mirror of
https://gitee.com/wujiawei1207537021/wu-lazy-cloud-network.git
synced 2025-06-06 21:37:56 +08:00
[fix] 修复数据下发与上传问题
This commit is contained in:
parent
7028e001ea
commit
93549cad90
@ -0,0 +1,30 @@
|
|||||||
|
package org.framework.lazy.cloud.network.heartbeat.server.netty.filter;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.ClientPermeateServerRealHandler;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.PermeateClientRealHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 客户端渗透服务端
|
||||||
|
*/
|
||||||
|
public class ClientPermeateServerRealFilter extends DebugChannelInitializer<SocketChannel> {
|
||||||
|
/**
|
||||||
|
* This method will be called once the {@link Channel} was registered. After the method returns this instance
|
||||||
|
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
|
||||||
|
*
|
||||||
|
* @param ch the {@link Channel} which was registered.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void initChannel0(SocketChannel ch) {
|
||||||
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
|
// 解码、编码
|
||||||
|
pipeline.addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10));
|
||||||
|
pipeline.addLast(new TransferEncoder());
|
||||||
|
pipeline.addLast(new ClientPermeateServerRealHandler());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,82 @@
|
|||||||
|
package org.framework.lazy.cloud.network.heartbeat.server.netty.handler;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 客户端渗透服务端
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class ClientPermeateServerRealHandler extends SimpleChannelInboundHandler<NettyByteBuf> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// 根据访客ID 确认真实通道 读写打开
|
||||||
|
Channel channel = ctx.channel();
|
||||||
|
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
|
||||||
|
|
||||||
|
channel.config().setOption(ChannelOption.AUTO_READ, true);
|
||||||
|
super.channelActive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead0(ChannelHandlerContext ctx,NettyByteBuf nettyByteBuf) {
|
||||||
|
|
||||||
|
Channel channel = ctx.channel();
|
||||||
|
byte[] bytes = nettyByteBuf.getData();
|
||||||
|
log.debug("bytes.length:{}",bytes.length);
|
||||||
|
log.debug("接收客户端真实服务数据:{}", new String(bytes));
|
||||||
|
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
|
||||||
|
// 消息下发到客户端
|
||||||
|
|
||||||
|
NettyProxyMsg nettyMsg = new NettyProxyMsg();
|
||||||
|
nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_TRANSFER);
|
||||||
|
nettyMsg.setData(bytes);
|
||||||
|
|
||||||
|
nextChannel.writeAndFlush(nettyMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// 客户端真实通信通道
|
||||||
|
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
|
||||||
|
if (nextChannel != null) {
|
||||||
|
// 上报关闭这个客户端的访客通道
|
||||||
|
nextChannel.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
super.channelInactive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
|
||||||
|
|
||||||
|
// 获取访客的传输通道
|
||||||
|
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
|
||||||
|
if (nextChannel != null) {
|
||||||
|
log.debug("transfer AUTO_READ:{} ",ctx.channel().isWritable());
|
||||||
|
nextChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
super.exceptionCaught(ctx, cause);
|
||||||
|
}
|
||||||
|
}
|
@ -10,6 +10,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPermeate
|
|||||||
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
|
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
|
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
|
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
|
||||||
|
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.ClientPermeateServerRealFilter;
|
||||||
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.PermeateClientRealFilter;
|
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.PermeateClientRealFilter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -37,7 +38,7 @@ public class NettyClientPermeateServerConnectRealSocket {
|
|||||||
// .option(ChannelOption.SO_BACKLOG, 128)//务端接受连接的队列长度 默认128
|
// .option(ChannelOption.SO_BACKLOG, 128)//务端接受连接的队列长度 默认128
|
||||||
// .option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
|
// .option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
|
||||||
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
|
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
|
||||||
.handler(new PermeateClientRealFilter())
|
.handler(new ClientPermeateServerRealFilter())
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user