【fix】 自定义netty 接收数据缓冲期控制接收数据大小

This commit is contained in:
wujiawei
2024-08-28 16:57:31 +08:00
parent fd364c78e7
commit 629910e860
19 changed files with 236 additions and 34 deletions

View File

@ -25,7 +25,7 @@
<module>wu-lazy-cloud-heartbeat-common</module>
<!-- 样例 -->
<module>wu-lazy-cloud-heartbeat-start</module>
<!-- <module>wu-lazy-cloud-heartbeat-start</module>-->
</modules>
<properties>

View File

@ -1,11 +1,12 @@
package org.framework.lazy.cloud.network.heartbeat.client.netty.filter;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientRealHandler;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
public class NettyClientRealFilter extends ChannelInitializer<SocketChannel> {
/**
@ -13,18 +14,13 @@ public class NettyClientRealFilter extends ChannelInitializer<SocketChannel> {
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
*
* @param ch the {@link Channel} which was registered.
* @throws Exception is thrown if an error occurs. In that case it will be handled by
* {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default connectionClose
* the {@link Channel}.
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 解码、编码
pipeline.addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10));
pipeline.addLast(new TransferEncoder());
pipeline.addLast(new NettyClientRealHandler());
// // 解码、编码
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettMsgEncoder());
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettyProxyMsgEncoder());
}
}

View File

@ -7,6 +7,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
import org.framework.lazy.cloud.network.heartbeat.common.NettyCommunicationIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
@ -15,14 +16,14 @@ import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeK
* 来自客户端 真实服务器返回的数据请求
*/
@Slf4j
public class NettyClientRealHandler extends SimpleChannelInboundHandler<ByteBuf> {
public class NettyClientRealHandler extends SimpleChannelInboundHandler<NettyByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
public void channelRead0(ChannelHandlerContext ctx,NettyByteBuf nettyByteBuf) {
// 客户端发送真实数据到代理了
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
byte[] bytes = nettyByteBuf.getData();
log.debug("bytes.length:{}",bytes.length);
log.debug("接收客户端真实服务数据:{}", new String(bytes));
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
Integer visitorPort = ChannelAttributeKeyUtils.getVisitorPort(ctx.channel());
@ -37,10 +38,9 @@ public class NettyClientRealHandler extends SimpleChannelInboundHandler<ByteBuf>
returnMessage.setData(bytes);
visitorChannel.writeAndFlush(returnMessage);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);

View File

@ -6,13 +6,13 @@ import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientRealFilter;
import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientProperties;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientRealFilter;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientVisitorRealFilter;
import org.framework.lazy.cloud.network.heartbeat.common.*;
import org.framework.lazy.cloud.network.heartbeat.common.*;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.allocator.NettyRecvByteBufAllocator;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.util.List;
@ -54,7 +54,19 @@ public class NettyClientRealSocket {
String visitorId = internalNetworkPenetrationRealClient.getVisitorId();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new NettyClientRealFilter());
// 设置读缓冲区为2M
// .option(ChannelOption.SO_RCVBUF, 2048 * 1024)
// 设置写缓冲区为1M
// .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 秒
.option(ChannelOption.SO_BACKLOG, 128)//务端接受连接的队列长度 默认128
.option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
.handler(new NettyClientRealFilter())
;
bootstrap.connect(clientTargetIp, clientTargetPort).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// 客户端链接真实服务成功 设置自动读写false 等待访客连接成功后设置成true

View File

@ -0,0 +1,18 @@
package org.framework.lazy.cloud.network.heartbeat.common;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@NoArgsConstructor
@Setter
@Getter
public class NettyByteBuf {
// body 长度 data 4
public static final int bodyLength = 4;
/**
* 消息传输数据
* byte[] 长度 4
*/
private byte[] data;
}

View File

@ -0,0 +1,75 @@
package org.framework.lazy.cloud.network.heartbeat.common.allocator;
import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import static io.netty.util.internal.ObjectUtil.checkPositive;
/**
* The {@link RecvByteBufAllocator} that always yields the same buffer
* size prediction. This predictor ignores the feed back from the I/O thread.
* 接收数据缓冲区
*
* @see FixedRecvByteBufAllocator
*/
@Slf4j
public class NettyRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
private final int bufferSize;
private final class HandleImpl extends MaxMessageHandle {
private final int bufferSize;
HandleImpl(int bufferSize) {
this.bufferSize = bufferSize;
}
@Override
public int guess() {
// log.info("guess :{}", bufferSize);
return bufferSize;
}
@Override
public void attemptedBytesRead(int bytes) {
// log.info("attemptedBytesRead:{}", bytes);
super.attemptedBytesRead(bytes);
}
@Override
public int attemptedBytesRead() {
int attemptedBytesRead = super.attemptedBytesRead();
// log.info("attemptedBytesRead result:{}", attemptedBytesRead);
return attemptedBytesRead;
}
@Override
public void lastBytesRead(int bytes) {
super.lastBytesRead(bytes);
// log.info("lastBytesRead result:{}", bytes);
}
}
/**
* Creates a new predictor that always returns the same prediction of
* the specified buffer size.
*/
public NettyRecvByteBufAllocator(int bufferSize) {
// checkPositive(bufferSize, "bufferSize");
this.bufferSize = bufferSize;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new NettyRecvByteBufAllocator.HandleImpl(bufferSize);
}
@Override
public NettyRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
super.respectMaybeMoreData(respectMaybeMoreData);
return this;
}
}

View File

@ -0,0 +1,79 @@
package org.framework.lazy.cloud.network.heartbeat.common.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
import org.wu.framework.core.exception.RuntimeExceptionFactory;
import java.util.List;
/**
* ByteBuf 解码为 byte[]
*/
@Slf4j
public class TransferDecoder extends
ByteToMessageDecoder {
// 上一次可以读取到到数据长度
private long latestReadableBytes = -1;
private final int maxFrameLength; // 读取最大长度超出会异常
private final int perPackageLimitLength;// 粘包时合并后到包最大大小
public TransferDecoder(int maxFrameLength, int perPackageLimitLength) {
this.maxFrameLength = maxFrameLength;
this.perPackageLimitLength = perPackageLimitLength;
}
protected NettyByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in == null) {
log.info("this byteBuf is null");
return null;
}
// if (latestReadableBytes == -1) {
//
// latestReadableBytes = in.readableBytes();
// // 第一次什么也不做
// return null;
// }
// // 第二次了哦
// if (in.readableBytes() > latestReadableBytes && in.readableBytes() < perPackageLimitLength) {
// // 继续粘包
// latestReadableBytes = in.readableBytes();
// log.info("粘包ing:{}",latestReadableBytes);
// return null;
// }
// if (in.readableBytes() > maxFrameLength) {
// RuntimeExceptionFactory.of("readableBytes is max then maxFrameLength:" + maxFrameLength);
// }
// log.info("粘包结束开始处理数据:{}",latestReadableBytes);
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
NettyByteBuf nettyByteBuf = new NettyByteBuf();
nettyByteBuf.setData(bytes);
latestReadableBytes = -1;
return nettyByteBuf;
}
/**
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
* {@link ByteBuf}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
}

View File

@ -0,0 +1,19 @@
package org.framework.lazy.cloud.network.heartbeat.common.encoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
/**
* 编码
*/
public class TransferEncoder extends MessageToByteEncoder<NettyByteBuf> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,NettyByteBuf nettyByteBuf, ByteBuf byteBuf) throws Exception {
byte[] bytes = nettyByteBuf.getData();
byteBuf.writeBytes(bytes);
}
}

View File

@ -37,6 +37,7 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac
public void doHandler(Channel channel, NettyProxyMsg msg) {
String clientId = new String(msg.getClientId());
Integer visitorPort = Integer.valueOf(new String(msg.getVisitorPort()));
// log.info("接收到客户端:[{}]内网穿透返回的数据:[{}]", clientId, msg.getData().length);
log.debug("接收到客户端:[{}]内网穿透返回的数据:[{}]", clientId, new String(msg.getData()));
// 将数据转发访客通道
byte[] visitorId = msg.getVisitorId();

View File

@ -67,6 +67,7 @@ public class LazyInternalNetworkPenetrationMappingApplicationImpl implements Laz
Integer clientTargetPort = lazyInternalNetworkPenetrationMapping.getClientTargetPort();
Integer visitorPort = lazyInternalNetworkPenetrationMapping.getVisitorPort();
// 创建访客通道池
this.changeSocket(clientId, clientTargetIp, clientTargetPort, visitorPort);
return lazyInternalNetworkPenetrationMappingRepository.story(lazyInternalNetworkPenetrationMapping);

View File

@ -69,7 +69,7 @@ public class LazyNettyServerVisitorDO {
* 访客端口池大小
*/
@Schema(description = "访客端口池大小", name = "poolSize", example = "")
@LazyTableFieldUnique(name = "pool_size", comment = "访客端口池大小", columnType = "int",defaultValue = "'20")
@LazyTableField(name = "pool_size", comment = "访客端口池大小", columnType = "int",defaultValue = "'20'")
private Integer poolSize;
/**
* 服务端ID

View File

@ -9,9 +9,9 @@ mvn clean package -Pnative
```shell
mvn spring-boot:build-image -Pnative
docker tag docker.io/library/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-SNAPSHOT registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.6-JDK17-NATIVE-SNAPSHOT
docker tag docker.io/library/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-SNAPSHOT registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-NATIVE-SNAPSHOT
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.6-JDK17-NATIVE-SNAPSHOT
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-NATIVE-SNAPSHOT
```
@ -33,7 +33,7 @@ docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-cl
### run
```shell
docker run -d -it --privileged --name client -p 6004:6004 registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.6-JDK17-NATIVE-SNAPSHOT
docker run -d -it --privileged --name client -p 6004:6004 registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-NATIVE-SNAPSHOT
```

View File

@ -34,7 +34,7 @@ spec:
- name: spring.lazy.netty.client.client-id
value: wu-lazy-cloud-heartbeat-client-start
image: >-
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.6-JDK17-NATIVE-SNAPSHOT
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-NATIVE-SNAPSHOT
imagePullPolicy: Always
name: wu-lazy-cloud-heartbeat-client-start
resources: {}

View File

@ -11,9 +11,9 @@ mvn clean package -Pnative
```shell
mvn spring-boot:build-image -Pnative
docker tag docker.io/library/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.7-JDK17-SNAPSHOT registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.6-JDK17-NATIVE-SNAPSHOT
docker tag docker.io/library/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.7-JDK17-SNAPSHOT registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.7-JDK17-NATIVE-SNAPSHOT
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.6-JDK17-NATIVE-SNAPSHOT
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.7-JDK17-NATIVE-SNAPSHOT
```

View File

@ -73,7 +73,7 @@ spec:
- configMapRef:
name: wu-lazy-cloud-heartbeat-server-cluster-start-conf
image: >-
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.6-JDK17-NATIVE-SNAPSHOT
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.7-JDK17-NATIVE-SNAPSHOT
imagePullPolicy: Always
name: network-server-cluster-start
ports:

View File

@ -54,7 +54,7 @@ spec:
- configMapRef:
name: wu-lazy-cloud-heartbeat-server-cluster-start-conf
image: >-
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.6-JDK17-NATIVE-SNAPSHOT
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-cluster-start:1.2.7-JDK17-NATIVE-SNAPSHOT
imagePullPolicy: Always
name: network-server-cluster-start
ports:

View File

@ -11,9 +11,9 @@ mvn clean package -Pnative
```shell
mvn spring-boot:build-image -Pnative
docker tag docker.io/library/wu-lazy-cloud-heartbeat-server-start:1.2.7-JDK17-SNAPSHOT registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-start:1.2.6-JDK17-NATIVE-SNAPSHOT
docker tag docker.io/library/wu-lazy-cloud-heartbeat-server-start:1.2.7-JDK17-SNAPSHOT registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-start:1.2.7-JDK17-NATIVE-SNAPSHOT
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-start:1.2.6-JDK17-NATIVE-SNAPSHOT
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-start:1.2.7-JDK17-NATIVE-SNAPSHOT
```
@ -30,7 +30,7 @@ docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-se
```RUN
docker run -d -it -p 6001:6001 --name wu-lazy-cloud-heartbeat-server-start registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-start:1.2.6-JDK17-NATIVE-SNAPSHOT
docker run -d -it -p 6001:6001 --name wu-lazy-cloud-heartbeat-server-start registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-start:1.2.7-JDK17-NATIVE-SNAPSHOT
http://127.0.0.1:6001/swagger-ui/index.html

View File

@ -41,7 +41,7 @@ spec:
- name: spring.datasource.driver-class-name
value: com.mysql.cj.jdbc.Driver
image: >-
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-start:1.2.6-JDK17-NATIVE-SNAPSHOT
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server-start:1.2.7-JDK17-NATIVE-SNAPSHOT
imagePullPolicy: Always
name: wu-lazy-cloud-heartbeat-server
resources: {}

View File

@ -12,6 +12,7 @@ spring:
node-id: default #当前服务ID
node-host: 127.0.0.1 # 当前节点host
node-port: 7001 # 当前节点端口
enable-flow-control: true # 允许流量控制,代理性能不太好
ddl-configure:
ddl-auto: create