From 42e8e5afeca492f487023da5833e7d4c0cc74e25 Mon Sep 17 00:00:00 2001 From: wujiawei <12345678> Date: Thu, 10 Oct 2024 22:20:09 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=20=20=E8=B0=83=E6=95=B4=E8=AE=A1=E5=88=92?= =?UTF-8?q?=E9=80=82=E9=85=8D=E5=A4=9A=E5=AE=A2=E6=88=B7=E7=AB=AF=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../heartbeat/common/ChannelContext.java | 111 +++++++++--------- ...andleClientConnectSuccessTypeAdvanced.java | 17 ++- ...annelConnectionSuccessfulTypeAdvanced.java | 6 +- ...verHandleReportDisconnectTypeAdvanced.java | 43 ++++--- ...HandleReportStagingClosedTypeAdvanced.java | 27 ++--- ...HandleReportStagingOpenedTypeAdvanced.java | 38 +++--- ...ttyServerPermeateClientVisitorHandler.java | 5 +- ...tPermeateClientMappingApplicationImpl.java | 35 +++--- ...tPermeateServerMappingApplicationImpl.java | 14 +-- ...zyNettyClientBlacklistApplicationImpl.java | 5 +- .../LazyNettyClientStateApplicationImpl.java | 5 +- .../controller/LazyChannelController.java | 29 ++--- 12 files changed, 156 insertions(+), 179 deletions(-) diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/ChannelContext.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/ChannelContext.java index 1bd917e..9950d9d 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/ChannelContext.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/ChannelContext.java @@ -1,7 +1,6 @@ package org.framework.lazy.cloud.network.heartbeat.common; import io.netty.channel.Channel; -import io.netty.channel.ChannelId; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -9,6 +8,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * 通道上下文 @@ -16,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j public class ChannelContext { - private final static ConcurrentHashMap + private final static ConcurrentHashMap/*通道*/> channelIdClientChannelDTOConcurrentHashMap = new ConcurrentHashMap<>(); /** @@ -26,17 +26,21 @@ public class ChannelContext { * @param clientId 客户端ID */ public static void push(Channel channel, String clientId) { - - ChannelId channelId = channel.id(); - ClientChannelImpl clientChannelImpl = new ClientChannelImpl(); - clientChannelImpl.setChannelId(channelId); - clientChannelImpl.setChannel(channel); - clientChannelImpl.setClientId(clientId.getBytes(StandardCharsets.UTF_8)); // 如果客户端已经存在 移除 if (channelIdClientChannelDTOConcurrentHashMap.containsKey(clientId)) { // clear(clientId); + List channels = channelIdClientChannelDTOConcurrentHashMap.get(clientId); + for (Channel existChannel : channels) { + if (existChannel != null && !existChannel.isActive()) { + existChannel.close(); + }else { + channels.remove(existChannel); + } + } + channels.add(channel); + }else { + channelIdClientChannelDTOConcurrentHashMap.putIfAbsent(clientId, List.of(channel)); } - channelIdClientChannelDTOConcurrentHashMap.put(clientId, clientChannelImpl); } @@ -47,14 +51,7 @@ public class ChannelContext { * @param clientId 客户端ID */ public static void push(Channel channel, byte[] clientId) { - - ChannelId channelId = channel.id(); - ClientChannelImpl clientChannelImpl = new ClientChannelImpl(); - clientChannelImpl.setChannelId(channelId); - clientChannelImpl.setChannel(channel); - clientChannelImpl.setClientId(clientId); - channelIdClientChannelDTOConcurrentHashMap.put(new String(clientId), clientChannelImpl); - + push(channel,new String(clientId, StandardCharsets.UTF_8)); } /** @@ -62,8 +59,16 @@ public class ChannelContext { * * @return 返回所有通道信息 */ - public static List get() { - return new ArrayList<>(channelIdClientChannelDTOConcurrentHashMap.values()); + public static ConcurrentMap/*通道*/> getChannels() { + return channelIdClientChannelDTOConcurrentHashMap; + } + /** + * 获取所有通道 + * + * @return 返回所有通道信息 + */ + public static List getClientIds() { + return new ArrayList<>(channelIdClientChannelDTOConcurrentHashMap.keySet().stream().toList()); } @@ -73,7 +78,7 @@ public class ChannelContext { * @param clientId 客户端ID * @return 通道信息 */ - public static ClientChannel get(byte[] clientId) { + public static List get(byte[] clientId) { if (channelIdClientChannelDTOConcurrentHashMap .containsKey(new String(clientId))) { return channelIdClientChannelDTOConcurrentHashMap @@ -91,10 +96,29 @@ public class ChannelContext { * @param clientId 客户端ID * @return 通道信息 */ - public static ChannelContext.ClientChannel get(String clientId) { + public static List get(String clientId) { return get(clientId.getBytes(StandardCharsets.UTF_8)); } + /** + * 根据通道ID获取通道信息 + * + * @param clientId 客户端ID + * @return 通道信息 + */ + public static Channel getLoadBalance(byte[] clientId) { + List channels = get(clientId); + return channels.get(0); + } + /** + * 根据通道ID获取通道信息 + * + * @param clientId 客户端ID + * @return 通道信息 + */ + public static Channel getLoadBalance(String clientId) { + return getLoadBalance(clientId.getBytes(StandardCharsets.UTF_8)); + } /** * 关闭通道 @@ -102,12 +126,13 @@ public class ChannelContext { * @param clientId 客户端ID */ public static void clear(String clientId) { - ClientChannel clientChannel = get(clientId); - if (clientChannel != null) { + List channels = get(clientId); + if (channels != null) { remove(clientId); - Channel channel = clientChannel.getChannel(); - if (channel != null && channel.isActive()) { - channel.close(); + for (Channel channel : channels) { + if (channel != null && channel.isActive()) { + channel.close(); + } } } else { // log warm @@ -122,7 +147,7 @@ public class ChannelContext { * @param clientId 客户端ID */ public static void remove(byte[] clientId) { - ClientChannel clientChannel = get(clientId); + List clientChannel = get(clientId); if (clientChannel != null) { channelIdClientChannelDTOConcurrentHashMap.remove(new String(clientId)); } else { @@ -137,7 +162,7 @@ public class ChannelContext { * @param clientId 客户端ID */ public static void remove(String clientId) { - ClientChannel clientChannel = get(clientId); + List clientChannel = get(clientId); if (clientChannel != null) { channelIdClientChannelDTOConcurrentHashMap.remove(clientId); } else { @@ -157,10 +182,6 @@ public class ChannelContext { */ byte[] getClientId(); - /** - * 通道ID - */ - ChannelId getChannelId(); /** * 通道 @@ -171,29 +192,3 @@ public class ChannelContext { } -/** - * 客户端通道信息 - */ -@Data -class ClientChannelImpl implements ChannelContext.ClientChannel { - /** - * 客户端ID - */ - private byte[] clientId; - /** - * 通道ID - */ - private ChannelId channelId; - /** - * 通道 - */ - private Channel channel; - - @Override - public String toString() { - return "ClientChannelImpl{" + - "clientId=" + new String(clientId) + - ", channelId=" + channelId.asLongText() + - '}'; - } -} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleClientConnectSuccessTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleClientConnectSuccessTypeAdvanced.java index 748c35d..a58fb6f 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleClientConnectSuccessTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleClientConnectSuccessTypeAdvanced.java @@ -68,26 +68,23 @@ public class ServerHandleClientConnectSuccessTypeAdvanced extends AbstractHandle if (!exists) { // 服务状态在线 lazyClientStatsChangeApplication.clientOnLine(clientId); - List clientChannels = ChannelContext.get(); // 当前在线客户端数量:{} - log.info("Current number of online clients: {}", clientChannels.size()); + log.info("Current number of online clients: {}", ChannelContext.getClientIds().size()); // 所有的客户端ID - List clientIdList = clientChannels - .stream() - .map(activeClientChannel -> new String(activeClientChannel.getClientId())) - .toList(); + List clientIdList = ChannelContext.getClientIds(); // TODO 多副本本地channel 无法共享问题 // 通知所有客户端有人上线了 - for (ChannelContext.ClientChannel clientChannel : clientChannels) { - Channel channel = clientChannel.getChannel(); + ChannelContext.getChannels().forEach((existClientId, channels) -> { NettyProxyMsg nettyMsg = new NettyProxyMsg(); nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_CONNECTION_SUCCESS_NOTIFICATION); nettyMsg.setData((JSON.toJSONString(clientIdList) .getBytes(StandardCharsets.UTF_8))); // 发送所有客户端ID - channel.writeAndFlush(nettyMsg); - } + for (Channel channel : channels) { + channel.writeAndFlush(nettyMsg); + } + }); // 开始开启客户端:【{}】,端口映射 log.info("Start opening client: [{}], port mapping", clientId); // 创建访问者(内网穿透连接创建) diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportClientTransferClientPermeateChannelConnectionSuccessfulTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportClientTransferClientPermeateChannelConnectionSuccessfulTypeAdvanced.java index a9137c6..2bb414e 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportClientTransferClientPermeateChannelConnectionSuccessfulTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportClientTransferClientPermeateChannelConnectionSuccessfulTypeAdvanced.java @@ -7,6 +7,8 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.Abstrac import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; import org.springframework.stereotype.Component; +import java.util.List; + /** * 上报 客户端渗透客户端数据传输通道连接成功 */ @@ -32,7 +34,7 @@ public class ServerHandleReportClientTransferClientPermeateChannelConnectionSucc ChannelAttributeKeyUtils.buildVisitorId(transferChannel, msgVisitorId); // 绑定访客通道 NettyTransferChannelContext.pushVisitor(transferChannel,msgVisitorId); - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); + Channel clientChannel = ChannelContext.getLoadBalance(clientId); NettyProxyMsg clientConnectTagetNettyProxyMsg = new NettyProxyMsg(); clientConnectTagetNettyProxyMsg.setVisitorId(msgVisitorId); @@ -42,7 +44,7 @@ public class ServerHandleReportClientTransferClientPermeateChannelConnectionSucc clientConnectTagetNettyProxyMsg.setClientId(clientId); clientConnectTagetNettyProxyMsg.setType(MessageType.DISTRIBUTE_CLIENT_TRANSFER_CLIENT_PERMEATE_CHANNEL_CONNECTION_SUCCESSFUL); if (clientChannel != null) { - clientChannel.getChannel().writeAndFlush(clientConnectTagetNettyProxyMsg); + clientChannel.writeAndFlush(clientConnectTagetNettyProxyMsg); }else { log.error("can not find the client:【】 channel",clientId); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java index d7926a5..7f15bde 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java @@ -14,6 +14,7 @@ import org.framework.lazy.cloud.network.heartbeat.server.standalone.application. import org.springframework.stereotype.Component; import org.wu.framework.core.utils.ObjectUtils; +import java.nio.charset.StandardCharsets; import java.util.List; @@ -39,36 +40,34 @@ public class ServerHandleReportDisconnectTypeAdvanced extends AbstractHandleRepo @Override public void doHandler(Channel deathChannel, NettyProxyMsg msg) { // 关闭连接通知 - byte[] clientIdByte = msg.getClientId(); - log.warn("close client :{} channel", new String(clientIdByte)); - ChannelId deathChannelId = deathChannel.id(); - ChannelContext.ClientChannel deathClientChannelDTO = ChannelContext.get(clientIdByte); + byte[] clientId = msg.getClientId(); + log.warn("close client :{} channel", new String(clientId)); + Channel deathClientChannelDTO = ChannelContext.getLoadBalance(clientId); if (deathClientChannelDTO != null) { - byte[] clientId = deathClientChannelDTO.getClientId(); // 服务状态离线 String tenantId = new String(clientId); lazyClientStatsChangeApplication.clientOffLine(tenantId); - ChannelContext.remove(clientIdByte); - List clientChannels = ChannelContext.get(); + ChannelContext.remove(clientId); // 通知其他客户端 channelId 关闭了 - for (ChannelContext.ClientChannel clientChannel : clientChannels) { - Channel channel = clientChannel.getChannel(); + ChannelContext.getChannels().forEach((existClientId, channels) -> { + for (Channel channel : channels) { + // 离线通知 + NettyProxyMsg nettyMsg = new NettyProxyMsg(); + nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION); + nettyMsg.setClientId(clientId); + nettyMsg.setData(clientId); + channel.writeAndFlush(nettyMsg); + // 暂存通知 + NettyProxyMsg stagingNettyProxyMsg = new NettyProxyMsg(); + stagingNettyProxyMsg.setType(MessageType.DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION); + stagingNettyProxyMsg.setData(clientId); + stagingNettyProxyMsg.setClientId(clientId); + channel.writeAndFlush(stagingNettyProxyMsg); + } - // 离线通知 - NettyProxyMsg nettyMsg = new NettyProxyMsg(); - nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION); - nettyMsg.setClientId(clientId); - nettyMsg.setData(clientId); - channel.writeAndFlush(nettyMsg); - // 暂存通知 - NettyProxyMsg stagingNettyProxyMsg = new NettyProxyMsg(); - stagingNettyProxyMsg.setType(MessageType.DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION); - stagingNettyProxyMsg.setData(clientId); - stagingNettyProxyMsg.setClientId(clientId); - channel.writeAndFlush(stagingNettyProxyMsg); - } + }); // 关闭绑定的访客端口 List visitorSockets = NettyClientVisitorContext.getVisitorSockets(new String(clientId)); if (!ObjectUtils.isEmpty(visitorSockets)) { diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportStagingClosedTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportStagingClosedTypeAdvanced.java index 67f981d..be15ebd 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportStagingClosedTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportStagingClosedTypeAdvanced.java @@ -35,24 +35,23 @@ public class ServerHandleReportStagingClosedTypeAdvanced extends AbstractHandleR protected void doHandler(Channel stagingClosedChannel, NettyProxyMsg msg) { byte[] clientIdBytes = msg.getClientId(); // 获取所有通道 - List clientChannels = ChannelContext.get(); - ChannelId stagingClosedChannelId = stagingClosedChannel.id(); - ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(clientIdBytes); + List stagingOpenedClientChannel = ChannelContext.get(clientIdBytes); if (stagingOpenedClientChannel != null) { String clientId = new String(clientIdBytes); // 存储当前客户端暂存关闭 lazyClientStatsChangeApplication.stagingClosed(clientId); - for (ChannelContext.ClientChannel clientChannel : clientChannels) { - // 告诉他们 当前参数这个通道 暂存关闭了 - Channel channel = clientChannel.getChannel(); - NettyProxyMsg nettyMsg = new NettyProxyMsg(); - nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_STAGING_CLOSED_NOTIFICATION); - nettyMsg.setData((clientId - .getBytes(StandardCharsets.UTF_8))); - nettyMsg.setClientId((clientId - .getBytes(StandardCharsets.UTF_8))); - channel.writeAndFlush(nettyMsg); - } + ChannelContext.getChannels().forEach((existClientId, channels) -> { + for (Channel channel : channels) { + // 告诉他们 当前参数这个通道 暂存关闭了 + NettyProxyMsg nettyMsg = new NettyProxyMsg(); + nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_STAGING_CLOSED_NOTIFICATION); + nettyMsg.setData((clientId + .getBytes(StandardCharsets.UTF_8))); + nettyMsg.setClientId((clientId + .getBytes(StandardCharsets.UTF_8))); + channel.writeAndFlush(nettyMsg); + } + }); } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportStagingOpenedTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportStagingOpenedTypeAdvanced.java index ef1d171..4ba04d1 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportStagingOpenedTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportStagingOpenedTypeAdvanced.java @@ -1,14 +1,13 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.advanced; import io.netty.channel.Channel; -import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyClientStatsChangeApplication; -import org.springframework.stereotype.Component; import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.MessageType; import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportStagingOpenedTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyClientStatsChangeApplication; +import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.List; @@ -36,24 +35,23 @@ public class ServerHandleReportStagingOpenedTypeAdvanced extends AbstractHandleR protected void doHandler(Channel stagingOpenedChannel, NettyProxyMsg msg) { // 获取所有通道 byte[] clientIdBytes = msg.getClientId(); - List clientChannels = ChannelContext.get(); - ChannelId stagingOpenedChannelId = stagingOpenedChannel.id(); - ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(clientIdBytes); + List stagingOpenedClientChannel = ChannelContext.get(clientIdBytes); if (stagingOpenedClientChannel != null) { - for (ChannelContext.ClientChannel clientChannel : clientChannels) { - // 存储当前客户端暂存关闭 - String clientId = new String(clientIdBytes); - lazyClientStatsChangeApplication.stagingOpened(clientId); - // 告诉他们 当前参数这个通道 暂存开启了 - Channel channel = clientChannel.getChannel(); - NettyProxyMsg nettyMsg = new NettyProxyMsg(); - nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION); - nettyMsg.setData((clientId - .getBytes(StandardCharsets.UTF_8))); - nettyMsg.setClientId((clientId - .getBytes(StandardCharsets.UTF_8))); - channel.writeAndFlush(nettyMsg); - } + ChannelContext.getChannels().forEach((existClientId, channels) -> { + for (Channel channel : channels) { + // 存储当前客户端暂存关闭 + String clientId = new String(clientIdBytes); + lazyClientStatsChangeApplication.stagingOpened(clientId); + // 告诉他们 当前参数这个通道 暂存开启了 + NettyProxyMsg nettyMsg = new NettyProxyMsg(); + nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION); + nettyMsg.setData((clientId + .getBytes(StandardCharsets.UTF_8))); + nettyMsg.setClientId((clientId + .getBytes(StandardCharsets.UTF_8))); + channel.writeAndFlush(nettyMsg); + } + }); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/NettyServerPermeateClientVisitorHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/NettyServerPermeateClientVisitorHandler.java index e49b9f4..cb5bf2a 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/NettyServerPermeateClientVisitorHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/NettyServerPermeateClientVisitorHandler.java @@ -60,11 +60,10 @@ public class NettyServerPermeateClientVisitorHandler extends SimpleChannelInboun // Channel transferChannel = nettyChannelPool.availableChannel(visitorId); // if (transferChannel == null) { // 客户端心跳通道 - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); + Channel clientChannel = ChannelContext.getLoadBalance(clientId); if (clientChannel != null) { log.info("通过客户端:{},获取通道而后创建连接", clientId); - Channel channel = clientChannel.getChannel(); - channel.writeAndFlush(nettyProxyMsg); + clientChannel.writeAndFlush(nettyProxyMsg); } else { log.error("客户端:【{}】已经下线,无法通过客户端ID获取客户端通道", clientId); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkClientPermeateClientMappingApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkClientPermeateClientMappingApplicationImpl.java index 3f1abad..eecbce1 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkClientPermeateClientMappingApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkClientPermeateClientMappingApplicationImpl.java @@ -1,27 +1,23 @@ package org.framework.lazy.cloud.network.heartbeat.server.standalone.application.impl; import io.netty.channel.Channel; +import jakarta.annotation.Resource; import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.MessageType; import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg; -import org.wu.framework.database.lazy.web.plus.stereotype.LazyApplication; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyInternalNetworkClientPermeateClientMappingApplication; -import org.wu.framework.web.response.Result; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMapping; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMappingRemoveCommand; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMappingStoryCommand; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMappingUpdateCommand; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMappingQueryListCommand; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMappingQueryOneCommand; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.assembler.LazyInternalNetworkClientPermeateClientMappingDTOAssembler; +import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.permeate.client.mapping.*; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.dto.LazyInternalNetworkClientPermeateClientMappingDTO; +import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMapping; +import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMappingRepository; +import org.wu.framework.database.lazy.web.plus.stereotype.LazyApplication; +import org.wu.framework.lazy.orm.database.lambda.domain.LazyPage; +import org.wu.framework.web.response.Result; import java.nio.charset.StandardCharsets; -import java.util.stream.Collectors; -import jakarta.annotation.Resource; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMappingRepository; import java.util.List; -import org.wu.framework.lazy.orm.database.lambda.domain.LazyPage; +import java.util.stream.Collectors; /** * describe 客户端渗透客户端映射 * @@ -160,9 +156,9 @@ public class LazyInternalNetworkClientPermeateClientMappingApplicationImpl imple public void closeClientPermeateClientSocketMessage(LazyInternalNetworkClientPermeateClientMapping lazyInternalNetworkClientPermeateClientMapping){ // 发送客户端初始化渗透 String clientId = lazyInternalNetworkClientPermeateClientMapping.getFromClientId(); - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); - if(clientChannel!=null && clientChannel.getChannel()!=null&&clientChannel.getChannel().isActive()){ - Channel channel = clientChannel.getChannel(); + Channel clientChannel = ChannelContext.getLoadBalance(clientId); + if (clientChannel != null && clientChannel.isActive()) { + String permeateTargetIp = lazyInternalNetworkClientPermeateClientMapping.getPermeateTargetIp(); Integer permeateTargetPort = lazyInternalNetworkClientPermeateClientMapping.getPermeateTargetPort(); Integer visitorPort = lazyInternalNetworkClientPermeateClientMapping.getVisitorPort(); @@ -171,7 +167,7 @@ public class LazyInternalNetworkClientPermeateClientMappingApplicationImpl imple nettyMsg.setClientTargetIp(permeateTargetIp); nettyMsg.setClientTargetPort(permeateTargetPort); nettyMsg.setVisitorPort(visitorPort); - channel.writeAndFlush(nettyMsg); + clientChannel.writeAndFlush(nettyMsg); } } @@ -184,9 +180,8 @@ public class LazyInternalNetworkClientPermeateClientMappingApplicationImpl imple // 发送客户端初始化渗透 String fromClientId = lazyInternalNetworkClientPermeateClientMapping.getFromClientId(); String toClientId = lazyInternalNetworkClientPermeateClientMapping.getToClientId(); - ChannelContext.ClientChannel clientChannel = ChannelContext.get(fromClientId); - if(clientChannel!=null && clientChannel.getChannel()!=null&&clientChannel.getChannel().isActive()){ - Channel channel = clientChannel.getChannel(); + Channel clientChannel = ChannelContext.getLoadBalance(fromClientId); + if(clientChannel!=null &&clientChannel.isActive()){ String permeateTargetIp = lazyInternalNetworkClientPermeateClientMapping.getPermeateTargetIp(); Integer permeateTargetPort = lazyInternalNetworkClientPermeateClientMapping.getPermeateTargetPort(); Integer visitorPort = lazyInternalNetworkClientPermeateClientMapping.getVisitorPort(); @@ -197,7 +192,7 @@ public class LazyInternalNetworkClientPermeateClientMappingApplicationImpl imple nettyMsg.setVisitorPort(visitorPort); nettyMsg.setClientId(fromClientId); nettyMsg.setData(toClientId.getBytes(StandardCharsets.UTF_8)); - channel.writeAndFlush(nettyMsg); + clientChannel.writeAndFlush(nettyMsg); } } } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkClientPermeateServerMappingApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkClientPermeateServerMappingApplicationImpl.java index 18e44b0..13aa58c 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkClientPermeateServerMappingApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkClientPermeateServerMappingApplicationImpl.java @@ -172,9 +172,8 @@ public class LazyInternalNetworkClientPermeateServerMappingApplicationImpl imple public void closeClientPermeateServerSocketMessage(LazyInternalNetworkClientPermeateServerMapping lazyInternalNetworkClientPermeateServerMapping){ // 发送客户端初始化渗透 String clientId = lazyInternalNetworkClientPermeateServerMapping.getClientId(); - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); - if(clientChannel!=null && clientChannel.getChannel()!=null&&clientChannel.getChannel().isActive()){ - Channel channel = clientChannel.getChannel(); + Channel clientChannel = ChannelContext.getLoadBalance(clientId); + if(clientChannel!=null &&clientChannel.isActive()){ String permeateTargetIp = lazyInternalNetworkClientPermeateServerMapping.getPermeateTargetIp(); Integer permeateTargetPort = lazyInternalNetworkClientPermeateServerMapping.getPermeateTargetPort(); Integer visitorPort = lazyInternalNetworkClientPermeateServerMapping.getVisitorPort(); @@ -183,7 +182,7 @@ public class LazyInternalNetworkClientPermeateServerMappingApplicationImpl imple nettyMsg.setClientTargetIp(permeateTargetIp); nettyMsg.setClientTargetPort(permeateTargetPort); nettyMsg.setVisitorPort(visitorPort); - channel.writeAndFlush(nettyMsg); + clientChannel.writeAndFlush(nettyMsg); } } @@ -194,9 +193,8 @@ public class LazyInternalNetworkClientPermeateServerMappingApplicationImpl imple public void createClientPermeateServerSocketMessage(LazyInternalNetworkClientPermeateServerMapping lazyInternalNetworkClientPermeateServerMapping){ // 发送客户端初始化渗透 String clientId = lazyInternalNetworkClientPermeateServerMapping.getClientId(); - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); - if(clientChannel!=null && clientChannel.getChannel()!=null&&clientChannel.getChannel().isActive()){ - Channel channel = clientChannel.getChannel(); + Channel clientChannel = ChannelContext.getLoadBalance(clientId); + if(clientChannel!=null &&clientChannel.isActive()){ String permeateTargetIp = lazyInternalNetworkClientPermeateServerMapping.getPermeateTargetIp(); Integer permeateTargetPort = lazyInternalNetworkClientPermeateServerMapping.getPermeateTargetPort(); Integer visitorPort = lazyInternalNetworkClientPermeateServerMapping.getVisitorPort(); @@ -205,7 +203,7 @@ public class LazyInternalNetworkClientPermeateServerMappingApplicationImpl imple nettyMsg.setClientTargetIp(permeateTargetIp); nettyMsg.setClientTargetPort(permeateTargetPort); nettyMsg.setVisitorPort(visitorPort); - channel.writeAndFlush(nettyMsg); + clientChannel.writeAndFlush(nettyMsg); } } } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientBlacklistApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientBlacklistApplicationImpl.java index 713b6cf..9cf87c2 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientBlacklistApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientBlacklistApplicationImpl.java @@ -54,15 +54,14 @@ public class LazyNettyClientBlacklistApplicationImpl implements LazyNettyClientB Result story = lazyNettyClientBlacklistRepository.story(lazyNettyClientBlacklist); // 获取客户端channel 发送下下通知 String clientId = lazyNettyClientBlacklist.getClientId(); - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId.getBytes(StandardCharsets.UTF_8)); + Channel clientChannel = ChannelContext.getLoadBalance(clientId.getBytes(StandardCharsets.UTF_8)); if (null != clientChannel) { // 模拟客户端发送下线通知 - Channel channel = clientChannel.getChannel(); NettyProxyMsg nettyMsg = new NettyProxyMsg(); nettyMsg.setClientId(clientId); nettyMsg.setType(MessageType.REPORT_CLIENT_DISCONNECTION); - channel.writeAndFlush(nettyMsg); + clientChannel.writeAndFlush(nettyMsg); } return story; diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java index 43e9cfc..4de1fa2 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java @@ -189,17 +189,16 @@ public class LazyNettyClientStateApplicationImpl implements LazyNettyClientState // 获取客户端ID String clientId = lazyNettyClientMessageCommand.getClientId(); String message = lazyNettyClientMessageCommand.getMessage(); - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); + Channel clientChannel = ChannelContext.getLoadBalance(clientId); if (clientChannel == null) { return ResultFactory.errorOf("客户端:" + clientId + "不存在"); } // 发送消息到客户端 - Channel channel = clientChannel.getChannel(); NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); nettyProxyMsg.setClientId("服务端"); nettyProxyMsg.setData(message.getBytes(StandardCharsets.UTF_8)); nettyProxyMsg.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_MESSAGE); - channel.writeAndFlush(nettyProxyMsg); + clientChannel.writeAndFlush(nettyProxyMsg); return ResultFactory.successOf(); } } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/controller/LazyChannelController.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/controller/LazyChannelController.java index 82ce6a2..20b18cf 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/controller/LazyChannelController.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/controller/LazyChannelController.java @@ -31,26 +31,25 @@ public class LazyChannelController { @Operation(summary = "服务端访问客户端") @GetMapping("/{clientId}") public void fetchClientId(@PathVariable("clientId") String clientId) { - log.info("clientId:" + clientId); + log.info("clientId:{}", clientId); // 获取客户端channel - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId.getBytes(StandardCharsets.UTF_8)); + Channel clientChannel = ChannelContext.getLoadBalance(clientId.getBytes(StandardCharsets.UTF_8)); if (clientChannel == null) { if (log.isDebugEnabled()) { - for (ChannelContext.ClientChannel exisitClientChannel : ChannelContext.get()) { - log.debug("当前存在的通道:{}", new String(exisitClientChannel.getClientId())); - } + ChannelContext.getChannels().forEach((existClientId, channels) -> { + log.debug("当前存在的通道:{}", existClientId); + }); } return; } // 发送消息 - Channel channel = clientChannel.getChannel(); NettyProxyMsg nettyMsg = new NettyProxyMsg(); nettyMsg.setClientId(clientId); // 下发 客户端消息 nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_TRANSFER); - channel.writeAndFlush(nettyMsg); + clientChannel.writeAndFlush(nettyMsg); } /** @@ -59,8 +58,7 @@ public class LazyChannelController { @Operation(summary = "获取当前服务端所有通道") @GetMapping("/findClientIdList") public Result> findClientIdList() { - List clientChannels = ChannelContext.get(); - return ResultFactory.successOf(clientChannels.stream().map(clientChannel -> new String(clientChannel.getClientId())).toList()); + return ResultFactory.successOf( ChannelContext.getClientIds()); } /** @@ -87,23 +85,22 @@ public class LazyChannelController { "Accept-Encoding: gzip, deflate, br\n" + "Accept-Language: zh-CN,zh;q=0.9\n" + "Cookie: XXL_JOB_LOGIN_IDENTITY=7b226964223a312c22757365726e616d65223a2261646d696e222c2270617373776f7264223a226531306164633339343962613539616262653536653035376632306638383365222c22726f6c65223a312c227065726d697373696f6e223a6e756c6c7d; Hm_lvt_173e771eef816c412396d2cb4fe2d632=1703040917\n"; - - List clientChannels = ChannelContext.get(); String clientTargetIp = internalNetworkPenetrationRealClient.getClientTargetIp(); Integer clientTargetPort = internalNetworkPenetrationRealClient.getClientTargetPort(); Integer visitorPort = internalNetworkPenetrationRealClient.getVisitorPort(); - for (ChannelContext.ClientChannel clientChannel : clientChannels) { - Channel channel = clientChannel.getChannel(); + ChannelContext.getChannels().forEach((clientId, channels) -> { NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); // 下发 客户端消息 nettyProxyMsg.setType(MessageType.DISTRIBUTE_CLIENT_TRANSFER); - nettyProxyMsg.setClientId(clientChannel.getClientId()); + nettyProxyMsg.setClientId(clientId); nettyProxyMsg.setVisitorPort(visitorPort); nettyProxyMsg.setClientTargetIp(clientTargetIp); nettyProxyMsg.setClientTargetPort(clientTargetPort); nettyProxyMsg.setData(data.getBytes(StandardCharsets.UTF_8)); - channel.writeAndFlush(nettyProxyMsg); - } + for (Channel channel : channels) { + channel.writeAndFlush(nettyProxyMsg); + } + }); return ResultFactory.successOf(); }