From 3f4a05ba5c29e218fd998c74b55d611794a07585 Mon Sep 17 00:00:00 2001 From: wujiawei <1207537021@qq.com> Date: Tue, 11 Nov 2025 21:40:49 +0800 Subject: [PATCH] [fix] fix --- version.md | 4 +- .../test1/TrafficInterceptHandler.java | 210 ++++++++++++++++++ .../protocol/test1/TrafficInterceptor.java | 134 +++++++++++ 3 files changed, 347 insertions(+), 1 deletion(-) create mode 100644 wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptHandler.java create mode 100644 wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptor.java diff --git a/version.md b/version.md index 9b6d0aa1..b371252e 100644 --- a/version.md +++ b/version.md @@ -56,4 +56,6 @@ [add]默认客户端ID为当前设备唯一标识 -#### 下一版本计划 \ No newline at end of file +#### 下一版本计划 + [add] 流媒体抓取 + [add] 监听本地网卡进行代理 \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptHandler.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptHandler.java new file mode 100644 index 00000000..00e6e397 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptHandler.java @@ -0,0 +1,210 @@ +package org.framework.lazy.cloud.network.heartbeat.protocol.test1; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.net.InetSocketAddress; + +/** + * 流量拦截处理器:执行拦截策略,处理流量转发/丢弃/修改 + */ +public class TrafficInterceptHandler extends ChannelDuplexHandler { + private final TrafficInterceptor.InterceptStrategy strategy; + private Channel forwardChannel; // 转发通道(连接原目标服务器) + private InetSocketAddress originalTarget; // 流量的原目标地址(需解析获取) + + public TrafficInterceptHandler(TrafficInterceptor.InterceptStrategy strategy) { + this.strategy = strategy; + } + + /** + * 客户端连接建立时,解析原目标地址(关键:透明代理需知道流量要发往的原目标) + * 注:原目标地址的解析方式需根据场景调整(如通过路由表、ARP 缓存、或 RAW socket 抓包解析 IP 头部) + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + InetSocketAddress clientAddr = (InetSocketAddress) ctx.channel().remoteAddress(); + System.out.println("拦截到新连接:客户端 " + clientAddr); + + // 关键步骤:解析流量的原目标地址(此处为模拟,实际需通过以下方式获取) + // 方式 1:若拦截器作为网关,原目标地址即客户端请求的目标 IP:Port(需解析应用层协议,如 HTTP Host 头) + // 方式 2:用 RAW socket 抓包,解析 IP 头部的「目的地址」字段(推荐,适用于所有 TCP 流量) + // 方式 3:通过 iptables 端口转发,原目标地址由 iptables 传递(Linux 环境) + // 此处模拟:假设原目标是百度服务器(实际需动态解析) + originalTarget = new InetSocketAddress("220.181.38.251", 80); + + // 根据策略初始化转发通道(若需要转发) + if (strategy == TrafficInterceptor.InterceptStrategy.FORWARD || strategy == TrafficInterceptor.InterceptStrategy.MODIFY) { + initForwardChannel(ctx); + } + + super.channelActive(ctx); + } + + /** + * 初始化转发通道:连接原目标服务器,实现透明转发 + */ + private void initForwardChannel(ChannelHandlerContext ctx) { + Bootstrap bootstrap = new Bootstrap() + .group(ctx.channel().eventLoop()) // 复用客户端事件循环组,性能更优 + .channel(NioSocketChannel.class) + .option(io.netty.channel.ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelDuplexHandler() { + // 接收原目标服务器的响应,转发回客户端 + @Override + public void channelRead(ChannelHandlerContext forwardCtx, Object msg) throws Exception { + ByteBuf response = (ByteBuf) msg; + System.out.printf("收到原目标 %s 响应,长度:%d 字节%n", originalTarget, response.readableBytes()); + + // 若策略是「修改流量」,则修改响应内容 + if (strategy == TrafficInterceptor.InterceptStrategy.MODIFY) { + modifyData(response, false); // false 表示修改响应流量 + } + + // 转发响应到客户端 + ctx.channel().writeAndFlush(msg); + } + + @Override + public void channelInactive(ChannelHandlerContext forwardCtx) throws Exception { + System.out.println("转发通道断开:原目标 " + originalTarget); + ctx.channel().close(); // 转发通道断开,关闭客户端连接 + } + }); + + // 连接原目标服务器 + bootstrap.connect(originalTarget).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + forwardChannel = future.channel(); + System.out.println("成功连接原目标:" + originalTarget); + } else { + System.err.println("连接原目标失败:" + originalTarget + ",原因:" + future.cause().getMessage()); + ctx.channel().close(); + } + }); + } + + /** + * 拦截客户端发送的流量(出站流量) + */ + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + ByteBuf request = (ByteBuf) msg; + InetSocketAddress clientAddr = (InetSocketAddress) ctx.channel().remoteAddress(); + System.out.printf("拦截到客户端 %s 发送的流量,长度:%d 字节%n", clientAddr, request.readableBytes()); + + switch (strategy) { + case DROP: + // 丢弃流量,释放缓冲区 + request.release(); + System.out.println("已丢弃该流量(策略:DROP)"); + break; + + case MODIFY: + // 修改流量内容(示例:在 HTTP 请求头添加自定义字段) + modifyData(request, true); // true 表示修改请求流量 + System.out.println("已修改流量,准备转发"); + // fall through 到 FORWARD 逻辑 + case FORWARD: + // 转发流量到原目标服务器 + if (forwardChannel != null && forwardChannel.isActive()) { + forwardChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + System.err.println("流量转发失败:" + future.cause().getMessage()); + ctx.channel().close(); + } + }); + } else { + request.release(); + System.err.println("转发通道未就绪,丢弃流量"); + } + break; + + case LOG_ONLY: + // 仅记录日志,不拦截,直接放行(透传流量) + super.write(ctx, msg, promise); + break; + } + } + + /** + * 修改流量内容(示例:修改 HTTP 请求/响应) + * @param data 要修改的 ByteBuf 数据 + * @param isRequest 是否为请求流量(true:请求,false:响应) + */ + private void modifyData(ByteBuf data, boolean isRequest) { + // 示例:在 HTTP 请求头添加 X-Intercepted: true 字段 + if (isRequest) { + // 切换为读模式,读取 HTTP 头 + data.markReaderIndex(); + byte[] temp = new byte[data.readableBytes()]; + data.readBytes(temp); + String content = new String(temp); + + // 找到 HTTP 头的结束位置("\r\n\r\n"),插入自定义字段 + if (content.contains("\r\n\r\n")) { + content = content.replace("\r\n\r\n", "\r\nX-Intercepted: true\r\n\r\n"); + // 重置 ByteBuf,写入修改后的数据 + data.resetReaderIndex(); + data.clear(); + data.writeBytes(content.getBytes()); + System.out.println("已修改请求流量:添加 X-Intercepted 头"); + } + } else { + // 示例:修改 HTTP 响应的内容 + data.markReaderIndex(); + byte[] temp = new byte[data.readableBytes()]; + data.readBytes(temp); + String content = new String(temp); + if (content.contains("
")) { + content = content.replace("", "