[fix] 优化tcp、udp架构

This commit is contained in:
wujiawei
2024-12-16 18:59:02 +08:00
parent daeba59a43
commit 2b3c7cb7c2
38 changed files with 315 additions and 175 deletions

View File

@ -2,7 +2,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.cluster.application.im
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.client.netty.event.ClientChangeEvent;
import org.framework.lazy.cloud.network.heartbeat.client.netty.tcp.socket.NettyClientSocket;
import org.framework.lazy.cloud.network.heartbeat.client.netty.tcp.socket.NettyTcpClientSocket;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.dto.LazyNettyClusterNodeDTO;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.domain.model.cluster.node.LazyNettyClusterNode;
@ -55,7 +55,7 @@ public class LazyNettyClusterNodeApplicationImpl implements LazyNettyClusterNode
ServerNodeProperties serverNodeProperties;
// 缓存连接集群 socket
private final ConcurrentHashMap<LazyNettyClusterNode, NettyClientSocket> cacheClusterNettyClientSocketMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<LazyNettyClusterNode, NettyTcpClientSocket> cacheClusterNettyClientSocketMap = new ConcurrentHashMap<>();
public static final ThreadPoolExecutor NETTY_CLUSTER_CLIENT_EXECUTOR =
new ThreadPoolExecutor(20, 50, 200, TimeUnit.MILLISECONDS,
@ -180,8 +180,8 @@ public class LazyNettyClusterNodeApplicationImpl implements LazyNettyClusterNode
// 当前节点ID
String clusterNodeClientId = serverNodeProperties.getNodeId();
NettyClientSocket nettyClientSocket = new
NettyClientSocket(inetHost, inetPort, clusterNodeClientId,
NettyTcpClientSocket nettyTcpClientSocket = new
NettyTcpClientSocket(inetHost, inetPort, clusterNodeClientId,
clusterNodeId,null,null,
clientChangeEvent, handleChannelTypeAdvancedList);
// 过滤已经存在的
@ -192,11 +192,11 @@ public class LazyNettyClusterNodeApplicationImpl implements LazyNettyClusterNode
log.warn("当前节点注册:{} 已经存在", lazyNettyClusterNode);
return;
}
cacheClusterNettyClientSocketMap.put(lazyNettyClusterNode, nettyClientSocket);
cacheClusterNettyClientSocketMap.put(lazyNettyClusterNode, nettyTcpClientSocket);
Thread thread = new Thread(() -> {
try {
nettyClientSocket.newConnect2Server();
nettyTcpClientSocket.newConnect2Server();
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -237,7 +237,7 @@ public class LazyNettyClusterNodeApplicationImpl implements LazyNettyClusterNode
// 当前节点ID
String clusterNodeId = serverNodeProperties.getNodeId();
// 关闭指定socket
cacheClusterNettyClientSocketMap.forEach(((lazyNettyClusterNode, nettyClientSocket) -> {
cacheClusterNettyClientSocketMap.forEach(((lazyNettyClusterNode, nettyTcpClientSocket) -> {
String inetHost = lazyNettyClusterNode.getClusterNodeHost();
Integer inetPort = lazyNettyClusterNode.getClusterNodePort();
String needCloseInetHost = needCloseLazyNettyClusterNode.getClusterNodeHost();
@ -246,7 +246,7 @@ public class LazyNettyClusterNodeApplicationImpl implements LazyNettyClusterNode
if (Objects.equals(clusterNodeId, needCloseClientId)
&& Objects.equals(inetPort, needCloseInetPort)
&& Objects.equals(inetHost, needCloseInetHost)) {
nettyClientSocket.shutdown();
nettyTcpClientSocket.shutdown();
// 关闭客户端:{}与服务端连接:{}:{}
log.warn("Close client: {} Connect to server: {}: {}", clusterNodeId, inetHost, inetPort);
}
@ -259,8 +259,8 @@ public class LazyNettyClusterNodeApplicationImpl implements LazyNettyClusterNode
@Override
public void destroyClusterNodes() {
// 关闭socket
cacheClusterNettyClientSocketMap.forEach(((lazyNettyClusterNode, nettyClientSocket) -> {
nettyClientSocket.shutdown();
cacheClusterNettyClientSocketMap.forEach(((lazyNettyClusterNode, nettyTcpClientSocket) -> {
nettyTcpClientSocket.shutdown();
String clientId = lazyNettyClusterNode.getClusterNodeId();
String inetHost = lazyNettyClusterNode.getClusterNodeHost();
Integer inetPort = lazyNettyClusterNode.getClusterNodePort();