diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/ClientPermeateServerRealFilter.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/ClientPermeateServerRealFilter.java new file mode 100644 index 0000000..4262a00 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/ClientPermeateServerRealFilter.java @@ -0,0 +1,30 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.filter; + +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder; +import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder; +import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; +import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.ClientPermeateServerRealHandler; +import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.PermeateClientRealHandler; + +/** + * 客户端渗透服务端 + */ +public class ClientPermeateServerRealFilter extends DebugChannelInitializer { + /** + * This method will be called once the {@link Channel} was registered. After the method returns this instance + * will be removed from the {@link ChannelPipeline} of the {@link Channel}. + * + * @param ch the {@link Channel} which was registered. + */ + @Override + protected void initChannel0(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + // 解码、编码 + pipeline.addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10)); + pipeline.addLast(new TransferEncoder()); + pipeline.addLast(new ClientPermeateServerRealHandler()); + + } +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/ClientPermeateServerRealHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/ClientPermeateServerRealHandler.java new file mode 100644 index 0000000..fb910c8 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/ClientPermeateServerRealHandler.java @@ -0,0 +1,82 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.handler; + + +import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.MessageType; +import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf; +import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; + +/** + * 客户端渗透服务端 + */ +@Slf4j +public class ClientPermeateServerRealHandler extends SimpleChannelInboundHandler { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // 根据访客ID 确认真实通道 读写打开 + Channel channel = ctx.channel(); + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); + + channel.config().setOption(ChannelOption.AUTO_READ, true); + super.channelActive(ctx); + } + + + + @Override + public void channelRead0(ChannelHandlerContext ctx,NettyByteBuf nettyByteBuf) { + + Channel channel = ctx.channel(); + byte[] bytes = nettyByteBuf.getData(); + log.debug("bytes.length:{}",bytes.length); + log.debug("接收客户端真实服务数据:{}", new String(bytes)); + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); + // 消息下发到客户端 + + NettyProxyMsg nettyMsg = new NettyProxyMsg(); + nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_TRANSFER); + nettyMsg.setData(bytes); + + nextChannel.writeAndFlush(nettyMsg); + } + + + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // 客户端真实通信通道 + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + if (nextChannel != null) { + // 上报关闭这个客户端的访客通道 + nextChannel.close(); + } + + super.channelInactive(ctx); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + + + // 获取访客的传输通道 + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + if (nextChannel != null) { + log.debug("transfer AUTO_READ:{} ",ctx.channel().isWritable()); + nextChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyClientPermeateServerConnectRealSocket.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyClientPermeateServerConnectRealSocket.java index 14aaa48..ed4d46f 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyClientPermeateServerConnectRealSocket.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyClientPermeateServerConnectRealSocket.java @@ -10,6 +10,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPermeate import org.framework.lazy.cloud.network.heartbeat.common.MessageType; import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.ClientPermeateServerRealFilter; import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.PermeateClientRealFilter; /** @@ -37,7 +38,7 @@ public class NettyClientPermeateServerConnectRealSocket { // .option(ChannelOption.SO_BACKLOG, 128)//务端接受连接的队列长度 默认128 // .option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2)) - .handler(new PermeateClientRealFilter()) + .handler(new ClientPermeateServerRealFilter()) ;