【fix】 优化读写缓冲区设置为度缓冲区2m 写1m 低水位1m 高水位2m

This commit is contained in:
wujiawei
2024-09-09 15:51:01 +08:00
parent 4ec1401712
commit 10d2d74ca3
18 changed files with 208 additions and 63 deletions

View File

@ -3,14 +3,17 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.advanced;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportHandleChannelTransferTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum;
import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerChannelFlow;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
@ -37,15 +40,17 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac
public void doHandler(Channel channel, NettyProxyMsg msg) {
String clientId = new String(msg.getClientId());
Integer visitorPort = Integer.valueOf(new String(msg.getVisitorPort()));
log.info("访客端口:[{}] 接收到客户端:[{}]",visitorPort, clientId);
log.debug("接收到客户端:[{}]内网穿透返回的数据:[{}]", clientId, new String(msg.getData()));
// 将数据转发访客通道
byte[] visitorId = msg.getVisitorId();
// log.info("访客ID:【{}】 访客端口:[{}] 接收到客户端:[{}]", new String(visitorId), visitorPort, clientId);
// log.debug("访客ID:【{}】接收到客户端:[{}] 返回数据大小:[{}] 内网穿透返回的数据:[{}]", new String(visitorId), clientId, msg.getData().length, new String(msg.getData()));
// 将数据转发访客通道
Channel visitor = NettyRealIdContext.getReal(visitorId);
if (visitor != null) {
ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length);
buf.writeBytes(msg.getData());
visitor.writeAndFlush(buf);
ChannelFuture channelFuture = visitor.writeAndFlush(buf);
boolean success = channelFuture.isSuccess();
log.debug("visitor writerAndFlush status: {}", success);
// 记录出口数据
ServerChannelFlow serverChannelFlow = ServerChannelFlow
.builder()
@ -56,7 +61,7 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac
.build();
channelFlowAdapter.asyncHandler(channel, serverChannelFlow);
}
log.debug("访客ID:【{}】接收到客户端:[{}] 发送真实数据成功", new String(visitorId), clientId);
}
}

View File

@ -10,6 +10,7 @@ import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.NettyServerHandler;
import org.springframework.stereotype.Component;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
@ -27,7 +28,7 @@ import java.util.concurrent.TimeUnit;
* @date 2023/09/13 10:26
*/
@Component
public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
public class NettyServerFilter extends DebugChannelInitializer<SocketChannel> {
private final List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList;
public NettyServerFilter(List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList) {
@ -35,7 +36,7 @@ public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel0(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 以("\n")为结尾分割的 解码器
// 解码、编码

View File

@ -1,12 +1,16 @@
package org.framework.lazy.cloud.network.heartbeat.server.netty.filter;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrationRealClient;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.VisitorHandler;
public class VisitorFilter extends ChannelInitializer<SocketChannel> {
public class VisitorFilter extends DebugChannelInitializer<SocketChannel> {
private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient;
private final ChannelFlowAdapter channelFlowAdapter;
@ -25,10 +29,11 @@ public class VisitorFilter extends ChannelInitializer<SocketChannel> {
* the {@link Channel}.
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel0(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ch.config().setReceiveBufferSize(1024 * 1024);
ch.config().setSendBufferSize(1024 * 1024);
pipeline.addLast(new ChannelDuplexHandler());
pipeline.addLast(new VisitorHandler(internalNetworkPenetrationRealClient, channelFlowAdapter));
}
}

View File

@ -130,4 +130,19 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
//……
if (channel.isActive()) ctx.close();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
* <p>
* Sub-classes may override this method to change behavior.
*
* @param ctx
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
log.info("netty server handler channel writability changed: {}", ctx.channel());
super.channelWritabilityChanged(ctx);
}
}

View File

@ -13,6 +13,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdap
import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerChannelFlow;
import org.wu.framework.core.utils.ObjectUtils;
import java.util.UUID;
@ -149,25 +150,36 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// Channel visitorChannel = ctx.channel();
// String vid = visitorChannel.attr(Constant.VID).get();
// if (StringUtil.isNullOrEmpty(vid)) {
// super.channelWritabilityChanged(ctx);
// return;
// }
// Channel clientChannel = Constant.vcc.get(vid);
// if (clientChannel != null) {
// clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable());
// }
// 获取访客的传输通道
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
if(ObjectUtils.isEmpty(visitorId)) {
super.channelWritabilityChanged(ctx);
return;
}
Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId);
if (visitorCommunicationChannel != null) {
log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable());
visitorCommunicationChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
}
// Channel visitorChannel = ctx.channel();
// String vid = visitorChannel.attr(Constant.VID).get();
// if (StringUtil.isNullOrEmpty(vid)) {
// super.channelWritabilityChanged(ctx);
// return;
// }
// Channel clientChannel = Constant.vcc.get(vid);
// if (clientChannel != null) {
// clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable());
// }
if (ctx.channel().isWritable()) {
System.out.println("Channel is writable again");
log.debug("Channel is writable again");
// 恢复之前暂停的操作,如写入数据
} else {
System.out.println("Channel is not writable");
log.debug("Channel is not writable");
// 暂停写入操作,等待可写状态
}
log.info("channelWritabilityChanged");
super.channelWritabilityChanged(ctx);
log.info("visitorId:{} channelWritabilityChanged!",visitorId);
}
@Override

View File

@ -2,10 +2,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.NettyServerFilter;
@ -31,8 +28,6 @@ public class NettyOnCloudNettyServerSocket {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
// 给服务端channel设置属性
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 设置读缓冲区为2M
.childOption(ChannelOption.SO_RCVBUF, 2048 * 1024)
// 设置写缓冲区为1M
@ -42,7 +37,7 @@ public class NettyOnCloudNettyServerSocket {
// .childOption(ChannelOption.TCP_NODELAY, false)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 秒
// .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
.childHandler(nettyServerFilter);
channelFuture = b.bind(serverPort).sync();

View File

@ -10,6 +10,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrat
import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyVisitorPortContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.allocator.NettyRecvByteBufAllocator;
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.VisitorFilter;
import java.io.IOException;
@ -41,12 +42,12 @@ public class NettyVisitorSocket {
*
* @throws Exception
*/
public void startServer() throws Exception {
public void startVisitorServer() throws Exception {
Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort);
if (visitor == null) {
ServerBootstrap b = new ServerBootstrap();
b
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
@ -60,11 +61,12 @@ public class NettyVisitorSocket {
.childOption(ChannelOption.SO_KEEPALIVE, true)
// .childOption(ChannelOption.TCP_NODELAY, false)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 秒
// .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
// .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认 AdaptiveRecvByteBufAllocator.DEFAULT
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
.childHandler(visitorFilter);
ChannelFuture sync = b.bind(visitorPort).sync();
ChannelFuture sync = bootstrap.bind(visitorPort).sync();
sync.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// 这里时异步处理

View File

@ -8,7 +8,6 @@ import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.internal.network.penetration.mapping.*;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.internal.network.penetration.mapping.LazyInternalNetworkPenetrationMapping;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.internal.network.penetration.mapping.LazyInternalNetworkPenetrationMappingRepository;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.internal.network.penetration.mapping.*;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.dto.LazyInternalNetworkPenetrationMappingDTO;
import org.springframework.transaction.annotation.Transactional;
import org.wu.framework.core.utils.ObjectUtils;
@ -292,7 +291,7 @@ public class LazyInternalNetworkPenetrationMappingApplicationImpl implements Laz
.builderChannelFlowAdapter(channelFlowAdapter)
.build();
try {
nettyVisitorSocket.startServer();
nettyVisitorSocket.startVisitorServer();
} catch (Exception e) {
log.error("客户端:{},网络端口:{},开放失败", clientId, visitorPort);
throw new RuntimeException(e);