diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/advanced/NettySocketProtocolHandleSocketLocalProxyTypeAdvanced.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/advanced/NettySocketProtocolHandleSocketLocalProxyTypeAdvanced.java index 05970b53..f79fe2d4 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/advanced/NettySocketProtocolHandleSocketLocalProxyTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/advanced/NettySocketProtocolHandleSocketLocalProxyTypeAdvanced.java @@ -14,6 +14,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyP import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettySocketChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.AbstractHandleSocketLocalProxyTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.factory.EventLoopGroupFactory; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettyProxy2RealInboundHandler; import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettySocketBackendHandler; import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettySocks5CommandRequestHandler; @@ -34,11 +35,11 @@ public class NettySocketProtocolHandleSocketLocalProxyTypeAdvanced @Override protected void doHandler(NettyChannelContext nettyChannelContext, NettyProxyMsg nettyProxyMsg) { NettySocketChannelContext nettySocketChannelContext = (NettySocketChannelContext) nettyChannelContext; - Channel channel = nettySocketChannelContext.channel(); - ChannelHandlerContext channelHandlerContext = nettySocketChannelContext.channelHandlerContext(); + Channel proxyChannel = nettySocketChannelContext.channel(); EventLoopGroup group = EventLoopGroupFactory.createClientWorkGroup(); String host = nettyProxyMsg.targetIp(); Integer port = Integer.parseInt(nettyProxyMsg.targetPort()); + String visitorId = nettyProxyMsg.visitorId(); Bootstrap b = new Bootstrap(); Socks5AddressType socks5AddressType = nettySocketChannelContext.getSocks5AddressType(); @@ -48,25 +49,30 @@ public class NettySocketProtocolHandleSocketLocalProxyTypeAdvanced .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new NettySocketBackendHandler(channelHandlerContext)); + ch.pipeline().addLast(new NettySocketBackendHandler()); } }); log.info("准备连接目标服务器,ip={},port={}", host, port); ChannelFuture f = b.connect(new InetSocketAddress(host, port)); f.addListener((ChannelFutureListener) future -> { + Channel realChannel = future.channel(); if (future.isSuccess()) { log.info("目标服务器连接成功"); + // 绑定next通道 + ChannelAttributeKeyUtils.buildNextChannel(realChannel, proxyChannel); + ChannelAttributeKeyUtils.buildNextChannel(proxyChannel, realChannel); + ChannelAttributeKeyUtils.buildVisitorId(realChannel, visitorId); //添加客户端转发请求到服务端的Handler - channel.pipeline().addLast(new NettyProxy2RealInboundHandler(future)); + proxyChannel.pipeline().addLast(new NettyProxy2RealInboundHandler()); DefaultSocks5CommandResponse commandResponse = new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, socks5AddressType); - channel.writeAndFlush(commandResponse); - channel.pipeline().remove(NettySocks5CommandRequestHandler.class); - channel.pipeline().remove(Socks5CommandRequestDecoder.class); + proxyChannel.writeAndFlush(commandResponse); + proxyChannel.pipeline().remove(NettySocks5CommandRequestHandler.class); + proxyChannel.pipeline().remove(Socks5CommandRequestDecoder.class); } else { log.error("连接目标服务器失败,address={},port={}", host, port); DefaultSocks5CommandResponse commandResponse = new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, socks5AddressType); - channel.writeAndFlush(commandResponse); - future.channel().close(); + proxyChannel.writeAndFlush(commandResponse); + realChannel.close(); } }); diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyProxy2RealInboundHandler.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyProxy2RealInboundHandler.java index 96fcace4..0a72b743 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyProxy2RealInboundHandler.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyProxy2RealInboundHandler.java @@ -2,22 +2,18 @@ package org.framework.lazy.cloud.network.heartbeat.protocol.handler; import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.*; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +/** + * 代理,真实通道发送数据 + */ @Slf4j public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter { - private final ChannelFuture dstChannelFuture; - - public NettyProxy2RealInboundHandler(ChannelFuture dstChannelFuture) { - this.dstChannelFuture = dstChannelFuture; - } @Override public void channelActive(ChannelHandlerContext ctx) { @@ -27,8 +23,9 @@ public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("本地转发客户端的请求到代理服务器"); - if (dstChannelFuture.channel().isActive()) { - dstChannelFuture.channel().writeAndFlush(msg); + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + if (nextChannel.isActive()) { + nextChannel.writeAndFlush(msg); } else { log.info("释放内存"); ReferenceCountUtil.release(msg); @@ -38,7 +35,8 @@ public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.debug("客户端与代理服务器的连接已经断开,即将断开代理服务器和目标服务器的连接"); - if (dstChannelFuture.channel().isActive()) { + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + if (nextChannel.isActive()) { if (ctx.channel().isActive()) { ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettySocketBackendHandler.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettySocketBackendHandler.java index f1f3faf0..78fb0bd1 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettySocketBackendHandler.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettySocketBackendHandler.java @@ -1,21 +1,18 @@ package org.framework.lazy.cloud.network.heartbeat.protocol.handler; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; @Slf4j public class NettySocketBackendHandler extends ChannelInboundHandlerAdapter { - private final ChannelHandlerContext clientChannelHandlerContext; - - public NettySocketBackendHandler(ChannelHandlerContext clientChannelHandlerContext) { - this.clientChannelHandlerContext = clientChannelHandlerContext; - } @Override public void channelActive(ChannelHandlerContext ctx) { @@ -25,8 +22,9 @@ public class NettySocketBackendHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.trace("开始写回客户端数据"); - if (clientChannelHandlerContext.channel().isActive()) { - clientChannelHandlerContext.writeAndFlush(msg); + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + if (nextChannel.isActive()) { + nextChannel.writeAndFlush(msg); } else { log.info("释放内存"); ReferenceCountUtil.release(msg); @@ -36,8 +34,9 @@ public class NettySocketBackendHandler extends ChannelInboundHandlerAdapter { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.trace("代理服务器和目标服务器的连接已经断开,即将断开客户端和代理服务器的连接"); - if (clientChannelHandlerContext.channel().isActive()) { - clientChannelHandlerContext.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + if (nextChannel.isActive()) { + nextChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } } diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettySocks5CommandRequestHandler.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettySocks5CommandRequestHandler.java index 087488a0..4475ff53 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettySocks5CommandRequestHandler.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettySocks5CommandRequestHandler.java @@ -56,6 +56,9 @@ public class NettySocks5CommandRequestHandler extends SimpleChannelInboundHandle int originPort = request.dstPort(); Socks5AddressType socks5AddressType = request.dstAddrType(); String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); + + ChannelAttributeKeyUtils.buildTargetIp(ctx.channel(),originHost); + ChannelAttributeKeyUtils.buildTargetPort(ctx.channel(),originPort); NettyProxyMsg proxyMsg = new NettyProxyMsg(); proxyMsg.setVisitorId(visitorId);