[fix] 通信通道统一绑定客户端ID,访客ID

This commit is contained in:
wujiawei 2024-09-24 21:54:53 +08:00
parent 9784cc65b6
commit 485d0602af
13 changed files with 45 additions and 63 deletions

View File

@ -28,11 +28,6 @@ public class InternalNetworkPermeateServerVisitor implements InternalNetworkPerm
* 访问端口
*/
private Integer visitorPort;
/**
* 流量适配器
*/
private ChannelFlowAdapter channelFlowAdapter;
/**
* 服务端地址信息
*/

View File

@ -7,7 +7,6 @@ import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateServerVisitorHandler;
import org.framework.lazy.cloud.network.heartbeat.client.netty.socket.NettyClientPermeateServerVisitorTransferSocket;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.client.AbstractHandleDistributeClientTransferServerPermeateChannelConnectionSuccessfulTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.enums.MessageTypeEnums;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;

View File

@ -4,6 +4,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateServerTransferHandler;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder;
@ -36,6 +37,7 @@ public class NettyClientPermeateServerTransferFilter extends DebugChannelInitial
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettMsgEncoder());
pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new IdleStateHandler(0, 4, 0));
pipeline.addLast(new NettyProxyMsgEncoder());
pipeline.addLast(new NettyClientPermeateServerTransferHandler(channelTypeAdapter));
}

View File

@ -4,12 +4,16 @@ package org.framework.lazy.cloud.network.heartbeat.client.netty.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.nio.charset.StandardCharsets;
/**
* 客户端访客通信通道 处理器
*/
@ -67,4 +71,23 @@ public class NettyClientPermeateServerTransferHandler extends SimpleChannelInbou
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
/**
* 心跳请求处理 * 每4秒发送一次心跳请求; *
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent event) {
if (IdleState.WRITER_IDLE.equals(event.state())) { //如果写通道处于空闲状态,就发送心跳命令
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.TYPE_HEARTBEAT);
ctx.writeAndFlush(nettyMsg);// 发送心跳数据
} else if (event.state() == IdleState.WRITER_IDLE) { // 如果检测到写空闲状态关闭连接
// 离线暂存通知
ctx.close();
}
} else {
super.userEventTriggered(ctx, obj);
}
}
}

View File

@ -41,8 +41,9 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
// 生成访客ID
String visitorId = UUID.randomUUID().toString();
Integer visitorPort = internalNetworkPermeateServerVisitor.getVisitorPort();
log.info("this channel{} use visitorId:{}", visitorChannel.id().asLongText(), visitorId);
log.info("this channel with visitor port:{} use visitorId:{}", visitorPort, visitorId);
ChannelAttributeKeyUtils.buildVisitorId(visitorChannel, visitorId);
// 判断是否有可用的通道 如果没有创建新的通道
// Channel transferChannel = nettyChannelPool.availableChannel(visitorId);
@ -63,7 +64,7 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
// 获取客户端通道而后进行数据下发
log.info("【客户端渗透服务端】访客端口成功接收数据:{}", new String(bytes));
log.debug("【客户端渗透服务端】访客端口成功接收数据:{}", new String(bytes));
// 使用访客的通信通道
Integer visitorPort = internalNetworkPermeateServerVisitor.getVisitorPort();
@ -92,9 +93,6 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
// 通信通道自动读写打开 然后关闭通信通道
if (nextChannel != null && nextChannel.isActive()) {
nextChannel.config().setOption(ChannelOption.AUTO_READ, true);
// 通知服务端 关闭访问通道真实通道
NettyProxyMsg myMsg = new NettyProxyMsg();
myMsg.setType(MessageType.REPORT_CLIENT_PERMEATE_SERVER_TRANSFER_CLOSE);
@ -102,11 +100,14 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
nextChannel.writeAndFlush(myMsg);
//通信通道
nextChannel.close();
log.info("关闭访问通道、真实通道 with visitorId:{}", visitorId);
}else {
log.info("channel inactive:{}", nextChannel);
}
// 访客通道关闭
channel.close();
log.warn("【客户端渗透服务端】访客端口断开连接");
log.warn("【客户端渗透服务端】访客端口断开连接,访客ID:{}", visitorId);
super.channelInactive(ctx);
}

View File

@ -8,7 +8,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
import org.framework.lazy.cloud.network.heartbeat.common.NettyCommunicationIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.wu.framework.core.utils.ObjectUtils;
@ -56,7 +55,7 @@ public class NettyServerPermeateClientRealHandler extends SimpleChannelInboundHa
if (visitor != null) {
// 上报关闭这个客户端的访客通道
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
closeVisitorMsg.setType(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR);
closeVisitorMsg.setType(MessageType.REPORT_SERVICE_PERMEATE_CLIENT_CLIENT_CLOSE_VISITOR);
closeVisitorMsg.setVisitorId(visitorId);
visitor.writeAndFlush(closeVisitorMsg);
}

View File

@ -44,7 +44,7 @@ public class NettyServerPermeateClientTransferHandler extends SimpleChannelInbou
if (nextChannel != null) {
// 上报关闭这个客户端的访客通道
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
closeVisitorMsg.setType(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR);
closeVisitorMsg.setType(MessageType.REPORT_SERVICE_PERMEATE_CLIENT_CLIENT_CLOSE_VISITOR);
closeVisitorMsg.setVisitorId(visitorId);
nextChannel.writeAndFlush(closeVisitorMsg);
}
@ -61,7 +61,8 @@ public class NettyServerPermeateClientTransferHandler extends SimpleChannelInbou
return;
}
Channel realChannel = NettyRealIdContext.getReal(visitorId);
// Channel realChannel = NettyRealIdContext.getReal(visitorId);
Channel realChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (realChannel != null) {
log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable());
realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());

View File

@ -11,7 +11,6 @@ import org.framework.lazy.cloud.network.heartbeat.client.netty.InternalNetworkPe
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientPermeateServerVisitorFilter;
import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyVisitorPortContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.socket.PermeateVisitorSocket;
@ -46,7 +45,7 @@ public class NettyClientPermeateServerVisitorSocket implements PermeateVisitorSo
@Override
public void start() {
Channel visitor = NettyVisitorPortContext.getVisitorChannel(visitorPort);
PermeateVisitorSocket visitor = NettyVisitorPortContext.getVisitorSocket(visitorPort);
if (visitor == null) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
@ -74,7 +73,6 @@ public class NettyClientPermeateServerVisitorSocket implements PermeateVisitorSo
if (future.isSuccess()) {
// 这里时异步处理
log.info("客户端:[{}]访客端口:[{}] 开启", clientId, visitorPort);
NettyVisitorPortContext.pushVisitorChannel(visitorPort, future.channel());
} else {
log.error("客户端:[{}]访客端口:[{}]绑定失败", clientId, visitorPort);
@ -97,15 +95,8 @@ public class NettyClientPermeateServerVisitorSocket implements PermeateVisitorSo
if (!workerGroup.isShutdown()) {
workerGroup.shutdownGracefully();
}
Channel visitor = NettyVisitorPortContext.getVisitorChannel(visitorPort);
if (visitor != null) {
// close channel
visitor.close();
// remove visitor
NettyVisitorPortContext.removeVisitorChannel(visitorPort);
// remove client this
NettyVisitorPortContext.removeVisitorSocket(visitorPort);
PermeateVisitorSocket permeateVisitorSocket = NettyVisitorPortContext.getVisitorSocket(visitorPort);
if (permeateVisitorSocket != null) {
log.warn("关闭客户端 :【{}】 访客户端口:【{}】", clientId, visitorPort);
} else {
log.warn("关闭访客端口失败 未找到客户端通道 客户端 :【{}】 访客户端口:【{}】", clientId, visitorPort);
@ -135,15 +126,6 @@ public class NettyClientPermeateServerVisitorSocket implements PermeateVisitorSo
* 访问端口
*/
private Integer visitorPort;
/**
* 访客ID
*/
private String visitorId;
/**
* 流量适配器
*/
private ChannelFlowAdapter channelFlowAdapter;
/**
* 服务端地址信息
*/
@ -203,15 +185,6 @@ public class NettyClientPermeateServerVisitorSocket implements PermeateVisitorSo
}
/**
* 绑定流量适配器
*
* @param channelFlowAdapter 流量适配器
* @return 当前对象
*/
public NettyVisitorSocketBuilder builderChannelFlowAdapter(ChannelFlowAdapter channelFlowAdapter) {
this.channelFlowAdapter = channelFlowAdapter;
return this;
} /**
* 绑定流量适配器
*
* @param handleChannelTypeAdvancedList 流量适配器
@ -232,16 +205,6 @@ public class NettyClientPermeateServerVisitorSocket implements PermeateVisitorSo
return this;
}
/**
* 绑定访客ID
*
* @param visitorId 访客ID
* @return 当前对象
*/
public NettyVisitorSocketBuilder builderVisitorId(String visitorId) {
this.visitorId = visitorId;
return this;
}
public NettyClientPermeateServerVisitorSocket build() {
if (clientTargetIp == null) {
@ -259,7 +222,6 @@ public class NettyClientPermeateServerVisitorSocket implements PermeateVisitorSo
internalNetworkPermeateServerVisitor.setTargetPort(clientTargetPort);
internalNetworkPermeateServerVisitor.setVisitorPort(visitorPort);
internalNetworkPermeateServerVisitor.setNettyClientProperties(nettyClientProperties);
internalNetworkPermeateServerVisitor.setChannelFlowAdapter(channelFlowAdapter);
internalNetworkPermeateServerVisitor.setHandleChannelTypeAdvancedList(handleChannelTypeAdvancedList);

View File

@ -72,7 +72,7 @@ public class MessageType {
* @see MessageTypeEnums#REPORT_SINGLE_CLIENT_CLOSE_VISITOR
* @see AbstractHandleReportServicePermeateClientCloseVisitorTypeAdvanced
*/
public static final byte REPORT_SINGLE_CLIENT_CLOSE_VISITOR = 0X08;
public static final byte REPORT_SERVICE_PERMEATE_CLIENT_CLIENT_CLOSE_VISITOR = 0X08;
/**
* 上报 客户端消息到另一个客户端

View File

@ -9,7 +9,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端处理客户端 关闭一个访客
* REPORT_SINGLE_CLIENT_CLOSE_VISITOR
* REPORT_SERVICE_PERMEATE_CLIENT_CLIENT_CLOSE_VISITOR
*/
public abstract class AbstractHandleReportServicePermeateClientCloseVisitorTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {

View File

@ -9,7 +9,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端处理客户端 关闭一个访客
* REPORT_SINGLE_CLIENT_CLOSE_VISITOR
* REPORT_SERVICE_PERMEATE_CLIENT_CLIENT_CLOSE_VISITOR
*/
public abstract class AbstractHandleReportSingleClientMessage2OtherClientTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {

View File

@ -47,7 +47,7 @@ public enum MessageTypeEnums {
/**
* @see AbstractHandleReportServicePermeateClientCloseVisitorTypeAdvanced
*/
REPORT_SINGLE_CLIENT_CLOSE_VISITOR(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR, "上报 客户端关闭一个访客通道"),
REPORT_SINGLE_CLIENT_CLOSE_VISITOR(MessageType.REPORT_SERVICE_PERMEATE_CLIENT_CLIENT_CLOSE_VISITOR, "上报 客户端关闭一个访客通道"),
/**
* @see AbstractHandleReportSingleClientMessage2OtherClientTypeAdvanced
*/

View File

@ -66,7 +66,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
if (ObjectUtils.isEmpty(visitorId)) {
// 关闭这个不活跃的channel client:{}
log.warn("Close this inactive channel client:{} with no visitor", clientId);
log.warn("close this inactive channel client:{} with no visitor", clientId);
// 给所有客户端发送 这个客户端离线了
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setClientId(clientId);
@ -74,7 +74,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
channelTypeAdapter.handler(channel, nettyMsg);
channel.close();
} else {
log.warn("Close visitor: [{}]'s connection", visitorId);
log.warn("close client:{} visitor: [{}]'s connection",clientId, visitorId);
NettyCommunicationIdContext.clear(visitorId);
NettyRealIdContext.clear(visitorId);
// 关闭通信通道