【fix】 添加流量控制开关

This commit is contained in:
wujiawei 2024-08-26 16:17:20 +08:00
parent 5ddf1337ad
commit fd364c78e7
13 changed files with 96 additions and 31 deletions

View File

@ -2,6 +2,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.config;
import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerHandlerInFlowHandler; import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerHandlerInFlowHandler;
import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerHandlerOutFlowHandler; import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerHandlerOutFlowHandler;
import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication;
import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -25,8 +26,8 @@ public class ServerFlowConfiguration {
*/ */
@Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Bean @Bean
public ServerHandlerInFlowHandler serverHandlerInFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication) { public ServerHandlerInFlowHandler serverHandlerInFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication,ServerNodeProperties serverNodeProperties) {
return new ServerHandlerInFlowHandler(lazyVisitorPortFlowApplication); return new ServerHandlerInFlowHandler(lazyVisitorPortFlowApplication, serverNodeProperties);
} }
/** /**
@ -36,8 +37,8 @@ public class ServerFlowConfiguration {
*/ */
@Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Bean @Bean
public ServerHandlerOutFlowHandler serverHandlerOutFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication) { public ServerHandlerOutFlowHandler serverHandlerOutFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication, ServerNodeProperties serverNodeProperties) {
return new ServerHandlerOutFlowHandler(lazyVisitorPortFlowApplication); return new ServerHandlerOutFlowHandler(lazyVisitorPortFlowApplication,serverNodeProperties);
} }

View File

@ -1,20 +1,23 @@
package org.framework.lazy.cloud.network.heartbeat.server.netty.flow; package org.framework.lazy.cloud.network.heartbeat.server.netty.flow;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.visitor.flow.LazyVisitorPortFlowStoryCommand;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.AbstractHandleChannelFlowAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.AbstractHandleChannelFlowAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.ChannelFlow; import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.ChannelFlow;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum;
import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.visitor.flow.LazyVisitorPortFlowStoryCommand;
/** /**
* 进口流量处理 * 进口流量处理
*/ */
public class ServerHandlerInFlowHandler extends AbstractHandleChannelFlowAdvanced { public class ServerHandlerInFlowHandler extends AbstractHandleChannelFlowAdvanced {
private final LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication; private final LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication;
private final ServerNodeProperties serverNodeProperties;
public ServerHandlerInFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication) { public ServerHandlerInFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication, ServerNodeProperties serverNodeProperties) {
this.lazyVisitorPortFlowApplication = lazyVisitorPortFlowApplication; this.lazyVisitorPortFlowApplication = lazyVisitorPortFlowApplication;
this.serverNodeProperties = serverNodeProperties;
} }
/** /**
@ -41,11 +44,13 @@ public class ServerHandlerInFlowHandler extends AbstractHandleChannelFlowAdvance
Integer flow = channelFlow.flow(); Integer flow = channelFlow.flow();
// 进口流量处理 // 进口流量处理
LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); if (serverNodeProperties.getEnableFlowControl()) {
visitorPortFlow.setInFlow(flow); LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand();
visitorPortFlow.setClientId(clientId); visitorPortFlow.setInFlow(flow);
visitorPortFlow.setVisitorPort(port); visitorPortFlow.setClientId(clientId);
visitorPortFlow.setIsDeleted(false); visitorPortFlow.setVisitorPort(port);
lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow); visitorPortFlow.setIsDeleted(false);
lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow);
}
} }
} }

View File

@ -1,20 +1,23 @@
package org.framework.lazy.cloud.network.heartbeat.server.netty.flow; package org.framework.lazy.cloud.network.heartbeat.server.netty.flow;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.visitor.flow.LazyVisitorPortFlowStoryCommand;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.AbstractHandleChannelFlowAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.AbstractHandleChannelFlowAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.ChannelFlow; import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.ChannelFlow;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum;
import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.visitor.flow.LazyVisitorPortFlowStoryCommand;
/** /**
* 出口流量处理 * 出口流量处理
*/ */
public class ServerHandlerOutFlowHandler extends AbstractHandleChannelFlowAdvanced { public class ServerHandlerOutFlowHandler extends AbstractHandleChannelFlowAdvanced {
private final LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication; private final LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication;
private final ServerNodeProperties serverNodeProperties;
public ServerHandlerOutFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication) { public ServerHandlerOutFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication, ServerNodeProperties serverNodeProperties) {
this.lazyVisitorPortFlowApplication = lazyVisitorPortFlowApplication; this.lazyVisitorPortFlowApplication = lazyVisitorPortFlowApplication;
this.serverNodeProperties = serverNodeProperties;
} }
/** /**
@ -42,11 +45,13 @@ public class ServerHandlerOutFlowHandler extends AbstractHandleChannelFlowAdvanc
// 出口流量处理 // 出口流量处理
LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); if(serverNodeProperties.getEnableFlowControl()){
visitorPortFlow.setOutFlow(flow); LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand();
visitorPortFlow.setClientId(clientId); visitorPortFlow.setOutFlow(flow);
visitorPortFlow.setVisitorPort(port); visitorPortFlow.setClientId(clientId);
visitorPortFlow.setIsDeleted(false); visitorPortFlow.setVisitorPort(port);
lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow); visitorPortFlow.setIsDeleted(false);
lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow);
}
} }
} }

View File

@ -9,7 +9,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.*; import org.framework.lazy.cloud.network.heartbeat.common.*;
import org.framework.lazy.cloud.network.heartbeat.common.*;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
@ -55,6 +54,9 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
nettyProxyMsg.setVisitorId(visitorId); nettyProxyMsg.setVisitorId(visitorId);
// 判断是否有可用的通道 如果没有创建新的通道
// 客户端心跳通道 // 客户端心跳通道
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
if (clientChannel != null) { if (clientChannel != null) {
@ -62,7 +64,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
Channel channel = clientChannel.getChannel(); Channel channel = clientChannel.getChannel();
channel.writeAndFlush(nettyProxyMsg); channel.writeAndFlush(nettyProxyMsg);
} else { } else {
log.error("无法通过客户端ID获取客户端通道"); log.error("客户端:【{}】已经下线,无法通过客户端ID获取客户端通道", clientId);
} }
@ -76,19 +78,20 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) {
Channel realChannel = ctx.channel(); // 访客通道
Channel visitorChannel = ctx.channel();
String clientId = internalNetworkPenetrationRealClient.getClientId(); String clientId = internalNetworkPenetrationRealClient.getClientId();
String clientTargetIp = internalNetworkPenetrationRealClient.getClientTargetIp(); String clientTargetIp = internalNetworkPenetrationRealClient.getClientTargetIp();
Integer clientTargetPort = internalNetworkPenetrationRealClient.getClientTargetPort(); Integer clientTargetPort = internalNetworkPenetrationRealClient.getClientTargetPort();
Integer visitorPort = internalNetworkPenetrationRealClient.getVisitorPort(); Integer visitorPort = internalNetworkPenetrationRealClient.getVisitorPort();
String visitorId = ChannelAttributeKeyUtils.getVisitorId(realChannel); String visitorId = ChannelAttributeKeyUtils.getVisitorId(visitorChannel);
if (StringUtil.isNullOrEmpty(clientId)) { if (StringUtil.isNullOrEmpty(clientId)) {
return; return;
} }
byte[] bytes = new byte[buf.readableBytes()]; byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes); buf.readBytes(bytes);
// 获取客户端通道而后进行数据下发 // 获取客户端通道而后进行数据下发
log.debug("服务端访客端口成功接收数据:{}", new String(bytes)); log.debug("服务端访客端口成功接收数据:{}", new String(bytes));
// 使用访客的通信通道 // 使用访客的通信通道
Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId); Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId);
@ -111,7 +114,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
.clientId(clientId) .clientId(clientId)
.flow(bytes.length) .flow(bytes.length)
.build(); .build();
channelFlowAdapter.handler(realChannel, serverChannelFlow); channelFlowAdapter.handler(visitorChannel, serverChannelFlow);
log.debug("服务端访客端口成功发送数据了"); log.debug("服务端访客端口成功发送数据了");
} }

View File

@ -36,4 +36,9 @@ public class ServerNodeProperties {
* 集群节点端口 * 集群节点端口
*/ */
private Integer nodePort; private Integer nodePort;
/**
* 开启流量监控
*/
private Boolean enableFlowControl=false;
} }

View File

@ -54,7 +54,13 @@ public class LazyNettyServerVisitorQueryListCommand {
* 访客端口 * 访客端口
*/ */
@Schema(description = "访客端口", name = "visitorPort", example = "") @Schema(description = "访客端口", name = "visitorPort", example = "")
private Integer visitorPort; /** private Integer visitorPort;
/**
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
private Integer poolSize;
/**
* 服务端ID * 服务端ID
*/ */
@Schema(description = "服务端ID", name = "serverId", example = "") @Schema(description = "服务端ID", name = "serverId", example = "")

View File

@ -55,6 +55,11 @@ public class LazyNettyServerVisitorQueryOneCommand {
*/ */
@Schema(description = "访客端口", name = "visitorPort", example = "") @Schema(description = "访客端口", name = "visitorPort", example = "")
private Integer visitorPort; private Integer visitorPort;
/**
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
private Integer poolSize;
/** /**
* 服务端ID * 服务端ID
*/ */

View File

@ -55,6 +55,11 @@ public class LazyNettyServerVisitorRemoveCommand {
*/ */
@Schema(description = "访客端口", name = "visitorPort", example = "") @Schema(description = "访客端口", name = "visitorPort", example = "")
private Integer visitorPort; private Integer visitorPort;
/**
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
private Integer poolSize;
/** /**
* 服务端ID * 服务端ID
*/ */

View File

@ -55,6 +55,11 @@ public class LazyNettyServerVisitorStoryCommand {
*/ */
@Schema(description = "访客端口", name = "visitorPort", example = "") @Schema(description = "访客端口", name = "visitorPort", example = "")
private Integer visitorPort; private Integer visitorPort;
/**
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
private Integer poolSize=20;
/** /**
* 服务端ID * 服务端ID
*/ */

View File

@ -55,6 +55,11 @@ public class LazyNettyServerVisitorUpdateCommand {
*/ */
@Schema(description = "访客端口", name = "visitorPort", example = "") @Schema(description = "访客端口", name = "visitorPort", example = "")
private Integer visitorPort; private Integer visitorPort;
/**
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
private Integer poolSize;
/** /**
* 服务端ID * 服务端ID
*/ */

View File

@ -54,7 +54,13 @@ public class LazyNettyServerVisitorDTO {
* 访客端口 * 访客端口
*/ */
@Schema(description = "访客端口", name = "visitorPort", example = "") @Schema(description = "访客端口", name = "visitorPort", example = "")
private Integer visitorPort; /** private Integer visitorPort;
/**
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
private Integer poolSize;
/**
* 服务端ID * 服务端ID
*/ */
@Schema(description = "服务端ID", name = "serverId", example = "") @Schema(description = "服务端ID", name = "serverId", example = "")

View File

@ -4,6 +4,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.wu.framework.lazy.orm.core.persistence.reverse.lazy.ddd.DefaultDDDLazyDomain; import org.wu.framework.lazy.orm.core.persistence.reverse.lazy.ddd.DefaultDDDLazyDomain;
import org.wu.framework.lazy.orm.core.stereotype.LazyTableFieldUnique;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -54,7 +55,13 @@ public class LazyNettyServerVisitor {
* 访客端口 * 访客端口
*/ */
@Schema(description = "访客端口", name = "visitorPort", example = "") @Schema(description = "访客端口", name = "visitorPort", example = "")
private Integer visitorPort; /** private Integer visitorPort;
/**
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
private Integer poolSize;
/**
* 服务端ID * 服务端ID
*/ */
@Schema(description = "服务端ID", name = "serverId", example = "") @Schema(description = "服务端ID", name = "serverId", example = "")

View File

@ -64,6 +64,13 @@ public class LazyNettyServerVisitorDO {
@Schema(description = "访客端口", name = "visitorPort", example = "") @Schema(description = "访客端口", name = "visitorPort", example = "")
@LazyTableFieldUnique(name = "visitor_port", comment = "访客端口", columnType = "int") @LazyTableFieldUnique(name = "visitor_port", comment = "访客端口", columnType = "int")
private Integer visitorPort; private Integer visitorPort;
/**
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
@LazyTableFieldUnique(name = "pool_size", comment = "访客端口池大小", columnType = "int",defaultValue = "'20")
private Integer poolSize;
/** /**
* 服务端ID * 服务端ID
*/ */