mirror of
https://gitee.com/wujiawei1207537021/wu-lazy-cloud-network.git
synced 2025-06-06 21:37:56 +08:00
【fix】修复客户端渗透客户端问题 通道关闭错误
This commit is contained in:
parent
1a2b5b8c29
commit
3ec0ab3271
@ -1,42 +0,0 @@
|
|||||||
package org.framework.lazy.cloud.network.heartbeat.client.netty.filter;
|
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.ChannelPipeline;
|
|
||||||
import io.netty.channel.socket.SocketChannel;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientPermeateTransferHandler;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.encoder.NettyProxyMsgEncoder;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* netty 客户端渗透通信通道
|
|
||||||
*/
|
|
||||||
public class NettyClientPermeateTransferFilter extends DebugChannelInitializer<SocketChannel> {
|
|
||||||
private final ChannelTypeAdapter channelTypeAdapter;
|
|
||||||
|
|
||||||
public NettyClientPermeateTransferFilter(ChannelTypeAdapter channelTypeAdapter) {
|
|
||||||
this.channelTypeAdapter = channelTypeAdapter;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method will be called once the {@link Channel} was registered. After the method returns this instance
|
|
||||||
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
|
|
||||||
*
|
|
||||||
* @param ch the {@link Channel} which was registered.
|
|
||||||
* @throws Exception is thrown if an error occurs. In that case it will be handled by
|
|
||||||
* {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default connectionClose
|
|
||||||
* the {@link Channel}.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected void initChannel0(SocketChannel ch) throws Exception {
|
|
||||||
ChannelPipeline pipeline = ch.pipeline();
|
|
||||||
// // 解码、编码
|
|
||||||
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
|
|
||||||
// pipeline.addLast(new NettMsgEncoder());
|
|
||||||
pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
|
|
||||||
pipeline.addLast(new NettyProxyMsgEncoder());
|
|
||||||
pipeline.addLast(new NettyClientPermeateTransferHandler(channelTypeAdapter));
|
|
||||||
}
|
|
||||||
}
|
|
@ -3,16 +3,13 @@ package org.framework.lazy.cloud.network.heartbeat.client.netty.handler;
|
|||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelOption;
|
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext;
|
import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext;
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
|
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
|
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
|
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
|
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
|
||||||
import org.wu.framework.core.utils.ObjectUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 客户端访客通信通道 处理器
|
* 客户端访客通信通道 处理器
|
||||||
@ -43,14 +40,14 @@ public class NettyClientPermeateServerTransferHandler extends SimpleChannelInbou
|
|||||||
String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel());
|
String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel());
|
||||||
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
|
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
|
||||||
// 关闭访客
|
// 关闭访客
|
||||||
|
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
|
||||||
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
|
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
|
||||||
if (clientChannel != null) {
|
if (nextChannel != null) {
|
||||||
Channel channel = clientChannel.getChannel();
|
|
||||||
// 上报关闭服务端客户端真实通道
|
// 上报关闭服务端客户端真实通道
|
||||||
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
|
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
|
||||||
closeVisitorMsg.setType(MessageType.REPORT_CLIENT_PERMEATE_SERVER_TRANSFER_CLOSE);
|
closeVisitorMsg.setType(MessageType.REPORT_CLIENT_PERMEATE_SERVER_TRANSFER_CLOSE);
|
||||||
closeVisitorMsg.setVisitorId(visitorId);
|
closeVisitorMsg.setVisitorId(visitorId);
|
||||||
channel.writeAndFlush(closeVisitorMsg);
|
nextChannel.writeAndFlush(closeVisitorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
super.channelInactive(ctx);
|
super.channelInactive(ctx);
|
||||||
@ -58,18 +55,14 @@ public class NettyClientPermeateServerTransferHandler extends SimpleChannelInbou
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
||||||
// 处理客户端本地真实通道问题
|
if (ctx.channel().isWritable()) {
|
||||||
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
|
log.debug("Channel is writable again");
|
||||||
if(ObjectUtils.isEmpty(visitorId)) {
|
// 恢复之前暂停的操作,如写入数据
|
||||||
super.channelWritabilityChanged(ctx);
|
} else {
|
||||||
return;
|
log.debug("Channel is not writable");
|
||||||
}
|
// 暂停写入操作,等待可写状态
|
||||||
|
|
||||||
Channel realChannel = NettyRealIdContext.getReal(visitorId);
|
|
||||||
if (realChannel != null) {
|
|
||||||
log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable());
|
|
||||||
realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
|
|
||||||
}
|
}
|
||||||
|
log.info("channelWritabilityChanged!");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,79 +0,0 @@
|
|||||||
package org.framework.lazy.cloud.network.heartbeat.client.netty.handler;
|
|
||||||
|
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.ChannelOption;
|
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
|
|
||||||
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
|
|
||||||
import org.wu.framework.core.utils.ObjectUtils;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 客户端访客通信通道 处理器
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class NettyClientPermeateTransferHandler extends SimpleChannelInboundHandler<NettyProxyMsg> {
|
|
||||||
private final ChannelTypeAdapter channelTypeAdapter;
|
|
||||||
|
|
||||||
public NettyClientPermeateTransferHandler(ChannelTypeAdapter channelTypeAdapter) {
|
|
||||||
this.channelTypeAdapter = channelTypeAdapter;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
super.channelActive(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead0(ChannelHandlerContext ctx, NettyProxyMsg nettyProxyMsg) throws Exception {
|
|
||||||
Channel channel = ctx.channel();
|
|
||||||
channelTypeAdapter.handler(channel, nettyProxyMsg);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
|
|
||||||
String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel());
|
|
||||||
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
|
|
||||||
// 关闭访客
|
|
||||||
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
|
|
||||||
if (clientChannel != null) {
|
|
||||||
Channel channel = clientChannel.getChannel();
|
|
||||||
// 上报关闭这个客户端的访客通道
|
|
||||||
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
|
|
||||||
closeVisitorMsg.setType(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR);
|
|
||||||
closeVisitorMsg.setVisitorId(visitorId);
|
|
||||||
channel.writeAndFlush(closeVisitorMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
super.channelInactive(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
// 处理客户端本地真实通道问题
|
|
||||||
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
|
|
||||||
if(ObjectUtils.isEmpty(visitorId)) {
|
|
||||||
super.channelWritabilityChanged(ctx);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Channel realChannel = NettyRealIdContext.getReal(visitorId);
|
|
||||||
if (realChannel != null) {
|
|
||||||
log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable());
|
|
||||||
realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
||||||
super.exceptionCaught(ctx, cause);
|
|
||||||
}
|
|
||||||
}
|
|
@ -40,14 +40,13 @@ public class NettyServerPermeateClientTransferHandler extends SimpleChannelInbou
|
|||||||
String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel());
|
String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel());
|
||||||
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
|
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
|
||||||
// 关闭访客
|
// 关闭访客
|
||||||
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
|
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
|
||||||
if (clientChannel != null) {
|
if (nextChannel != null) {
|
||||||
Channel channel = clientChannel.getChannel();
|
|
||||||
// 上报关闭这个客户端的访客通道
|
// 上报关闭这个客户端的访客通道
|
||||||
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
|
NettyProxyMsg closeVisitorMsg = new NettyProxyMsg();
|
||||||
closeVisitorMsg.setType(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR);
|
closeVisitorMsg.setType(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR);
|
||||||
closeVisitorMsg.setVisitorId(visitorId);
|
closeVisitorMsg.setVisitorId(visitorId);
|
||||||
channel.writeAndFlush(closeVisitorMsg);
|
nextChannel.writeAndFlush(closeVisitorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
super.channelInactive(ctx);
|
super.channelInactive(ctx);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user