【fix】

This commit is contained in:
bug-fix 2024-10-31 20:48:38 +08:00
parent 810579178f
commit aa056dbe07
8 changed files with 48 additions and 37 deletions

View File

@ -4,6 +4,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateClientTransferHandler; import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateClientTransferHandler;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder; import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder;
@ -35,6 +36,9 @@ public class NettyClientPermeateClientTransferFilter extends DebugChannelInitial
// // 解码编码 // // 解码编码
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); // pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettMsgEncoder()); // pipeline.addLast(new NettMsgEncoder());
pipeline.addLast(new IdleStateHandler(0, 4, 0));
pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new NettyProxyMsgEncoder()); pipeline.addLast(new NettyProxyMsgEncoder());
pipeline.addLast(new NettyClientPermeateClientTransferHandler(channelTypeAdapter)); pipeline.addLast(new NettyClientPermeateClientTransferHandler(channelTypeAdapter));

View File

@ -4,6 +4,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateClientTransferRealHandler; import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateClientTransferRealHandler;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder; import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder;
@ -35,6 +36,9 @@ public class NettyClientPermeateClientTransferRealFilter extends DebugChannelIni
// // 解码编码 // // 解码编码
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); // pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettMsgEncoder()); // pipeline.addLast(new NettMsgEncoder());
pipeline.addLast(new IdleStateHandler(0, 4, 0));
pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new NettyProxyMsgEncoder()); pipeline.addLast(new NettyProxyMsgEncoder());
pipeline.addLast(new NettyClientPermeateClientTransferRealHandler(channelTypeAdapter)); pipeline.addLast(new NettyClientPermeateClientTransferRealHandler(channelTypeAdapter));

View File

@ -36,6 +36,7 @@ public class NettyClientPermeateServerTransferFilter extends DebugChannelInitial
// // 解码编码 // // 解码编码
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); // pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettMsgEncoder()); // pipeline.addLast(new NettMsgEncoder());
pipeline.addLast(new IdleStateHandler(0, 4, 0));
pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new IdleStateHandler(0, 4, 0)); pipeline.addLast(new IdleStateHandler(0, 4, 0));
pipeline.addLast(new NettyProxyMsgEncoder()); pipeline.addLast(new NettyProxyMsgEncoder());

View File

@ -42,6 +42,7 @@ public class NettyClientPermeateClientTransferRealHandler extends SimpleChannelI
String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel()); String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel());
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
log.warn("close client permeate client transfer real clientId:{} visitorId:{}", clientId, visitorId);
// 关闭访客 // 关闭访客
if (nextChannel != null) { if (nextChannel != null) {

View File

@ -18,6 +18,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext; import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.util.Objects;
import java.util.UUID; import java.util.UUID;
@Slf4j @Slf4j
@ -54,7 +55,7 @@ public class NettyClientPermeateClientVisitorHandler extends SimpleChannelInbou
// Channel transferChannel = nettyChannelPool.availableChannel(visitorId); // Channel transferChannel = nettyChannelPool.availableChannel(visitorId);
// 创建访客连接客户端通道 // 创建访客连接客户端通道
NettyClientPermeateClientVisitorTransferSocket.buildTransferServer(internalNetworkClientPermeateClientVisitor,visitorChannel); NettyClientPermeateClientVisitorTransferSocket.buildTransferServer(internalNetworkClientPermeateClientVisitor,visitorChannel);
log.info("客户端渗透客户端访客端口连接成功了"); log.info("客户端渗透客户端访客:【{}】端口连接成功了",visitorId);
super.channelActive(ctx); super.channelActive(ctx);
} }
@ -108,7 +109,7 @@ public class NettyClientPermeateClientVisitorHandler extends SimpleChannelInbou
// 关闭 访客通信通道访客真实通道 // 关闭 访客通信通道访客真实通道
NettyRealIdContext.clear(visitorId); NettyRealIdContext.clear(visitorId);
NettyCommunicationIdContext.clear(visitorId); NettyCommunicationIdContext.clear(visitorId);
log.warn("【客户端渗透客户端】访客端口断开连接"); log.warn("【客户端渗透客户端】访客:【{}】端口断开连接",visitorId);
super.channelInactive(ctx); super.channelInactive(ctx);
} }

View File

@ -57,7 +57,7 @@ public class NettyClientPermeateClientRealSocket {
Channel realChannel = future.channel(); Channel realChannel = future.channel();
realChannel.config().setOption(ChannelOption.AUTO_READ, false); realChannel.config().setOption(ChannelOption.AUTO_READ, false);
log.info("访客通过 客户端:【{}】,绑定本地服务,IP:{},端口:{} 新建通道成功", clientId, clientTargetIp, clientTargetPort); log.info("访客通过 客户端:【{}】,visitorId:{},绑定本地服务,IP:{},端口:{} 新建通道成功", clientId,visitorId, clientTargetIp, clientTargetPort);
// 客户端真实通道 // 客户端真实通道
NettyRealIdContext.pushReal(realChannel, visitorId); NettyRealIdContext.pushReal(realChannel, visitorId);
// 绑定访客ID到当前真实通道属性 // 绑定访客ID到当前真实通道属性
@ -126,11 +126,9 @@ public class NettyClientPermeateClientRealSocket {
// 客户端新建访客通道 连接服务端IP:{},连接服务端端口:{} // 客户端新建访客通道 连接服务端IP:{},连接服务端端口:{}
log.info("Client creates a new visitor channel to connect to server IP: {}, connecting to server port: {}", inetHost, inetPort); log.info("client creates a new visitor channel to connect to server IP: {}, connecting to server port: {} with visitorId:{} & clientId:{}", inetHost, inetPort,visitorId,clientId);
ChannelFuture future = bootstrap.connect(inetHost, inetPort); ChannelFuture future = bootstrap.connect(inetHost, inetPort);
// 使用的客户端ID:{}
log.info("Client ID used: {}", clientId);
future.addListener((ChannelFutureListener) futureListener -> { future.addListener((ChannelFutureListener) futureListener -> {
Channel transferChannel = futureListener.channel(); Channel transferChannel = futureListener.channel();
if (futureListener.isSuccess()) { if (futureListener.isSuccess()) {

View File

@ -10,9 +10,7 @@ import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientPrope
import org.framework.lazy.cloud.network.heartbeat.client.netty.InternalNetworkClientPermeateClientVisitor; import org.framework.lazy.cloud.network.heartbeat.client.netty.InternalNetworkClientPermeateClientVisitor;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientPermeateClientTransferFilter; import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientPermeateClientTransferFilter;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType; import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyCommunicationIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
@ -28,7 +26,7 @@ public class NettyClientPermeateClientVisitorTransferSocket {
/** /**
* 连接服务端通信通道 * 连接服务端通信通道
*/ */
public static void buildTransferServer(InternalNetworkClientPermeateClientVisitor internalNetworkClientPermeateClientVisitor,Channel visitorChannel) { public static void buildTransferServer(InternalNetworkClientPermeateClientVisitor internalNetworkClientPermeateClientVisitor, Channel visitorChannel) {
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
@ -62,25 +60,24 @@ public class NettyClientPermeateClientVisitorTransferSocket {
String toClientId = internalNetworkClientPermeateClientVisitor.getToClientId(); String toClientId = internalNetworkClientPermeateClientVisitor.getToClientId();
// 客户端新建访客通道 连接服务端IP:{},连接服务端端口:{} // 客户端新建访客通道 连接服务端IP:{},连接服务端端口:{}
log.info("Client creates a new visitor channel to connect to server IP: {}, connecting to server port: {}", inetHost, inetPort); log.info("Client creates a new visitor channel to connect to server IP: {}, connecting to server port: {} with clientId:【{}】 toClientId:【{}】 & visitorId:【{}】", inetHost, inetPort, clientId, toClientId, visitorId);
ChannelFuture future = bootstrap.connect(inetHost, inetPort); ChannelFuture future = bootstrap.connect(inetHost, inetPort);
// 使用的客户端ID:{} // 使用的客户端ID:{}
log.info("Client ID used: {}", clientId);
future.addListener((ChannelFutureListener) futureListener -> { future.addListener((ChannelFutureListener) futureListener -> {
Channel transferChannel = futureListener.channel(); Channel transferChannel = futureListener.channel();
if (futureListener.isSuccess()) { if (futureListener.isSuccess()) {
NettyProxyMsg myMsg = new NettyProxyMsg(); NettyProxyMsg nettyProxyMsg = new NettyProxyMsg();
myMsg.setType(MessageType.REPORT_CLIENT_TRANSFER_CLIENT_PERMEATE_CHANNEL_CONNECTION_SUCCESSFUL); nettyProxyMsg.setType(MessageType.REPORT_CLIENT_TRANSFER_CLIENT_PERMEATE_CHANNEL_CONNECTION_SUCCESSFUL);
// other clientId // other clientId
myMsg.setClientId(toClientId); nettyProxyMsg.setClientId(toClientId);
myMsg.setVisitorPort(visitorPort); nettyProxyMsg.setVisitorPort(visitorPort);
myMsg.setClientTargetIp(targetIp); nettyProxyMsg.setClientTargetIp(targetIp);
myMsg.setClientTargetPort(targetPort); nettyProxyMsg.setClientTargetPort(targetPort);
myMsg.setVisitorId(visitorId); nettyProxyMsg.setVisitorId(visitorId);
transferChannel.writeAndFlush(myMsg); transferChannel.writeAndFlush(nettyProxyMsg);
// 绑定客户端真实通信通道 // 绑定客户端真实通信通道
ChannelAttributeKeyUtils.buildVisitorId(transferChannel, visitorId); ChannelAttributeKeyUtils.buildVisitorId(transferChannel, visitorId);
ChannelAttributeKeyUtils.buildClientId(transferChannel, clientId); ChannelAttributeKeyUtils.buildClientId(transferChannel, clientId);
@ -93,7 +90,7 @@ public class NettyClientPermeateClientVisitorTransferSocket {
log.info("无法连接到服务端...."); log.info("无法连接到服务端....");
eventLoopGroup.schedule(() -> { eventLoopGroup.schedule(() -> {
try { try {
buildTransferServer(internalNetworkClientPermeateClientVisitor,visitorChannel); buildTransferServer(internalNetworkClientPermeateClientVisitor, visitorChannel);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -26,6 +26,10 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
* 空闲次数 * 空闲次数
*/ */
private int idle_count = 1; private int idle_count = 1;
/**
* 传出数据延迟次数* 心跳时间作为关闭时间
*/
private int transfer_count = 1;
public NettyServerHandler(ChannelTypeAdapter channelTypeAdapter) { public NettyServerHandler(ChannelTypeAdapter channelTypeAdapter) {
this.channelTypeAdapter = channelTypeAdapter; this.channelTypeAdapter = channelTypeAdapter;
@ -60,21 +64,22 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态说明没有接收到心跳命令 if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态说明没有接收到心跳命令
String clientId = ChannelAttributeKeyUtils.getClientId(channel); String clientId = ChannelAttributeKeyUtils.getClientId(channel);
String visitorId = ChannelAttributeKeyUtils.getVisitorId(channel); String visitorId = ChannelAttributeKeyUtils.getVisitorId(channel);
// 已经5秒没有接收到客户端{}的信息了
log.warn("I haven't received any information from client: {} for 5 seconds", clientId);
if (idle_count > 2) {
if (ObjectUtils.isEmpty(visitorId)) { if (ObjectUtils.isEmpty(visitorId)) {
// 关闭这个不活跃的channel client:{} // 已经5秒没有接收到客户端{}的信息了
log.warn("close this inactive channel client:{} with no visitor", clientId); log.warn("I haven't received any information from client: {} with channel:{} for 5 seconds", clientId, channel.id().toString());
// 给所有客户端发送 这个客户端离线了 // 关闭这个不活跃的channel client:{}
NettyProxyMsg nettyMsg = new NettyProxyMsg(); log.warn("close this inactive channel client:{} with no visitor", clientId);
nettyMsg.setClientId(clientId); // 给所有客户端发送 这个客户端离线了
nettyMsg.setType(MessageType.REPORT_CLIENT_DISCONNECTION); NettyProxyMsg nettyMsg = new NettyProxyMsg();
channelTypeAdapter.handler(channel, nettyMsg); nettyMsg.setClientId(clientId);
channel.close(); nettyMsg.setType(MessageType.REPORT_CLIENT_DISCONNECTION);
} else { channelTypeAdapter.handler(channel, nettyMsg);
log.warn("close client:{} visitor: [{}]'s connection",clientId, visitorId); channel.close();
} else {
// 访客通道数据 5*100秒后关闭
if (transfer_count > 100) {
log.warn("close client:{} visitor: [{}]'s connection", clientId, visitorId);
NettyCommunicationIdContext.clear(visitorId); NettyCommunicationIdContext.clear(visitorId);
NettyRealIdContext.clear(visitorId); NettyRealIdContext.clear(visitorId);
// 关闭通信通道 // 关闭通信通道
@ -84,9 +89,9 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
nextChannel.close(); nextChannel.close();
transferNextChannel.close(); transferNextChannel.close();
} }
transfer_count++;
} }
idle_count++;
} }
} else { } else {
super.userEventTriggered(ctx, obj); super.userEventTriggered(ctx, obj);
@ -113,7 +118,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
if (!ObjectUtils.isEmpty(visitorId)) { if (!ObjectUtils.isEmpty(visitorId)) {
// 客户端:{},断开访客的连接:{} // 客户端:{},断开访客的连接:{}
log.warn("Client: {}, disconnect with visitorId:{}", clientId, visitorId); log.warn("client: {} channel:{}, disconnect with visitorId:{}", clientId, channel.id().toString(), visitorId);
// 访客通道 关闭访客通道 // 访客通道 关闭访客通道
NettyCommunicationIdContext.clear(visitorId); NettyCommunicationIdContext.clear(visitorId);
// 关闭通信通道 // 关闭通信通道