mirror of
https://gitee.com/wujiawei1207537021/wu-lazy-cloud-network.git
synced 2025-06-06 21:37:56 +08:00
fix 服务端添加心跳监听
This commit is contained in:
parent
d2cf3944ab
commit
4f8d1d206c
@ -4,10 +4,12 @@ import com.lazy.netty.proxy.client.proxy.config.ClientProxyConfigurationProperti
|
||||
import com.lazy.netty.proxy.client.proxy.netty.Constant;
|
||||
import com.lazy.netty.proxy.client.proxy.netty.ProxySocket;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NettyProxyAutoConfiguration {
|
||||
|
||||
@ -20,12 +22,14 @@ public class NettyProxyAutoConfiguration {
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void xx() {
|
||||
public void clientRun() {
|
||||
new Thread(() -> {
|
||||
Constant.serverIp = clientProxyConfigurationProperties.getServerIp();
|
||||
Constant.serverPort = clientProxyConfigurationProperties.getServerPort();
|
||||
Constant.realPort = serverProperties.getPort();
|
||||
|
||||
|
||||
log.info("netty客户端连接服务端IP:{},服务端端口:{},客户端自己的端口:{}",Constant.serverIp,Constant.serverPort,Constant.realPort);
|
||||
// 连接代理服务
|
||||
try {
|
||||
ProxySocket.connectProxyServer();
|
||||
|
@ -13,11 +13,13 @@ import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
public class ProxySocket {
|
||||
/**
|
||||
* 重连代理服务
|
||||
@ -67,6 +69,7 @@ public class ProxySocket {
|
||||
}
|
||||
});
|
||||
|
||||
log.info("连接服务端IP:{},连接服务端端口:{}",Constant.serverIp, Constant.serverPort);
|
||||
bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
|
@ -4,6 +4,7 @@ import com.lazy.netty.proxy.msg.MyMsgDecoder;
|
||||
import com.lazy.netty.proxy.msg.MyMsgEncoder;
|
||||
import com.lazy.netty.proxy.server.handler.ClientHandler;
|
||||
import com.lazy.netty.proxy.server.handler.HeartBeatServerHandler;
|
||||
import com.lazy.netty.proxy.server.handler.LazyServerIdleStateHandler;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
@ -21,7 +22,7 @@ public class ServerSocket {
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void startServer() throws Exception {
|
||||
public static void startServer(int serverPort) throws Exception {
|
||||
try {
|
||||
|
||||
ServerBootstrap b = new ServerBootstrap();
|
||||
@ -33,12 +34,13 @@ public class ServerSocket {
|
||||
pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
|
||||
pipeline.addLast(new MyMsgEncoder());
|
||||
pipeline.addLast(new IdleStateHandler(40, 10, 0));
|
||||
pipeline.addLast(new LazyServerIdleStateHandler(40, 10, 0));
|
||||
pipeline.addLast(new ClientHandler());
|
||||
pipeline.addLast(new HeartBeatServerHandler());
|
||||
}
|
||||
|
||||
});
|
||||
channelFuture = b.bind(Constant.serverPort).sync();
|
||||
channelFuture = b.bind(serverPort).sync();
|
||||
|
||||
channelFuture.addListener((ChannelFutureListener) channelFuture -> {
|
||||
// 服务器已启动
|
||||
|
@ -10,8 +10,8 @@ public class ServerStart {
|
||||
Constant.serverPort = serverPort;
|
||||
}
|
||||
// 启动访客服务端,用于接收访客请求
|
||||
VisitorSocket.startServer();
|
||||
VisitorSocket.startServer(Constant.visitorPort);
|
||||
// 启动代理服务端,用于接收客户端请求
|
||||
ServerSocket.startServer();
|
||||
ServerSocket.startServer(Constant.serverPort);
|
||||
}
|
||||
}
|
@ -9,16 +9,19 @@ import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
|
||||
/**
|
||||
* 访客链接socket
|
||||
*/
|
||||
public class VisitorSocket {
|
||||
private static EventLoopGroup bossGroup = new NioEventLoopGroup();
|
||||
private static EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||
|
||||
/**
|
||||
* 启动服务代理
|
||||
*
|
||||
* @param visitorPort 访客代理端口
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void startServer() throws Exception {
|
||||
public static void startServer(int visitorPort) throws Exception {
|
||||
|
||||
ServerBootstrap b = new ServerBootstrap();
|
||||
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
|
||||
@ -30,7 +33,7 @@ public class VisitorSocket {
|
||||
pipeline.addLast(new VisitorHandler());
|
||||
}
|
||||
});
|
||||
b.bind(Constant.visitorPort).get();
|
||||
b.bind(visitorPort).get();
|
||||
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,553 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package com.lazy.netty.proxy.server.handler;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.handler.timeout.*;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Raises a {@link ReadTimeoutException} when no data was read within a certain
|
||||
* period of time.
|
||||
*
|
||||
* <pre>
|
||||
* // The connection is closed when there is no inbound traffic
|
||||
* // for 30 seconds.
|
||||
*
|
||||
* public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> {
|
||||
* public void initChannel({@link Channel} channel) {
|
||||
* channel.pipeline().addLast("readTimeoutHandler", new {@link LazyServerIdleStateHandler}(30));
|
||||
* channel.pipeline().addLast("myHandler", new MyHandler());
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // Handler should handle the {@link ReadTimeoutException}.
|
||||
* public class MyHandler extends {@link ChannelDuplexHandler} {
|
||||
* {@code @Override}
|
||||
* public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
|
||||
* throws {@link Exception} {
|
||||
* if (cause instanceof {@link ReadTimeoutException}) {
|
||||
* // do something
|
||||
* } else {
|
||||
* super.exceptionCaught(ctx, cause);
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* {@link ServerBootstrap} bootstrap = ...;
|
||||
* ...
|
||||
* bootstrap.childHandler(new MyChannelInitializer());
|
||||
* ...
|
||||
* </pre>
|
||||
* @see WriteTimeoutHandler
|
||||
* @see IdleStateHandler
|
||||
*/
|
||||
public class LazyServerIdleStateHandler extends ChannelDuplexHandler {
|
||||
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||
|
||||
// Not create a new ChannelFutureListener per write operation to reduce GC pressure.
|
||||
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
lastWriteTime = ticksInNanos();
|
||||
firstWriterIdleEvent = firstAllIdleEvent = true;
|
||||
}
|
||||
};
|
||||
|
||||
private final boolean observeOutput;
|
||||
private final long readerIdleTimeNanos;
|
||||
private final long writerIdleTimeNanos;
|
||||
private final long allIdleTimeNanos;
|
||||
|
||||
private Future<?> readerIdleTimeout;
|
||||
private long lastReadTime;
|
||||
private boolean firstReaderIdleEvent = true;
|
||||
|
||||
private Future<?> writerIdleTimeout;
|
||||
private long lastWriteTime;
|
||||
private boolean firstWriterIdleEvent = true;
|
||||
|
||||
private Future<?> allIdleTimeout;
|
||||
private boolean firstAllIdleEvent = true;
|
||||
|
||||
private byte state; // 0 - none, 1 - initialized, 2 - destroyed
|
||||
private boolean reading;
|
||||
|
||||
private long lastChangeCheckTimeStamp;
|
||||
private int lastMessageHashCode;
|
||||
private long lastPendingWriteBytes;
|
||||
private long lastFlushProgress;
|
||||
|
||||
/**
|
||||
* Creates a new instance firing {@link IdleStateEvent}s.
|
||||
*
|
||||
* @param readerIdleTimeSeconds
|
||||
* an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
|
||||
* will be triggered when no read was performed for the specified
|
||||
* period of time. Specify {@code 0} to disable.
|
||||
* @param writerIdleTimeSeconds
|
||||
* an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
|
||||
* will be triggered when no write was performed for the specified
|
||||
* period of time. Specify {@code 0} to disable.
|
||||
* @param allIdleTimeSeconds
|
||||
* an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
|
||||
* will be triggered when neither read nor write was performed for
|
||||
* the specified period of time. Specify {@code 0} to disable.
|
||||
*/
|
||||
public LazyServerIdleStateHandler(
|
||||
int readerIdleTimeSeconds,
|
||||
int writerIdleTimeSeconds,
|
||||
int allIdleTimeSeconds) {
|
||||
|
||||
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
|
||||
TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #LazyServerIdleStateHandler(boolean, long, long, long, TimeUnit)
|
||||
*/
|
||||
public LazyServerIdleStateHandler(
|
||||
long readerIdleTime, long writerIdleTime, long allIdleTime,
|
||||
TimeUnit unit) {
|
||||
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance firing {@link IdleStateEvent}s.
|
||||
*
|
||||
* @param observeOutput
|
||||
* whether or not the consumption of {@code bytes} should be taken into
|
||||
* consideration when assessing write idleness. The default is {@code false}.
|
||||
* @param readerIdleTime
|
||||
* an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
|
||||
* will be triggered when no read was performed for the specified
|
||||
* period of time. Specify {@code 0} to disable.
|
||||
* @param writerIdleTime
|
||||
* an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
|
||||
* will be triggered when no write was performed for the specified
|
||||
* period of time. Specify {@code 0} to disable.
|
||||
* @param allIdleTime
|
||||
* an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
|
||||
* will be triggered when neither read nor write was performed for
|
||||
* the specified period of time. Specify {@code 0} to disable.
|
||||
* @param unit
|
||||
* the {@link TimeUnit} of {@code readerIdleTime},
|
||||
* {@code writeIdleTime}, and {@code allIdleTime}
|
||||
*/
|
||||
public LazyServerIdleStateHandler(boolean observeOutput,
|
||||
long readerIdleTime, long writerIdleTime, long allIdleTime,
|
||||
TimeUnit unit) {
|
||||
ObjectUtil.checkNotNull(unit, "unit");
|
||||
|
||||
this.observeOutput = observeOutput;
|
||||
|
||||
if (readerIdleTime <= 0) {
|
||||
readerIdleTimeNanos = 0;
|
||||
} else {
|
||||
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
|
||||
}
|
||||
if (writerIdleTime <= 0) {
|
||||
writerIdleTimeNanos = 0;
|
||||
} else {
|
||||
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
|
||||
}
|
||||
if (allIdleTime <= 0) {
|
||||
allIdleTimeNanos = 0;
|
||||
} else {
|
||||
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the readerIdleTime that was given when instance this class in milliseconds.
|
||||
*
|
||||
*/
|
||||
public long getReaderIdleTimeInMillis() {
|
||||
return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the writerIdleTime that was given when instance this class in milliseconds.
|
||||
*
|
||||
*/
|
||||
public long getWriterIdleTimeInMillis() {
|
||||
return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the allIdleTime that was given when instance this class in milliseconds.
|
||||
*
|
||||
*/
|
||||
public long getAllIdleTimeInMillis() {
|
||||
return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
|
||||
// channelActive() event has been fired already, which means this.channelActive() will
|
||||
// not be invoked. We have to initialize here instead.
|
||||
initialize(ctx);
|
||||
} else {
|
||||
// channelActive() event has not been fired yet. this.channelActive() will be invoked
|
||||
// and initialization will occur there.
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||
// Initialize early if channel is active already.
|
||||
if (ctx.channel().isActive()) {
|
||||
initialize(ctx);
|
||||
}
|
||||
super.channelRegistered(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
// This method will be invoked only if this handler was added
|
||||
// before channelActive() event is fired. If a user adds this handler
|
||||
// after the channelActive() event, initialize() will be called by beforeAdd().
|
||||
initialize(ctx);
|
||||
super.channelActive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
destroy();
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
|
||||
reading = true;
|
||||
firstReaderIdleEvent = firstAllIdleEvent = true;
|
||||
}
|
||||
ctx.fireChannelRead(msg);
|
||||
System.out.println("channelRead");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
|
||||
lastReadTime = ticksInNanos();
|
||||
reading = false;
|
||||
}
|
||||
ctx.fireChannelReadComplete();
|
||||
System.out.println("channelReadComplete");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||
// Allow writing with void promise if handler is only configured for read timeout events.
|
||||
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
|
||||
ctx.write(msg, promise.unvoid()).addListener(writeListener);
|
||||
} else {
|
||||
ctx.write(msg, promise);
|
||||
}
|
||||
System.out.println("write");
|
||||
}
|
||||
|
||||
private void initialize(ChannelHandlerContext ctx) {
|
||||
// Avoid the case where destroy() is called before scheduling timeouts.
|
||||
// See: https://github.com/netty/netty/issues/143
|
||||
switch (state) {
|
||||
case 1:
|
||||
case 2:
|
||||
return;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
state = 1;
|
||||
initOutputChanged(ctx);
|
||||
|
||||
lastReadTime = lastWriteTime = ticksInNanos();
|
||||
if (readerIdleTimeNanos > 0) {
|
||||
readerIdleTimeout = schedule(ctx, new LazyServerIdleStateHandler.ReaderIdleTimeoutTask(ctx),
|
||||
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
if (writerIdleTimeNanos > 0) {
|
||||
writerIdleTimeout = schedule(ctx, new LazyServerIdleStateHandler.WriterIdleTimeoutTask(ctx),
|
||||
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
if (allIdleTimeNanos > 0) {
|
||||
allIdleTimeout = schedule(ctx, new LazyServerIdleStateHandler.AllIdleTimeoutTask(ctx),
|
||||
allIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is visible for testing!
|
||||
*/
|
||||
long ticksInNanos() {
|
||||
return System.nanoTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is visible for testing!
|
||||
*/
|
||||
Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
|
||||
return ctx.executor().schedule(task, delay, unit);
|
||||
}
|
||||
|
||||
private void destroy() {
|
||||
state = 2;
|
||||
|
||||
if (readerIdleTimeout != null) {
|
||||
readerIdleTimeout.cancel(false);
|
||||
readerIdleTimeout = null;
|
||||
}
|
||||
if (writerIdleTimeout != null) {
|
||||
writerIdleTimeout.cancel(false);
|
||||
writerIdleTimeout = null;
|
||||
}
|
||||
if (allIdleTimeout != null) {
|
||||
allIdleTimeout.cancel(false);
|
||||
allIdleTimeout = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Is called when an {@link IdleStateEvent} should be fired. This implementation calls
|
||||
* {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
|
||||
*/
|
||||
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
|
||||
ctx.fireUserEventTriggered(evt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link IdleStateEvent}.
|
||||
*/
|
||||
protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
|
||||
switch (state) {
|
||||
case ALL_IDLE:
|
||||
return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
|
||||
case READER_IDLE:
|
||||
return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
|
||||
case WRITER_IDLE:
|
||||
return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #hasOutputChanged(ChannelHandlerContext, boolean)
|
||||
*/
|
||||
private void initOutputChanged(ChannelHandlerContext ctx) {
|
||||
if (observeOutput) {
|
||||
Channel channel = ctx.channel();
|
||||
Channel.Unsafe unsafe = channel.unsafe();
|
||||
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
|
||||
|
||||
if (buf != null) {
|
||||
lastMessageHashCode = System.identityHashCode(buf.current());
|
||||
lastPendingWriteBytes = buf.totalPendingWriteBytes();
|
||||
lastFlushProgress = buf.currentProgress();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
|
||||
* with {@link #observeOutput} enabled and there has been an observed change in the
|
||||
* {@link ChannelOutboundBuffer} between two consecutive calls of this method.
|
||||
*
|
||||
* https://github.com/netty/netty/issues/6150
|
||||
*/
|
||||
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
|
||||
if (observeOutput) {
|
||||
|
||||
// We can take this shortcut if the ChannelPromises that got passed into write()
|
||||
// appear to complete. It indicates "change" on message level and we simply assume
|
||||
// that there's change happening on byte level. If the user doesn't observe channel
|
||||
// writability events then they'll eventually OOME and there's clearly a different
|
||||
// problem and idleness is least of their concerns.
|
||||
if (lastChangeCheckTimeStamp != lastWriteTime) {
|
||||
lastChangeCheckTimeStamp = lastWriteTime;
|
||||
|
||||
// But this applies only if it's the non-first call.
|
||||
if (!first) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
Channel channel = ctx.channel();
|
||||
Channel.Unsafe unsafe = channel.unsafe();
|
||||
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
|
||||
|
||||
if (buf != null) {
|
||||
int messageHashCode = System.identityHashCode(buf.current());
|
||||
long pendingWriteBytes = buf.totalPendingWriteBytes();
|
||||
|
||||
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
|
||||
lastMessageHashCode = messageHashCode;
|
||||
lastPendingWriteBytes = pendingWriteBytes;
|
||||
|
||||
if (!first) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
long flushProgress = buf.currentProgress();
|
||||
if (flushProgress != lastFlushProgress) {
|
||||
lastFlushProgress = flushProgress;
|
||||
return !first;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private abstract static class AbstractIdleTask implements Runnable {
|
||||
|
||||
private final ChannelHandlerContext ctx;
|
||||
|
||||
AbstractIdleTask(ChannelHandlerContext ctx) {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (!ctx.channel().isOpen()) {
|
||||
return;
|
||||
}
|
||||
|
||||
run(ctx);
|
||||
}
|
||||
|
||||
protected abstract void run(ChannelHandlerContext ctx);
|
||||
}
|
||||
|
||||
private final class ReaderIdleTimeoutTask extends LazyServerIdleStateHandler.AbstractIdleTask {
|
||||
|
||||
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(ChannelHandlerContext ctx) {
|
||||
long nextDelay = readerIdleTimeNanos;
|
||||
if (!reading) {
|
||||
nextDelay -= ticksInNanos() - lastReadTime;
|
||||
}
|
||||
|
||||
if (nextDelay <= 0) {
|
||||
// Reader is idle - set a new timeout and notify the callback.
|
||||
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
|
||||
boolean first = firstReaderIdleEvent;
|
||||
firstReaderIdleEvent = false;
|
||||
|
||||
try {
|
||||
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
|
||||
channelIdle(ctx, event);
|
||||
} catch (Throwable t) {
|
||||
ctx.fireExceptionCaught(t);
|
||||
}
|
||||
} else {
|
||||
// Read occurred before the timeout - set a new timeout with shorter delay.
|
||||
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class WriterIdleTimeoutTask extends LazyServerIdleStateHandler.AbstractIdleTask {
|
||||
|
||||
WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(ChannelHandlerContext ctx) {
|
||||
|
||||
long lastWriteTime = LazyServerIdleStateHandler.this.lastWriteTime;
|
||||
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
|
||||
if (nextDelay <= 0) {
|
||||
// Writer is idle - set a new timeout and notify the callback.
|
||||
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
|
||||
boolean first = firstWriterIdleEvent;
|
||||
firstWriterIdleEvent = false;
|
||||
|
||||
try {
|
||||
if (hasOutputChanged(ctx, first)) {
|
||||
return;
|
||||
}
|
||||
|
||||
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
|
||||
channelIdle(ctx, event);
|
||||
} catch (Throwable t) {
|
||||
ctx.fireExceptionCaught(t);
|
||||
}
|
||||
} else {
|
||||
// Write occurred before the timeout - set a new timeout with shorter delay.
|
||||
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class AllIdleTimeoutTask extends LazyServerIdleStateHandler.AbstractIdleTask {
|
||||
|
||||
AllIdleTimeoutTask(ChannelHandlerContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(ChannelHandlerContext ctx) {
|
||||
|
||||
long nextDelay = allIdleTimeNanos;
|
||||
if (!reading) {
|
||||
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
|
||||
}
|
||||
if (nextDelay <= 0) {
|
||||
// Both reader and writer are idle - set a new timeout and
|
||||
// notify the callback.
|
||||
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
|
||||
boolean first = firstAllIdleEvent;
|
||||
firstAllIdleEvent = false;
|
||||
|
||||
try {
|
||||
if (hasOutputChanged(ctx, first)) {
|
||||
return;
|
||||
}
|
||||
|
||||
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
|
||||
channelIdle(ctx, event);
|
||||
} catch (Throwable t) {
|
||||
ctx.fireExceptionCaught(t);
|
||||
}
|
||||
} else {
|
||||
// Either read or write occurred before the timeout - set a new
|
||||
// timeout with shorter delay.
|
||||
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -5,10 +5,12 @@ import com.lazy.netty.proxy.server.ServerSocket;
|
||||
import com.lazy.netty.proxy.server.VisitorSocket;
|
||||
import com.lazy.netty.proxy.server.config.ServerProxyConfigurationProperties;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NettyServerProxyAutoConfiguration {
|
||||
|
||||
@ -22,20 +24,22 @@ public class NettyServerProxyAutoConfiguration {
|
||||
|
||||
|
||||
@PostConstruct
|
||||
public void xx() {
|
||||
public void runServer() {
|
||||
new Thread(() -> {
|
||||
Constant.visitorPort = serverProxyConfigurationProperties.getVisitorPort();
|
||||
Constant.serverPort = serverProperties.getPort();
|
||||
|
||||
// 启动访客服务端,用于接收访客请求
|
||||
try {
|
||||
VisitorSocket.startServer();
|
||||
VisitorSocket.startServer(Constant.visitorPort);
|
||||
log.info("访客链接端口:"+Constant.visitorPort);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
// 启动代理服务端,用于接收客户端请求
|
||||
try {
|
||||
ServerSocket.startServer();
|
||||
log.info("服务端端口:"+Constant.serverPort);
|
||||
ServerSocket.startServer(Constant.serverPort);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user