diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/ClientDistributeSingleClientRealAutoReadConnectTypeAdvanced.java b/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/ClientDistributeSingleClientRealAutoReadConnectTypeAdvanced.java index 0e20cc6..6b5ecec 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/ClientDistributeSingleClientRealAutoReadConnectTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/ClientDistributeSingleClientRealAutoReadConnectTypeAdvanced.java @@ -22,7 +22,7 @@ public class ClientDistributeSingleClientRealAutoReadConnectTypeAdvanced extends // 获取访客ID byte[] visitorId = nettyProxyMsg.getVisitorId(); // 获取访客对应的真实代理通道 - Channel realChannel = NettyRealIdContext.getVisitor(visitorId); + Channel realChannel = NettyRealIdContext.getReal(visitorId); if (realChannel != null) { realChannel.config().setOption(ChannelOption.AUTO_READ, true); } diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/ClientReportChannelTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/ClientReportChannelTransferTypeAdvanced.java index 0a10b88..bbbdbae 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/ClientReportChannelTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/ClientReportChannelTransferTypeAdvanced.java @@ -41,7 +41,7 @@ public class ClientReportChannelTransferTypeAdvanced extends AbstractDistributeC byte[] clientTargetPort = nettyProxyMsg.getClientTargetPort(); byte[] visitorId = nettyProxyMsg.getVisitorId(); // 真实服务通道 - Channel realChannel = NettyRealIdContext.getVisitor(new String(visitorId)); + Channel realChannel = NettyRealIdContext.getReal(new String(visitorId)); if (realChannel == null) { log.error("无法获取访客:{} 真实服务", new String(visitorId)); return; diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/DistributeConnectSuccessNotificationTypeAdvanced.java b/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/DistributeConnectSuccessNotificationTypeAdvanced.java index 24e5a16..8498338 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/DistributeConnectSuccessNotificationTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/advanced/DistributeConnectSuccessNotificationTypeAdvanced.java @@ -43,7 +43,7 @@ public class DistributeConnectSuccessNotificationTypeAdvanced extends AbstractDi String clientId = nettyServerProperties.getClientId(); NettyProxyMsg nettyMsg = new NettyProxyMsg(); nettyMsg.setClientId(clientId.getBytes(StandardCharsets.UTF_8)); - ChannelContext.push(channel, nettyMsg); + ChannelContext.push(channel, clientId); ChannelAttributeKeyUtils.buildClientId(channel,clientId); // 存储其他客户端状态 List clientIdList = JSONObject.parseArray(new String(msg.getData()), String.class); diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/socket/NettyClientRealSocket.java b/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/socket/NettyClientRealSocket.java index 9506871..f9fbaa7 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/socket/NettyClientRealSocket.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/wu/framework/lazy/cloud/heartbeat/client/netty/socket/NettyClientRealSocket.java @@ -3,7 +3,6 @@ package wu.framework.lazy.cloud.heartbeat.client.netty.socket; import wu.framework.lazy.cloud.heartbeat.client.netty.config.NettyServerProperties; import wu.framework.lazy.cloud.heartbeat.common.*; -import wu.framework.lazy.cloud.heartbeat.common.*; import wu.framework.lazy.cloud.heartbeat.common.adapter.ChannelTypeAdapter; import wu.framework.lazy.cloud.heartbeat.common.advanced.ChannelTypeAdvanced; import wu.framework.lazy.cloud.heartbeat.common.utils.ChannelAttributeKeyUtils; @@ -64,7 +63,7 @@ public class NettyClientRealSocket { String visitorId = internalNetworkPenetrationRealClient.getVisitorId(); log.info("访客通过 客户端:【{}】,绑定本地服务,IP:{},端口:{} 新建通道成功", clientId, clientTargetIp1, clientTargetPort1); // 客户端真实通道 - NettyRealIdContext.pushVisitor(realChannel, visitorId); + NettyRealIdContext.pushReal(realChannel, visitorId); // 绑定访客ID到当前真实通道属性 ChannelAttributeKeyUtils.buildVisitorId(realChannel, visitorId); ChannelAttributeKeyUtils.buildClientId(realChannel, clientId); @@ -157,7 +156,7 @@ public class NettyClientRealSocket { ChannelAttributeKeyUtils.buildVisitorId(channel, visitorId); ChannelAttributeKeyUtils.buildClientId(channel, clientId); // 客户端真实通道自动读写打开 - Channel visitor = NettyRealIdContext.getVisitor(visitorId); + Channel visitor = NettyRealIdContext.getReal(visitorId); visitor.config().setOption(ChannelOption.AUTO_READ, true); diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/ChannelContext.java b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/ChannelContext.java index ec6da9a..4897a50 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/ChannelContext.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/ChannelContext.java @@ -17,28 +17,29 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j public class ChannelContext { - private final static ConcurrentHashMap + private final static ConcurrentHashMap channelIdClientChannelDTOConcurrentHashMap = new ConcurrentHashMap<>(); /** * 新增通道 * * @param channel 通道 - * @param nettyMsg 通道中的信息 + * @param clientId 客户端ID */ - public static void push(Channel channel, NettyProxyMsg nettyMsg) { + public static void push(Channel channel, String clientId) { ChannelId channelId = channel.id(); - byte[] clientId = nettyMsg.getClientId(); - ClientChannelImpl clientChannelImpl = new ClientChannelImpl(); clientChannelImpl.setChannelId(channelId); clientChannelImpl.setChannel(channel); - clientChannelImpl.setClientId(clientId); - channelIdClientChannelDTOConcurrentHashMap.put(channelId, clientChannelImpl); + clientChannelImpl.setClientId(clientId.getBytes(StandardCharsets.UTF_8)); + // 如果客户端已经存在 移除 + if(channelIdClientChannelDTOConcurrentHashMap.containsKey(clientId)){ + clear(clientId); + } + channelIdClientChannelDTOConcurrentHashMap.put(clientId, clientChannelImpl); } - /** * 新增通道 * @@ -52,7 +53,7 @@ public class ChannelContext { clientChannelImpl.setChannelId(channelId); clientChannelImpl.setChannel(channel); clientChannelImpl.setClientId(clientId); - channelIdClientChannelDTOConcurrentHashMap.put(channelId, clientChannelImpl); + channelIdClientChannelDTOConcurrentHashMap.put(new String(clientId), clientChannelImpl); } @@ -65,20 +66,6 @@ public class ChannelContext { return new ArrayList<>(channelIdClientChannelDTOConcurrentHashMap.values()); } - /** - * 根据通道ID获取通道信息 - * - * @param channelId 通道ID - * @return 通道信息 - */ - public static ClientChannel get(ChannelId channelId) { - if (channelIdClientChannelDTOConcurrentHashMap.containsKey(channelId)) { - return channelIdClientChannelDTOConcurrentHashMap.get(channelId); - } else { - log.error("无法通过通道ID[" + channelId + "]获取通道信息"); - return null; - } - } /** * 根据通道ID获取通道信息 @@ -88,13 +75,9 @@ public class ChannelContext { */ public static ClientChannel get(byte[] clientId) { if (channelIdClientChannelDTOConcurrentHashMap - .values().stream() - .anyMatch(clientChannelImpl -> new String(clientChannelImpl.getClientId()).equals(new String(clientId)))) { + .containsKey(new String(clientId))) { return channelIdClientChannelDTOConcurrentHashMap - .values() - .stream() - .filter(clientChannelImpl -> new String(clientChannelImpl.getClientId()).equals(new String(clientId))) - .findFirst().get(); + .get(new String(clientId)); } else { log.error("无法通过客户端ID[" + new String(clientId) + "]获取通道信息"); return null; @@ -111,19 +94,6 @@ public class ChannelContext { return get(clientId.getBytes(StandardCharsets.UTF_8)); } - /** - * 通过客户端通道ID移除客户端通道 - * - * @param channelId 客户端通道ID - */ - public static void remove(ChannelId channelId) { - if (channelIdClientChannelDTOConcurrentHashMap.containsKey(channelId)) { - channelIdClientChannelDTOConcurrentHashMap.remove(channelId); - } else { - // log warm - log.warn("无法通过客户端通道ID:[{}]移除客户端", channelId); - } - } /** * 关闭通道 @@ -152,7 +122,7 @@ public class ChannelContext { public static void remove(byte[] clientId) { ClientChannel clientChannel = get(clientId); if (clientChannel != null) { - channelIdClientChannelDTOConcurrentHashMap.remove(clientChannel.getChannelId()); + channelIdClientChannelDTOConcurrentHashMap.remove(new String(clientId)); } else { // log warm log.warn("无法通过客户ID:[{}]移除客户端", new String(clientId)); @@ -167,7 +137,7 @@ public class ChannelContext { public static void remove(String clientId) { ClientChannel clientChannel = get(clientId); if (clientChannel != null) { - channelIdClientChannelDTOConcurrentHashMap.remove(clientChannel.getChannelId()); + channelIdClientChannelDTOConcurrentHashMap.remove(clientId); } else { // log warm log.warn("无法通过客户ID:[{}]移除客户端", clientId); diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyRealIdContext.java b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyRealIdContext.java index 3737b52..b61ac3f 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyRealIdContext.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyRealIdContext.java @@ -5,7 +5,7 @@ import io.netty.channel.Channel; import java.util.concurrent.ConcurrentHashMap; /** - * 真实通道对应上下文 + * 真实通道对应上下文 客户端、服务端真实代理通道 */ public class NettyRealIdContext { @@ -13,12 +13,12 @@ public class NettyRealIdContext { /** - * 添加访客 + * 添加真实通道 * * @param visitorId 访客id - * @param visitor 访客 + * @param visitor 访客真实通道 */ - public static void pushVisitor(T visitor, String visitorId) { + public static void pushReal(T visitor, String visitorId) { REAL.put(visitorId, visitor); } @@ -30,7 +30,7 @@ public class NettyRealIdContext { * @param 访客范型 * @return 访客 */ - public static T getVisitor(String visitorId) { + public static T getReal(String visitorId) { return (T) REAL.get(visitorId); } @@ -41,8 +41,8 @@ public class NettyRealIdContext { * @param 访客范型 * @return 访客 */ - public static T getVisitor(byte[] visitorId) { - return getVisitor(new String(visitorId)); + public static T getReal(byte[] visitorId) { + return getReal(new String(visitorId)); } @@ -52,7 +52,7 @@ public class NettyRealIdContext { * @param visitorId 访客ID */ public static void clear(String visitorId) { - Channel visitor = getVisitor(visitorId); + Channel visitor = getReal(visitorId); if (visitor != null) { REAL.remove(visitorId); visitor.close(); diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorIdContext.java b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorIdContext.java index fe08142..3e2dcfe 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorIdContext.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorIdContext.java @@ -5,7 +5,7 @@ import io.netty.channel.Channel; import java.util.concurrent.ConcurrentHashMap; /** - * 访客ID对应上下文 + * 访客通信通道上下文(服务端、客户端 通信) */ @Deprecated public class NettyVisitorIdContext { diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorContext.java b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorPortContext.java similarity index 91% rename from wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorContext.java rename to wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorPortContext.java index 8283bf7..2194f12 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorContext.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyVisitorPortContext.java @@ -1,13 +1,11 @@ package wu.framework.lazy.cloud.heartbeat.common; -import io.netty.channel.Channel; - import java.util.concurrent.ConcurrentHashMap; /** * 访客端口对应上下文 */ -public class NettyVisitorContext { +public class NettyVisitorPortContext { protected static final ConcurrentHashMap VISITOR_PORT = new ConcurrentHashMap<>(); diff --git a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-client-sample/src/main/resources/application-dev.yml b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-client-sample/src/main/resources/application-dev.yml index fba541b..696d6f4 100644 --- a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-client-sample/src/main/resources/application-dev.yml +++ b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-client-sample/src/main/resources/application-dev.yml @@ -1,12 +1,12 @@ spring: lazy: netty: - inet-host: 127.0.0.1 - inet-port: 7001 - inet-path: middleground-on-cloud-heartbeat-server -# inet-host: 124.222.48.62 # 服务端地址 -# inet-port: 30676 #服务端端口 +# inet-host: 127.0.0.1 +# inet-port: 7001 # inet-path: middleground-on-cloud-heartbeat-server + inet-host: 124.222.48.62 # 服务端地址 + inet-port: 30676 #服务端端口 + inet-path: middleground-on-cloud-heartbeat-server client-id: local # 客户端ID data: redis: diff --git a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/README.md b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/README.md index c21cf7a..f8e6f57 100644 --- a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/README.md +++ b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/README.md @@ -6,8 +6,10 @@ #docker login --username=1207537021@qq.com registry.cn-hangzhou.aliyuncs.com -docker build -t registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:server-jdk17-master . -docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:server-jdk17-master +mvn clean install + +docker build -t registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT . +docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT ``` @@ -24,7 +26,7 @@ mvn native:build ``` ```RUN -docker run -d -it -p 18080:18080 --name wu-lazy-cloud-heartbeat-server registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:server-jdk17-master +docker run -d -it -p 18080:18080 --name wu-lazy-cloud-heartbeat-server registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT http://127.0.0.1:18080/swagger-ui/index.html diff --git a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/k8s.yaml b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/k8s.yaml new file mode 100644 index 0000000..104ac44 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/k8s.yaml @@ -0,0 +1,89 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + annotations: {} + labels: + k8s.kuboard.cn/layer: gateway + k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server + name: wu-lazy-cloud-heartbeat-server + namespace: default + resourceVersion: '3503304' +spec: + progressDeadlineSeconds: 600 + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + k8s.kuboard.cn/layer: gateway + k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + template: + metadata: + annotations: + kubectl.kubernetes.io/restartedAt: '2024-01-16T21:34:28+08:00' + creationTimestamp: null + labels: + k8s.kuboard.cn/layer: gateway + k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server + spec: + containers: + - env: + - name: spring.datasource.url + value: >- + jdbc:mysql://cloud-mysql:3306/wu_lazy_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA + - name: JAVA_OPTS + value: '-Xms64m -Xmx128m' + image: >- + registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT + imagePullPolicy: Always + name: wu-lazy-cloud-heartbeat-server + resources: {} + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + dnsPolicy: ClusterFirst + restartPolicy: Always + schedulerName: default-scheduler + securityContext: {} + terminationGracePeriodSeconds: 30 + +--- +apiVersion: v1 +kind: Service +metadata: + annotations: {} + labels: + k8s.kuboard.cn/layer: gateway + k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server + name: wu-lazy-cloud-heartbeat-server + namespace: default + resourceVersion: '3500792' +spec: + clusterIP: 10.96.41.191 + externalTrafficPolicy: Cluster + ports: + - name: pecjjh + nodePort: 30676 + port: 7001 + protocol: TCP + targetPort: 7001 + - name: z4bg3n + nodePort: 30273 + port: 30273 + protocol: TCP + targetPort: 30273 + - name: wfcigf + nodePort: 30576 + port: 6001 + protocol: TCP + targetPort: 6001 + selector: + k8s.kuboard.cn/layer: gateway + k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server + sessionAffinity: None + type: NodePort + diff --git a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/src/main/resources/application-dev.yml b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/src/main/resources/application-dev.yml index 0de0018..58c71e1 100644 --- a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/src/main/resources/application-dev.yml +++ b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/src/main/resources/application-dev.yml @@ -6,7 +6,7 @@ spring: password: laihui database: 2 datasource: - url: jdbc:mysql://127.0.0.1:3306/lazy_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA + url: jdbc:mysql://127.0.0.1:3306/wu_lazy_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA username: root password: wujiawei driver-class-name: com.mysql.cj.jdbc.Driver diff --git a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/src/main/resources/application-prod.yml b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/src/main/resources/application-prod.yml index 6628e7b..865e25e 100644 --- a/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/src/main/resources/application-prod.yml +++ b/wu-lazy-cloud-heartbeat-sample/wu-lazy-cloud-heartbeat-server-sample/src/main/resources/application-prod.yml @@ -1,6 +1,6 @@ spring: datasource: - url: jdbc:mysql://${MAIN_DB_HOST}/middleground_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA + url: jdbc:mysql://${MAIN_DB_HOST}/wu_lazy_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA username: middleground_cloud_netty_server password: laihui driver-class-name: com.mysql.cj.jdbc.Driver diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/InternalNetworkPenetrationMappingApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/InternalNetworkPenetrationMappingApplicationImpl.java index cceb9e0..b6f551f 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/InternalNetworkPenetrationMappingApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/InternalNetworkPenetrationMappingApplicationImpl.java @@ -1,5 +1,6 @@ package wu.framework.lazy.cloud.heartbeat.server.application.impl; +import lombok.extern.slf4j.Slf4j; import wu.framework.lazy.cloud.heartbeat.common.InternalNetworkPenetrationRealClient; import wu.framework.lazy.cloud.heartbeat.server.application.InternalNetworkPenetrationMappingApplication; import wu.framework.lazy.cloud.heartbeat.server.application.assembler.InternalNetworkPenetrationMappingDTOAssembler; @@ -26,6 +27,7 @@ import java.util.stream.Collectors; * @date 2023/12/29 05:21 下午 * @see com.wu.framework.inner.lazy.persistence.reverse.lazy.ddd.DefaultDDDLazyApplicationImpl **/ +@Slf4j @LazyApplication public class InternalNetworkPenetrationMappingApplicationImpl implements InternalNetworkPenetrationMappingApplication { @@ -45,6 +47,7 @@ public class InternalNetworkPenetrationMappingApplicationImpl implements Interna @Override public Result story(InternalNetworkPenetrationMappingStoryCommand internalNetworkPenetrationMappingStoryCommand) { InternalNetworkPenetrationMapping internalNetworkPenetrationMapping = InternalNetworkPenetrationMappingDTOAssembler.INSTANCE.toInternalNetworkPenetrationMapping(internalNetworkPenetrationMappingStoryCommand); + internalNetworkPenetrationMapping.setIsDeleted(false); return internalNetworkPenetrationMappingRepository.story(internalNetworkPenetrationMapping); } @@ -167,17 +170,11 @@ public class InternalNetworkPenetrationMappingApplicationImpl implements Interna VisitorFilter visitorFilter = new VisitorFilter(internalNetworkPenetrationRealClient); NettyVisitorSocket nettyVisitorSocket = new NettyVisitorSocket(visitorFilter); - Thread thread = new Thread(() -> { - try { - nettyVisitorSocket.startServer(visitorPort); - } catch (Exception e) { - throw new RuntimeException(e); - } - - }); - // 使用线程池 TODO - thread.run(); - + try { + nettyVisitorSocket.startServer(visitorPort); + } catch (Exception e) { + log.error("客户端:{},网络端口:{},开放失败",clientId,visitorPort); + } // 发送客户端代理连接请求 客户端创建代理连接 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/NettyClientStateApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/NettyClientStateApplicationImpl.java index 8b4f047..6187102 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/NettyClientStateApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/NettyClientStateApplicationImpl.java @@ -3,7 +3,6 @@ package wu.framework.lazy.cloud.heartbeat.server.application.impl; import wu.framework.lazy.cloud.heartbeat.common.ChannelContext; -import wu.framework.lazy.cloud.heartbeat.common.NettyVisitorContext; import wu.framework.lazy.cloud.heartbeat.server.application.NettyClientStateApplication; import wu.framework.lazy.cloud.heartbeat.server.application.assembler.NettyClientStateDTOAssembler; import wu.framework.lazy.cloud.heartbeat.server.application.command.netty.client.state.NettyClientStateStoryCommand; diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/infrastructure/entity/NettyServerVisitorDO.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/infrastructure/entity/NettyServerVisitorDO.java index 7a27174..0a4f177 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/infrastructure/entity/NettyServerVisitorDO.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/infrastructure/entity/NettyServerVisitorDO.java @@ -25,7 +25,7 @@ import java.lang.Integer; **/ @Data @Accessors(chain = true) -@LazyTable(tableName = "netty_server_visitor",schema = "lazy_cloud_netty_server",comment = "服务端提前开放出来的端口") +@LazyTable(tableName = "netty_server_visitor",comment = "服务端提前开放出来的端口") @Schema(title = "netty_server_visitor",description = "服务端提前开放出来的端口") public class NettyServerVisitorDO { diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportChannelTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportChannelTransferTypeAdvanced.java index 6772675..88166eb 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportChannelTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportChannelTransferTypeAdvanced.java @@ -29,7 +29,7 @@ public class ServerReportChannelTransferTypeAdvanced extends AbstractReportChann log.debug("接收到客户端:[{}]内网穿透返回的数据:[{}]", new String(msg.getClientId()), new String(msg.getData())); // 将数据转发访客通道 byte[] visitorId = msg.getVisitorId(); - Channel visitor = NettyRealIdContext.getVisitor(visitorId); + Channel visitor = NettyRealIdContext.getReal(visitorId); if (visitor != null) { ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length); buf.writeBytes(msg.getData()); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportConnectSuccessTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportConnectSuccessTypeAdvanced.java index c54aaf3..5052877 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportConnectSuccessTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportConnectSuccessTypeAdvanced.java @@ -46,9 +46,10 @@ public class ServerReportConnectSuccessTypeAdvanced extends AbstractReportConnec */ @Override public void doHandler(Channel newChannel, NettyProxyMsg msg) { - ChannelContext.push(newChannel, msg); + String clientId = new String(msg.getClientId()); + ChannelContext.push(newChannel, clientId); ChannelAttributeKeyUtils.buildClientId(newChannel,clientId); log.info("客户端:{}连接成功",new String(msg.getClientId())); @@ -76,8 +77,10 @@ public class ServerReportConnectSuccessTypeAdvanced extends AbstractReportConnec // 发送所有客户端ID channel.writeAndFlush(nettyMsg); } + log.info("开始开启客户端:【{}】,端口映射",clientId); // 创建访问者(内网穿透连接创建) internalNetworkPenetrationMappingApplication.createVisitor(clientId); + log.info("结束开启客户端:【{}】,端口映射",clientId); }else { // 黑名单客户端 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportDisconnectTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportDisconnectTypeAdvanced.java index 473525d..3f076bd 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportDisconnectTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportDisconnectTypeAdvanced.java @@ -39,14 +39,14 @@ public class ServerReportDisconnectTypeAdvanced extends AbstractReportDisconnect byte[] clientIdByte = msg.getClientId(); log.info("关闭客户端:{} 的通道",new String(clientIdByte)); ChannelId deathChannelId = deathChannel.id(); - ChannelContext.ClientChannel deathClientChannelDTO = ChannelContext.get(deathChannelId); + ChannelContext.ClientChannel deathClientChannelDTO = ChannelContext.get(clientIdByte); if (deathClientChannelDTO != null) { byte[] clientId = deathClientChannelDTO.getClientId(); // 服务状态离线 String tenantId = new String(clientId); serverNettyConfigApplication.clientOffLine(tenantId); - ChannelContext.remove(deathChannelId); + ChannelContext.remove(clientIdByte); List clientChannels = ChannelContext.get(); // 通知其他客户端 channelId 关闭了 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportSingleClientRealConnectTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportSingleClientRealConnectTypeAdvanced.java index b1b0c87..e8df762 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportSingleClientRealConnectTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportSingleClientRealConnectTypeAdvanced.java @@ -34,7 +34,7 @@ public class ServerReportSingleClientRealConnectTypeAdvanced extends AbstractRep ChannelAttributeKeyUtils.buildVisitorId(channel, visitorId); ChannelAttributeKeyUtils.buildClientId(channel, clientId); // 访客通道开启自动读取 - Channel visitorRealChannel = NettyRealIdContext.getVisitor(new String(visitorId)); + Channel visitorRealChannel = NettyRealIdContext.getReal(new String(visitorId)); visitorRealChannel.config().setOption(ChannelOption.AUTO_READ, true); // 或许此处还应该通知服务端 这个访客绑定的客户端真实通道打开 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportStagingClosedTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportStagingClosedTypeAdvanced.java index 112bb96..8749178 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportStagingClosedTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportStagingClosedTypeAdvanced.java @@ -39,7 +39,7 @@ public class ServerReportStagingClosedTypeAdvanced extends AbstractReportStaging // 获取所有通道 List clientChannels = ChannelContext.get(); ChannelId stagingClosedChannelId = stagingClosedChannel.id(); - ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(stagingClosedChannelId); + ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(clientIdBytes); if (stagingOpenedClientChannel != null) { String clientId = new String(clientIdBytes); // 存储当前客户端暂存关闭 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportStagingOpenedTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportStagingOpenedTypeAdvanced.java index d44110d..33b56db 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportStagingOpenedTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerReportStagingOpenedTypeAdvanced.java @@ -38,7 +38,7 @@ public class ServerReportStagingOpenedTypeAdvanced extends AbstractReportStaging byte[] clientIdBytes = msg.getClientId(); List clientChannels = ChannelContext.get(); ChannelId stagingOpenedChannelId = stagingOpenedChannel.id(); - ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(stagingOpenedChannelId); + ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(clientIdBytes); if (stagingOpenedClientChannel != null) { for (ChannelContext.ClientChannel clientChannel : clientChannels) { // 存储当前客户端暂存关闭 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/handler/NettyServerHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/handler/NettyServerHandler.java index 04a4e79..4fead56 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/handler/NettyServerHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/handler/NettyServerHandler.java @@ -1,9 +1,6 @@ package wu.framework.lazy.cloud.heartbeat.server.netty.handler; -import wu.framework.lazy.cloud.heartbeat.common.MessageType; -import wu.framework.lazy.cloud.heartbeat.common.NettyCommunicationIdContext; -import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg; -import wu.framework.lazy.cloud.heartbeat.common.NettyRealIdContext; +import wu.framework.lazy.cloud.heartbeat.common.*; import wu.framework.lazy.cloud.heartbeat.common.adapter.ChannelTypeAdapter; import wu.framework.lazy.cloud.heartbeat.common.utils.ChannelAttributeKeyUtils; import io.netty.channel.*; @@ -62,10 +59,10 @@ public class NettyServerHandler extends SimpleChannelInboundHandler 2) { - if(ObjectUtils.isEmpty(visitorId)){ + if (ObjectUtils.isEmpty(visitorId)) { log.warn("关闭这个不活跃的channel client:{}", clientId); // 给所有客户端发送 这个客户端离线了 NettyProxyMsg nettyMsg = new NettyProxyMsg(); @@ -74,8 +71,8 @@ public class NettyServerHandler extends SimpleChannelInboundHandler { String clientTargetIp = internalNetworkPenetrationRealClient.getClientTargetIp(); Integer clientTargetPort = internalNetworkPenetrationRealClient.getClientTargetPort(); // 绑定访客真实通道 - NettyRealIdContext.pushVisitor(visitorChannel, visitorId); + NettyRealIdContext.pushReal(visitorChannel, visitorId); // 当前通道绑定访客ID ChannelAttributeKeyUtils.buildVisitorId(visitorChannel, visitorId); ChannelAttributeKeyUtils.buildClientId(visitorChannel, clientId); @@ -51,6 +49,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler { myMsg.setVisitorId(visitorId); + // 客户端心跳通道 ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); if (clientChannel != null) { Channel channel = clientChannel.getChannel(); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/socket/NettyVisitorSocket.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/socket/NettyVisitorSocket.java index e4875ea..c6947e0 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/socket/NettyVisitorSocket.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/socket/NettyVisitorSocket.java @@ -1,6 +1,6 @@ package wu.framework.lazy.cloud.heartbeat.server.netty.socket; -import wu.framework.lazy.cloud.heartbeat.common.NettyVisitorContext; +import wu.framework.lazy.cloud.heartbeat.common.NettyVisitorPortContext; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -32,7 +32,7 @@ public class NettyVisitorSocket { */ public void startServer(int visitorPort) throws Exception { - Channel visitor = NettyVisitorContext.getVisitor(visitorPort); + Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort); if (visitor == null) { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) @@ -42,7 +42,7 @@ public class NettyVisitorSocket { if(future.isSuccess()){ Channel channel = future.channel(); log.info("访客端口:{} 开启", visitorPort); - NettyVisitorContext.pushVisitor(visitorPort, channel); + NettyVisitorPortContext.pushVisitor(visitorPort, channel); } });