From 32a641ce999dbbfb7e7e3484aa3b027721028c56 Mon Sep 17 00:00:00 2001 From: wujiawei <12345678> Date: Tue, 13 May 2025 11:15:21 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90fix=E3=80=91init=20route=20ip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ettySocksServerProxyClientRealHandler.java | 7 +- ...SocksServerProxyClientTransferHandler.java | 13 ++-- ...NettySocksServerProxyClientRealSocket.java | 1 + .../NettyProxy2RealInboundHandler.java | 2 +- .../protocol/route/RouteContext.java | 1 + ...xyClientConnectionSuccessTypeAdvanced.java | 4 ++ ...tySocksServerProxyClientVisitorFilter.java | 41 ++++++++++++ ...ySocksServerProxyClientVisitorHandler.java | 64 +++++++++++++++++++ 8 files changed, 119 insertions(+), 14 deletions(-) create mode 100644 wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/filter/NettySocksServerProxyClientVisitorFilter.java create mode 100644 wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorHandler.java diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/handler/NettySocksServerProxyClientRealHandler.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/handler/NettySocksServerProxyClientRealHandler.java index 2e7491e..a2fc648 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/handler/NettySocksServerProxyClientRealHandler.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/handler/NettySocksServerProxyClientRealHandler.java @@ -8,7 +8,6 @@ import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; -import org.framework.lazy.cloud.network.heartbeat.common.constant.TcpMessageType; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; /** @@ -49,13 +48,13 @@ public class NettySocksServerProxyClientRealHandler extends SimpleChannelInbound public void channelInactive(ChannelHandlerContext ctx) throws Exception { String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); // 客户端真实通信通道 - Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); - if (nextChannel != null) { + Channel transferChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + if (transferChannel != null&&transferChannel.isActive()) { // 上报关闭这个客户端的访客通道 NettyProxyMsg closeVisitorMsg = new NettyProxyMsg(); closeVisitorMsg.setType(ProxyMessageType.SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_CLOSE_); closeVisitorMsg.setVisitorId(visitorId); - nextChannel.writeAndFlush(closeVisitorMsg); + transferChannel.writeAndFlush(closeVisitorMsg); } super.channelInactive(ctx); diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/handler/NettySocksServerProxyClientTransferHandler.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/handler/NettySocksServerProxyClientTransferHandler.java index 4e766ed..ff22040 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/handler/NettySocksServerProxyClientTransferHandler.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/handler/NettySocksServerProxyClientTransferHandler.java @@ -35,16 +35,11 @@ public class NettySocksServerProxyClientTransferHandler extends SimpleChannelInb String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel()); String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); - Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + Channel realChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); log.warn("close server proxy client transfer real clientId:{} visitorId:{}", clientId, visitorId); - // 关闭访客 - if (nextChannel != null) { - - // 上报关闭这个客户端的访客通道 - NettyProxyMsg closeVisitorMsg = new NettyProxyMsg(); - closeVisitorMsg.setType(ProxyMessageType.SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_CLOSE_); - closeVisitorMsg.setVisitorId(visitorId); - nextChannel.writeAndFlush(closeVisitorMsg); + // 关闭真实通道 + if (realChannel != null) { + realChannel.close(); } super.channelInactive(ctx); diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/socket/NettySocksServerProxyClientRealSocket.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/socket/NettySocksServerProxyClientRealSocket.java index 4dacebb..3c4f1f9 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/socket/NettySocksServerProxyClientRealSocket.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/socks/socket/NettySocksServerProxyClientRealSocket.java @@ -129,6 +129,7 @@ public class NettySocksServerProxyClientRealSocket { future.addListener((ChannelFutureListener) futureListener -> { Channel transferChannel = futureListener.channel(); if (futureListener.isSuccess()) { + log.info("服务端代理客户端socks,客户端连接服务端传输通道成功"); realChannel.config().setOption(ChannelOption.AUTO_READ, true); // 通知服务端访客连接成功 NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyProxy2RealInboundHandler.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyProxy2RealInboundHandler.java index 063fa23..7fb4e1f 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyProxy2RealInboundHandler.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyProxy2RealInboundHandler.java @@ -26,7 +26,7 @@ public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log.info("转发客户端的请求到代理服务器"); + log.info("本地转发客户端的请求到代理服务器"); if (dstChannelFuture.channel().isActive()) { dstChannelFuture.channel().writeAndFlush(msg); } else { diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/route/RouteContext.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/route/RouteContext.java index 2a53b4a..315be49 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/route/RouteContext.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/route/RouteContext.java @@ -25,6 +25,7 @@ public class RouteContext { String virtualPort = route.getVirtualPort(); String key = virtualIp + ":" + virtualPort + "_" + routeType; if (m.containsKey(key)) { + m.put(key, route); return; } // TODO 验证ip、端口、类型 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced.java index 1cf7df6..6bee422 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced.java @@ -14,6 +14,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.se import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettySocks5CommandRequestHandler; +import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.filter.NettySocksServerProxyClientVisitorFilter; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler.NettyServerProxyClientVisitorInboundHandler; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler.NettySocksClientProxyServerRealHandler; import org.springframework.beans.factory.config.BeanDefinition; @@ -59,6 +60,9 @@ public class ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanc DefaultSocks5CommandResponse commandResponse = new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, socks5AddressType); nextChannel.writeAndFlush(commandResponse); + + nextChannel.pipeline().addLast(new NettySocksServerProxyClientVisitorFilter()); + nextChannel.pipeline().remove(NettySocks5CommandRequestHandler.class); nextChannel.pipeline().remove(Socks5CommandRequestDecoder.class); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/filter/NettySocksServerProxyClientVisitorFilter.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/filter/NettySocksServerProxyClientVisitorFilter.java new file mode 100644 index 0000000..3fe9999 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/filter/NettySocksServerProxyClientVisitorFilter.java @@ -0,0 +1,41 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.filter; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; +import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler.NettySocksServerProxyClientVisitorHandler; +import org.wu.framework.spring.utils.SpringContextHolder; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class NettySocksServerProxyClientVisitorFilter extends DebugChannelInitializer { + + + /** + * This method will be called once the {@link Channel} was registered. After the method returns this instance + * will be removed from the {@link ChannelPipeline} of the {@link Channel}. + * + * @param ch the {@link Channel} which was registered. + * @throws Exception is thrown if an error occurs. In that case it will be handled by + * {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default connectionClose + * the {@link Channel}. + */ + @Override + protected void initChannel0(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new ChannelDuplexHandler()); + + List handleChannelTypeAdvancedList = new ArrayList<>(SpringContextHolder.getApplicationContext().getBeansOfType(HandleChannelTypeAdvanced.class).values()); + + pipeline.addLast(new NettySocksServerProxyClientVisitorHandler(new ChannelTypeAdapter(handleChannelTypeAdvancedList))); + } +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorHandler.java new file mode 100644 index 0000000..e4b4c1f --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorHandler.java @@ -0,0 +1,64 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler; + + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; + + +@Slf4j +public class NettySocksServerProxyClientVisitorHandler extends SimpleChannelInboundHandler { + private final ChannelTypeAdapter channelTypeAdapter; + + public NettySocksServerProxyClientVisitorHandler(ChannelTypeAdapter channelTypeAdapter) { + this.channelTypeAdapter = channelTypeAdapter; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, NettyProxyMsg nettyProxyMsg) throws Exception { + channelTypeAdapter.handler(ctx, nettyProxyMsg); + + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + + String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel()); + String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); + Channel realChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + log.warn("close server proxy client transfer real clientId:{} visitorId:{}", clientId, visitorId); + // 关闭真实通道 + if (realChannel != null) { + realChannel.close(); + } + + super.channelInactive(ctx); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + // 处理客户端本地真实通道问题 + if (ctx.channel().isWritable()) { + log.debug("Channel is writable again"); + // 恢复之前暂停的操作,如写入数据 + } else { + log.debug("Channel is not writable"); + // 暂停写入操作,等待可写状态 + } + log.info("channelWritabilityChanged!"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } +} \ No newline at end of file