[add] 客户端下线异常问题

This commit is contained in:
wujiawei
2025-06-15 00:50:16 +08:00
parent da7d55c266
commit 6d0bcd2f63
11 changed files with 49 additions and 30 deletions

View File

@@ -37,26 +37,22 @@ public class ChannelContext {
ConcurrentHashMap<String/*clientId*/, List<Channel>/*通道*/> clientChannelListConcurrentHashMap =
cacheClientChannelConcurrentHashMap.get(namespace);
// 判断是否存在客户端
if (clientChannelListConcurrentHashMap.containsKey(clientId)) {
List<Channel> existChannelList = new ArrayList<>();
List<Channel> oldChannels = clientChannelListConcurrentHashMap.get(clientId);
for (Channel existChannel : oldChannels) {
if (existChannel != null) {
if (existChannel.isActive()) {
existChannelList.add(existChannel);
} else {
log.warn("close channel with namespace:{} client:{}", namespace, ChannelAttributeKeyUtils.getClientId(existChannel));
existChannel.close();
}
List<Channel> existChannelList = new ArrayList<>();
List<Channel> oldChannels = clientChannelListConcurrentHashMap.getOrDefault(clientId, new ArrayList<>());
for (Channel existChannel : oldChannels) {
if (existChannel != null) {
if (existChannel.isActive()) {
existChannelList.add(existChannel);
} else {
log.warn("close channel with namespace:{} client:{}", namespace, ChannelAttributeKeyUtils.getClientId(existChannel));
existChannel.close();
}
}
existChannelList.add(channel);
clientChannelListConcurrentHashMap.put(clientId, existChannelList);
cacheClientChannelConcurrentHashMap.put(namespace, clientChannelListConcurrentHashMap);
}
}else {
existChannelList.add(channel);
clientChannelListConcurrentHashMap.put(clientId, existChannelList);
cacheClientChannelConcurrentHashMap.put(namespace, clientChannelListConcurrentHashMap);
} else {
ConcurrentHashMap<String/*clientId*/, List<Channel>/*通道*/> clientChannelListConcurrentHashMap =
new ConcurrentHashMap<>();
clientChannelListConcurrentHashMap.put(clientId, Collections.synchronizedList(new ArrayList<>(List.of(channel))));
@@ -104,6 +100,7 @@ public class ChannelContext {
return channelListConcurrentHashMap;
}
/**
* 获取所有通道
*
@@ -140,6 +137,7 @@ public class ChannelContext {
public static List<Channel> get(String namespace, String clientId) {
return get(namespace.getBytes(StandardCharsets.UTF_8), clientId.getBytes(StandardCharsets.UTF_8));
}
/**
* 根据通道ID获取通道信息
*
@@ -148,11 +146,11 @@ public class ChannelContext {
*/
public static Channel getLoadBalance(byte[] namespace, byte[] clientId) {
List<Channel> channels = get(namespace, clientId);
if(ObjectUtils.isEmpty(channels)){
if (ObjectUtils.isEmpty(channels)) {
return null;
}
channels = channels.stream().filter(Channel::isActive).collect(Collectors.toList());
if(ObjectUtils.isEmpty(channels)){
if (ObjectUtils.isEmpty(channels)) {
return null;
}
// TODO 负载问题

View File

@@ -116,8 +116,8 @@ public class NettyProxyMsg {
this.namespace = namespace;
}
public void setNamespace(String serverId) {
this.namespace = serverId.getBytes(StandardCharsets.UTF_8);
public void setNamespace(String namespace) {
this.namespace = namespace.getBytes(StandardCharsets.UTF_8);
}

View File

@@ -62,5 +62,10 @@ public class LazyNettyServerRouteQueryListCommand {
*/
@Schema(description ="更新时间",name ="updateTime",example = "")
private LocalDateTime updateTime;
/**
* 命名空间
*/
@Schema(description ="命名空间",name ="namespace",example = "")
private String namespace;
}

View File

@@ -62,5 +62,10 @@ public class LazyNettyServerRouteQueryOneCommand {
*/
@Schema(description ="更新时间",name ="updateTime",example = "")
private LocalDateTime updateTime;
/**
* 命名空间
*/
@Schema(description ="命名空间",name ="namespace",example = "")
private String namespace;
}

View File

@@ -62,5 +62,10 @@ public class LazyNettyServerRouteRemoveCommand {
*/
@Schema(description ="更新时间",name ="updateTime",example = "")
private LocalDateTime updateTime;
/**
* 命名空间
*/
@Schema(description ="命名空间",name ="namespace",example = "")
private String namespace;
}

View File

@@ -56,5 +56,10 @@ public class LazyNettyServerRouteStoryCommand {
*/
@Schema(description ="更新时间",name ="updateTime",example = "")
private LocalDateTime updateTime;
/**
* 命名空间
*/
@Schema(description ="命名空间",name ="namespace",example = "")
private String namespace;
}

View File

@@ -62,5 +62,10 @@ public class LazyNettyServerRouteUpdateCommand {
*/
@Schema(description ="更新时间",name ="updateTime",example = "")
private LocalDateTime updateTime;
/**
* 命名空间
*/
@Schema(description ="命名空间",name ="namespace",example = "")
private String namespace;
}

View File

@@ -164,8 +164,6 @@ public class LazyNettyClientStateApplicationImpl implements LazyNettyClientState
@Override
public Result<List<LazyNettyClientStateGroupByClientDTO>> findListGroupByClient(LazyNettyClientStateQueryListCommand lazyNettyClientStateQueryListCommand) {
LazyNettyClientState lazyNettyClientState = NettyClientStateDTOAssembler.INSTANCE.toNettyClientState(lazyNettyClientStateQueryListCommand);
String serverId = serverNodeProperties.getNodeId();
lazyNettyClientState.setNamespace(serverId);
return lazyNettyClientStateRepository.findListGroupByClient(lazyNettyClientState).convert(nettyClientStates -> nettyClientStates.stream().map(NettyClientStateDTOAssembler.INSTANCE::fromNettyClientState).collect(Collectors.toList()));
}
@@ -183,8 +181,6 @@ public class LazyNettyClientStateApplicationImpl implements LazyNettyClientState
@Override
public Result<LazyPage<LazyNettyClientStateGroupByClientDTO>> findPageGroupByClient(int size, int current, LazyNettyClientStateQueryListCommand lazyNettyClientStateQueryListCommand) {
LazyNettyClientState lazyNettyClientState = NettyClientStateDTOAssembler.INSTANCE.toNettyClientState(lazyNettyClientStateQueryListCommand);
String serverId = serverNodeProperties.getNodeId();
lazyNettyClientState.setNamespace(serverId);
return lazyNettyClientStateRepository.findPageGroupByClient(size, current, lazyNettyClientState).convert(page -> page.convert(NettyClientStateDTOAssembler.INSTANCE::fromNettyClientState));
}
@@ -200,12 +196,11 @@ public class LazyNettyClientStateApplicationImpl implements LazyNettyClientState
@Override
public Result<LazyNettyClientState> remove(LazyNettyClientStateRemoveCommand lazyNettyClientStateRemoveCommand) {
LazyNettyClientState lazyNettyClientState = NettyClientStateDTOAssembler.INSTANCE.toNettyClientState(lazyNettyClientStateRemoveCommand);
String serverId = serverNodeProperties.getNodeId();
lazyNettyClientState.setNamespace(serverId);
// 获取当前客户端通道 而后关闭
String clientId = lazyNettyClientStateRemoveCommand.getClientId();
String namespace = lazyNettyClientStateRemoveCommand.getNamespace();
// 心跳关闭
ChannelContext.clear(serverId, clientId);
ChannelContext.clear(namespace, clientId);
// 关闭访客
List<NettyTcpServerPermeateClientVisitorSocket> nettyTcpServerPermeateClientVisitorSocketList = NettyClientVisitorContext.getVisitorSockets(clientId);
if (!ObjectUtils.isEmpty(nettyTcpServerPermeateClientVisitorSocketList)) {

View File

@@ -66,7 +66,7 @@ public class LazyNettyNamespaceDO {
* 命名空间
*/
@Schema(description ="命名空间",name ="namespace",example = "")
@LazyTableField(name="namespace",comment="命名空间",columnType="varchar(255)")
@LazyTableFieldUnique(name="namespace",comment="命名空间",columnType="varchar(25)")
private String namespace;
/**

View File

@@ -82,7 +82,7 @@ public class LazyNettyServerRouteDO {
* 命名空间
*/
@Schema(description ="命名空间",name ="namespace",example = "")
@LazyTableFieldUnique(name="namespace",comment="命名空间",columnType="varchar(25)",defaultValue = "DEFAULT")
@LazyTableField(name="namespace",comment="命名空间",columnType="varchar(25)",defaultValue = "DEFAULT")
private String namespace;
}

View File

@@ -53,6 +53,7 @@ public class LazyNettyServerRouteRepositoryImpl implements LazyNettyServerRout
@Override
public Result<LazyNettyServerRoute> story(LazyNettyServerRoute lazyNettyServerRoute) {
LazyNettyServerRouteDO lazyNettyServerRouteDO = LazyNettyServerRouteConverter.INSTANCE.fromLazyNettyServerRoute(lazyNettyServerRoute);
lazyLambdaStream.upsert(lazyNettyServerRouteDO);
String routeIp = lazyNettyServerRoute.getRouteIp();
String routePort = NormalUsedString.ASTERISK;