mirror of
https://gitee.com/wujiawei1207537021/wu-lazy-cloud-network.git
synced 2026-02-04 06:55:52 +08:00
[fix] fix
This commit is contained in:
@@ -56,4 +56,6 @@
|
||||
[add]默认客户端ID为当前设备唯一标识
|
||||
|
||||
|
||||
#### 下一版本计划
|
||||
#### 下一版本计划
|
||||
[add] 流媒体抓取
|
||||
[add] 监听本地网卡进行代理
|
||||
@@ -0,0 +1,210 @@
|
||||
package org.framework.lazy.cloud.network.heartbeat.protocol.test1;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* 流量拦截处理器:执行拦截策略,处理流量转发/丢弃/修改
|
||||
*/
|
||||
public class TrafficInterceptHandler extends ChannelDuplexHandler {
|
||||
private final TrafficInterceptor.InterceptStrategy strategy;
|
||||
private Channel forwardChannel; // 转发通道(连接原目标服务器)
|
||||
private InetSocketAddress originalTarget; // 流量的原目标地址(需解析获取)
|
||||
|
||||
public TrafficInterceptHandler(TrafficInterceptor.InterceptStrategy strategy) {
|
||||
this.strategy = strategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* 客户端连接建立时,解析原目标地址(关键:透明代理需知道流量要发往的原目标)
|
||||
* 注:原目标地址的解析方式需根据场景调整(如通过路由表、ARP 缓存、或 RAW socket 抓包解析 IP 头部)
|
||||
*/
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
InetSocketAddress clientAddr = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
System.out.println("拦截到新连接:客户端 " + clientAddr);
|
||||
|
||||
// 关键步骤:解析流量的原目标地址(此处为模拟,实际需通过以下方式获取)
|
||||
// 方式 1:若拦截器作为网关,原目标地址即客户端请求的目标 IP:Port(需解析应用层协议,如 HTTP Host 头)
|
||||
// 方式 2:用 RAW socket 抓包,解析 IP 头部的「目的地址」字段(推荐,适用于所有 TCP 流量)
|
||||
// 方式 3:通过 iptables 端口转发,原目标地址由 iptables 传递(Linux 环境)
|
||||
// 此处模拟:假设原目标是百度服务器(实际需动态解析)
|
||||
originalTarget = new InetSocketAddress("220.181.38.251", 80);
|
||||
|
||||
// 根据策略初始化转发通道(若需要转发)
|
||||
if (strategy == TrafficInterceptor.InterceptStrategy.FORWARD || strategy == TrafficInterceptor.InterceptStrategy.MODIFY) {
|
||||
initForwardChannel(ctx);
|
||||
}
|
||||
|
||||
super.channelActive(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化转发通道:连接原目标服务器,实现透明转发
|
||||
*/
|
||||
private void initForwardChannel(ChannelHandlerContext ctx) {
|
||||
Bootstrap bootstrap = new Bootstrap()
|
||||
.group(ctx.channel().eventLoop()) // 复用客户端事件循环组,性能更优
|
||||
.channel(NioSocketChannel.class)
|
||||
.option(io.netty.channel.ChannelOption.SO_KEEPALIVE, true)
|
||||
.handler(new ChannelDuplexHandler() {
|
||||
// 接收原目标服务器的响应,转发回客户端
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext forwardCtx, Object msg) throws Exception {
|
||||
ByteBuf response = (ByteBuf) msg;
|
||||
System.out.printf("收到原目标 %s 响应,长度:%d 字节%n", originalTarget, response.readableBytes());
|
||||
|
||||
// 若策略是「修改流量」,则修改响应内容
|
||||
if (strategy == TrafficInterceptor.InterceptStrategy.MODIFY) {
|
||||
modifyData(response, false); // false 表示修改响应流量
|
||||
}
|
||||
|
||||
// 转发响应到客户端
|
||||
ctx.channel().writeAndFlush(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext forwardCtx) throws Exception {
|
||||
System.out.println("转发通道断开:原目标 " + originalTarget);
|
||||
ctx.channel().close(); // 转发通道断开,关闭客户端连接
|
||||
}
|
||||
});
|
||||
|
||||
// 连接原目标服务器
|
||||
bootstrap.connect(originalTarget).addListener((ChannelFutureListener) future -> {
|
||||
if (future.isSuccess()) {
|
||||
forwardChannel = future.channel();
|
||||
System.out.println("成功连接原目标:" + originalTarget);
|
||||
} else {
|
||||
System.err.println("连接原目标失败:" + originalTarget + ",原因:" + future.cause().getMessage());
|
||||
ctx.channel().close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 拦截客户端发送的流量(出站流量)
|
||||
*/
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||
ByteBuf request = (ByteBuf) msg;
|
||||
InetSocketAddress clientAddr = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
System.out.printf("拦截到客户端 %s 发送的流量,长度:%d 字节%n", clientAddr, request.readableBytes());
|
||||
|
||||
switch (strategy) {
|
||||
case DROP:
|
||||
// 丢弃流量,释放缓冲区
|
||||
request.release();
|
||||
System.out.println("已丢弃该流量(策略:DROP)");
|
||||
break;
|
||||
|
||||
case MODIFY:
|
||||
// 修改流量内容(示例:在 HTTP 请求头添加自定义字段)
|
||||
modifyData(request, true); // true 表示修改请求流量
|
||||
System.out.println("已修改流量,准备转发");
|
||||
// fall through 到 FORWARD 逻辑
|
||||
case FORWARD:
|
||||
// 转发流量到原目标服务器
|
||||
if (forwardChannel != null && forwardChannel.isActive()) {
|
||||
forwardChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
|
||||
if (!future.isSuccess()) {
|
||||
System.err.println("流量转发失败:" + future.cause().getMessage());
|
||||
ctx.channel().close();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
request.release();
|
||||
System.err.println("转发通道未就绪,丢弃流量");
|
||||
}
|
||||
break;
|
||||
|
||||
case LOG_ONLY:
|
||||
// 仅记录日志,不拦截,直接放行(透传流量)
|
||||
super.write(ctx, msg, promise);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改流量内容(示例:修改 HTTP 请求/响应)
|
||||
* @param data 要修改的 ByteBuf 数据
|
||||
* @param isRequest 是否为请求流量(true:请求,false:响应)
|
||||
*/
|
||||
private void modifyData(ByteBuf data, boolean isRequest) {
|
||||
// 示例:在 HTTP 请求头添加 X-Intercepted: true 字段
|
||||
if (isRequest) {
|
||||
// 切换为读模式,读取 HTTP 头
|
||||
data.markReaderIndex();
|
||||
byte[] temp = new byte[data.readableBytes()];
|
||||
data.readBytes(temp);
|
||||
String content = new String(temp);
|
||||
|
||||
// 找到 HTTP 头的结束位置("\r\n\r\n"),插入自定义字段
|
||||
if (content.contains("\r\n\r\n")) {
|
||||
content = content.replace("\r\n\r\n", "\r\nX-Intercepted: true\r\n\r\n");
|
||||
// 重置 ByteBuf,写入修改后的数据
|
||||
data.resetReaderIndex();
|
||||
data.clear();
|
||||
data.writeBytes(content.getBytes());
|
||||
System.out.println("已修改请求流量:添加 X-Intercepted 头");
|
||||
}
|
||||
} else {
|
||||
// 示例:修改 HTTP 响应的内容
|
||||
data.markReaderIndex();
|
||||
byte[] temp = new byte[data.readableBytes()];
|
||||
data.readBytes(temp);
|
||||
String content = new String(temp);
|
||||
if (content.contains("<body>")) {
|
||||
content = content.replace("<body>", "<body><h1>流量已被 Netty 拦截并修改</h1>");
|
||||
data.resetReaderIndex();
|
||||
data.clear();
|
||||
data.writeBytes(content.getBytes());
|
||||
System.out.println("已修改响应流量:添加拦截提示");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 拦截客户端接收的流量(响应流量)
|
||||
*/
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
// 若策略为「仅记录日志」,直接放行;其他策略已在转发通道中处理
|
||||
if (strategy == TrafficInterceptor.InterceptStrategy.LOG_ONLY) {
|
||||
ByteBuf data = (ByteBuf) msg;
|
||||
System.out.printf("记录流量:客户端接收数据,长度:%d 字节%n", data.readableBytes());
|
||||
super.channelRead(ctx, msg);
|
||||
} else {
|
||||
((ByteBuf) msg).release(); // 已在转发通道中处理,此处释放缓冲区
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接断开时,关闭转发通道
|
||||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
System.out.println("连接断开:客户端 " + ctx.channel().remoteAddress());
|
||||
if (forwardChannel != null && forwardChannel.isActive()) {
|
||||
forwardChannel.close();
|
||||
}
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* 异常处理:关闭所有连接
|
||||
*/
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
System.err.println("流量拦截异常:" + cause.getMessage());
|
||||
ctx.close();
|
||||
if (forwardChannel != null && forwardChannel.isActive()) {
|
||||
forwardChannel.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
package org.framework.lazy.cloud.network.heartbeat.protocol.test1;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.SocketException;
|
||||
import java.util.Enumeration;
|
||||
|
||||
/**
|
||||
* 网络流量拦截器:绑定指定网卡,拦截该网卡的所有 TCP 流量,支持转发/丢弃/修改
|
||||
*/
|
||||
public class TrafficInterceptor {
|
||||
// 绑定的网卡 IP(必须是本地网卡已配置的 IP)
|
||||
private final String bindIp;
|
||||
// 拦截策略(可自定义:转发、丢弃、修改等)
|
||||
private final InterceptStrategy strategy;
|
||||
|
||||
// Netty 事件循环组
|
||||
private EventLoopGroup bossGroup;
|
||||
private EventLoopGroup workerGroup;
|
||||
|
||||
/**
|
||||
* 构造拦截器
|
||||
* @param bindIp 绑定的网卡 IP(如 192.168.1.100)
|
||||
* @param strategy 拦截策略
|
||||
*/
|
||||
public TrafficInterceptor(String bindIp, InterceptStrategy strategy) {
|
||||
this.bindIp = bindIp;
|
||||
this.strategy = strategy;
|
||||
validateBindIp(bindIp);
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动拦截器(监听绑定网卡的所有 TCP 端口,实际通过端口复用实现)
|
||||
* 注:全端口监听需操作系统支持,或通过「端口范围监听」模拟(此处用 1-65535 端口范围)
|
||||
*/
|
||||
public void start() throws InterruptedException {
|
||||
bossGroup = new NioEventLoopGroup(1);
|
||||
workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
// 核心:启动服务端,绑定网卡 IP,监听所有 TCP 端口(模拟全端口拦截)
|
||||
// 实际生产中可优化为:监听常用端口 + 动态端口,或用 RAW socket 直接抓包
|
||||
ServerBootstrap bootstrap = new ServerBootstrap()
|
||||
.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.localAddress(new InetSocketAddress(bindIp, 0)) // 端口 0 表示随机端口,实际通过端口复用扩展
|
||||
.option(ChannelOption.SO_REUSEADDR, true) // 允许端口复用(关键)
|
||||
.option(ChannelOption.SO_BACKLOG, 1024)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.handler(new LoggingHandler(LogLevel.INFO)) // 日志打印(可选)
|
||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ch.pipeline()
|
||||
.addLast(new LoggingHandler(LogLevel.DEBUG)) // 打印流量日志
|
||||
.addLast(new TrafficInterceptHandler(strategy)); // 核心拦截处理器
|
||||
}
|
||||
});
|
||||
|
||||
// 绑定网卡 IP,监听所有端口(模拟:实际需遍历端口或用 RAW socket,此处以常用端口为例)
|
||||
System.out.printf("流量拦截器启动成功!绑定网卡:%s,拦截策略:%s%n", bindIp, strategy);
|
||||
System.out.println("开始拦截该网卡的所有 TCP 流量...");
|
||||
|
||||
// 阻塞等待服务端关闭
|
||||
bootstrap.bind().sync().channel().closeFuture().sync();
|
||||
} finally {
|
||||
bossGroup.shutdownGracefully();
|
||||
workerGroup.shutdownGracefully();
|
||||
System.out.println("流量拦截器已关闭");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验绑定的网卡是否存在
|
||||
*/
|
||||
private void validateBindIp(String bindIp) {
|
||||
try {
|
||||
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
|
||||
boolean exists = false;
|
||||
while (interfaces.hasMoreElements()) {
|
||||
NetworkInterface ni = interfaces.nextElement();
|
||||
Enumeration<java.net.InetAddress> addresses = ni.getInetAddresses();
|
||||
while (addresses.hasMoreElements()) {
|
||||
java.net.InetAddress addr = addresses.nextElement();
|
||||
if (addr.getHostAddress().equals(bindIp)) {
|
||||
exists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (exists) break;
|
||||
}
|
||||
if (!exists) {
|
||||
throw new IllegalArgumentException("绑定的网卡 IP " + bindIp + " 不存在于本地网卡");
|
||||
}
|
||||
} catch (SocketException e) {
|
||||
throw new RuntimeException("获取本地网卡信息失败:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 拦截策略枚举(可扩展)
|
||||
*/
|
||||
public enum InterceptStrategy {
|
||||
FORWARD("转发流量到原目标"),
|
||||
DROP("丢弃流量"),
|
||||
MODIFY("修改流量内容后转发"),
|
||||
LOG_ONLY("仅记录日志,不拦截");
|
||||
|
||||
private final String desc;
|
||||
|
||||
InterceptStrategy(String desc) {
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
public String getDesc() {
|
||||
return desc;
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
// 示例:绑定网卡 192.168.1.100,策略为「记录日志并转发」
|
||||
String bindIp = "192.168.3.6";
|
||||
TrafficInterceptor.InterceptStrategy strategy = TrafficInterceptor.InterceptStrategy.LOG_ONLY;
|
||||
new TrafficInterceptor(bindIp, strategy).start();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user