From 5427ff68db2295919707aa7f33c06684b3977498 Mon Sep 17 00:00:00 2001 From: wujiawei <12345678> Date: Sun, 11 Jan 2026 21:43:02 +0800 Subject: [PATCH] =?UTF-8?q?[update]=20=E6=B7=BB=E5=8A=A0=E6=B5=81=E9=87=8F?= =?UTF-8?q?=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../TrafficStatisticsInboundHandler.java | 47 +++++++++++++++++ .../TrafficStatisticsOutboundHandler.java | 32 ++++++++++++ .../utils/ChannelAttributeKeyUtils.java | 50 +++++++++++++++++-- 3 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/handler/TrafficStatisticsInboundHandler.java create mode 100644 wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/handler/TrafficStatisticsOutboundHandler.java diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/handler/TrafficStatisticsInboundHandler.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/handler/TrafficStatisticsInboundHandler.java new file mode 100644 index 00000000..b06b2549 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/handler/TrafficStatisticsInboundHandler.java @@ -0,0 +1,47 @@ +package org.framework.lazy.cloud.network.heartbeat.common.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; + +/** + * Netty4.2.9 流量+网速统计核心Handler (无侵入、线程安全) + */ +public class TrafficStatisticsInboundHandler extends ChannelInboundHandlerAdapter { + + // ===================== 统计下行流量:接收数据 ===================== + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + int len = buf.readableBytes(); + if (len > 0) { + // 从Channel中获取统计对象,累加字节数 + ChannelAttributeKeyUtils.buildReadBytesLength(ctx.channel(),len); + } + } + super.channelRead(ctx, msg); + } + + + + + // ===================== 连接创建时,初始化统计对象 ===================== + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // 给当前Channel绑定一个独立的统计对象 + super.channelActive(ctx); + } + + // ===================== 连接关闭时,移除统计对象 ===================== + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/handler/TrafficStatisticsOutboundHandler.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/handler/TrafficStatisticsOutboundHandler.java new file mode 100644 index 00000000..7292ff06 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/handler/TrafficStatisticsOutboundHandler.java @@ -0,0 +1,32 @@ +package org.framework.lazy.cloud.network.heartbeat.common.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * 出站流量统计处理器,只处理发送数据 + * 继承ChannelOutboundHandlerAdapter 正确重写write()! + */ +public class TrafficStatisticsOutboundHandler extends ChannelOutboundHandlerAdapter { + + // ========== 核心:统计【发送/上行】流量 ========== + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + int readableBytes = buf.readableBytes(); + // 判断通道是否活跃,兼容你的关闭逻辑 + if (ctx.channel().isActive()) { + ChannelAttributeKeyUtils.buildWriteBytesLength(ctx.channel(),readableBytes); + } + } + // 必须传递消息,不影响发送逻辑 + super.write(ctx, msg, promise); + } + +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java index 13975edc..e450985a 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java @@ -25,9 +25,12 @@ public class ChannelAttributeKeyUtils { private static final AttributeKey NEXT_CHANNEL = AttributeKey.newInstance("nextChannel"); private static final AttributeKey TRANSFER_NEXT_CHANNEL = AttributeKey.newInstance("transferNextChannel"); - private static final AttributeKey NETTY_BYTE_BUF_DATA = AttributeKey.newInstance("nettyByteBufData"); + private static final AttributeKey NETTY_BYTE_BUFF_DATA = AttributeKey.newInstance("nettyByteBuffData"); private static final AttributeKey NETTY_SOCKS5_ADDRESS_TYPE = AttributeKey.newInstance("socks5AddressType"); + private static final AttributeKey READ_BYTES_LENGTH = AttributeKey.newInstance("readBytesLength"); + private static final AttributeKey WRITE_BYTES_LENGTH = AttributeKey.newInstance("writeBytesLength"); + /** * 为通道绑定 访客属性 @@ -242,7 +245,7 @@ public class ChannelAttributeKeyUtils { * @param nettyByteBuf 传输数据 */ public static void buildNettyByteBufData(Channel channel, NettyByteBuf nettyByteBuf) { - channel.attr(NETTY_BYTE_BUF_DATA).set(nettyByteBuf); + channel.attr(NETTY_BYTE_BUFF_DATA).set(nettyByteBuf); } @@ -252,7 +255,7 @@ public class ChannelAttributeKeyUtils { * @param channel 通道 */ public static NettyByteBuf getNettyByteBufData(Channel channel) { - return channel.attr(NETTY_BYTE_BUF_DATA).get(); + return channel.attr(NETTY_BYTE_BUFF_DATA).get(); } @@ -371,4 +374,45 @@ public class ChannelAttributeKeyUtils { public static Byte getSocks5AddressType(Channel channel) { return channel.attr(NETTY_SOCKS5_ADDRESS_TYPE).get(); } + + + + /** + * 为通道绑定 读数据大小 + * + * @param channel 通道 + * @param readBytesLength 读数据大小 + */ + public static void buildReadBytesLength(Channel channel, Integer readBytesLength) { + channel.attr(READ_BYTES_LENGTH).set(readBytesLength); + } + + /** + * 获取 通道中 读数据大小 + * + * @param channel 通道 + */ + public static Integer getReadBytesLength(Channel channel) { + return channel.attr(READ_BYTES_LENGTH).get(); + } + + /** + * 为通道绑定 写数据大小 + * + * @param channel 通道 + * @param writeBytesLength 写数据大小 + * @see Socks5AddressType#valueOf(byte) + */ + public static void buildWriteBytesLength(Channel channel, Integer writeBytesLength) { + channel.attr(WRITE_BYTES_LENGTH).set(writeBytesLength); + } + + /** + * 获取 通道中 写数据大小 + * + * @param channel 通道 + */ + public static Integer getWriteBytesLength(Channel channel) { + return channel.attr(WRITE_BYTES_LENGTH).get(); + } }