[update] 添加流量控制

This commit is contained in:
wujiawei
2026-01-11 21:43:02 +08:00
parent d797b918a4
commit 5427ff68db
3 changed files with 126 additions and 3 deletions

View File

@@ -0,0 +1,47 @@
package org.framework.lazy.cloud.network.heartbeat.common.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
/**
* Netty4.2.9 流量+网速统计核心Handler (无侵入、线程安全)
*/
public class TrafficStatisticsInboundHandler extends ChannelInboundHandlerAdapter {
// ===================== 统计下行流量:接收数据 =====================
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int len = buf.readableBytes();
if (len > 0) {
// 从Channel中获取统计对象累加字节数
ChannelAttributeKeyUtils.buildReadBytesLength(ctx.channel(),len);
}
}
super.channelRead(ctx, msg);
}
// ===================== 连接创建时,初始化统计对象 =====================
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 给当前Channel绑定一个独立的统计对象
super.channelActive(ctx);
}
// ===================== 连接关闭时,移除统计对象 =====================
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}

View File

@@ -0,0 +1,32 @@
package org.framework.lazy.cloud.network.heartbeat.common.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.util.concurrent.atomic.AtomicLong;
/**
* 出站流量统计处理器,只处理发送数据
* 继承ChannelOutboundHandlerAdapter 正确重写write()
*/
public class TrafficStatisticsOutboundHandler extends ChannelOutboundHandlerAdapter {
// ========== 核心:统计【发送/上行】流量 ==========
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
// 判断通道是否活跃,兼容你的关闭逻辑
if (ctx.channel().isActive()) {
ChannelAttributeKeyUtils.buildWriteBytesLength(ctx.channel(),readableBytes);
}
}
// 必须传递消息,不影响发送逻辑
super.write(ctx, msg, promise);
}
}

View File

@@ -25,9 +25,12 @@ public class ChannelAttributeKeyUtils {
private static final AttributeKey<Channel> NEXT_CHANNEL = AttributeKey.newInstance("nextChannel"); private static final AttributeKey<Channel> NEXT_CHANNEL = AttributeKey.newInstance("nextChannel");
private static final AttributeKey<Channel> TRANSFER_NEXT_CHANNEL = AttributeKey.newInstance("transferNextChannel"); private static final AttributeKey<Channel> TRANSFER_NEXT_CHANNEL = AttributeKey.newInstance("transferNextChannel");
private static final AttributeKey<NettyByteBuf> NETTY_BYTE_BUF_DATA = AttributeKey.newInstance("nettyByteBufData"); private static final AttributeKey<NettyByteBuf> NETTY_BYTE_BUFF_DATA = AttributeKey.newInstance("nettyByteBuffData");
private static final AttributeKey<Byte> NETTY_SOCKS5_ADDRESS_TYPE = AttributeKey.newInstance("socks5AddressType"); private static final AttributeKey<Byte> NETTY_SOCKS5_ADDRESS_TYPE = AttributeKey.newInstance("socks5AddressType");
private static final AttributeKey<Integer> READ_BYTES_LENGTH = AttributeKey.newInstance("readBytesLength");
private static final AttributeKey<Integer> WRITE_BYTES_LENGTH = AttributeKey.newInstance("writeBytesLength");
/** /**
* 为通道绑定 访客属性 * 为通道绑定 访客属性
@@ -242,7 +245,7 @@ public class ChannelAttributeKeyUtils {
* @param nettyByteBuf 传输数据 * @param nettyByteBuf 传输数据
*/ */
public static void buildNettyByteBufData(Channel channel, NettyByteBuf nettyByteBuf) { public static void buildNettyByteBufData(Channel channel, NettyByteBuf nettyByteBuf) {
channel.attr(NETTY_BYTE_BUF_DATA).set(nettyByteBuf); channel.attr(NETTY_BYTE_BUFF_DATA).set(nettyByteBuf);
} }
@@ -252,7 +255,7 @@ public class ChannelAttributeKeyUtils {
* @param channel 通道 * @param channel 通道
*/ */
public static NettyByteBuf getNettyByteBufData(Channel channel) { public static NettyByteBuf getNettyByteBufData(Channel channel) {
return channel.attr(NETTY_BYTE_BUF_DATA).get(); return channel.attr(NETTY_BYTE_BUFF_DATA).get();
} }
@@ -371,4 +374,45 @@ public class ChannelAttributeKeyUtils {
public static Byte getSocks5AddressType(Channel channel) { public static Byte getSocks5AddressType(Channel channel) {
return channel.attr(NETTY_SOCKS5_ADDRESS_TYPE).get(); return channel.attr(NETTY_SOCKS5_ADDRESS_TYPE).get();
} }
/**
* 为通道绑定 读数据大小
*
* @param channel 通道
* @param readBytesLength 读数据大小
*/
public static void buildReadBytesLength(Channel channel, Integer readBytesLength) {
channel.attr(READ_BYTES_LENGTH).set(readBytesLength);
}
/**
* 获取 通道中 读数据大小
*
* @param channel 通道
*/
public static Integer getReadBytesLength(Channel channel) {
return channel.attr(READ_BYTES_LENGTH).get();
}
/**
* 为通道绑定 写数据大小
*
* @param channel 通道
* @param writeBytesLength 写数据大小
* @see Socks5AddressType#valueOf(byte)
*/
public static void buildWriteBytesLength(Channel channel, Integer writeBytesLength) {
channel.attr(WRITE_BYTES_LENGTH).set(writeBytesLength);
}
/**
* 获取 通道中 写数据大小
*
* @param channel 通道
*/
public static Integer getWriteBytesLength(Channel channel) {
return channel.attr(WRITE_BYTES_LENGTH).get();
}
} }