diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientPermeateTransferFilter.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientPermeateTransferFilter.java deleted file mode 100644 index 7c15188..0000000 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientPermeateTransferFilter.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.framework.lazy.cloud.network.heartbeat.client.netty.filter; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.SocketChannel; -import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateTransferHandler; -import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; -import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder; -import org.framework.lazy.cloud.network.heartbeat.common.encoder.NettyProxyMsgEncoder; -import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; - -/** - * netty 客户端渗透通信通道 - */ -public class NettyClientPermeateTransferFilter extends DebugChannelInitializer { - private final ChannelTypeAdapter channelTypeAdapter; - - public NettyClientPermeateTransferFilter(ChannelTypeAdapter channelTypeAdapter) { - this.channelTypeAdapter = channelTypeAdapter; - } - - /** - * This method will be called once the {@link Channel} was registered. After the method returns this instance - * will be removed from the {@link ChannelPipeline} of the {@link Channel}. - * - * @param ch the {@link Channel} which was registered. - * @throws Exception is thrown if an error occurs. In that case it will be handled by - * {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default connectionClose - * the {@link Channel}. - */ - @Override - protected void initChannel0(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); -// // 解码、编码 -// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); -// pipeline.addLast(new NettMsgEncoder()); - pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); - pipeline.addLast(new NettyProxyMsgEncoder()); - pipeline.addLast(new NettyClientPermeateTransferHandler(channelTypeAdapter)); - } -} diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientPermeateServerTransferHandler.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientPermeateServerTransferHandler.java index 2ba52d1..72ff3fb 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientPermeateServerTransferHandler.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientPermeateServerTransferHandler.java @@ -3,16 +3,13 @@ package org.framework.lazy.cloud.network.heartbeat.client.netty.handler; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.MessageType; import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg; -import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; -import org.wu.framework.core.utils.ObjectUtils; /** * 客户端访客通信通道 处理器 @@ -43,14 +40,14 @@ public class NettyClientPermeateServerTransferHandler extends SimpleChannelInbou String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel()); String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); // 关闭访客 + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); - if (clientChannel != null) { - Channel channel = clientChannel.getChannel(); + if (nextChannel != null) { // 上报关闭服务端客户端真实通道 NettyProxyMsg closeVisitorMsg = new NettyProxyMsg(); closeVisitorMsg.setType(MessageType.REPORT_CLIENT_PERMEATE_SERVER_TRANSFER_CLOSE); closeVisitorMsg.setVisitorId(visitorId); - channel.writeAndFlush(closeVisitorMsg); + nextChannel.writeAndFlush(closeVisitorMsg); } super.channelInactive(ctx); @@ -58,18 +55,14 @@ public class NettyClientPermeateServerTransferHandler extends SimpleChannelInbou @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - // 处理客户端本地真实通道问题 - String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); - if(ObjectUtils.isEmpty(visitorId)) { - super.channelWritabilityChanged(ctx); - return; - } - - Channel realChannel = NettyRealIdContext.getReal(visitorId); - if (realChannel != null) { - log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable()); - realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + if (ctx.channel().isWritable()) { + log.debug("Channel is writable again"); + // 恢复之前暂停的操作,如写入数据 + } else { + log.debug("Channel is not writable"); + // 暂停写入操作,等待可写状态 } + log.info("channelWritabilityChanged!"); } @Override diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientPermeateTransferHandler.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientPermeateTransferHandler.java deleted file mode 100644 index 95ce821..0000000 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientPermeateTransferHandler.java +++ /dev/null @@ -1,79 +0,0 @@ -package org.framework.lazy.cloud.network.heartbeat.client.netty.handler; - - -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOption; -import io.netty.channel.SimpleChannelInboundHandler; -import lombok.extern.slf4j.Slf4j; -import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext; -import org.framework.lazy.cloud.network.heartbeat.common.MessageType; -import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg; -import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext; -import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; -import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; -import org.wu.framework.core.utils.ObjectUtils; - -/** - * 客户端访客通信通道 处理器 - */ -@Slf4j -public class NettyClientPermeateTransferHandler extends SimpleChannelInboundHandler { - private final ChannelTypeAdapter channelTypeAdapter; - - public NettyClientPermeateTransferHandler(ChannelTypeAdapter channelTypeAdapter) { - this.channelTypeAdapter = channelTypeAdapter; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, NettyProxyMsg nettyProxyMsg) throws Exception { - Channel channel = ctx.channel(); - channelTypeAdapter.handler(channel, nettyProxyMsg); - - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - - String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel()); - String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); - // 关闭访客 - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); - if (clientChannel != null) { - Channel channel = clientChannel.getChannel(); - // 上报关闭这个客户端的访客通道 - NettyProxyMsg closeVisitorMsg = new NettyProxyMsg(); - closeVisitorMsg.setType(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR); - closeVisitorMsg.setVisitorId(visitorId); - channel.writeAndFlush(closeVisitorMsg); - } - - super.channelInactive(ctx); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - // 处理客户端本地真实通道问题 - String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); - if(ObjectUtils.isEmpty(visitorId)) { - super.channelWritabilityChanged(ctx); - return; - } - - Channel realChannel = NettyRealIdContext.getReal(visitorId); - if (realChannel != null) { - log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable()); - realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - } -} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyServerPermeateClientTransferHandler.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyServerPermeateClientTransferHandler.java index 1ebc772..5745e1f 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyServerPermeateClientTransferHandler.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyServerPermeateClientTransferHandler.java @@ -40,14 +40,13 @@ public class NettyServerPermeateClientTransferHandler extends SimpleChannelInbou String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel()); String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); // 关闭访客 - ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); - if (clientChannel != null) { - Channel channel = clientChannel.getChannel(); + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); + if (nextChannel != null) { // 上报关闭这个客户端的访客通道 NettyProxyMsg closeVisitorMsg = new NettyProxyMsg(); closeVisitorMsg.setType(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR); closeVisitorMsg.setVisitorId(visitorId); - channel.writeAndFlush(closeVisitorMsg); + nextChannel.writeAndFlush(closeVisitorMsg); } super.channelInactive(ctx);