From 6d0bcd2f63fc013c2eb5e5fa377b85d5213f89e7 Mon Sep 17 00:00:00 2001 From: wujiawei <1207537021@qq.com> Date: Sun, 15 Jun 2025 00:50:16 +0800 Subject: [PATCH] =?UTF-8?q?[add]=20=E5=AE=A2=E6=88=B7=E7=AB=AF=E4=B8=8B?= =?UTF-8?q?=E7=BA=BF=E5=BC=82=E5=B8=B8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../heartbeat/common/ChannelContext.java | 36 +++++++++---------- .../advanced/payload/NettyProxyMsg.java | 4 +-- .../LazyNettyServerRouteQueryListCommand.java | 5 +++ .../LazyNettyServerRouteQueryOneCommand.java | 5 +++ .../LazyNettyServerRouteRemoveCommand.java | 5 +++ .../LazyNettyServerRouteStoryCommand.java | 5 +++ .../LazyNettyServerRouteUpdateCommand.java | 5 +++ .../LazyNettyClientStateApplicationImpl.java | 9 ++--- .../entity/LazyNettyNamespaceDO.java | 2 +- .../entity/LazyNettyServerRouteDO.java | 2 +- .../LazyNettyServerRouteRepositoryImpl.java | 1 + 11 files changed, 49 insertions(+), 30 deletions(-) diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/ChannelContext.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/ChannelContext.java index 4901c3e5..5183592c 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/ChannelContext.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/ChannelContext.java @@ -37,26 +37,22 @@ public class ChannelContext { ConcurrentHashMap/*通道*/> clientChannelListConcurrentHashMap = cacheClientChannelConcurrentHashMap.get(namespace); - - // 判断是否存在客户端 - if (clientChannelListConcurrentHashMap.containsKey(clientId)) { - List existChannelList = new ArrayList<>(); - List oldChannels = clientChannelListConcurrentHashMap.get(clientId); - for (Channel existChannel : oldChannels) { - if (existChannel != null) { - if (existChannel.isActive()) { - existChannelList.add(existChannel); - } else { - log.warn("close channel with namespace:{} client:{}", namespace, ChannelAttributeKeyUtils.getClientId(existChannel)); - existChannel.close(); - } + List existChannelList = new ArrayList<>(); + List oldChannels = clientChannelListConcurrentHashMap.getOrDefault(clientId, new ArrayList<>()); + for (Channel existChannel : oldChannels) { + if (existChannel != null) { + if (existChannel.isActive()) { + existChannelList.add(existChannel); + } else { + log.warn("close channel with namespace:{} client:{}", namespace, ChannelAttributeKeyUtils.getClientId(existChannel)); + existChannel.close(); } } - existChannelList.add(channel); - clientChannelListConcurrentHashMap.put(clientId, existChannelList); - cacheClientChannelConcurrentHashMap.put(namespace, clientChannelListConcurrentHashMap); } - }else { + existChannelList.add(channel); + clientChannelListConcurrentHashMap.put(clientId, existChannelList); + cacheClientChannelConcurrentHashMap.put(namespace, clientChannelListConcurrentHashMap); + } else { ConcurrentHashMap/*通道*/> clientChannelListConcurrentHashMap = new ConcurrentHashMap<>(); clientChannelListConcurrentHashMap.put(clientId, Collections.synchronizedList(new ArrayList<>(List.of(channel)))); @@ -104,6 +100,7 @@ public class ChannelContext { return channelListConcurrentHashMap; } + /** * 获取所有通道 * @@ -140,6 +137,7 @@ public class ChannelContext { public static List get(String namespace, String clientId) { return get(namespace.getBytes(StandardCharsets.UTF_8), clientId.getBytes(StandardCharsets.UTF_8)); } + /** * 根据通道ID获取通道信息 * @@ -148,11 +146,11 @@ public class ChannelContext { */ public static Channel getLoadBalance(byte[] namespace, byte[] clientId) { List channels = get(namespace, clientId); - if(ObjectUtils.isEmpty(channels)){ + if (ObjectUtils.isEmpty(channels)) { return null; } channels = channels.stream().filter(Channel::isActive).collect(Collectors.toList()); - if(ObjectUtils.isEmpty(channels)){ + if (ObjectUtils.isEmpty(channels)) { return null; } // TODO 负载问题 diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/payload/NettyProxyMsg.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/payload/NettyProxyMsg.java index 7ede6e6c..797fa65f 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/payload/NettyProxyMsg.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/payload/NettyProxyMsg.java @@ -116,8 +116,8 @@ public class NettyProxyMsg { this.namespace = namespace; } - public void setNamespace(String serverId) { - this.namespace = serverId.getBytes(StandardCharsets.UTF_8); + public void setNamespace(String namespace) { + this.namespace = namespace.getBytes(StandardCharsets.UTF_8); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteQueryListCommand.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteQueryListCommand.java index eec4b1d1..df6c33e8 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteQueryListCommand.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteQueryListCommand.java @@ -62,5 +62,10 @@ public class LazyNettyServerRouteQueryListCommand { */ @Schema(description ="更新时间",name ="updateTime",example = "") private LocalDateTime updateTime; + /** + * 命名空间 + */ + @Schema(description ="命名空间",name ="namespace",example = "") + private String namespace; } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteQueryOneCommand.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteQueryOneCommand.java index 2cda1a36..84f35138 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteQueryOneCommand.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteQueryOneCommand.java @@ -62,5 +62,10 @@ public class LazyNettyServerRouteQueryOneCommand { */ @Schema(description ="更新时间",name ="updateTime",example = "") private LocalDateTime updateTime; + /** + * 命名空间 + */ + @Schema(description ="命名空间",name ="namespace",example = "") + private String namespace; } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteRemoveCommand.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteRemoveCommand.java index b475f887..8764ffae 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteRemoveCommand.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteRemoveCommand.java @@ -62,5 +62,10 @@ public class LazyNettyServerRouteRemoveCommand { */ @Schema(description ="更新时间",name ="updateTime",example = "") private LocalDateTime updateTime; + /** + * 命名空间 + */ + @Schema(description ="命名空间",name ="namespace",example = "") + private String namespace; } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteStoryCommand.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteStoryCommand.java index 436499ce..76a616dd 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteStoryCommand.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteStoryCommand.java @@ -56,5 +56,10 @@ public class LazyNettyServerRouteStoryCommand { */ @Schema(description ="更新时间",name ="updateTime",example = "") private LocalDateTime updateTime; + /** + * 命名空间 + */ + @Schema(description ="命名空间",name ="namespace",example = "") + private String namespace; } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteUpdateCommand.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteUpdateCommand.java index 7fe5239f..8da39b40 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteUpdateCommand.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/command/lazy/netty/server/route/LazyNettyServerRouteUpdateCommand.java @@ -62,5 +62,10 @@ public class LazyNettyServerRouteUpdateCommand { */ @Schema(description ="更新时间",name ="updateTime",example = "") private LocalDateTime updateTime; + /** + * 命名空间 + */ + @Schema(description ="命名空间",name ="namespace",example = "") + private String namespace; } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java index 1d02e954..b3c5c7c6 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java @@ -164,8 +164,6 @@ public class LazyNettyClientStateApplicationImpl implements LazyNettyClientState @Override public Result> findListGroupByClient(LazyNettyClientStateQueryListCommand lazyNettyClientStateQueryListCommand) { LazyNettyClientState lazyNettyClientState = NettyClientStateDTOAssembler.INSTANCE.toNettyClientState(lazyNettyClientStateQueryListCommand); - String serverId = serverNodeProperties.getNodeId(); - lazyNettyClientState.setNamespace(serverId); return lazyNettyClientStateRepository.findListGroupByClient(lazyNettyClientState).convert(nettyClientStates -> nettyClientStates.stream().map(NettyClientStateDTOAssembler.INSTANCE::fromNettyClientState).collect(Collectors.toList())); } @@ -183,8 +181,6 @@ public class LazyNettyClientStateApplicationImpl implements LazyNettyClientState @Override public Result> findPageGroupByClient(int size, int current, LazyNettyClientStateQueryListCommand lazyNettyClientStateQueryListCommand) { LazyNettyClientState lazyNettyClientState = NettyClientStateDTOAssembler.INSTANCE.toNettyClientState(lazyNettyClientStateQueryListCommand); - String serverId = serverNodeProperties.getNodeId(); - lazyNettyClientState.setNamespace(serverId); return lazyNettyClientStateRepository.findPageGroupByClient(size, current, lazyNettyClientState).convert(page -> page.convert(NettyClientStateDTOAssembler.INSTANCE::fromNettyClientState)); } @@ -200,12 +196,11 @@ public class LazyNettyClientStateApplicationImpl implements LazyNettyClientState @Override public Result remove(LazyNettyClientStateRemoveCommand lazyNettyClientStateRemoveCommand) { LazyNettyClientState lazyNettyClientState = NettyClientStateDTOAssembler.INSTANCE.toNettyClientState(lazyNettyClientStateRemoveCommand); - String serverId = serverNodeProperties.getNodeId(); - lazyNettyClientState.setNamespace(serverId); // 获取当前客户端通道 而后关闭 String clientId = lazyNettyClientStateRemoveCommand.getClientId(); + String namespace = lazyNettyClientStateRemoveCommand.getNamespace(); // 心跳关闭 - ChannelContext.clear(serverId, clientId); + ChannelContext.clear(namespace, clientId); // 关闭访客 List nettyTcpServerPermeateClientVisitorSocketList = NettyClientVisitorContext.getVisitorSockets(clientId); if (!ObjectUtils.isEmpty(nettyTcpServerPermeateClientVisitorSocketList)) { diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/entity/LazyNettyNamespaceDO.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/entity/LazyNettyNamespaceDO.java index 8a97946b..73aeb429 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/entity/LazyNettyNamespaceDO.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/entity/LazyNettyNamespaceDO.java @@ -66,7 +66,7 @@ public class LazyNettyNamespaceDO { * 命名空间 */ @Schema(description ="命名空间",name ="namespace",example = "") - @LazyTableField(name="namespace",comment="命名空间",columnType="varchar(255)") + @LazyTableFieldUnique(name="namespace",comment="命名空间",columnType="varchar(25)") private String namespace; /** diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/entity/LazyNettyServerRouteDO.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/entity/LazyNettyServerRouteDO.java index d321008c..8394f1bb 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/entity/LazyNettyServerRouteDO.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/entity/LazyNettyServerRouteDO.java @@ -82,7 +82,7 @@ public class LazyNettyServerRouteDO { * 命名空间 */ @Schema(description ="命名空间",name ="namespace",example = "") - @LazyTableFieldUnique(name="namespace",comment="命名空间",columnType="varchar(25)",defaultValue = "DEFAULT") + @LazyTableField(name="namespace",comment="命名空间",columnType="varchar(25)",defaultValue = "DEFAULT") private String namespace; } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/persistence/LazyNettyServerRouteRepositoryImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/persistence/LazyNettyServerRouteRepositoryImpl.java index a4956f0d..9cdebf28 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/persistence/LazyNettyServerRouteRepositoryImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/infrastructure/persistence/LazyNettyServerRouteRepositoryImpl.java @@ -53,6 +53,7 @@ public class LazyNettyServerRouteRepositoryImpl implements LazyNettyServerRout @Override public Result story(LazyNettyServerRoute lazyNettyServerRoute) { LazyNettyServerRouteDO lazyNettyServerRouteDO = LazyNettyServerRouteConverter.INSTANCE.fromLazyNettyServerRoute(lazyNettyServerRoute); + lazyLambdaStream.upsert(lazyNettyServerRouteDO); String routeIp = lazyNettyServerRoute.getRouteIp(); String routePort = NormalUsedString.ASTERISK;