[fix] 通信通道统一绑定客户端ID,访客ID

This commit is contained in:
wujiawei 2024-09-24 20:19:43 +08:00
parent 576498afc7
commit 9784cc65b6
11 changed files with 51 additions and 37 deletions

View File

@ -11,6 +11,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.client.Abstrac
import org.framework.lazy.cloud.network.heartbeat.common.enums.MessageTypeEnums;
import org.wu.framework.spring.utils.SpringContextHolder;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@ -45,7 +46,7 @@ public class ClientHandleDistributeClientTransferClientPermeateChannelConnection
String visitorId=new String(msgVisitorId);
Integer visitorPort=Integer.parseInt(new String(msgVisitorPort));
NettyClientProperties nettyClientProperties = SpringContextHolder.getBean(NettyClientProperties.class);
List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList = SpringContextHolder.getApplicationContext().getBeansOfType(HandleChannelTypeAdvanced.class).values().stream().collect(Collectors.toList());
List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList = new ArrayList<>(SpringContextHolder.getApplicationContext().getBeansOfType(HandleChannelTypeAdvanced.class).values());
NettyClientPermeateClientRealSocket.buildRealServer(
clientId,
clientTargetIp,

View File

@ -21,7 +21,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeK
import java.util.UUID;
@Slf4j
public class NettyClientPermeateClientVisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
public class NettyClientPermeateClientVisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final InternalNetworkClientPermeateClientVisitor internalNetworkClientPermeateClientVisitor;
// private final NettyChannelPool nettyChannelPool = new DefaultNettyChannelPool(10);
@ -50,11 +50,10 @@ public class NettyClientPermeateClientVisitorHandler extends SimpleChannelInboun
// 当前通道绑定访客ID
ChannelAttributeKeyUtils.buildVisitorId(visitorChannel, visitorId);
internalNetworkClientPermeateClientVisitor.setVisitorId(visitorId);
// 判断是否有可用的通道 如果没有创建新的通道
// Channel transferChannel = nettyChannelPool.availableChannel(visitorId);
// 创建访客连接客户端通道
NettyClientPermeateClientVisitorTransferSocket.buildTransferServer(internalNetworkClientPermeateClientVisitor);
NettyClientPermeateClientVisitorTransferSocket.buildTransferServer(internalNetworkClientPermeateClientVisitor,visitorChannel);
log.info("客户端渗透客户端访客端口连接成功了");
super.channelActive(ctx);
}

View File

@ -28,18 +28,9 @@ public class NettyClientPermeateClientVisitorTransferSocket {
/**
* 连接服务端通信通道
*/
public static void buildTransferServer(InternalNetworkClientPermeateClientVisitor internalNetworkClientPermeateClientVisitor) {
newTransferConnect2Server(internalNetworkClientPermeateClientVisitor);
}
public static void buildTransferServer(InternalNetworkClientPermeateClientVisitor internalNetworkClientPermeateClientVisitor,Channel visitorChannel) {
/**
* 连接服务端通信通道
* <p>
* internalNetworkPermeateServerVisitor
*/
protected static void newTransferConnect2Server(InternalNetworkClientPermeateClientVisitor internalNetworkClientPermeateClientVisitor) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
@ -66,7 +57,7 @@ public class NettyClientPermeateClientVisitorTransferSocket {
String targetIp = internalNetworkClientPermeateClientVisitor.getTargetIp();
Integer targetPort = internalNetworkClientPermeateClientVisitor.getTargetPort();
String visitorId = internalNetworkClientPermeateClientVisitor.getVisitorId();
String visitorId = ChannelAttributeKeyUtils.getVisitorId(visitorChannel);
Integer visitorPort = internalNetworkClientPermeateClientVisitor.getVisitorPort();
String toClientId = internalNetworkClientPermeateClientVisitor.getToClientId();
@ -94,17 +85,15 @@ public class NettyClientPermeateClientVisitorTransferSocket {
ChannelAttributeKeyUtils.buildVisitorId(transferChannel, visitorId);
ChannelAttributeKeyUtils.buildClientId(transferChannel, clientId);
// 传输通道打开后自动读取
Channel visitor = NettyRealIdContext.getReal(visitorId);
ChannelAttributeKeyUtils.buildNextChannel(visitor, transferChannel);
ChannelAttributeKeyUtils.buildNextChannel(transferChannel, visitor);
ChannelAttributeKeyUtils.buildNextChannel(visitorChannel, transferChannel);
ChannelAttributeKeyUtils.buildNextChannel(transferChannel, visitorChannel);
} else {
log.info("无法连接到服务端....");
transferChannel.eventLoop().schedule(() -> {
try {
buildTransferServer(internalNetworkClientPermeateClientVisitor);
buildTransferServer(internalNetworkClientPermeateClientVisitor,visitorChannel);
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -212,6 +212,8 @@ public class MessageType {
/**
* 下发 客户端接收连接成功通知
*
@ -270,7 +272,7 @@ public class MessageType {
* @see MessageTypeEnums#DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR
* @see AbstractHandleDistributeSingleClientRealCloseVisitorTypeAdvanced
*/
public static final byte DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR = -0X08;
public static final byte DISTRIBUTE_SERVER_PERMEATE_CLIENT_REAL_CLOSE_VISITOR = -0X08;
/**
* 下发 客户端消息

View File

@ -156,7 +156,7 @@ public enum MessageTypeEnums {
/**
* @see AbstractHandleDistributeSingleClientRealCloseVisitorTypeAdvanced
*/
DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR, "下发 客户端关闭代理服务通道"),
DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR(MessageType.DISTRIBUTE_SERVER_PERMEATE_CLIENT_REAL_CLOSE_VISITOR, "下发 客户端关闭代理服务通道"),
/**
* @see AbstractHandleDistributeSingleClientMessageTypeAdvanced
*/

View File

@ -4,6 +4,7 @@ import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.*;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportClientTransferClientPermeateChannelConnectionSuccessfulTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.springframework.stereotype.Component;
/**
@ -15,19 +16,22 @@ public class ServerHandleReportClientTransferClientPermeateChannelConnectionSucc
/**
* 处理当前数据
*
* @param channel 当前通道
* @param transferChannel 当前通道
* @param nettyProxyMsg 通道数据
*/
@Override
protected void doHandler(Channel channel, NettyProxyMsg nettyProxyMsg) {
protected void doHandler(Channel transferChannel, NettyProxyMsg nettyProxyMsg) {
// 创建目标地址连接
byte[] msgVisitorId = nettyProxyMsg.getVisitorId();
byte[] msgVisitorPort = nettyProxyMsg.getVisitorPort();
byte[] msgClientTargetIp = nettyProxyMsg.getClientTargetIp();
byte[] msgClientTargetPort = nettyProxyMsg.getClientTargetPort();
byte[] clientId = nettyProxyMsg.getClientId();// 目标客户端ID
ChannelAttributeKeyUtils.buildClientId(transferChannel, clientId);
ChannelAttributeKeyUtils.buildVisitorId(transferChannel, msgVisitorId);
// 绑定访客通道
NettyTransferChannelContext.pushVisitor(channel,msgVisitorId);
NettyTransferChannelContext.pushVisitor(transferChannel,msgVisitorId);
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
NettyProxyMsg clientConnectTagetNettyProxyMsg = new NettyProxyMsg();

View File

@ -18,11 +18,11 @@ public class ServerHandleReportClientTransferClientPermeateChannelInitSuccessful
/**
* 处理当前数据
*
* @param channel 当前通道
* @param transferChannel 当前通道
* @param nettyProxyMsg 通道数据
*/
@Override
protected void doHandler(Channel channel, NettyProxyMsg nettyProxyMsg) {
protected void doHandler(Channel transferChannel, NettyProxyMsg nettyProxyMsg) {
// 创建目标地址连接
byte[] msgVisitorId = nettyProxyMsg.getVisitorId();
byte[] msgVisitorPort = nettyProxyMsg.getVisitorPort();
@ -33,11 +33,12 @@ public class ServerHandleReportClientTransferClientPermeateChannelInitSuccessful
// next translation
Channel nextTransferChannel = NettyTransferChannelContext.getVisitor(msgVisitorId);
ChannelAttributeKeyUtils.buildTransferNextChannel(nextTransferChannel,channel);
ChannelAttributeKeyUtils.buildTransferNextChannel(channel,nextTransferChannel);
NettyTransferChannelContext.clear(msgVisitorId);
ChannelAttributeKeyUtils.buildTransferNextChannel(nextTransferChannel,transferChannel);
ChannelAttributeKeyUtils.buildTransferNextChannel(transferChannel,nextTransferChannel);
ChannelAttributeKeyUtils.buildClientId(transferChannel,clientId);
ChannelAttributeKeyUtils.buildVisitorId(transferChannel,msgVisitorId);
Channel nextChannel = ChannelAttributeKeyUtils.getTransferNextChannel(channel);
Channel nextChannel = ChannelAttributeKeyUtils.getTransferNextChannel(transferChannel);
NettyProxyMsg clientConnectTagetNettyProxyMsg = new NettyProxyMsg();
clientConnectTagetNettyProxyMsg.setVisitorId(msgVisitorId);

View File

@ -3,6 +3,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.advanced;
import io.netty.channel.Channel;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportClientTransferServerPermeateChannelConnectionSuccessfulTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.server.netty.socket.NettyClientPermeateServerConnectRealSocket;
import org.springframework.stereotype.Component;
@ -24,6 +25,11 @@ public class ServerHandleReportClientTransferServerPermeateChannelConnectionSucc
byte[] msgVisitorPort = nettyProxyMsg.getVisitorPort();
byte[] msgClientTargetIp = nettyProxyMsg.getClientTargetIp();
byte[] msgClientTargetPort = nettyProxyMsg.getClientTargetPort();
byte[] msgClientId = nettyProxyMsg.getClientId();
String clientId = new String(msgClientId);
// 绑定客户端ID
ChannelAttributeKeyUtils.buildClientId(transferChannel,clientId);
ChannelAttributeKeyUtils.buildVisitorId(transferChannel,msgVisitorId);
NettyClientPermeateServerConnectRealSocket.buildNewRealServer(new String(msgVisitorId),
Integer.parseInt(new String(msgVisitorPort)),
new String(msgClientTargetIp),

View File

@ -40,7 +40,7 @@ public class ServerHandleReportDisconnectTypeAdvanced extends AbstractHandleRepo
public void doHandler(Channel deathChannel, NettyProxyMsg msg) {
// 关闭连接通知
byte[] clientIdByte = msg.getClientId();
log.warn("close client :{} channel", new String(clientIdByte));
log.warn("close client :{} channel", new String(clientIdByte));
ChannelId deathChannelId = deathChannel.id();
ChannelContext.ClientChannel deathClientChannelDTO = ChannelContext.get(clientIdByte);

View File

@ -4,13 +4,13 @@ import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.wu.framework.core.utils.ObjectUtils;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyCommunicationIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.wu.framework.core.utils.ObjectUtils;
/**
* description 服务端数据处理器
@ -66,7 +66,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
if (ObjectUtils.isEmpty(visitorId)) {
// 关闭这个不活跃的channel client:{}
log.warn("Close this inactive channel client: {}", clientId);
log.warn("Close this inactive channel client:{} with no visitor", clientId);
// 给所有客户端发送 这个客户端离线了
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setClientId(clientId);
@ -77,6 +77,12 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
log.warn("Close visitor: [{}]'s connection", visitorId);
NettyCommunicationIdContext.clear(visitorId);
NettyRealIdContext.clear(visitorId);
// 关闭通信通道
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
Channel transferNextChannel = ChannelAttributeKeyUtils.getTransferNextChannel(channel);
channel.close();
nextChannel.close();
transferNextChannel.close();
}
}
@ -107,13 +113,19 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
if (!ObjectUtils.isEmpty(visitorId)) {
// 客户端:{},断开访客的连接:{}
log.warn("Client: {}, disconnect guest: {}", clientId, visitorId);
log.warn("Client: {}, disconnect guest:{}", clientId, visitorId);
// 访客通道 关闭访客通道
NettyCommunicationIdContext.clear(visitorId);
// 关闭通信通道
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
Channel transferNextChannel = ChannelAttributeKeyUtils.getTransferNextChannel(channel);
channel.close();
nextChannel.close();
transferNextChannel.close();
super.channelInactive(ctx);
} else if (!ObjectUtils.isEmpty(clientId)) {
// 断开客户端的连接:{}
log.warn("Disconnect client: {}", clientId);
log.warn("Disconnect client:{}", clientId);
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.REPORT_CLIENT_DISCONNECTION);
nettyMsg.setClientId(clientId);

View File

@ -137,7 +137,7 @@ public class NettyServerPermeateClientVisitorHandler extends SimpleChannelInboun
// 通知服务端 关闭访问通道真实通道
NettyProxyMsg myMsg = new NettyProxyMsg();
myMsg.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR);
myMsg.setType(MessageType.DISTRIBUTE_SERVER_PERMEATE_CLIENT_REAL_CLOSE_VISITOR);
myMsg.setVisitorId(visitorId);
nextChannel.writeAndFlush(myMsg);
}