[fix] 修复多客户端导致通道被移除问题

This commit is contained in:
wujiawei 2024-01-23 16:51:55 +08:00
parent db704ed3e5
commit fef8149ec4
18 changed files with 152 additions and 43 deletions

View File

@ -0,0 +1,33 @@
package wu.framework.lazy.cloud.heartbeat.client.netty.advanced;
import io.netty.channel.Channel;
import wu.framework.lazy.cloud.heartbeat.client.netty.config.NettyServerProperties;
import wu.framework.lazy.cloud.heartbeat.common.ChannelContext;
import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg;
import wu.framework.lazy.cloud.heartbeat.common.advanced.client.AbstractHandleClientChannelActiveAdvanced;
import wu.framework.lazy.cloud.heartbeat.common.utils.ChannelAttributeKeyUtils;
/**
* 客户端通道 is active
*/
public class HandleClientChannelActiveAdvanced extends AbstractHandleClientChannelActiveAdvanced<NettyProxyMsg> {
private final NettyServerProperties nettyServerProperties;
public HandleClientChannelActiveAdvanced(NettyServerProperties nettyServerProperties) {
this.nettyServerProperties = nettyServerProperties;
}
/**
* 处理当前数据
*
* @param channel 当前通道
* @param nettyProxyMsg 通道数据
*/
@Override
protected void doHandler(Channel channel, NettyProxyMsg nettyProxyMsg) {
// 缓存当前通道
String clientId = nettyServerProperties.getClientId();
ChannelContext.push(channel, clientId);
ChannelAttributeKeyUtils.buildClientId(channel, clientId);
}
}

View File

@ -22,11 +22,10 @@ import java.util.List;
public class HandleDistributeConnectSuccessNotificationTypeAdvancedHandle extends AbstractHandleDistributeConnectSuccessNotificationTypeAdvancedHandle<NettyProxyMsg> {
private final ClientNettyConfigApplication clientNettyConfigApplication;
private final NettyServerProperties nettyServerProperties;
public HandleDistributeConnectSuccessNotificationTypeAdvancedHandle(ClientNettyConfigApplication clientNettyConfigApplication, NettyServerProperties nettyServerProperties) {
public HandleDistributeConnectSuccessNotificationTypeAdvancedHandle(ClientNettyConfigApplication clientNettyConfigApplication) {
this.clientNettyConfigApplication = clientNettyConfigApplication;
this.nettyServerProperties = nettyServerProperties;
}
/**
@ -39,12 +38,7 @@ public class HandleDistributeConnectSuccessNotificationTypeAdvancedHandle extend
protected void doHandler(Channel channel, NettyProxyMsg msg) {
log.warn("客户端ID{},客户端:{}连接成功", new String(msg.getClientId()), new String(msg.getData()));
// 缓存当前通道
String clientId = nettyServerProperties.getClientId();
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setClientId(clientId.getBytes(StandardCharsets.UTF_8));
ChannelContext.push(channel, clientId);
ChannelAttributeKeyUtils.buildClientId(channel,clientId);
// 存储其他客户端状态
List<String> clientIdList = JSONObject.parseArray(new String(msg.getData()), String.class);
for (String tenantId : clientIdList) {

View File

@ -27,7 +27,7 @@ public class HeartbeatClientConfiguration {
* @return ClientHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced
*/
@Bean
public ClientHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced clientDistributeSingleClientRealAutoReadConnectTypeAdvanced(){
public ClientHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced handleDistributeSingleClientRealAutoReadConnectTypeAdvanced(){
return new ClientHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced();
}
/**
@ -35,38 +35,42 @@ public class HeartbeatClientConfiguration {
* @return ClientHandleDistributeSingleClientMessageTypeAdvanced
*/
@Bean
public ClientHandleDistributeSingleClientMessageTypeAdvanced clientDistributeSingleClientMessageTypeAdvanced(){
public ClientHandleDistributeSingleClientMessageTypeAdvanced handleDistributeSingleClientMessageTypeAdvanced(){
return new ClientHandleDistributeSingleClientMessageTypeAdvanced();
}
@Bean
public ClientHandleDistributeSingleClientRealCloseVisitorTypeAdvanced clientDistributeSingleClientRealCloseVisitorTypeAdvanced(){
public ClientHandleDistributeSingleClientRealCloseVisitorTypeAdvanced handleDistributeSingleClientRealCloseVisitorTypeAdvanced(){
return new ClientHandleDistributeSingleClientRealCloseVisitorTypeAdvanced();
}
@Bean
public ClientReportHandleChannelTransferTypeAdvancedHandleDistribute clientReportChannelTransferTypeAdvanced(NettyServerProperties nettyServerProperties){
public ClientReportHandleChannelTransferTypeAdvancedHandleDistribute handleChannelTransferTypeAdvancedHandleDistribute(NettyServerProperties nettyServerProperties){
return new ClientReportHandleChannelTransferTypeAdvancedHandleDistribute(nettyServerProperties);
}
@Bean
public HandleDistributeConnectSuccessNotificationTypeAdvancedHandle distributeConnectSuccessNotificationTypeAdvanced(ClientNettyConfigApplication clientNettyConfigApplication, NettyServerProperties nettyServerProperties){
return new HandleDistributeConnectSuccessNotificationTypeAdvancedHandle(clientNettyConfigApplication, nettyServerProperties);
public HandleDistributeConnectSuccessNotificationTypeAdvancedHandle handleDistributeConnectSuccessNotificationTypeAdvancedHandle(ClientNettyConfigApplication clientNettyConfigApplication){
return new HandleDistributeConnectSuccessNotificationTypeAdvancedHandle(clientNettyConfigApplication);
}
@Bean
public HandleDistributeDisconnectTypeAdvancedHandle distributeDisconnectTypeAdvanced(ClientNettyConfigApplication clientNettyConfigApplication){
public HandleClientChannelActiveAdvanced handleClientChannelActiveAdvanced(NettyServerProperties nettyServerProperties){
return new HandleClientChannelActiveAdvanced(nettyServerProperties);
}
@Bean
public HandleDistributeDisconnectTypeAdvancedHandle handleDistributeDisconnectTypeAdvancedHandle(ClientNettyConfigApplication clientNettyConfigApplication){
return new HandleDistributeDisconnectTypeAdvancedHandle(clientNettyConfigApplication);
}
@Bean
public HandleDistributeStagingClosedTypeAdvanced distributeStagingClosedTypeAdvanced(){
public HandleDistributeStagingClosedTypeAdvanced handleDistributeStagingClosedTypeAdvanced(){
return new HandleDistributeStagingClosedTypeAdvanced();
}
@Bean
public HandleDistributeStagingOpenedTypeAdvanced distributeStagingOpenedTypeAdvanced(){
public HandleDistributeStagingOpenedTypeAdvanced handleDistributeStagingOpenedTypeAdvanced(){
return new HandleDistributeStagingOpenedTypeAdvanced();
}
@Bean
public ClientHandleDistributeSingleClientRealConnectTypeAdvanced clientDistributeSingleClientRealConnectTypeAdvanced(NettyServerProperties nettyServerProperties,
public ClientHandleDistributeSingleClientRealConnectTypeAdvanced clientHandleDistributeSingleClientRealConnectTypeAdvanced(NettyServerProperties nettyServerProperties,
List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList){
return new ClientHandleDistributeSingleClientRealConnectTypeAdvanced(nettyServerProperties, handleChannelTypeAdvancedList);
}

View File

@ -61,7 +61,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<NettyProxyMs
// 处理客户端连接成功
Channel channel = ctx.channel();
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.REPORT_CLIENT_CONNECT_SUCCESS);
nettyMsg.setType(MessageType.CLIENT_CHANNEL_ACTIVE);
nettyMsg.setClientId(clientId);
channelTypeAdapter.handler(channel, nettyMsg);

View File

@ -137,7 +137,7 @@ public class NettyClientRealSocket {
log.info("客户端新建访客通道 连接服务端IP:{},连接服务端端口:{}", inetHost, inetPort);
ChannelFuture future = bootstrap.connect(inetHost, inetPort);
log.info("使用的租户ID:" + clientId);
log.info("使用的客户端ID:" + clientId);
future.addListener((ChannelFutureListener) futureListener -> {
Channel channel = futureListener.channel();
if (futureListener.isSuccess()) {

View File

@ -72,7 +72,7 @@ public class NettyClientSocket {
ChannelFuture future = bootstrap.connect(inetHost, inetPort);
Channel channel = future.channel();
log.info("使用的租户ID:" + clientId);
log.info("使用的客户端ID:" + clientId);
future.addListener((ChannelFutureListener) futureListener -> {
if (futureListener.isSuccess()) {

View File

@ -35,7 +35,7 @@ public class ChannelContext {
clientChannelImpl.setClientId(clientId.getBytes(StandardCharsets.UTF_8));
// 如果客户端已经存在 移除
if(channelIdClientChannelDTOConcurrentHashMap.containsKey(clientId)){
clear(clientId);
// clear(clientId);
}
channelIdClientChannelDTOConcurrentHashMap.put(clientId, clientChannelImpl);

View File

@ -22,7 +22,7 @@ public class MessageType {
* 客户端上报连接成功
*
* @see MessageTypeEnums#REPORT_CLIENT_CONNECT_SUCCESS
* @see AbstractHandleReportConnectSuccessTypeAdvanced
* @see AbstractHandleClientConnectSuccessTypeAdvanced
*/
public static final byte REPORT_CLIENT_CONNECT_SUCCESS = 0X01;
/**
@ -78,6 +78,13 @@ public class MessageType {
* @see AbstractHandleReportSingleClientMessage2OtherClientTypeAdvanced
*/
public static final byte REPORT_SINGLE_CLIENT_MESSAGE = 0X09;
/**
* 服务端通道 is active
*
* @see MessageTypeEnums#SERVER_CHANNEL_ACTIVE
* @see AbstractHandleServerChannelActiveTypeAdvanced
*/
public static final byte SERVER_CHANNEL_ACTIVE = 0X10;
/**
* 下发 客户端接收连接成功通知
@ -146,4 +153,13 @@ public class MessageType {
* @see AbstractHandleDistributeSingleClientMessageTypeAdvanced
*/
public static final byte DISTRIBUTE_SINGLE_CLIENT_MESSAGE = -0X09;
/**
* 客户端通道 is active
*
* @see MessageTypeEnums#CLIENT_CHANNEL_ACTIVE
* @see AbstractHandleClientChannelActiveAdvanced
*/
public static final byte CLIENT_CHANNEL_ACTIVE = -0X10;
}

View File

@ -0,0 +1,25 @@
package wu.framework.lazy.cloud.heartbeat.common.advanced.client;
import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg;
import wu.framework.lazy.cloud.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import wu.framework.lazy.cloud.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import wu.framework.lazy.cloud.heartbeat.common.enums.MessageTypeEnums;
/**
* 客户端通道 is active
*/
public abstract class AbstractHandleClientChannelActiveAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.CLIENT_CHANNEL_ACTIVE.getTypeByte() == msg.getType();
}
}

View File

@ -6,9 +6,9 @@ import wu.framework.lazy.cloud.heartbeat.common.advanced.HandleChannelTypeAdvanc
import wu.framework.lazy.cloud.heartbeat.common.enums.MessageTypeEnums;
/**
* 客户端连接成功上报处理器
* 服务端处理客户端连接成功
*/
public abstract class AbstractHandleReportConnectSuccessTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
public abstract class AbstractHandleClientConnectSuccessTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型

View File

@ -0,0 +1,27 @@
package wu.framework.lazy.cloud.heartbeat.common.advanced.server;
import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg;
import wu.framework.lazy.cloud.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import wu.framework.lazy.cloud.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import wu.framework.lazy.cloud.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端通道 is active
* SERVER_CHANNEL_ACTIVE
*/
public abstract class AbstractHandleServerChannelActiveTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.SERVER_CHANNEL_ACTIVE.getTypeByte() == msg.getType();
}
}

View File

@ -18,7 +18,7 @@ public enum MessageTypeEnums {
*/
TYPE_HEARTBEAT(MessageType.TYPE_HEARTBEAT, "心跳"),
/**
* @see AbstractHandleReportConnectSuccessTypeAdvanced
* @see AbstractHandleClientConnectSuccessTypeAdvanced
*/
REPORT_CLIENT_CONNECT_SUCCESS(MessageType.REPORT_CLIENT_CONNECT_SUCCESS, "上报 客户端连接成功"),
/**
@ -50,6 +50,10 @@ public enum MessageTypeEnums {
* @see AbstractHandleReportSingleClientMessage2OtherClientTypeAdvanced
*/
REPORT_SINGLE_CLIENT_MESSAGE(MessageType.REPORT_SINGLE_CLIENT_MESSAGE, "上报 客户端消息到另一个客户端"),
/**
* @see AbstractHandleServerChannelActiveTypeAdvanced
*/
SERVER_CHANNEL_ACTIVE(MessageType.SERVER_CHANNEL_ACTIVE, "服务端通道 is active"),
/**
* @see AbstractHandleDistributeConnectSuccessNotificationTypeAdvancedHandle
*/
@ -91,6 +95,10 @@ public enum MessageTypeEnums {
* @see AbstractHandleDistributeSingleClientMessageTypeAdvanced
*/
DISTRIBUTE_SINGLE_CLIENT_MESSAGE(MessageType.DISTRIBUTE_SINGLE_CLIENT_MESSAGE, "下发 客户端消息"),
/**
* @see AbstractHandleClientChannelActiveAdvanced
*/
CLIENT_CHANNEL_ACTIVE(MessageType.CLIENT_CHANNEL_ACTIVE, "客户端通道 is active"),
;

View File

@ -1,13 +1,13 @@
spring:
lazy:
netty:
# inet-host: 127.0.0.1
# inet-port: 7001
inet-host: 127.0.0.1
inet-port: 7001
# inet-path: middleground-on-cloud-heartbeat-server
inet-host: 124.222.48.62 # 服务端地址
inet-port: 30676 #服务端端口
# inet-host: 124.222.48.62 # 服务端地址
# inet-port: 30676 #服务端端口
# inet-path: middleground-on-cloud-heartbeat-server
client-id: local # 客户端ID
client-id: wujiawei # 客户端ID
data:
redis:
host: 192.168.17.221

View File

@ -6,7 +6,7 @@
#docker login --username=1207537021@qq.com registry.cn-hangzhou.aliyuncs.com
mvn clean install
#mvn clean install
docker build -t registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT .
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT

View File

@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSON;
import wu.framework.lazy.cloud.heartbeat.common.ChannelContext;
import wu.framework.lazy.cloud.heartbeat.common.MessageType;
import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg;
import wu.framework.lazy.cloud.heartbeat.common.advanced.server.AbstractHandleReportConnectSuccessTypeAdvanced;
import wu.framework.lazy.cloud.heartbeat.common.advanced.server.AbstractHandleClientConnectSuccessTypeAdvanced;
import wu.framework.lazy.cloud.heartbeat.common.utils.ChannelAttributeKeyUtils;
import wu.framework.lazy.cloud.heartbeat.server.application.InternalNetworkPenetrationMappingApplication;
import wu.framework.lazy.cloud.heartbeat.server.application.NettyClientBlacklistApplication;
@ -25,13 +25,13 @@ import java.util.List;
*/
@Slf4j
@Component
public class ServerHandleReportConnectSuccessTypeAdvanced extends AbstractHandleReportConnectSuccessTypeAdvanced<NettyProxyMsg> {
public class ServerHandleClientConnectSuccessTypeAdvanced extends AbstractHandleClientConnectSuccessTypeAdvanced<NettyProxyMsg> {
private final ServerNettyConfigApplication serverNettyConfigApplication;
private final NettyClientBlacklistApplication nettyClientBlacklistApplication;
private final InternalNetworkPenetrationMappingApplication internalNetworkPenetrationMappingApplication;
public ServerHandleReportConnectSuccessTypeAdvanced(ServerNettyConfigApplication serverNettyConfigApplication, NettyClientBlacklistApplication nettyClientBlacklistApplication, InternalNetworkPenetrationMappingApplication internalNetworkPenetrationMappingApplication) {
public ServerHandleClientConnectSuccessTypeAdvanced(ServerNettyConfigApplication serverNettyConfigApplication, NettyClientBlacklistApplication nettyClientBlacklistApplication, InternalNetworkPenetrationMappingApplication internalNetworkPenetrationMappingApplication) {
this.serverNettyConfigApplication = serverNettyConfigApplication;
this.nettyClientBlacklistApplication = nettyClientBlacklistApplication;
this.internalNetworkPenetrationMappingApplication = internalNetworkPenetrationMappingApplication;

View File

@ -37,11 +37,11 @@ public class HeartbeatServerConfiguration {
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Bean
public ServerHandleReportConnectSuccessTypeAdvanced serverReportConnectSuccessTypeAdvanced(
public ServerHandleClientConnectSuccessTypeAdvanced serverReportConnectSuccessTypeAdvanced(
ServerNettyConfigApplication serverNettyConfigApplication,
NettyClientBlacklistApplication nettyClientBlacklistApplication,
InternalNetworkPenetrationMappingApplication internalNetworkPenetrationMappingApplication) {
return new ServerHandleReportConnectSuccessTypeAdvanced(serverNettyConfigApplication, nettyClientBlacklistApplication, internalNetworkPenetrationMappingApplication);
return new ServerHandleClientConnectSuccessTypeAdvanced(serverNettyConfigApplication, nettyClientBlacklistApplication, internalNetworkPenetrationMappingApplication);
}
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)

View File

@ -101,12 +101,14 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
String clientId = ChannelAttributeKeyUtils.getClientId(channel);
String visitorId = ChannelAttributeKeyUtils.getVisitorId(channel);
log.info("断开客户端的连接:{}", clientId);
if (!ObjectUtils.isEmpty(visitorId)) {
log.info("客户端:{},断开访客的连接:{}", clientId,visitorId);
// 访客通道 关闭访客通道
NettyCommunicationIdContext.clear(visitorId);
super.channelInactive(ctx);
} else if (!ObjectUtils.isEmpty(clientId)) {
log.info("断开客户端的连接:{}", clientId);
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.REPORT_CLIENT_DISCONNECTION);
nettyMsg.setClientId(clientId);

View File

@ -56,8 +56,8 @@
<el-table-column fixed="right" label="操作">
<template v-slot:default="{ row }">
<el-button
v-permission="['del']"
@click.prevent="handleDel(row.clientId)"
v-permission="['off_line']"
@click.prevent="handleOffLine(row.clientId)"
type="danger"
size="small"
>
@ -93,7 +93,7 @@ export default {
search: { name: "查询" },
add: { name: "添加" },
edit: { name: "编辑" },
del: { name: "删除" },
off_line: { name: "下线" },
sendMessage: { name: "发送消息" },
export: { name: "导出用户" },
},
@ -156,7 +156,7 @@ const handleArouse2SendMessage = (row = null) => {
* @param {*}
* @return {*}
*/
const handleDel = (clientId) => {
const handleOffLine = (clientId) => {
proxy
.$confirm("此操作将永久删除该数据, 是否继续?", "提示", {
confirmButtonText: "确定",