diff --git a/version.md b/version.md index 9b6d0aa1..b371252e 100644 --- a/version.md +++ b/version.md @@ -56,4 +56,6 @@ [add]默认客户端ID为当前设备唯一标识 -#### 下一版本计划 \ No newline at end of file +#### 下一版本计划 + [add] 流媒体抓取 + [add] 监听本地网卡进行代理 \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptHandler.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptHandler.java new file mode 100644 index 00000000..00e6e397 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptHandler.java @@ -0,0 +1,210 @@ +package org.framework.lazy.cloud.network.heartbeat.protocol.test1; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.net.InetSocketAddress; + +/** + * 流量拦截处理器:执行拦截策略,处理流量转发/丢弃/修改 + */ +public class TrafficInterceptHandler extends ChannelDuplexHandler { + private final TrafficInterceptor.InterceptStrategy strategy; + private Channel forwardChannel; // 转发通道(连接原目标服务器) + private InetSocketAddress originalTarget; // 流量的原目标地址(需解析获取) + + public TrafficInterceptHandler(TrafficInterceptor.InterceptStrategy strategy) { + this.strategy = strategy; + } + + /** + * 客户端连接建立时,解析原目标地址(关键:透明代理需知道流量要发往的原目标) + * 注:原目标地址的解析方式需根据场景调整(如通过路由表、ARP 缓存、或 RAW socket 抓包解析 IP 头部) + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + InetSocketAddress clientAddr = (InetSocketAddress) ctx.channel().remoteAddress(); + System.out.println("拦截到新连接:客户端 " + clientAddr); + + // 关键步骤:解析流量的原目标地址(此处为模拟,实际需通过以下方式获取) + // 方式 1:若拦截器作为网关,原目标地址即客户端请求的目标 IP:Port(需解析应用层协议,如 HTTP Host 头) + // 方式 2:用 RAW socket 抓包,解析 IP 头部的「目的地址」字段(推荐,适用于所有 TCP 流量) + // 方式 3:通过 iptables 端口转发,原目标地址由 iptables 传递(Linux 环境) + // 此处模拟:假设原目标是百度服务器(实际需动态解析) + originalTarget = new InetSocketAddress("220.181.38.251", 80); + + // 根据策略初始化转发通道(若需要转发) + if (strategy == TrafficInterceptor.InterceptStrategy.FORWARD || strategy == TrafficInterceptor.InterceptStrategy.MODIFY) { + initForwardChannel(ctx); + } + + super.channelActive(ctx); + } + + /** + * 初始化转发通道:连接原目标服务器,实现透明转发 + */ + private void initForwardChannel(ChannelHandlerContext ctx) { + Bootstrap bootstrap = new Bootstrap() + .group(ctx.channel().eventLoop()) // 复用客户端事件循环组,性能更优 + .channel(NioSocketChannel.class) + .option(io.netty.channel.ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelDuplexHandler() { + // 接收原目标服务器的响应,转发回客户端 + @Override + public void channelRead(ChannelHandlerContext forwardCtx, Object msg) throws Exception { + ByteBuf response = (ByteBuf) msg; + System.out.printf("收到原目标 %s 响应,长度:%d 字节%n", originalTarget, response.readableBytes()); + + // 若策略是「修改流量」,则修改响应内容 + if (strategy == TrafficInterceptor.InterceptStrategy.MODIFY) { + modifyData(response, false); // false 表示修改响应流量 + } + + // 转发响应到客户端 + ctx.channel().writeAndFlush(msg); + } + + @Override + public void channelInactive(ChannelHandlerContext forwardCtx) throws Exception { + System.out.println("转发通道断开:原目标 " + originalTarget); + ctx.channel().close(); // 转发通道断开,关闭客户端连接 + } + }); + + // 连接原目标服务器 + bootstrap.connect(originalTarget).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + forwardChannel = future.channel(); + System.out.println("成功连接原目标:" + originalTarget); + } else { + System.err.println("连接原目标失败:" + originalTarget + ",原因:" + future.cause().getMessage()); + ctx.channel().close(); + } + }); + } + + /** + * 拦截客户端发送的流量(出站流量) + */ + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + ByteBuf request = (ByteBuf) msg; + InetSocketAddress clientAddr = (InetSocketAddress) ctx.channel().remoteAddress(); + System.out.printf("拦截到客户端 %s 发送的流量,长度:%d 字节%n", clientAddr, request.readableBytes()); + + switch (strategy) { + case DROP: + // 丢弃流量,释放缓冲区 + request.release(); + System.out.println("已丢弃该流量(策略:DROP)"); + break; + + case MODIFY: + // 修改流量内容(示例:在 HTTP 请求头添加自定义字段) + modifyData(request, true); // true 表示修改请求流量 + System.out.println("已修改流量,准备转发"); + // fall through 到 FORWARD 逻辑 + case FORWARD: + // 转发流量到原目标服务器 + if (forwardChannel != null && forwardChannel.isActive()) { + forwardChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + System.err.println("流量转发失败:" + future.cause().getMessage()); + ctx.channel().close(); + } + }); + } else { + request.release(); + System.err.println("转发通道未就绪,丢弃流量"); + } + break; + + case LOG_ONLY: + // 仅记录日志,不拦截,直接放行(透传流量) + super.write(ctx, msg, promise); + break; + } + } + + /** + * 修改流量内容(示例:修改 HTTP 请求/响应) + * @param data 要修改的 ByteBuf 数据 + * @param isRequest 是否为请求流量(true:请求,false:响应) + */ + private void modifyData(ByteBuf data, boolean isRequest) { + // 示例:在 HTTP 请求头添加 X-Intercepted: true 字段 + if (isRequest) { + // 切换为读模式,读取 HTTP 头 + data.markReaderIndex(); + byte[] temp = new byte[data.readableBytes()]; + data.readBytes(temp); + String content = new String(temp); + + // 找到 HTTP 头的结束位置("\r\n\r\n"),插入自定义字段 + if (content.contains("\r\n\r\n")) { + content = content.replace("\r\n\r\n", "\r\nX-Intercepted: true\r\n\r\n"); + // 重置 ByteBuf,写入修改后的数据 + data.resetReaderIndex(); + data.clear(); + data.writeBytes(content.getBytes()); + System.out.println("已修改请求流量:添加 X-Intercepted 头"); + } + } else { + // 示例:修改 HTTP 响应的内容 + data.markReaderIndex(); + byte[] temp = new byte[data.readableBytes()]; + data.readBytes(temp); + String content = new String(temp); + if (content.contains("")) { + content = content.replace("", "

流量已被 Netty 拦截并修改

"); + data.resetReaderIndex(); + data.clear(); + data.writeBytes(content.getBytes()); + System.out.println("已修改响应流量:添加拦截提示"); + } + } + } + + /** + * 拦截客户端接收的流量(响应流量) + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // 若策略为「仅记录日志」,直接放行;其他策略已在转发通道中处理 + if (strategy == TrafficInterceptor.InterceptStrategy.LOG_ONLY) { + ByteBuf data = (ByteBuf) msg; + System.out.printf("记录流量:客户端接收数据,长度:%d 字节%n", data.readableBytes()); + super.channelRead(ctx, msg); + } else { + ((ByteBuf) msg).release(); // 已在转发通道中处理,此处释放缓冲区 + } + } + + /** + * 连接断开时,关闭转发通道 + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + System.out.println("连接断开:客户端 " + ctx.channel().remoteAddress()); + if (forwardChannel != null && forwardChannel.isActive()) { + forwardChannel.close(); + } + super.channelInactive(ctx); + } + + /** + * 异常处理:关闭所有连接 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + System.err.println("流量拦截异常:" + cause.getMessage()); + ctx.close(); + if (forwardChannel != null && forwardChannel.isActive()) { + forwardChannel.close(); + } + } +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptor.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptor.java new file mode 100644 index 00000000..703d9633 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/test1/TrafficInterceptor.java @@ -0,0 +1,134 @@ +package org.framework.lazy.cloud.network.heartbeat.protocol.test1; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; + +/** + * 网络流量拦截器:绑定指定网卡,拦截该网卡的所有 TCP 流量,支持转发/丢弃/修改 + */ +public class TrafficInterceptor { + // 绑定的网卡 IP(必须是本地网卡已配置的 IP) + private final String bindIp; + // 拦截策略(可自定义:转发、丢弃、修改等) + private final InterceptStrategy strategy; + + // Netty 事件循环组 + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + /** + * 构造拦截器 + * @param bindIp 绑定的网卡 IP(如 192.168.1.100) + * @param strategy 拦截策略 + */ + public TrafficInterceptor(String bindIp, InterceptStrategy strategy) { + this.bindIp = bindIp; + this.strategy = strategy; + validateBindIp(bindIp); + } + + /** + * 启动拦截器(监听绑定网卡的所有 TCP 端口,实际通过端口复用实现) + * 注:全端口监听需操作系统支持,或通过「端口范围监听」模拟(此处用 1-65535 端口范围) + */ + public void start() throws InterruptedException { + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + + try { + // 核心:启动服务端,绑定网卡 IP,监听所有 TCP 端口(模拟全端口拦截) + // 实际生产中可优化为:监听常用端口 + 动态端口,或用 RAW socket 直接抓包 + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .localAddress(new InetSocketAddress(bindIp, 0)) // 端口 0 表示随机端口,实际通过端口复用扩展 + .option(ChannelOption.SO_REUSEADDR, true) // 允许端口复用(关键) + .option(ChannelOption.SO_BACKLOG, 1024) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .handler(new LoggingHandler(LogLevel.INFO)) // 日志打印(可选) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast(new LoggingHandler(LogLevel.DEBUG)) // 打印流量日志 + .addLast(new TrafficInterceptHandler(strategy)); // 核心拦截处理器 + } + }); + + // 绑定网卡 IP,监听所有端口(模拟:实际需遍历端口或用 RAW socket,此处以常用端口为例) + System.out.printf("流量拦截器启动成功!绑定网卡:%s,拦截策略:%s%n", bindIp, strategy); + System.out.println("开始拦截该网卡的所有 TCP 流量..."); + + // 阻塞等待服务端关闭 + bootstrap.bind().sync().channel().closeFuture().sync(); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + System.out.println("流量拦截器已关闭"); + } + } + + /** + * 校验绑定的网卡是否存在 + */ + private void validateBindIp(String bindIp) { + try { + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + boolean exists = false; + while (interfaces.hasMoreElements()) { + NetworkInterface ni = interfaces.nextElement(); + Enumeration addresses = ni.getInetAddresses(); + while (addresses.hasMoreElements()) { + java.net.InetAddress addr = addresses.nextElement(); + if (addr.getHostAddress().equals(bindIp)) { + exists = true; + break; + } + } + if (exists) break; + } + if (!exists) { + throw new IllegalArgumentException("绑定的网卡 IP " + bindIp + " 不存在于本地网卡"); + } + } catch (SocketException e) { + throw new RuntimeException("获取本地网卡信息失败:" + e.getMessage()); + } + } + + /** + * 拦截策略枚举(可扩展) + */ + public enum InterceptStrategy { + FORWARD("转发流量到原目标"), + DROP("丢弃流量"), + MODIFY("修改流量内容后转发"), + LOG_ONLY("仅记录日志,不拦截"); + + private final String desc; + + InterceptStrategy(String desc) { + this.desc = desc; + } + + public String getDesc() { + return desc; + } + } + + public static void main(String[] args) throws InterruptedException { + // 示例:绑定网卡 192.168.1.100,策略为「记录日志并转发」 + String bindIp = "192.168.3.6"; + TrafficInterceptor.InterceptStrategy strategy = TrafficInterceptor.InterceptStrategy.LOG_ONLY; + new TrafficInterceptor(bindIp, strategy).start(); + } +} \ No newline at end of file