【fix】 本地socket代理通道添加属性。目标ip、目标端口、访客ID

This commit is contained in:
wujiawei
2025-07-12 12:07:40 +08:00
parent f2fede4a96
commit 7178974abb
4 changed files with 36 additions and 30 deletions

View File

@@ -14,6 +14,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyP
import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettySocketChannelContext;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.AbstractHandleSocketLocalProxyTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.factory.EventLoopGroupFactory;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettyProxy2RealInboundHandler;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettySocketBackendHandler;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettySocks5CommandRequestHandler;
@@ -34,11 +35,11 @@ public class NettySocketProtocolHandleSocketLocalProxyTypeAdvanced
@Override
protected void doHandler(NettyChannelContext nettyChannelContext, NettyProxyMsg nettyProxyMsg) {
NettySocketChannelContext nettySocketChannelContext = (NettySocketChannelContext) nettyChannelContext;
Channel channel = nettySocketChannelContext.channel();
ChannelHandlerContext channelHandlerContext = nettySocketChannelContext.channelHandlerContext();
Channel proxyChannel = nettySocketChannelContext.channel();
EventLoopGroup group = EventLoopGroupFactory.createClientWorkGroup();
String host = nettyProxyMsg.targetIp();
Integer port = Integer.parseInt(nettyProxyMsg.targetPort());
String visitorId = nettyProxyMsg.visitorId();
Bootstrap b = new Bootstrap();
Socks5AddressType socks5AddressType = nettySocketChannelContext.getSocks5AddressType();
@@ -48,25 +49,30 @@ public class NettySocketProtocolHandleSocketLocalProxyTypeAdvanced
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettySocketBackendHandler(channelHandlerContext));
ch.pipeline().addLast(new NettySocketBackendHandler());
}
});
log.info("准备连接目标服务器ip={},port={}", host, port);
ChannelFuture f = b.connect(new InetSocketAddress(host, port));
f.addListener((ChannelFutureListener) future -> {
Channel realChannel = future.channel();
if (future.isSuccess()) {
log.info("目标服务器连接成功");
// 绑定next通道
ChannelAttributeKeyUtils.buildNextChannel(realChannel, proxyChannel);
ChannelAttributeKeyUtils.buildNextChannel(proxyChannel, realChannel);
ChannelAttributeKeyUtils.buildVisitorId(realChannel, visitorId);
//添加客户端转发请求到服务端的Handler
channel.pipeline().addLast(new NettyProxy2RealInboundHandler(future));
proxyChannel.pipeline().addLast(new NettyProxy2RealInboundHandler());
DefaultSocks5CommandResponse commandResponse = new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, socks5AddressType);
channel.writeAndFlush(commandResponse);
channel.pipeline().remove(NettySocks5CommandRequestHandler.class);
channel.pipeline().remove(Socks5CommandRequestDecoder.class);
proxyChannel.writeAndFlush(commandResponse);
proxyChannel.pipeline().remove(NettySocks5CommandRequestHandler.class);
proxyChannel.pipeline().remove(Socks5CommandRequestDecoder.class);
} else {
log.error("连接目标服务器失败,address={},port={}", host, port);
DefaultSocks5CommandResponse commandResponse = new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, socks5AddressType);
channel.writeAndFlush(commandResponse);
future.channel().close();
proxyChannel.writeAndFlush(commandResponse);
realChannel.close();
}
});

View File

@@ -2,22 +2,18 @@ package org.framework.lazy.cloud.network.heartbeat.protocol.handler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.*;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
/**
* 代理,真实通道发送数据
*/
@Slf4j
public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter {
private final ChannelFuture dstChannelFuture;
public NettyProxy2RealInboundHandler(ChannelFuture dstChannelFuture) {
this.dstChannelFuture = dstChannelFuture;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
@@ -27,8 +23,9 @@ public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("本地转发客户端的请求到代理服务器");
if (dstChannelFuture.channel().isActive()) {
dstChannelFuture.channel().writeAndFlush(msg);
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (nextChannel.isActive()) {
nextChannel.writeAndFlush(msg);
} else {
log.info("释放内存");
ReferenceCountUtil.release(msg);
@@ -38,7 +35,8 @@ public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("客户端与代理服务器的连接已经断开,即将断开代理服务器和目标服务器的连接");
if (dstChannelFuture.channel().isActive()) {
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (nextChannel.isActive()) {
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

View File

@@ -1,21 +1,18 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.handler;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
@Slf4j
public class NettySocketBackendHandler extends ChannelInboundHandlerAdapter {
private final ChannelHandlerContext clientChannelHandlerContext;
public NettySocketBackendHandler(ChannelHandlerContext clientChannelHandlerContext) {
this.clientChannelHandlerContext = clientChannelHandlerContext;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
@@ -25,8 +22,9 @@ public class NettySocketBackendHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.trace("开始写回客户端数据");
if (clientChannelHandlerContext.channel().isActive()) {
clientChannelHandlerContext.writeAndFlush(msg);
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (nextChannel.isActive()) {
nextChannel.writeAndFlush(msg);
} else {
log.info("释放内存");
ReferenceCountUtil.release(msg);
@@ -36,8 +34,9 @@ public class NettySocketBackendHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.trace("代理服务器和目标服务器的连接已经断开,即将断开客户端和代理服务器的连接");
if (clientChannelHandlerContext.channel().isActive()) {
clientChannelHandlerContext.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (nextChannel.isActive()) {
nextChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}

View File

@@ -56,6 +56,9 @@ public class NettySocks5CommandRequestHandler extends SimpleChannelInboundHandle
int originPort = request.dstPort();
Socks5AddressType socks5AddressType = request.dstAddrType();
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
ChannelAttributeKeyUtils.buildTargetIp(ctx.channel(),originHost);
ChannelAttributeKeyUtils.buildTargetPort(ctx.channel(),originPort);
NettyProxyMsg proxyMsg = new NettyProxyMsg();
proxyMsg.setVisitorId(visitorId);