diff --git a/wu-lazy-cloud-heartbeat-client/pom.xml b/wu-lazy-cloud-heartbeat-client/pom.xml index cd38f32..fc582d8 100644 --- a/wu-lazy-cloud-heartbeat-client/pom.xml +++ b/wu-lazy-cloud-heartbeat-client/pom.xml @@ -44,13 +44,6 @@ - - - maven_central - Maven Central - https://repo.maven.apache.org/maven2/ - - \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientRealSocket.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientRealSocket.java index 055d5e5..7b8e772 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientRealSocket.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientRealSocket.java @@ -188,6 +188,8 @@ public class NettyClientRealSocket { visitor.config().setOption(ChannelOption.AUTO_READ, true); + + } else { log.info("每隔2s重连...."); // 离线 diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/adapter/ChannelFlowAdapter.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/adapter/ChannelFlowAdapter.java index 03db508..bf884f7 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/adapter/ChannelFlowAdapter.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/adapter/ChannelFlowAdapter.java @@ -6,6 +6,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.ChannelFl import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.HandleChannelFlowAdvanced; import java.util.List; +import java.util.concurrent.*; /** * 通道流量适配器 @@ -15,6 +16,13 @@ import java.util.List; @Slf4j public class ChannelFlowAdapter { + + ThreadPoolExecutor CHANNEL_FLOW_ADAPTER_EXECUTOR = + new ThreadPoolExecutor(20, 200, 3L, TimeUnit.MINUTES, + new LinkedBlockingDeque<>(500)); + + + protected final List handleChannelFlowAdvancedList; public ChannelFlowAdapter(List handleChannelFlowAdvancedList) { @@ -38,4 +46,14 @@ public class ChannelFlowAdapter { } } } + + /** + * 异步处理当前数据 + * + * @param channelFlow 通道数据 + */ + public void asyncHandler(Channel channel, ChannelFlow channelFlow) { + CHANNEL_FLOW_ADAPTER_EXECUTOR.submit(() -> handler(channel, channelFlow)); + } + } diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/AbstractNettyChannelPool.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/AbstractNettyChannelPool.java new file mode 100644 index 0000000..8e8feab --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/AbstractNettyChannelPool.java @@ -0,0 +1,8 @@ +package org.framework.lazy.cloud.network.heartbeat.common.pool; + + +/** + * 通道连接池抽象类 + */ +public abstract class AbstractNettyChannelPool implements NettyChannelPool { +} diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/DefaultNettyChannelPool.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/DefaultNettyChannelPool.java new file mode 100644 index 0000000..98617af --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/DefaultNettyChannelPool.java @@ -0,0 +1,65 @@ +package org.framework.lazy.cloud.network.heartbeat.common.pool; + +import io.netty.channel.Channel; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * 默认netty 连接池 + */ +public class DefaultNettyChannelPool extends AbstractNettyChannelPool implements NettyChannelPool { + + /** + * 连接池大小 + */ + private final int poolSize; + + + // 绑定访客的通道 + private final ConcurrentHashMap visitorChannelMap = new ConcurrentHashMap<>(); + + // 所有的通道 + private final List allChannelList = new ArrayList<>(); + + // 闲置的通道 + private final List idleChannelList = new CopyOnWriteArrayList(); + + public DefaultNettyChannelPool(int poolSize) { + this.poolSize = poolSize; + } + + /** + * 根据访客ID获取可以使用的通道 + * + * @param visitorId 访客ID + * @return Channel 如果无法获取到闲置通道返回null + */ + @Override + public Channel availableChannel(String visitorId) { + synchronized (idleChannelList) { + if (idleChannelList.isEmpty()) { + return null; + } + // 获取通道 + Channel visitorChannel = null; + for (Channel idleChannel : idleChannelList) { + if (idleChannel.isActive()) { + visitorChannel = idleChannel; + } + idleChannelList.remove(idleChannel); + } + if (visitorChannel == null) { + return null; + } + // 绑定 通道 + ChannelAttributeKeyUtils.buildVisitorId(visitorChannel, visitorId); + visitorChannelMap.put(visitorId, visitorChannel); + return visitorChannel; + } + + } +} diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/NettyChannelPool.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/NettyChannelPool.java new file mode 100644 index 0000000..6bfd140 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/pool/NettyChannelPool.java @@ -0,0 +1,17 @@ +package org.framework.lazy.cloud.network.heartbeat.common.pool; + +import io.netty.channel.Channel; + +/** + * 通道连接池 + */ +public interface NettyChannelPool { + + /** + * 根据访客ID获取可以使用的通道 + * @param visitorId 访客ID + * @return Channel + */ + Channel availableChannel(String visitorId); + +} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportHandleChannelTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportHandleChannelTransferTypeAdvanced.java index f9cc677..86f8a53 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportHandleChannelTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportHandleChannelTransferTypeAdvanced.java @@ -46,7 +46,6 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length); buf.writeBytes(msg.getData()); visitor.writeAndFlush(buf); - log.info("writeAndFlush"); // 记录出口数据 ServerChannelFlow serverChannelFlow = ServerChannelFlow .builder() @@ -55,7 +54,7 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac .clientId(clientId) .flow(msg.getData().length) .build(); - channelFlowAdapter.handler(channel, serverChannelFlow); + channelFlowAdapter.asyncHandler(channel, serverChannelFlow); } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/ServerHandlerInFlowHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/ServerHandlerInFlowHandler.java index 0f3a164..5b87cd9 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/ServerHandlerInFlowHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/ServerHandlerInFlowHandler.java @@ -44,13 +44,11 @@ public class ServerHandlerInFlowHandler extends AbstractHandleChannelFlowAdvance Integer flow = channelFlow.flow(); // 进口流量处理 - if (serverNodeProperties.getEnableFlowControl()) { - LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); - visitorPortFlow.setInFlow(flow); - visitorPortFlow.setClientId(clientId); - visitorPortFlow.setVisitorPort(port); - visitorPortFlow.setIsDeleted(false); - lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow); - } + LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); + visitorPortFlow.setInFlow(flow); + visitorPortFlow.setClientId(clientId); + visitorPortFlow.setVisitorPort(port); + visitorPortFlow.setIsDeleted(false); + lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow); } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/ServerHandlerOutFlowHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/ServerHandlerOutFlowHandler.java index 6d8194e..25d6959 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/ServerHandlerOutFlowHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/ServerHandlerOutFlowHandler.java @@ -44,14 +44,12 @@ public class ServerHandlerOutFlowHandler extends AbstractHandleChannelFlowAdvanc Integer flow = channelFlow.flow(); // 出口流量处理 + LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); + visitorPortFlow.setOutFlow(flow); + visitorPortFlow.setClientId(clientId); + visitorPortFlow.setVisitorPort(port); + visitorPortFlow.setIsDeleted(false); + lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow); - if(serverNodeProperties.getEnableFlowControl()){ - LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); - visitorPortFlow.setOutFlow(flow); - visitorPortFlow.setClientId(clientId); - visitorPortFlow.setVisitorPort(port); - visitorPortFlow.setIsDeleted(false); - lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow); - } } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/VisitorHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/VisitorHandler.java index c6b10a1..40b8bd3 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/VisitorHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/VisitorHandler.java @@ -20,6 +20,7 @@ import java.util.UUID; public class VisitorHandler extends SimpleChannelInboundHandler { private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient; private final ChannelFlowAdapter channelFlowAdapter;// 流量适配器 +// private final NettyChannelPool nettyChannelPool = new DefaultNettyChannelPool(10); public VisitorHandler(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) { this.internalNetworkPenetrationRealClient = internalNetworkPenetrationRealClient; @@ -55,8 +56,8 @@ public class VisitorHandler extends SimpleChannelInboundHandler { nettyProxyMsg.setVisitorId(visitorId); // 判断是否有可用的通道 如果没有创建新的通道 - - +// Channel transferChannel = nettyChannelPool.availableChannel(visitorId); +// if (transferChannel == null) { // 客户端心跳通道 ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); if (clientChannel != null) { @@ -66,7 +67,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler { } else { log.error("客户端:【{}】已经下线,无法通过客户端ID获取客户端通道", clientId); } - +// } // 等待访客ID传输到客户端后绑定客户端真实服务后开启 @@ -114,7 +115,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler { .clientId(clientId) .flow(bytes.length) .build(); - channelFlowAdapter.handler(visitorChannel, serverChannelFlow); + channelFlowAdapter.asyncHandler(visitorChannel, serverChannelFlow); log.debug("服务端访客端口成功发送数据了"); } diff --git a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/README.md b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/README.md index 64c2e20..91a80d6 100644 --- a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/README.md +++ b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/README.md @@ -40,7 +40,7 @@ docker run -d -it --privileged --name client -p 6004:6004 registry.cn-hangzhou.a ```yaml # 只在 worker 节点执行 # 替换 x.x.x.x 为 master 节点的内网 IP -export MASTER_IP=124.222.48.62 +export MASTER_IP=124.222.152.160 # 替换 apiserver.demo 为初始化 master 节点时所使用的 APISERVER_NAME export APISERVER_NAME=apiserver.demo echo "${MASTER_IP} ${APISERVER_NAME}" >> /etc/hosts diff --git a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/LazyCloudHeartbeatClientStart.java b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/LazyCloudHeartbeatClientStart.java index 90e3d1d..2bbb2c9 100644 --- a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/LazyCloudHeartbeatClientStart.java +++ b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/LazyCloudHeartbeatClientStart.java @@ -1,5 +1,6 @@ package org.framework.lazy.cloud.network.heartbeat.client; +import io.netty.util.internal.PlatformDependent; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.wu.framework.lazy.orm.core.stereotype.LazyScan; @@ -12,6 +13,11 @@ import org.wu.framework.lazy.orm.core.stereotype.LazyScan; @SpringBootApplication public class LazyCloudHeartbeatClientStart { public static void main(String[] args) { + + String normalizedArch = PlatformDependent.normalizedArch(); + String normalizedOs = PlatformDependent.normalizedOs(); + System.out.println("normalizedArch: " + normalizedArch+"\nnormalizedOs: " + normalizedOs); + SpringApplication.run(LazyCloudHeartbeatClientStart.class,args); } } diff --git a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/resources/application.yml b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/resources/application.yml index 1cf11e2..5796039 100644 --- a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/resources/application.yml +++ b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/resources/application.yml @@ -16,4 +16,9 @@ spring: # client: # client-id: 1024 # inet-host: 127.0.0.1 -# inet-port: 7101 \ No newline at end of file +# inet-port: 7101 + +--- +logging: + level: + root: DEBUG \ No newline at end of file