From 4f8d1d206c6cca12ccee50aea3652773513cac7c Mon Sep 17 00:00:00 2001 From: wujiawei <12345678> Date: Tue, 5 Sep 2023 18:57:14 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E6=9C=8D=E5=8A=A1=E7=AB=AF=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=BF=83=E8=B7=B3=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../proxy/NettyProxyAutoConfiguration.java | 6 +- .../proxy/client/proxy/netty/ProxySocket.java | 3 + .../lazy/netty/proxy/server/ServerSocket.java | 6 +- .../lazy/netty/proxy/server/ServerStart.java | 4 +- .../netty/proxy/server/VisitorSocket.java | 9 +- .../handler/LazyServerIdleStateHandler.java | 553 ++++++++++++++++++ .../NettyServerProxyAutoConfiguration.java | 10 +- 7 files changed, 580 insertions(+), 11 deletions(-) create mode 100644 lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/handler/LazyServerIdleStateHandler.java diff --git a/lazy-netty-proxy-client/src/main/java/com/lazy/netty/proxy/client/proxy/NettyProxyAutoConfiguration.java b/lazy-netty-proxy-client/src/main/java/com/lazy/netty/proxy/client/proxy/NettyProxyAutoConfiguration.java index d4523ea..9ebb8ac 100644 --- a/lazy-netty-proxy-client/src/main/java/com/lazy/netty/proxy/client/proxy/NettyProxyAutoConfiguration.java +++ b/lazy-netty-proxy-client/src/main/java/com/lazy/netty/proxy/client/proxy/NettyProxyAutoConfiguration.java @@ -4,10 +4,12 @@ import com.lazy.netty.proxy.client.proxy.config.ClientProxyConfigurationProperti import com.lazy.netty.proxy.client.proxy.netty.Constant; import com.lazy.netty.proxy.client.proxy.netty.ProxySocket; import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +@Slf4j @Component public class NettyProxyAutoConfiguration { @@ -20,12 +22,14 @@ public class NettyProxyAutoConfiguration { } @PostConstruct - public void xx() { + public void clientRun() { new Thread(() -> { Constant.serverIp = clientProxyConfigurationProperties.getServerIp(); Constant.serverPort = clientProxyConfigurationProperties.getServerPort(); Constant.realPort = serverProperties.getPort(); + + log.info("netty客户端连接服务端IP:{},服务端端口:{},客户端自己的端口:{}",Constant.serverIp,Constant.serverPort,Constant.realPort); // 连接代理服务 try { ProxySocket.connectProxyServer(); diff --git a/lazy-netty-proxy-client/src/main/java/com/lazy/netty/proxy/client/proxy/netty/ProxySocket.java b/lazy-netty-proxy-client/src/main/java/com/lazy/netty/proxy/client/proxy/netty/ProxySocket.java index 0890b7d..e086344 100644 --- a/lazy-netty-proxy-client/src/main/java/com/lazy/netty/proxy/client/proxy/netty/ProxySocket.java +++ b/lazy-netty-proxy-client/src/main/java/com/lazy/netty/proxy/client/proxy/netty/ProxySocket.java @@ -13,11 +13,13 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.internal.StringUtil; +import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +@Slf4j public class ProxySocket { /** * 重连代理服务 @@ -67,6 +69,7 @@ public class ProxySocket { } }); + log.info("连接服务端IP:{},连接服务端端口:{}",Constant.serverIp, Constant.serverPort); bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/ServerSocket.java b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/ServerSocket.java index d631169..f0a35e4 100644 --- a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/ServerSocket.java +++ b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/ServerSocket.java @@ -4,6 +4,7 @@ import com.lazy.netty.proxy.msg.MyMsgDecoder; import com.lazy.netty.proxy.msg.MyMsgEncoder; import com.lazy.netty.proxy.server.handler.ClientHandler; import com.lazy.netty.proxy.server.handler.HeartBeatServerHandler; +import com.lazy.netty.proxy.server.handler.LazyServerIdleStateHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -21,7 +22,7 @@ public class ServerSocket { * * @throws Exception */ - public static void startServer() throws Exception { + public static void startServer(int serverPort) throws Exception { try { ServerBootstrap b = new ServerBootstrap(); @@ -33,12 +34,13 @@ public class ServerSocket { pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); pipeline.addLast(new MyMsgEncoder()); pipeline.addLast(new IdleStateHandler(40, 10, 0)); + pipeline.addLast(new LazyServerIdleStateHandler(40, 10, 0)); pipeline.addLast(new ClientHandler()); pipeline.addLast(new HeartBeatServerHandler()); } }); - channelFuture = b.bind(Constant.serverPort).sync(); + channelFuture = b.bind(serverPort).sync(); channelFuture.addListener((ChannelFutureListener) channelFuture -> { // 服务器已启动 diff --git a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/ServerStart.java b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/ServerStart.java index 5614dcf..fd24863 100644 --- a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/ServerStart.java +++ b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/ServerStart.java @@ -10,8 +10,8 @@ public class ServerStart { Constant.serverPort = serverPort; } // 启动访客服务端,用于接收访客请求 - VisitorSocket.startServer(); + VisitorSocket.startServer(Constant.visitorPort); // 启动代理服务端,用于接收客户端请求 - ServerSocket.startServer(); + ServerSocket.startServer(Constant.serverPort); } } \ No newline at end of file diff --git a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/VisitorSocket.java b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/VisitorSocket.java index 47570df..5fdde34 100644 --- a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/VisitorSocket.java +++ b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/VisitorSocket.java @@ -9,16 +9,19 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +/** + * 访客链接socket + */ public class VisitorSocket { private static EventLoopGroup bossGroup = new NioEventLoopGroup(); private static EventLoopGroup workerGroup = new NioEventLoopGroup(); /** * 启动服务代理 - * + * @param visitorPort 访客代理端口 * @throws Exception */ - public static void startServer() throws Exception { + public static void startServer(int visitorPort) throws Exception { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) @@ -30,7 +33,7 @@ public class VisitorSocket { pipeline.addLast(new VisitorHandler()); } }); - b.bind(Constant.visitorPort).get(); + b.bind(visitorPort).get(); } diff --git a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/handler/LazyServerIdleStateHandler.java b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/handler/LazyServerIdleStateHandler.java new file mode 100644 index 0000000..171a185 --- /dev/null +++ b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/handler/LazyServerIdleStateHandler.java @@ -0,0 +1,553 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.lazy.netty.proxy.server.handler; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.handler.timeout.*; +import io.netty.util.concurrent.Future; +import io.netty.util.internal.ObjectUtil; + +import java.util.concurrent.TimeUnit; + +/** + * Raises a {@link ReadTimeoutException} when no data was read within a certain + * period of time. + * + *
+ * // The connection is closed when there is no inbound traffic + * // for 30 seconds. + * + * public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> { + * public void initChannel({@link Channel} channel) { + * channel.pipeline().addLast("readTimeoutHandler", new {@link LazyServerIdleStateHandler}(30)); + * channel.pipeline().addLast("myHandler", new MyHandler()); + * } + * } + * + * // Handler should handle the {@link ReadTimeoutException}. + * public class MyHandler extends {@link ChannelDuplexHandler} { + * {@code @Override} + * public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause) + * throws {@link Exception} { + * if (cause instanceof {@link ReadTimeoutException}) { + * // do something + * } else { + * super.exceptionCaught(ctx, cause); + * } + * } + * } + * + * {@link ServerBootstrap} bootstrap = ...; + * ... + * bootstrap.childHandler(new MyChannelInitializer()); + * ... + *+ * @see WriteTimeoutHandler + * @see IdleStateHandler + */ +public class LazyServerIdleStateHandler extends ChannelDuplexHandler { + private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1); + + // Not create a new ChannelFutureListener per write operation to reduce GC pressure. + private final ChannelFutureListener writeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + lastWriteTime = ticksInNanos(); + firstWriterIdleEvent = firstAllIdleEvent = true; + } + }; + + private final boolean observeOutput; + private final long readerIdleTimeNanos; + private final long writerIdleTimeNanos; + private final long allIdleTimeNanos; + + private Future> readerIdleTimeout; + private long lastReadTime; + private boolean firstReaderIdleEvent = true; + + private Future> writerIdleTimeout; + private long lastWriteTime; + private boolean firstWriterIdleEvent = true; + + private Future> allIdleTimeout; + private boolean firstAllIdleEvent = true; + + private byte state; // 0 - none, 1 - initialized, 2 - destroyed + private boolean reading; + + private long lastChangeCheckTimeStamp; + private int lastMessageHashCode; + private long lastPendingWriteBytes; + private long lastFlushProgress; + + /** + * Creates a new instance firing {@link IdleStateEvent}s. + * + * @param readerIdleTimeSeconds + * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE} + * will be triggered when no read was performed for the specified + * period of time. Specify {@code 0} to disable. + * @param writerIdleTimeSeconds + * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE} + * will be triggered when no write was performed for the specified + * period of time. Specify {@code 0} to disable. + * @param allIdleTimeSeconds + * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE} + * will be triggered when neither read nor write was performed for + * the specified period of time. Specify {@code 0} to disable. + */ + public LazyServerIdleStateHandler( + int readerIdleTimeSeconds, + int writerIdleTimeSeconds, + int allIdleTimeSeconds) { + + this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, + TimeUnit.SECONDS); + } + + /** + * @see #LazyServerIdleStateHandler(boolean, long, long, long, TimeUnit) + */ + public LazyServerIdleStateHandler( + long readerIdleTime, long writerIdleTime, long allIdleTime, + TimeUnit unit) { + this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); + } + + /** + * Creates a new instance firing {@link IdleStateEvent}s. + * + * @param observeOutput + * whether or not the consumption of {@code bytes} should be taken into + * consideration when assessing write idleness. The default is {@code false}. + * @param readerIdleTime + * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE} + * will be triggered when no read was performed for the specified + * period of time. Specify {@code 0} to disable. + * @param writerIdleTime + * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE} + * will be triggered when no write was performed for the specified + * period of time. Specify {@code 0} to disable. + * @param allIdleTime + * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE} + * will be triggered when neither read nor write was performed for + * the specified period of time. Specify {@code 0} to disable. + * @param unit + * the {@link TimeUnit} of {@code readerIdleTime}, + * {@code writeIdleTime}, and {@code allIdleTime} + */ + public LazyServerIdleStateHandler(boolean observeOutput, + long readerIdleTime, long writerIdleTime, long allIdleTime, + TimeUnit unit) { + ObjectUtil.checkNotNull(unit, "unit"); + + this.observeOutput = observeOutput; + + if (readerIdleTime <= 0) { + readerIdleTimeNanos = 0; + } else { + readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS); + } + if (writerIdleTime <= 0) { + writerIdleTimeNanos = 0; + } else { + writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS); + } + if (allIdleTime <= 0) { + allIdleTimeNanos = 0; + } else { + allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS); + } + } + + /** + * Return the readerIdleTime that was given when instance this class in milliseconds. + * + */ + public long getReaderIdleTimeInMillis() { + return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos); + } + + /** + * Return the writerIdleTime that was given when instance this class in milliseconds. + * + */ + public long getWriterIdleTimeInMillis() { + return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos); + } + + /** + * Return the allIdleTime that was given when instance this class in milliseconds. + * + */ + public long getAllIdleTimeInMillis() { + return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + if (ctx.channel().isActive() && ctx.channel().isRegistered()) { + // channelActive() event has been fired already, which means this.channelActive() will + // not be invoked. We have to initialize here instead. + initialize(ctx); + } else { + // channelActive() event has not been fired yet. this.channelActive() will be invoked + // and initialization will occur there. + } + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + destroy(); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + // Initialize early if channel is active already. + if (ctx.channel().isActive()) { + initialize(ctx); + } + super.channelRegistered(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // This method will be invoked only if this handler was added + // before channelActive() event is fired. If a user adds this handler + // after the channelActive() event, initialize() will be called by beforeAdd(). + initialize(ctx); + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + destroy(); + super.channelInactive(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { + reading = true; + firstReaderIdleEvent = firstAllIdleEvent = true; + } + ctx.fireChannelRead(msg); + System.out.println("channelRead"); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) { + lastReadTime = ticksInNanos(); + reading = false; + } + ctx.fireChannelReadComplete(); + System.out.println("channelReadComplete"); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + // Allow writing with void promise if handler is only configured for read timeout events. + if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { + ctx.write(msg, promise.unvoid()).addListener(writeListener); + } else { + ctx.write(msg, promise); + } + System.out.println("write"); + } + + private void initialize(ChannelHandlerContext ctx) { + // Avoid the case where destroy() is called before scheduling timeouts. + // See: https://github.com/netty/netty/issues/143 + switch (state) { + case 1: + case 2: + return; + default: + break; + } + + state = 1; + initOutputChanged(ctx); + + lastReadTime = lastWriteTime = ticksInNanos(); + if (readerIdleTimeNanos > 0) { + readerIdleTimeout = schedule(ctx, new LazyServerIdleStateHandler.ReaderIdleTimeoutTask(ctx), + readerIdleTimeNanos, TimeUnit.NANOSECONDS); + } + if (writerIdleTimeNanos > 0) { + writerIdleTimeout = schedule(ctx, new LazyServerIdleStateHandler.WriterIdleTimeoutTask(ctx), + writerIdleTimeNanos, TimeUnit.NANOSECONDS); + } + if (allIdleTimeNanos > 0) { + allIdleTimeout = schedule(ctx, new LazyServerIdleStateHandler.AllIdleTimeoutTask(ctx), + allIdleTimeNanos, TimeUnit.NANOSECONDS); + } + } + + /** + * This method is visible for testing! + */ + long ticksInNanos() { + return System.nanoTime(); + } + + /** + * This method is visible for testing! + */ + Future> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { + return ctx.executor().schedule(task, delay, unit); + } + + private void destroy() { + state = 2; + + if (readerIdleTimeout != null) { + readerIdleTimeout.cancel(false); + readerIdleTimeout = null; + } + if (writerIdleTimeout != null) { + writerIdleTimeout.cancel(false); + writerIdleTimeout = null; + } + if (allIdleTimeout != null) { + allIdleTimeout.cancel(false); + allIdleTimeout = null; + } + } + + /** + * Is called when an {@link IdleStateEvent} should be fired. This implementation calls + * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}. + */ + protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { + ctx.fireUserEventTriggered(evt); + } + + /** + * Returns a {@link IdleStateEvent}. + */ + protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) { + switch (state) { + case ALL_IDLE: + return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT; + case READER_IDLE: + return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT; + case WRITER_IDLE: + return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT; + default: + throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first); + } + } + + /** + * @see #hasOutputChanged(ChannelHandlerContext, boolean) + */ + private void initOutputChanged(ChannelHandlerContext ctx) { + if (observeOutput) { + Channel channel = ctx.channel(); + Channel.Unsafe unsafe = channel.unsafe(); + ChannelOutboundBuffer buf = unsafe.outboundBuffer(); + + if (buf != null) { + lastMessageHashCode = System.identityHashCode(buf.current()); + lastPendingWriteBytes = buf.totalPendingWriteBytes(); + lastFlushProgress = buf.currentProgress(); + } + } + } + + /** + * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed + * with {@link #observeOutput} enabled and there has been an observed change in the + * {@link ChannelOutboundBuffer} between two consecutive calls of this method. + * + * https://github.com/netty/netty/issues/6150 + */ + private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) { + if (observeOutput) { + + // We can take this shortcut if the ChannelPromises that got passed into write() + // appear to complete. It indicates "change" on message level and we simply assume + // that there's change happening on byte level. If the user doesn't observe channel + // writability events then they'll eventually OOME and there's clearly a different + // problem and idleness is least of their concerns. + if (lastChangeCheckTimeStamp != lastWriteTime) { + lastChangeCheckTimeStamp = lastWriteTime; + + // But this applies only if it's the non-first call. + if (!first) { + return true; + } + } + + Channel channel = ctx.channel(); + Channel.Unsafe unsafe = channel.unsafe(); + ChannelOutboundBuffer buf = unsafe.outboundBuffer(); + + if (buf != null) { + int messageHashCode = System.identityHashCode(buf.current()); + long pendingWriteBytes = buf.totalPendingWriteBytes(); + + if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) { + lastMessageHashCode = messageHashCode; + lastPendingWriteBytes = pendingWriteBytes; + + if (!first) { + return true; + } + } + + long flushProgress = buf.currentProgress(); + if (flushProgress != lastFlushProgress) { + lastFlushProgress = flushProgress; + return !first; + } + } + } + + return false; + } + + private abstract static class AbstractIdleTask implements Runnable { + + private final ChannelHandlerContext ctx; + + AbstractIdleTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public void run() { + if (!ctx.channel().isOpen()) { + return; + } + + run(ctx); + } + + protected abstract void run(ChannelHandlerContext ctx); + } + + private final class ReaderIdleTimeoutTask extends LazyServerIdleStateHandler.AbstractIdleTask { + + ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { + super(ctx); + } + + @Override + protected void run(ChannelHandlerContext ctx) { + long nextDelay = readerIdleTimeNanos; + if (!reading) { + nextDelay -= ticksInNanos() - lastReadTime; + } + + if (nextDelay <= 0) { + // Reader is idle - set a new timeout and notify the callback. + readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); + + boolean first = firstReaderIdleEvent; + firstReaderIdleEvent = false; + + try { + IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); + channelIdle(ctx, event); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); + } + } else { + // Read occurred before the timeout - set a new timeout with shorter delay. + readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); + } + } + } + + private final class WriterIdleTimeoutTask extends LazyServerIdleStateHandler.AbstractIdleTask { + + WriterIdleTimeoutTask(ChannelHandlerContext ctx) { + super(ctx); + } + + @Override + protected void run(ChannelHandlerContext ctx) { + + long lastWriteTime = LazyServerIdleStateHandler.this.lastWriteTime; + long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime); + if (nextDelay <= 0) { + // Writer is idle - set a new timeout and notify the callback. + writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS); + + boolean first = firstWriterIdleEvent; + firstWriterIdleEvent = false; + + try { + if (hasOutputChanged(ctx, first)) { + return; + } + + IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first); + channelIdle(ctx, event); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); + } + } else { + // Write occurred before the timeout - set a new timeout with shorter delay. + writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); + } + } + } + + private final class AllIdleTimeoutTask extends LazyServerIdleStateHandler.AbstractIdleTask { + + AllIdleTimeoutTask(ChannelHandlerContext ctx) { + super(ctx); + } + + @Override + protected void run(ChannelHandlerContext ctx) { + + long nextDelay = allIdleTimeNanos; + if (!reading) { + nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime); + } + if (nextDelay <= 0) { + // Both reader and writer are idle - set a new timeout and + // notify the callback. + allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS); + + boolean first = firstAllIdleEvent; + firstAllIdleEvent = false; + + try { + if (hasOutputChanged(ctx, first)) { + return; + } + + IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first); + channelIdle(ctx, event); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); + } + } else { + // Either read or write occurred before the timeout - set a new + // timeout with shorter delay. + allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); + } + } + } +} diff --git a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/netty/NettyServerProxyAutoConfiguration.java b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/netty/NettyServerProxyAutoConfiguration.java index 193684b..e5e0a6d 100644 --- a/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/netty/NettyServerProxyAutoConfiguration.java +++ b/lazy-netty-proxy-server/src/main/java/com/lazy/netty/proxy/server/netty/NettyServerProxyAutoConfiguration.java @@ -5,10 +5,12 @@ import com.lazy.netty.proxy.server.ServerSocket; import com.lazy.netty.proxy.server.VisitorSocket; import com.lazy.netty.proxy.server.config.ServerProxyConfigurationProperties; import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +@Slf4j @Component public class NettyServerProxyAutoConfiguration { @@ -22,20 +24,22 @@ public class NettyServerProxyAutoConfiguration { @PostConstruct - public void xx() { + public void runServer() { new Thread(() -> { Constant.visitorPort = serverProxyConfigurationProperties.getVisitorPort(); Constant.serverPort = serverProperties.getPort(); // 启动访客服务端,用于接收访客请求 try { - VisitorSocket.startServer(); + VisitorSocket.startServer(Constant.visitorPort); + log.info("访客链接端口:"+Constant.visitorPort); } catch (Exception e) { e.printStackTrace(); } // 启动代理服务端,用于接收客户端请求 try { - ServerSocket.startServer(); + log.info("服务端端口:"+Constant.serverPort); + ServerSocket.startServer(Constant.serverPort); } catch (Exception e) { e.printStackTrace(); }