[fix] 统一使用next 通道进行数据处理

This commit is contained in:
wujiawei 2024-09-18 22:33:24 +08:00
parent 138752e56d
commit 37d9065c5a
11 changed files with 56 additions and 43 deletions

View File

@ -2,6 +2,7 @@ package org.framework.lazy.cloud.network.heartbeat.client.netty;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientProperties;
import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPermeate;
@ -10,8 +11,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelT
import java.util.List;
@Builder
@Accessors(chain = true)
@NoArgsConstructor
@Data
public class InternalNetworkPermeateServerVisitor implements InternalNetworkPermeate {

View File

@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.client.AbstractHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
@Slf4j
public class ClientHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced extends AbstractHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced<NettyProxyMsg> {
@ -21,8 +22,9 @@ public class ClientHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced e
byte[] visitorId = nettyProxyMsg.getVisitorId();
// 获取访客对应的真实代理通道
Channel realChannel = NettyRealIdContext.getReal(visitorId);
if (realChannel != null) {
realChannel.config().setOption(ChannelOption.AUTO_READ, true);
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
if (nextChannel != null) {
nextChannel.config().setOption(ChannelOption.AUTO_READ, true);
}
}

View File

@ -9,6 +9,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.client.AbstractHandleDistributeChannelTransferTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.enums.MessageTypeEnums;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
/**
@ -40,8 +41,9 @@ public class ClientReportHandleChannelTransferTypeAdvancedHandleDistribute exten
byte[] clientTargetPort = nettyProxyMsg.getClientTargetPort();
byte[] visitorId = nettyProxyMsg.getVisitorId();
// 真实服务通道
Channel realChannel = NettyRealIdContext.getReal(new String(visitorId));
if (realChannel == null) {
// Channel realChannel = NettyRealIdContext.getReal(new String(visitorId));
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
if (nextChannel == null) {
log.error("无法获取访客:{} 真实服务", new String(visitorId));
return;
}
@ -51,7 +53,7 @@ public class ClientReportHandleChannelTransferTypeAdvancedHandleDistribute exten
ByteBuf buf = channel.config().getAllocator().buffer(nettyProxyMsg.getData().length);
buf.writeBytes(nettyProxyMsg.getData());
realChannel.writeAndFlush(buf);
nextChannel.writeAndFlush(buf);
}

View File

@ -233,15 +233,14 @@ public class NettyClientPermeateServerVisitorSocket {
if (visitorPort == null) {
throw new IllegalArgumentException("visitorPort must not null");
}
InternalNetworkPermeateServerVisitor internalNetworkPermeateServerVisitor = InternalNetworkPermeateServerVisitor
.builder()
.targetIp(clientTargetIp)
.targetPort(clientTargetPort)
.visitorPort(visitorPort)
.build();
InternalNetworkPermeateServerVisitor internalNetworkPermeateServerVisitor = new InternalNetworkPermeateServerVisitor();
internalNetworkPermeateServerVisitor.setTargetIp(clientTargetIp);
internalNetworkPermeateServerVisitor.setTargetPort(clientTargetPort);
internalNetworkPermeateServerVisitor.setVisitorPort(visitorPort);
NettyClientPermeateServerVisitorFilter visitorFilter = new NettyClientPermeateServerVisitorFilter();
NettyClientPermeateServerVisitorFilter visitorFilter = new NettyClientPermeateServerVisitorFilter(internalNetworkPermeateServerVisitor);
return new NettyClientPermeateServerVisitorSocket(visitorFilter, clientId, visitorPort);
}

View File

@ -10,6 +10,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportHandleChannelTransferTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerChannelFlow;
import org.springframework.stereotype.Component;
@ -45,10 +46,11 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac
// log.debug("访客ID:【{}】接收到客户端:[{}] 返回数据大小:[{}] 内网穿透返回的数据:[{}]", new String(visitorId), clientId, msg.getData().length, new String(msg.getData()));
// 将数据转发访客通道
Channel visitor = NettyRealIdContext.getReal(visitorId);
if (visitor != null) {
ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length);
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
if (nextChannel != null) {
ByteBuf buf = nextChannel.config().getAllocator().buffer(msg.getData().length);
buf.writeBytes(msg.getData());
ChannelFuture channelFuture = visitor.writeAndFlush(buf);
ChannelFuture channelFuture = nextChannel.writeAndFlush(buf);
boolean success = channelFuture.isSuccess();
log.debug("visitor writerAndFlush status: {}", success);
// 记录出口数据

View File

@ -10,6 +10,9 @@ import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportSingleClientRealConnectTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
/**
* 服务端渗透客户端通信通道
*/
@Slf4j
@Component
public class ServerHandleReportSingleClientRealConnectTypeAdvanced extends AbstractHandleReportSingleClientRealConnectTypeAdvanced<NettyProxyMsg> {
@ -38,6 +41,9 @@ public class ServerHandleReportSingleClientRealConnectTypeAdvanced extends Abstr
Channel visitorRealChannel = NettyRealIdContext.getReal(new String(visitorId));
visitorRealChannel.config().setOption(ChannelOption.AUTO_READ, true);
ChannelAttributeKeyUtils.buildNextChannel(channel, visitorRealChannel);
ChannelAttributeKeyUtils.buildNextChannel(visitorRealChannel, channel);
// 或许此处还应该通知服务端 这个访客绑定的客户端真实通道打开
// 下发客户端 真实通道自动读写开启

View File

@ -8,13 +8,13 @@ import io.netty.channel.socket.SocketChannel;
import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrationRealClient;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.VisitorHandler;
import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.NettyServerPermeateClientVisitorHandler;
public class VisitorFilter extends DebugChannelInitializer<SocketChannel> {
public class NettyServerPermeateClientVisitorFilter extends DebugChannelInitializer<SocketChannel> {
private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient;
private final ChannelFlowAdapter channelFlowAdapter;
public VisitorFilter(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) {
public NettyServerPermeateClientVisitorFilter(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) {
this.internalNetworkPenetrationRealClient = internalNetworkPenetrationRealClient;
this.channelFlowAdapter = channelFlowAdapter;
}
@ -32,6 +32,6 @@ public class VisitorFilter extends DebugChannelInitializer<SocketChannel> {
protected void initChannel0(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelDuplexHandler());
pipeline.addLast(new VisitorHandler(internalNetworkPenetrationRealClient, channelFlowAdapter));
pipeline.addLast(new NettyServerPermeateClientVisitorHandler(internalNetworkPenetrationRealClient, channelFlowAdapter));
}
}

View File

@ -5,12 +5,10 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrationRealClient;
import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPermeateRealServer;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.PermeateVisitorHandler;
import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.VisitorHandler;
public class PermeateVisitorFilter extends DebugChannelInitializer<SocketChannel> {
private final InternalNetworkPermeateRealServer internalNetworkPermeateRealServer;

View File

@ -18,12 +18,12 @@ import org.wu.framework.core.utils.ObjectUtils;
import java.util.UUID;
@Slf4j
public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
public class NettyServerPermeateClientVisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient;
private final ChannelFlowAdapter channelFlowAdapter;// 流量适配器
// private final NettyChannelPool nettyChannelPool = new DefaultNettyChannelPool(10);
public VisitorHandler(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) {
public NettyServerPermeateClientVisitorHandler(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) {
this.internalNetworkPenetrationRealClient = internalNetworkPenetrationRealClient;
this.channelFlowAdapter = channelFlowAdapter;
}
@ -97,8 +97,9 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 使用访客的通信通道
Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId);
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(visitorChannel);
// 绑定数据流量
ChannelAttributeKeyUtils.buildInFlow(visitorCommunicationChannel, bytes.length);
ChannelAttributeKeyUtils.buildInFlow(nextChannel, bytes.length);
NettyProxyMsg nettyProxyMsg = new NettyProxyMsg();
nettyProxyMsg.setType(MessageType.DISTRIBUTE_CLIENT_TRANSFER);
nettyProxyMsg.setClientId(clientId);
@ -107,7 +108,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
nettyProxyMsg.setVisitorPort(visitorPort);
nettyProxyMsg.setVisitorId(visitorId);
nettyProxyMsg.setData(bytes);
visitorCommunicationChannel.writeAndFlush(nettyProxyMsg);
nextChannel.writeAndFlush(nettyProxyMsg);
// 处理访客流量
ServerChannelFlow serverChannelFlow = ServerChannelFlow
.builder()
@ -129,16 +130,17 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
return;
}
// 通信通道自动读写打开 然后关闭通信通道
Channel visitorChannel = NettyCommunicationIdContext.getVisitor(visitorId);
if (visitorChannel != null && visitorChannel.isActive()) {
// Channel visitorChannel = NettyCommunicationIdContext.getVisitor(visitorId);
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (nextChannel != null && nextChannel.isActive()) {
visitorChannel.config().setOption(ChannelOption.AUTO_READ, true);
nextChannel.config().setOption(ChannelOption.AUTO_READ, true);
// 通知服务端 关闭访问通道真实通道
NettyProxyMsg myMsg = new NettyProxyMsg();
myMsg.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR);
myMsg.setVisitorId(visitorId);
visitorChannel.writeAndFlush(myMsg);
nextChannel.writeAndFlush(myMsg);
}
// 关闭 访客通信通道访客真实通道
NettyRealIdContext.clear(visitorId);
@ -158,9 +160,10 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
}
Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId);
if (visitorCommunicationChannel != null) {
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (nextChannel != null) {
log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable());
visitorCommunicationChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
nextChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
}
// Channel visitorChannel = ctx.channel();
// String vid = visitorChannel.attr(Constant.VID).get();
@ -191,13 +194,14 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
String visitorId = ChannelAttributeKeyUtils.getVisitorId(channel);
// 使用通信通道 下发关闭访客
Channel visitorChannel = NettyCommunicationIdContext.getVisitor(visitorId);
if (visitorChannel != null) {
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (nextChannel != null) {
// 下发关闭访客
NettyProxyMsg closeRealClient = new NettyProxyMsg();
closeRealClient.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ);
closeRealClient.setClientId(clientId);
closeRealClient.setVisitorId(visitorId);
visitorChannel.writeAndFlush(closeRealClient);
nextChannel.writeAndFlush(closeRealClient);
}
ctx.close();

View File

@ -10,7 +10,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrat
import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyVisitorPortContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.VisitorFilter;
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.NettyServerPermeateClientVisitorFilter;
import java.io.IOException;
@ -24,14 +24,14 @@ import java.io.IOException;
public class NettyServerPermeateClientVisitorSocket {
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final VisitorFilter visitorFilter;
private final NettyServerPermeateClientVisitorFilter nettyServerPermeateClientVisitorFilter;
@Getter
private final String clientId;
@Getter
private final int visitorPort;
public NettyServerPermeateClientVisitorSocket(VisitorFilter visitorFilter, String clientId, int visitorPort) {
this.visitorFilter = visitorFilter;
public NettyServerPermeateClientVisitorSocket(NettyServerPermeateClientVisitorFilter nettyServerPermeateClientVisitorFilter, String clientId, int visitorPort) {
this.nettyServerPermeateClientVisitorFilter = nettyServerPermeateClientVisitorFilter;
this.clientId = clientId;
this.visitorPort = visitorPort;
}
@ -64,7 +64,7 @@ public class NettyServerPermeateClientVisitorSocket {
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
.childHandler(visitorFilter);
.childHandler(nettyServerPermeateClientVisitorFilter);
ChannelFuture sync = bootstrap.bind(visitorPort).sync();
sync.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
@ -229,8 +229,8 @@ public class NettyServerPermeateClientVisitorSocket {
.visitorPort(visitorPort)
.visitorId(visitorId).build();
VisitorFilter visitorFilter = new VisitorFilter(internalNetworkPenetrationRealClient, channelFlowAdapter);
return new NettyServerPermeateClientVisitorSocket(visitorFilter, clientId, visitorPort);
NettyServerPermeateClientVisitorFilter nettyServerPermeateClientVisitorFilter = new NettyServerPermeateClientVisitorFilter(internalNetworkPenetrationRealClient, channelFlowAdapter);
return new NettyServerPermeateClientVisitorSocket(nettyServerPermeateClientVisitorFilter, clientId, visitorPort);
}

View File

@ -7,7 +7,7 @@ spring:
inet-host: 127.0.0.1
inet-port: 7001
inet-path: wu-lazy-cloud-heartbeat-server
client-id: wujiawei-acw # 客户端ID
client-id: wujiawei # 客户端ID
# inet-host: 124.222.48.62 # 服务端地址
# inet-port: 30676 #服务端端口
# # inet-path: wu-lazy-cloud-heartbeat-server