【fix】修复流量计费bug

This commit is contained in:
wujiawei
2024-09-22 18:13:51 +08:00
parent 4f58a675d0
commit c04ba09763
21 changed files with 603 additions and 65 deletions

View File

@ -131,6 +131,10 @@ public class HeartbeatClientConfiguration {
public ClientHandleDistributeClientPermeateServerCloseTypeAdvanced clientHandleDistributeClientPermeateServerCloseTypeAdvanced( ) {
return new ClientHandleDistributeClientPermeateServerCloseTypeAdvanced();
}
@Bean
public ClientHandleDistributeClientPermeateServerTransferTypeAdvanced clientHandleDistributeClientPermeateServerTransferTypeAdvanced( ) {
return new ClientHandleDistributeClientPermeateServerTransferTypeAdvanced();
}
@Bean
public ClientHandleDistributeClientPermeateClientCloseTypeAdvanced clientHandleDistributeClientPermeateClientCloseTypeAdvanced() {

View File

@ -0,0 +1,53 @@
package org.framework.lazy.cloud.network.heartbeat.client.netty.advanced;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientProperties;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.client.AbstractHandleDistributeClientPermeateServerTransferTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.enums.MessageTypeEnums;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
/**
* 服务端处理客户端数据传输
*
* @see MessageTypeEnums#DISTRIBUTE_CLIENT_TRANSFER
*/
@Slf4j
public class ClientHandleDistributeClientPermeateServerTransferTypeAdvanced extends AbstractHandleDistributeClientPermeateServerTransferTypeAdvanced<NettyProxyMsg> {
/**
* 处理当前数据
*
* @param channel 当前通道
* @param nettyProxyMsg 通道数据
*/
@Override
public void doHandler(Channel channel, NettyProxyMsg nettyProxyMsg) {
log.debug("接收到服务端需要内网穿透的数据:{}" , nettyProxyMsg);
byte[] visitorPort = nettyProxyMsg.getVisitorPort();
byte[] clientTargetIp = nettyProxyMsg.getClientTargetIp();
byte[] clientTargetPort = nettyProxyMsg.getClientTargetPort();
byte[] visitorId = nettyProxyMsg.getVisitorId();
// 真实服务通道
// Channel realChannel = NettyRealIdContext.getReal(new String(visitorId));
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
if (nextChannel == null) {
log.error("无法获取访客:{} 真实服务", new String(visitorId));
return;
}
// 把数据转到真实服务
ByteBuf buf = channel.config().getAllocator().buffer(nettyProxyMsg.getData().length);
buf.writeBytes(nettyProxyMsg.getData());
nextChannel.writeAndFlush(buf);
}
}

View File

@ -0,0 +1,42 @@
package org.framework.lazy.cloud.network.heartbeat.client.netty.filter;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateClientTransferHandler;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.NettyProxyMsgEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
/**
* netty 客户端渗透通信通道
*/
public class NettyClientPermeateClientTransferFilter extends DebugChannelInitializer<SocketChannel> {
private final ChannelTypeAdapter channelTypeAdapter;
public NettyClientPermeateClientTransferFilter(ChannelTypeAdapter channelTypeAdapter) {
this.channelTypeAdapter = channelTypeAdapter;
}
/**
* This method will be called once the {@link Channel} was registered. After the method returns this instance
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
*
* @param ch the {@link Channel} which was registered.
* @throws Exception is thrown if an error occurs. In that case it will be handled by
* {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default connectionClose
* the {@link Channel}.
*/
@Override
protected void initChannel0(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// // 解码、编码
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettMsgEncoder());
pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new NettyProxyMsgEncoder());
pipeline.addLast(new NettyClientPermeateClientTransferHandler(channelTypeAdapter));
}
}

View File

@ -0,0 +1,42 @@
package org.framework.lazy.cloud.network.heartbeat.client.netty.filter;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateServerTransferHandler;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.NettyProxyMsgEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
/**
* netty 客户端渗透通信通道
*/
public class NettyClientPermeateServerTransferFilter extends DebugChannelInitializer<SocketChannel> {
private final ChannelTypeAdapter channelTypeAdapter;
public NettyClientPermeateServerTransferFilter(ChannelTypeAdapter channelTypeAdapter) {
this.channelTypeAdapter = channelTypeAdapter;
}
/**
* This method will be called once the {@link Channel} was registered. After the method returns this instance
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
*
* @param ch the {@link Channel} which was registered.
* @throws Exception is thrown if an error occurs. In that case it will be handled by
* {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default connectionClose
* the {@link Channel}.
*/
@Override
protected void initChannel0(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// // 解码、编码
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettMsgEncoder());
pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new NettyProxyMsgEncoder());
pipeline.addLast(new NettyClientPermeateServerTransferHandler(channelTypeAdapter));
}
}

View File

@ -0,0 +1,79 @@
package org.framework.lazy.cloud.network.heartbeat.client.netty.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.wu.framework.core.utils.ObjectUtils;
/**
* 客户端访客通信通道 处理器
*/
@Slf4j
public class NettyClientPermeateClientTransferHandler extends SimpleChannelInboundHandler<NettyProxyMsg> {
private final ChannelTypeAdapter channelTypeAdapter;
public NettyClientPermeateClientTransferHandler(ChannelTypeAdapter channelTypeAdapter) {
this.channelTypeAdapter = channelTypeAdapter;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, NettyProxyMsg nettyProxyMsg) throws Exception {
Channel channel = ctx.channel();
channelTypeAdapter.handler(channel, nettyProxyMsg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel());
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
// 关闭访客
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
if (clientChannel != null) {
Channel channel = clientChannel.getChannel();
// 上报关闭这个客户端的访客通道
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
closeVisitorMsg.setType(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR);
closeVisitorMsg.setVisitorId(visitorId);
channel.writeAndFlush(closeVisitorMsg);
}
super.channelInactive(ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// 处理客户端本地真实通道问题
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
if(ObjectUtils.isEmpty(visitorId)) {
super.channelWritabilityChanged(ctx);
return;
}
Channel realChannel = NettyRealIdContext.getReal(visitorId);
if (realChannel != null) {
log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable());
realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}

View File

@ -0,0 +1,79 @@
package org.framework.lazy.cloud.network.heartbeat.client.netty.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.wu.framework.core.utils.ObjectUtils;
/**
* 客户端访客通信通道 处理器
*/
@Slf4j
public class NettyClientPermeateServerTransferHandler extends SimpleChannelInboundHandler<NettyProxyMsg> {
private final ChannelTypeAdapter channelTypeAdapter;
public NettyClientPermeateServerTransferHandler(ChannelTypeAdapter channelTypeAdapter) {
this.channelTypeAdapter = channelTypeAdapter;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, NettyProxyMsg nettyProxyMsg) throws Exception {
Channel channel = ctx.channel();
channelTypeAdapter.handler(channel, nettyProxyMsg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel());
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
// 关闭访客
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
if (clientChannel != null) {
Channel channel = clientChannel.getChannel();
// 上报关闭服务端客户端真实通道
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
closeVisitorMsg.setType(MessageType.REPORT_CLIENT_PERMEATE_SERVER_TRANSFER_CLOSE);
closeVisitorMsg.setVisitorId(visitorId);
channel.writeAndFlush(closeVisitorMsg);
}
super.channelInactive(ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// 处理客户端本地真实通道问题
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
if(ObjectUtils.isEmpty(visitorId)) {
super.channelWritabilityChanged(ctx);
return;
}
Channel realChannel = NettyRealIdContext.getReal(visitorId);
if (realChannel != null) {
log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable());
realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}

View File

@ -76,7 +76,7 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
Integer visitorPort = internalNetworkPermeateServerVisitor.getVisitorPort();
String clientId = internalNetworkPermeateServerVisitor.getNettyClientProperties().getClientId();
NettyProxyMsg nettyProxyMsg = new NettyProxyMsg();
nettyProxyMsg.setType(MessageType.REPORT_CLIENT_TRANSFER);
nettyProxyMsg.setType(MessageType.REPORT_CLIENT_PERMEATE_SERVER_TRANSFER);
nettyProxyMsg.setVisitorId(visitorId);
nettyProxyMsg.setClientId(clientId);
nettyProxyMsg.setVisitorPort(visitorPort);
@ -112,7 +112,7 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
// 通知服务端 关闭访问通道、真实通道
NettyProxyMsg myMsg = new NettyProxyMsg();
myMsg.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR);
myMsg.setType(MessageType.REPORT_CLIENT_PERMEATE_SERVER_TRANSFER_CLOSE);
myMsg.setVisitorId(visitorId);
nextChannel.writeAndFlush(myMsg);
}
@ -126,28 +126,6 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// 获取访客的传输通道
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
if(ObjectUtils.isEmpty(visitorId)) {
super.channelWritabilityChanged(ctx);
return;
}
Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId);
if (visitorCommunicationChannel != null) {
log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable());
visitorCommunicationChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
}
// Channel visitorChannel = ctx.channel();
// String vid = visitorChannel.attr(Constant.VID).get();
// if (StringUtil.isNullOrEmpty(vid)) {
// super.channelWritabilityChanged(ctx);
// return;
// }
// Channel clientChannel = Constant.vcc.get(vid);
// if (clientChannel != null) {
// clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable());
// }
if (ctx.channel().isWritable()) {
log.debug("Channel is writable again");
// 恢复之前暂停的操作,如写入数据
@ -155,7 +133,7 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
log.debug("Channel is not writable");
// 暂停写入操作,等待可写状态
}
log.info("visitorId:{} channelWritabilityChanged!",visitorId);
log.info("channelWritabilityChanged!");
}
@Override
@ -171,7 +149,7 @@ public class NettyClientPermeateServerVisitorHandler extends SimpleChannelInboun
if (nextChannel != null) {
// 下发关闭访客
NettyProxyMsg closeRealClient = new NettyProxyMsg();
closeRealClient.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ);
closeRealClient.setType(MessageType.REPORT_CLIENT_PERMEATE_SERVER_TRANSFER_CLOSE);
closeRealClient.setClientId(clientId);
closeRealClient.setVisitorId(visitorId);
nextChannel.writeAndFlush(closeRealClient);

View File

@ -8,7 +8,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientProperties;
import org.framework.lazy.cloud.network.heartbeat.client.netty.InternalNetworkClientPermeateClientVisitor;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientPermeateTransferFilter;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientPermeateClientTransferFilter;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyCommunicationIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
@ -16,8 +16,6 @@ import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.util.concurrent.TimeUnit;
/**
* 客户端渗透服务端传输通道
*/
@ -54,8 +52,7 @@ public class NettyClientPermeateClientVisitorTransferSocket {
// .option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
.handler(new NettyClientPermeateTransferFilter(new ChannelTypeAdapter(internalNetworkClientPermeateClientVisitor.getHandleChannelTypeAdvancedList())))
.handler(new NettyClientPermeateClientTransferFilter(new ChannelTypeAdapter(internalNetworkClientPermeateClientVisitor.getHandleChannelTypeAdvancedList())))
;
NettyClientProperties nettyClientProperties = internalNetworkClientPermeateClientVisitor.getNettyClientProperties();
String inetHost = nettyClientProperties.getInetHost();

View File

@ -8,7 +8,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientProperties;
import org.framework.lazy.cloud.network.heartbeat.client.netty.InternalNetworkPermeateServerVisitor;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientPermeateTransferFilter;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientPermeateServerTransferFilter;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyCommunicationIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
@ -16,8 +16,6 @@ import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.util.concurrent.TimeUnit;
/**
* 客户端渗透服务端传输通道
*/
@ -55,7 +53,7 @@ public class NettyClientPermeateServerVisitorTransferSocket {
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
.handler(new NettyClientPermeateTransferFilter(new ChannelTypeAdapter(internalNetworkPermeateServerVisitor.getHandleChannelTypeAdvancedList())))
.handler(new NettyClientPermeateServerTransferFilter(new ChannelTypeAdapter(internalNetworkPermeateServerVisitor.getHandleChannelTypeAdvancedList())))
;
NettyClientProperties nettyClientProperties = internalNetworkPermeateServerVisitor.getNettyClientProperties();
String inetHost = nettyClientProperties.getInetHost();