【fix】 使用线程池处理流量信息及业务

This commit is contained in:
wujiawei 2024-09-03 16:37:46 +08:00
parent 58aae7a67d
commit b9d75715de
13 changed files with 141 additions and 31 deletions

View File

@ -44,13 +44,6 @@
</dependency> </dependency>
</dependencies> </dependencies>
<repositories>
<repository>
<id>maven_central</id>
<name>Maven Central</name>
<url>https://repo.maven.apache.org/maven2/</url>
</repository>
</repositories>
</project> </project>

View File

@ -188,6 +188,8 @@ public class NettyClientRealSocket {
visitor.config().setOption(ChannelOption.AUTO_READ, true); visitor.config().setOption(ChannelOption.AUTO_READ, true);
} else { } else {
log.info("每隔2s重连...."); log.info("每隔2s重连....");
// 离线 // 离线

View File

@ -6,6 +6,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.ChannelFl
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.HandleChannelFlowAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.HandleChannelFlowAdvanced;
import java.util.List; import java.util.List;
import java.util.concurrent.*;
/** /**
* 通道流量适配器 * 通道流量适配器
@ -15,6 +16,13 @@ import java.util.List;
@Slf4j @Slf4j
public class ChannelFlowAdapter { public class ChannelFlowAdapter {
ThreadPoolExecutor CHANNEL_FLOW_ADAPTER_EXECUTOR =
new ThreadPoolExecutor(20, 200, 3L, TimeUnit.MINUTES,
new LinkedBlockingDeque<>(500));
protected final List<HandleChannelFlowAdvanced> handleChannelFlowAdvancedList; protected final List<HandleChannelFlowAdvanced> handleChannelFlowAdvancedList;
public ChannelFlowAdapter(List<HandleChannelFlowAdvanced> handleChannelFlowAdvancedList) { public ChannelFlowAdapter(List<HandleChannelFlowAdvanced> handleChannelFlowAdvancedList) {
@ -38,4 +46,14 @@ public class ChannelFlowAdapter {
} }
} }
} }
/**
* 异步处理当前数据
*
* @param channelFlow 通道数据
*/
public void asyncHandler(Channel channel, ChannelFlow channelFlow) {
CHANNEL_FLOW_ADAPTER_EXECUTOR.submit(() -> handler(channel, channelFlow));
}
} }

View File

@ -0,0 +1,8 @@
package org.framework.lazy.cloud.network.heartbeat.common.pool;
/**
* 通道连接池抽象类
*/
public abstract class AbstractNettyChannelPool implements NettyChannelPool {
}

View File

@ -0,0 +1,65 @@
package org.framework.lazy.cloud.network.heartbeat.common.pool;
import io.netty.channel.Channel;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* 默认netty 连接池
*/
public class DefaultNettyChannelPool extends AbstractNettyChannelPool implements NettyChannelPool {
/**
* 连接池大小
*/
private final int poolSize;
// 绑定访客的通道
private final ConcurrentHashMap<String, Channel> visitorChannelMap = new ConcurrentHashMap<>();
// 所有的通道
private final List<Channel> allChannelList = new ArrayList<>();
// 闲置的通道
private final List<Channel> idleChannelList = new CopyOnWriteArrayList<Channel>();
public DefaultNettyChannelPool(int poolSize) {
this.poolSize = poolSize;
}
/**
* 根据访客ID获取可以使用的通道
*
* @param visitorId 访客ID
* @return Channel 如果无法获取到闲置通道返回null
*/
@Override
public Channel availableChannel(String visitorId) {
synchronized (idleChannelList) {
if (idleChannelList.isEmpty()) {
return null;
}
// 获取通道
Channel visitorChannel = null;
for (Channel idleChannel : idleChannelList) {
if (idleChannel.isActive()) {
visitorChannel = idleChannel;
}
idleChannelList.remove(idleChannel);
}
if (visitorChannel == null) {
return null;
}
// 绑定 通道
ChannelAttributeKeyUtils.buildVisitorId(visitorChannel, visitorId);
visitorChannelMap.put(visitorId, visitorChannel);
return visitorChannel;
}
}
}

View File

@ -0,0 +1,17 @@
package org.framework.lazy.cloud.network.heartbeat.common.pool;
import io.netty.channel.Channel;
/**
* 通道连接池
*/
public interface NettyChannelPool {
/**
* 根据访客ID获取可以使用的通道
* @param visitorId 访客ID
* @return Channel
*/
Channel availableChannel(String visitorId);
}

View File

@ -46,7 +46,6 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac
ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length); ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length);
buf.writeBytes(msg.getData()); buf.writeBytes(msg.getData());
visitor.writeAndFlush(buf); visitor.writeAndFlush(buf);
log.info("writeAndFlush");
// 记录出口数据 // 记录出口数据
ServerChannelFlow serverChannelFlow = ServerChannelFlow ServerChannelFlow serverChannelFlow = ServerChannelFlow
.builder() .builder()
@ -55,7 +54,7 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac
.clientId(clientId) .clientId(clientId)
.flow(msg.getData().length) .flow(msg.getData().length)
.build(); .build();
channelFlowAdapter.handler(channel, serverChannelFlow); channelFlowAdapter.asyncHandler(channel, serverChannelFlow);
} }
} }

View File

@ -44,13 +44,11 @@ public class ServerHandlerInFlowHandler extends AbstractHandleChannelFlowAdvance
Integer flow = channelFlow.flow(); Integer flow = channelFlow.flow();
// 进口流量处理 // 进口流量处理
if (serverNodeProperties.getEnableFlowControl()) { LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand();
LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); visitorPortFlow.setInFlow(flow);
visitorPortFlow.setInFlow(flow); visitorPortFlow.setClientId(clientId);
visitorPortFlow.setClientId(clientId); visitorPortFlow.setVisitorPort(port);
visitorPortFlow.setVisitorPort(port); visitorPortFlow.setIsDeleted(false);
visitorPortFlow.setIsDeleted(false); lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow);
lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow);
}
} }
} }

View File

@ -44,14 +44,12 @@ public class ServerHandlerOutFlowHandler extends AbstractHandleChannelFlowAdvanc
Integer flow = channelFlow.flow(); Integer flow = channelFlow.flow();
// 出口流量处理 // 出口流量处理
LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand();
visitorPortFlow.setOutFlow(flow);
visitorPortFlow.setClientId(clientId);
visitorPortFlow.setVisitorPort(port);
visitorPortFlow.setIsDeleted(false);
lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow);
if(serverNodeProperties.getEnableFlowControl()){
LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand();
visitorPortFlow.setOutFlow(flow);
visitorPortFlow.setClientId(clientId);
visitorPortFlow.setVisitorPort(port);
visitorPortFlow.setIsDeleted(false);
lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow);
}
} }
} }

View File

@ -20,6 +20,7 @@ import java.util.UUID;
public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> { public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient; private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient;
private final ChannelFlowAdapter channelFlowAdapter;// 流量适配器 private final ChannelFlowAdapter channelFlowAdapter;// 流量适配器
// private final NettyChannelPool nettyChannelPool = new DefaultNettyChannelPool(10);
public VisitorHandler(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) { public VisitorHandler(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) {
this.internalNetworkPenetrationRealClient = internalNetworkPenetrationRealClient; this.internalNetworkPenetrationRealClient = internalNetworkPenetrationRealClient;
@ -55,8 +56,8 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
nettyProxyMsg.setVisitorId(visitorId); nettyProxyMsg.setVisitorId(visitorId);
// 判断是否有可用的通道 如果没有创建新的通道 // 判断是否有可用的通道 如果没有创建新的通道
// Channel transferChannel = nettyChannelPool.availableChannel(visitorId);
// if (transferChannel == null) {
// 客户端心跳通道 // 客户端心跳通道
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
if (clientChannel != null) { if (clientChannel != null) {
@ -66,7 +67,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
} else { } else {
log.error("客户端:【{}】已经下线无法通过客户端ID获取客户端通道", clientId); log.error("客户端:【{}】已经下线无法通过客户端ID获取客户端通道", clientId);
} }
// }
// 等待访客ID传输到客户端后绑定客户端真实服务后开启 // 等待访客ID传输到客户端后绑定客户端真实服务后开启
@ -114,7 +115,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
.clientId(clientId) .clientId(clientId)
.flow(bytes.length) .flow(bytes.length)
.build(); .build();
channelFlowAdapter.handler(visitorChannel, serverChannelFlow); channelFlowAdapter.asyncHandler(visitorChannel, serverChannelFlow);
log.debug("服务端访客端口成功发送数据了"); log.debug("服务端访客端口成功发送数据了");
} }

View File

@ -40,7 +40,7 @@ docker run -d -it --privileged --name client -p 6004:6004 registry.cn-hangzhou.a
```yaml ```yaml
# 只在 worker 节点执行 # 只在 worker 节点执行
# 替换 x.x.x.x 为 master 节点的内网 IP # 替换 x.x.x.x 为 master 节点的内网 IP
export MASTER_IP=124.222.48.62 export MASTER_IP=124.222.152.160
# 替换 apiserver.demo 为初始化 master 节点时所使用的 APISERVER_NAME # 替换 apiserver.demo 为初始化 master 节点时所使用的 APISERVER_NAME
export APISERVER_NAME=apiserver.demo export APISERVER_NAME=apiserver.demo
echo "${MASTER_IP} ${APISERVER_NAME}" >> /etc/hosts echo "${MASTER_IP} ${APISERVER_NAME}" >> /etc/hosts

View File

@ -1,5 +1,6 @@
package org.framework.lazy.cloud.network.heartbeat.client; package org.framework.lazy.cloud.network.heartbeat.client;
import io.netty.util.internal.PlatformDependent;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.wu.framework.lazy.orm.core.stereotype.LazyScan; import org.wu.framework.lazy.orm.core.stereotype.LazyScan;
@ -12,6 +13,11 @@ import org.wu.framework.lazy.orm.core.stereotype.LazyScan;
@SpringBootApplication @SpringBootApplication
public class LazyCloudHeartbeatClientStart { public class LazyCloudHeartbeatClientStart {
public static void main(String[] args) { public static void main(String[] args) {
String normalizedArch = PlatformDependent.normalizedArch();
String normalizedOs = PlatformDependent.normalizedOs();
System.out.println("normalizedArch: " + normalizedArch+"\nnormalizedOs: " + normalizedOs);
SpringApplication.run(LazyCloudHeartbeatClientStart.class,args); SpringApplication.run(LazyCloudHeartbeatClientStart.class,args);
} }
} }

View File

@ -16,4 +16,9 @@ spring:
# client: # client:
# client-id: 1024 # client-id: 1024
# inet-host: 127.0.0.1 # inet-host: 127.0.0.1
# inet-port: 7101 # inet-port: 7101
---
logging:
level:
root: DEBUG