diff --git a/netty-proxy-client/pom.xml b/netty-proxy-client/pom.xml index db5ff27..c43cc09 100644 --- a/netty-proxy-client/pom.xml +++ b/netty-proxy-client/pom.xml @@ -1,6 +1,6 @@ - netty-proxy diff --git a/netty-proxy-client/src/main/java/com/luck/client/ClientStart.java b/netty-proxy-client/src/main/java/com/luck/client/ClientStart.java index 3117db7..6776f21 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/ClientStart.java +++ b/netty-proxy-client/src/main/java/com/luck/client/ClientStart.java @@ -1,5 +1,8 @@ package com.luck.client; +import org.springframework.boot.SpringApplication; + + public class ClientStart { public static void main(String[] args) throws Exception { diff --git a/netty-proxy-client/src/main/java/com/luck/client/Constant.java b/netty-proxy-client/src/main/java/com/luck/client/Constant.java index 18e09a7..d6119fd 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/Constant.java +++ b/netty-proxy-client/src/main/java/com/luck/client/Constant.java @@ -1,87 +1,75 @@ package com.luck.client; - + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import io.netty.channel.Channel; import io.netty.util.AttributeKey; import io.netty.util.internal.StringUtil; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - + public class Constant { - /** - * 绑定访客id - */ - public static final AttributeKey VID = AttributeKey.newInstance("vid"); - /** - * 代理服务channel - */ - public static Channel proxyChannel = null; - /** - * 访客,代理服务channel - */ - public static Map vpc = new ConcurrentHashMap<>(); - - /** - * 访客,真实服务channel - */ - public static Map vrc = new ConcurrentHashMap<>(); - - /** - * 真实服务端口 - */ - public static int realPort = 8080; - - /** - * 服务端口 - */ - public static int serverPort = 16001; - - /** - * 服务IP - */ - public static String serverIp = "127.0.0.1"; - - /** - * 清除连接 - * - * @param vid 访客ID - */ - public static void clearvpcvrc(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel clientChannel = vpc.get(vid); - if (null != clientChannel) { - clientChannel.attr(VID).set(null); - vpc.remove(vid); - } - Channel visitorChannel = vrc.get(vid); - if (null != visitorChannel) { - visitorChannel.attr(VID).set(null); - vrc.remove(vid); - } - } - - /** - * 清除关闭连接 - * - * @param vid 访客ID - */ - public static void clearvpcvrcAndClose(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel clientChannel = vpc.get(vid); - if (null != clientChannel) { - clientChannel.attr(VID).set(null); - vpc.remove(vid); - clientChannel.close(); - } - Channel visitorChannel = vrc.get(vid); - if (null != visitorChannel) { - visitorChannel.attr(VID).set(null); - vrc.remove(vid); - visitorChannel.close(); - } - } + /** 代理服务channel */ + public static Channel proxyChannel = null; + + /** 绑定访客id */ + public static final AttributeKey VID = AttributeKey.newInstance("vid"); + + /** 访客,代理服务channel */ + public static Map vpc = new ConcurrentHashMap<>(); + + /** 访客,真实服务channel */ + public static Map vrc = new ConcurrentHashMap<>(); + + /** 真实服务端口 */ + public static int realPort = 8080; + + /** 服务端口 */ + public static int serverPort = 16001; + + /** 服务IP */ + public static String serverIp = "127.0.0.1"; + + /** + * 清除连接 + * + * @param vid 访客ID + */ + public static void clearvpcvrc(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel clientChannel = vpc.get(vid); + if (null != clientChannel) { + clientChannel.attr(VID).set(null); + vpc.remove(vid); + } + Channel visitorChannel = vrc.get(vid); + if (null != visitorChannel) { + visitorChannel.attr(VID).set(null); + vrc.remove(vid); + } + } + + /** + * 清除关闭连接 + * + * @param vid 访客ID + */ + public static void clearvpcvrcAndClose(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel clientChannel = vpc.get(vid); + if (null != clientChannel) { + clientChannel.attr(VID).set(null); + vpc.remove(vid); + clientChannel.close(); + } + Channel visitorChannel = vrc.get(vid); + if (null != visitorChannel) { + visitorChannel.attr(VID).set(null); + vrc.remove(vid); + visitorChannel.close(); + } + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java b/netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java index 35932bc..3bc202d 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java +++ b/netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java @@ -1,6 +1,7 @@ package com.luck.client; - + import com.luck.msg.MyMsg; + import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -8,91 +9,91 @@ import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.internal.StringUtil; - + public class ProxyHandler extends SimpleChannelInboundHandler { - - @Override - public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { - // 客户端读取到代理过来的数据了 - byte type = myMsg.getType(); - String vid = new String(myMsg.getData()); - switch (type) { - case MyMsg.TYPE_HEARTBEAT: - break; - case MyMsg.TYPE_CONNECT: - RealSocket.connectRealServer(vid); - break; - case MyMsg.TYPE_DISCONNECT: - Constant.clearvpcvrcAndClose(vid); - break; - case MyMsg.TYPE_TRANSFER: - // 把数据转到真实服务 - ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); - buf.writeBytes(myMsg.getData()); - - String visitorId = ctx.channel().attr(Constant.VID).get(); - Channel rchannel = Constant.vrc.get(visitorId); - if (null != rchannel) { - rchannel.writeAndFlush(buf); - } - break; - default: - // 操作有误 - } - // 客户端发数据到真实服务了 - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelWritabilityChanged(ctx); - return; - } - Channel realChannel = Constant.vrc.get(vid); - if (realChannel != null) { - realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); - } - - super.channelWritabilityChanged(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelInactive(ctx); - return; - } - Channel realChannel = Constant.vrc.get(vid); - if (realChannel != null && realChannel.isActive()) { - realChannel.close(); - } - super.channelInactive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - cause.printStackTrace(); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent event = (IdleStateEvent) evt; - switch (event.state()) { - case READER_IDLE: - ctx.channel().close(); - break; - case WRITER_IDLE: - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_HEARTBEAT); - ctx.channel().writeAndFlush(myMsg); - break; - case ALL_IDLE: - break; - } - } - } + + @Override + public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { + // 客户端读取到代理过来的数据了 + byte type = myMsg.getType(); + String vid = new String(myMsg.getData()); + switch (type) { + case MyMsg.TYPE_HEARTBEAT: + break; + case MyMsg.TYPE_CONNECT: + RealSocket.connectRealServer(vid); + break; + case MyMsg.TYPE_DISCONNECT: + Constant.clearvpcvrcAndClose(vid); + break; + case MyMsg.TYPE_TRANSFER: + // 把数据转到真实服务 + ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); + buf.writeBytes(myMsg.getData()); + + String visitorId = ctx.channel().attr(Constant.VID).get(); + Channel rchannel = Constant.vrc.get(visitorId); + if (null != rchannel) { + rchannel.writeAndFlush(buf); + } + break; + default: + // 操作有误 + } + // 客户端发数据到真实服务了 + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelWritabilityChanged(ctx); + return; + } + Channel realChannel = Constant.vrc.get(vid); + if (realChannel != null) { + realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + + super.channelWritabilityChanged(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelInactive(ctx); + return; + } + Channel realChannel = Constant.vrc.get(vid); + if (realChannel != null && realChannel.isActive()) { + realChannel.close(); + } + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + cause.printStackTrace(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + switch (event.state()) { + case READER_IDLE: + ctx.channel().close(); + break; + case WRITER_IDLE: + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_HEARTBEAT); + ctx.channel().writeAndFlush(myMsg); + break; + case ALL_IDLE: + break; + } + } + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java b/netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java index 236a2a7..8d6ae8d 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java +++ b/netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java @@ -1,101 +1,107 @@ package com.luck.client; - + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import com.luck.msg.MyMsg; import com.luck.msg.MyMsgDecoder; import com.luck.msg.MyMsgEncoder; + import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.internal.StringUtil; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - + public class ProxySocket { - /** - * 重连代理服务 - */ - private static final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); - private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - - public static Channel connectProxyServer() throws Exception { - reconnectExecutor.scheduleAtFixedRate(() -> { - try { - connectProxyServer(null); - } catch (Exception e) { - e.printStackTrace(); - } - }, 3, 3, TimeUnit.SECONDS); - return connectProxyServer(null); - } - - public static Channel connectProxyServer(String vid) throws Exception { - if (StringUtil.isNullOrEmpty(vid)) { - if (Constant.proxyChannel == null || !Constant.proxyChannel.isActive()) { - newConnect(null); - } - return null; - } else { - Channel channel = Constant.vpc.get(vid); - if (null == channel) { - newConnect(vid); - channel = Constant.vpc.get(vid); - } - return channel; - } - } - - private static void newConnect(String vid) throws InterruptedException { - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); - pipeline.addLast(new MyMsgEncoder()); - pipeline.addLast(new IdleStateHandler(40, 8, 0)); - pipeline.addLast(new ProxyHandler()); - } - }); - - bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // 客户端链接代理服务器成功 - Channel channel = future.channel(); - if (StringUtil.isNullOrEmpty(vid)) { - // 告诉服务端这条连接是client的连接 - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_CONNECT); - myMsg.setData("client".getBytes()); - channel.writeAndFlush(myMsg); - - Constant.proxyChannel = channel; - } else { - - // 告诉服务端这条连接是vid的连接 - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_CONNECT); - myMsg.setData(vid.getBytes()); - channel.writeAndFlush(myMsg); - - // 客户端绑定通道关系 - Constant.vpc.put(vid, channel); - channel.attr(Constant.VID).set(vid); - - Channel realChannel = Constant.vrc.get(vid); - if (null != realChannel) { - realChannel.config().setOption(ChannelOption.AUTO_READ, true); - } - } - } - } - }); - } + private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + + /** 重连代理服务 */ + private static final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); + + public static Channel connectProxyServer() throws Exception { + reconnectExecutor.scheduleAtFixedRate(() -> { + try { + connectProxyServer(null); + } catch (Exception e) { + e.printStackTrace(); + } + }, 3, 3, TimeUnit.SECONDS); + return connectProxyServer(null); + } + + public static Channel connectProxyServer(String vid) throws Exception { + if (StringUtil.isNullOrEmpty(vid)) { + if (Constant.proxyChannel == null || !Constant.proxyChannel.isActive()) { + newConnect(null); + } + return null; + } else { + Channel channel = Constant.vpc.get(vid); + if (null == channel) { + newConnect(vid); + channel = Constant.vpc.get(vid); + } + return channel; + } + } + + private static void newConnect(String vid) throws InterruptedException { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); + pipeline.addLast(new MyMsgEncoder()); + pipeline.addLast(new IdleStateHandler(40, 8, 0)); + pipeline.addLast(new ProxyHandler()); + } + }); + + bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // 客户端链接代理服务器成功 + Channel channel = future.channel(); + if (StringUtil.isNullOrEmpty(vid)) { + // 告诉服务端这条连接是client的连接 + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_CONNECT); + myMsg.setData("client".getBytes()); + channel.writeAndFlush(myMsg); + + Constant.proxyChannel = channel; + } else { + + // 告诉服务端这条连接是vid的连接 + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_CONNECT); + myMsg.setData(vid.getBytes()); + channel.writeAndFlush(myMsg); + + // 客户端绑定通道关系 + Constant.vpc.put(vid, channel); + channel.attr(Constant.VID).set(vid); + + Channel realChannel = Constant.vrc.get(vid); + if (null != realChannel) { + realChannel.config().setOption(ChannelOption.AUTO_READ, true); + } + } + } + } + }); + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/RealHandler.java b/netty-proxy-client/src/main/java/com/luck/client/RealHandler.java index 5f1d679..3707fe4 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/RealHandler.java +++ b/netty-proxy-client/src/main/java/com/luck/client/RealHandler.java @@ -1,74 +1,75 @@ package com.luck.client; - + import com.luck.msg.MyMsg; + import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.internal.StringUtil; - + public class RealHandler extends SimpleChannelInboundHandler { - - @Override - public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { - // 客户读取到真实服务数据了 - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_TRANSFER); - myMsg.setData(bytes); - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel proxyChannel = Constant.vpc.get(vid); - if (null != proxyChannel) { - proxyChannel.writeAndFlush(myMsg); - } - // 客户端发送真实数据到代理了 - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelInactive(ctx); - return; - } - Channel proxyChannel = Constant.vpc.get(vid); - if (proxyChannel != null) { - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_DISCONNECT); - myMsg.setData(vid.getBytes()); - proxyChannel.writeAndFlush(myMsg); - } - - super.channelInactive(ctx); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelWritabilityChanged(ctx); - return; - } - Channel proxyChannel = Constant.vpc.get(vid); - if (proxyChannel != null) { - proxyChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); - } - - super.channelWritabilityChanged(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { + // 客户读取到真实服务数据了 + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_TRANSFER); + myMsg.setData(bytes); + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel proxyChannel = Constant.vpc.get(vid); + if (null != proxyChannel) { + proxyChannel.writeAndFlush(myMsg); + } + // 客户端发送真实数据到代理了 + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelInactive(ctx); + return; + } + Channel proxyChannel = Constant.vpc.get(vid); + if (proxyChannel != null) { + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_DISCONNECT); + myMsg.setData(vid.getBytes()); + proxyChannel.writeAndFlush(myMsg); + } + + super.channelInactive(ctx); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelWritabilityChanged(ctx); + return; + } + Channel proxyChannel = Constant.vpc.get(vid); + if (proxyChannel != null) { + proxyChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + + super.channelWritabilityChanged(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/RealSocket.java b/netty-proxy-client/src/main/java/com/luck/client/RealSocket.java index 2b5d84f..82b84a7 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/RealSocket.java +++ b/netty-proxy-client/src/main/java/com/luck/client/RealSocket.java @@ -1,59 +1,65 @@ package com.luck.client; - + import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.internal.StringUtil; - + public class RealSocket { - static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - - /** - * 连接真实服务 - * - * @param vid 访客ID - * @return - */ - public static Channel connectRealServer(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return null; - } - Channel channel = Constant.vrc.get(vid); - if (null == channel) { - newConnect(vid); - channel = Constant.vrc.get(vid); - } - return channel; - } - - private static void newConnect(String vid) { - try { - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new RealHandler()); - } - - }); - bootstrap.connect("127.0.0.1", Constant.realPort).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // 客户端链接真实服务成功 - future.channel().config().setOption(ChannelOption.AUTO_READ, false); - future.channel().attr(Constant.VID).set(vid); - Constant.vrc.put(vid, future.channel()); - ProxySocket.connectProxyServer(vid); - } - } - }); - } catch (Exception e) { - e.printStackTrace(); - } - } + static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + + /** + * 连接真实服务 + * + * @param vid 访客ID + * @return + */ + public static Channel connectRealServer(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return null; + } + Channel channel = Constant.vrc.get(vid); + if (null == channel) { + newConnect(vid); + channel = Constant.vrc.get(vid); + } + return channel; + } + + private static void newConnect(String vid) { + try { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new RealHandler()); + } + + }); + bootstrap.connect("127.0.0.1", Constant.realPort).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // 客户端链接真实服务成功 + future.channel().config().setOption(ChannelOption.AUTO_READ, false); + future.channel().attr(Constant.VID).set(vid); + Constant.vrc.put(vid, future.channel()); + ProxySocket.connectProxyServer(vid); + } + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java b/netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java index 7d33c6a..ee95e7b 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java +++ b/netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java @@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.GetMapping; public class ClientController { @GetMapping("/version") - public Result version() { + public Result version(){ return ResultFactory.successOf("客户端版本"); } } diff --git a/netty-proxy-common/pom.xml b/netty-proxy-common/pom.xml index 5e9a2e9..b94a89d 100644 --- a/netty-proxy-common/pom.xml +++ b/netty-proxy-common/pom.xml @@ -1,6 +1,6 @@ - netty-proxy diff --git a/netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java b/netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java index c9d1087..d828684 100644 --- a/netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java +++ b/netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java @@ -1,58 +1,46 @@ package com.luck.msg; - + import java.util.Arrays; - + public class MyMsg { - - /** - * 心跳 - */ - public static final byte TYPE_HEARTBEAT = 0X00; - - /** - * 连接成功 - */ - public static final byte TYPE_CONNECT = 0X01; - - /** - * 数据传输 - */ - public static final byte TYPE_TRANSFER = 0X02; - - /** - * 连接断开 - */ - public static final byte TYPE_DISCONNECT = 0X09; - - /** - * 数据类型 - */ - private byte type; - - /** - * 消息传输数据 - */ - private byte[] data; - - public byte getType() { - return type; - } - - public void setType(byte type) { - this.type = type; - } - - public byte[] getData() { - return data; - } - - public void setData(byte[] data) { - this.data = data; - } - - @Override - public String toString() { - return "MyMsg [type=" + type + ", data=" + Arrays.toString(data) + "]"; - } - + + /** 心跳 */ + public static final byte TYPE_HEARTBEAT = 0X00; + + /** 连接成功 */ + public static final byte TYPE_CONNECT = 0X01; + + /** 数据传输 */ + public static final byte TYPE_TRANSFER = 0X02; + + /** 连接断开 */ + public static final byte TYPE_DISCONNECT = 0X09; + + /** 数据类型 */ + private byte type; + + /** 消息传输数据 */ + private byte[] data; + + public byte getType() { + return type; + } + + public void setType(byte type) { + this.type = type; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + @Override + public String toString() { + return "MyMsg [type=" + type + ", data=" + Arrays.toString(data) + "]"; + } + } \ No newline at end of file diff --git a/netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java b/netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java index 6f9809e..38af367 100644 --- a/netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java +++ b/netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java @@ -1,41 +1,41 @@ package com.luck.msg; - + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; - + public class MyMsgDecoder extends LengthFieldBasedFrameDecoder { - - public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, - int initialBytesToStrip) { - super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); - } - - public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, - int initialBytesToStrip, boolean failFast) { - super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); - } - - @Override - protected MyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception { - ByteBuf in = (ByteBuf) super.decode(ctx, in2); - if (in == null) { - return null; - } - - if (in.readableBytes() < 4) { - return null; - } - - MyMsg myMsg = new MyMsg(); - int dataLength = in.readInt(); - byte type = in.readByte(); - myMsg.setType(type); - byte[] data = new byte[dataLength - 5]; - in.readBytes(data); - myMsg.setData(data); - in.release(); - - return myMsg; - } + + public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, + int initialBytesToStrip) { + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); + } + + public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, + int initialBytesToStrip, boolean failFast) { + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); + } + + @Override + protected MyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception { + ByteBuf in = (ByteBuf) super.decode(ctx, in2); + if (in == null) { + return null; + } + + if (in.readableBytes() < 4) { + return null; + } + + MyMsg myMsg = new MyMsg(); + int dataLength = in.readInt(); + byte type = in.readByte(); + myMsg.setType(type); + byte[] data = new byte[dataLength - 5]; + in.readBytes(data); + myMsg.setData(data); + in.release(); + + return myMsg; + } } \ No newline at end of file diff --git a/netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java b/netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java index ec10ecf..f645e90 100644 --- a/netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java +++ b/netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java @@ -1,28 +1,28 @@ package com.luck.msg; - + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; - + public class MyMsgEncoder extends MessageToByteEncoder { - - public MyMsgEncoder() { - - } - - @Override - protected void encode(ChannelHandlerContext ctx, MyMsg msg, ByteBuf out) throws Exception { - int bodyLength = 5; - if (msg.getData() != null) { - bodyLength += msg.getData().length; - } - - out.writeInt(bodyLength); - - out.writeByte(msg.getType()); - - if (msg.getData() != null) { - out.writeBytes(msg.getData()); - } - } + + public MyMsgEncoder() { + + } + + @Override + protected void encode(ChannelHandlerContext ctx, MyMsg msg, ByteBuf out) throws Exception { + int bodyLength = 5; + if (msg.getData() != null) { + bodyLength += msg.getData().length; + } + + out.writeInt(bodyLength); + + out.writeByte(msg.getType()); + + if (msg.getData() != null) { + out.writeBytes(msg.getData()); + } + } } \ No newline at end of file diff --git a/netty-proxy-server/pom.xml b/netty-proxy-server/pom.xml index fad37ad..cc1a880 100644 --- a/netty-proxy-server/pom.xml +++ b/netty-proxy-server/pom.xml @@ -1,6 +1,6 @@ - netty-proxy diff --git a/netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java b/netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java index 0bbb5f0..15c0041 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java +++ b/netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java @@ -1,113 +1,119 @@ -package com.luck.server; +package com.luck.server; + import com.luck.msg.MyMsg; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.internal.StringUtil; - + public class ClientHandler extends SimpleChannelInboundHandler { - - @Override - public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { - // 代理服务器读到客户端数据了 - byte type = myMsg.getType(); - switch (type) { - case MyMsg.TYPE_HEARTBEAT: - MyMsg hb = new MyMsg(); - hb.setType(MyMsg.TYPE_HEARTBEAT); - ctx.channel().writeAndFlush(hb); - break; - case MyMsg.TYPE_CONNECT: - String vid = new String(myMsg.getData()); - if (StringUtil.isNullOrEmpty(vid) || "client".equals(vid)) { - Constant.clientChannel = ctx.channel(); - } else { - // 绑定访客和客户端的连接 - Channel visitorChannel = Constant.vvc.get(vid); - if (null != visitorChannel) { - ctx.channel().attr(Constant.VID).set(vid); - Constant.vcc.put(vid, ctx.channel()); - - // 通道绑定完成可以读取访客数据 - visitorChannel.config().setOption(ChannelOption.AUTO_READ, true); - } - } - break; - case MyMsg.TYPE_DISCONNECT: - String disVid = new String(myMsg.getData()); - Constant.clearVccVvcAndClose(disVid); - break; - case MyMsg.TYPE_TRANSFER: - // 把数据转到用户服务 - ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); - buf.writeBytes(myMsg.getData()); - - String visitorId = ctx.channel().attr(Constant.VID).get(); - Channel vchannel = Constant.vvc.get(visitorId); - if (null != vchannel) { - vchannel.writeAndFlush(buf); - } - break; - default: - // 操作有误 - } - // 代理服务器发送数据到用户了 - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelWritabilityChanged(ctx); - return; - } - Channel visitorChannel = Constant.vvc.get(vid); - if (visitorChannel != null) { - visitorChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); - } - - super.channelWritabilityChanged(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelInactive(ctx); - return; - } - Channel visitorChannel = Constant.vvc.get(vid); - if (visitorChannel != null && visitorChannel.isActive()) { - // 数据发送完成后再关闭连接,解决http1.0数据传输问题 - visitorChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); - visitorChannel.close(); - } else { - ctx.channel().close(); - } - Constant.clearVccVvc(vid); - super.channelInactive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent event = (IdleStateEvent) evt; - switch (event.state()) { - case READER_IDLE: - ctx.channel().close(); - break; - case WRITER_IDLE: - break; - case ALL_IDLE: - break; - } - } - } + + @Override + public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { + // 代理服务器读到客户端数据了 + byte type = myMsg.getType(); + switch (type) { + case MyMsg.TYPE_HEARTBEAT: + MyMsg hb = new MyMsg(); + hb.setType(MyMsg.TYPE_HEARTBEAT); + ctx.channel().writeAndFlush(hb); + break; + case MyMsg.TYPE_CONNECT: + String vid = new String(myMsg.getData()); + if (StringUtil.isNullOrEmpty(vid) || "client".equals(vid)) { + Constant.clientChannel = ctx.channel(); + } else { + // 绑定访客和客户端的连接 + Channel visitorChannel = Constant.vvc.get(vid); + if (null != visitorChannel) { + ctx.channel().attr(Constant.VID).set(vid); + Constant.vcc.put(vid, ctx.channel()); + + // 通道绑定完成可以读取访客数据 + visitorChannel.config().setOption(ChannelOption.AUTO_READ, true); + } + } + break; + case MyMsg.TYPE_DISCONNECT: + String disVid = new String(myMsg.getData()); + Constant.clearVccVvcAndClose(disVid); + break; + case MyMsg.TYPE_TRANSFER: + // 把数据转到用户服务 + ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); + buf.writeBytes(myMsg.getData()); + + String visitorId = ctx.channel().attr(Constant.VID).get(); + Channel vchannel = Constant.vvc.get(visitorId); + if (null != vchannel) { + vchannel.writeAndFlush(buf); + } + break; + default: + // 操作有误 + } + // 代理服务器发送数据到用户了 + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if(StringUtil.isNullOrEmpty(vid)) { + super.channelWritabilityChanged(ctx); + return; + } + Channel visitorChannel = Constant.vvc.get(vid); + if (visitorChannel != null) { + visitorChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + + super.channelWritabilityChanged(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelInactive(ctx); + return; + } + Channel visitorChannel = Constant.vvc.get(vid); + if (visitorChannel != null && visitorChannel.isActive()) { + // 数据发送完成后再关闭连接,解决http1.0数据传输问题 + visitorChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + visitorChannel.close(); + } else { + ctx.channel().close(); + } + Constant.clearVccVvc(vid); + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + switch (event.state()) { + case READER_IDLE: + ctx.channel().close(); + break; + case WRITER_IDLE: + break; + case ALL_IDLE: + break; + } + } + } } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/Constant.java b/netty-proxy-server/src/main/java/com/luck/server/Constant.java index ed6cde3..1abe930 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/Constant.java +++ b/netty-proxy-server/src/main/java/com/luck/server/Constant.java @@ -1,82 +1,72 @@ package com.luck.server; - + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import io.netty.channel.Channel; import io.netty.util.AttributeKey; import io.netty.util.internal.StringUtil; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - + public class Constant { - /** - * 绑定channel_id - */ - public static final AttributeKey VID = AttributeKey.newInstance("vid"); - /** - * 客户端服务channel - */ - public static Channel clientChannel = null; - /** - * 访客,客户服务channel - */ - public static Map vcc = new ConcurrentHashMap<>(); - - /** - * 访客,访客服务channel - */ - public static Map vvc = new ConcurrentHashMap<>(); - - /** - * 服务代理端口 - */ - public static int visitorPort = 16002; - - /** - * 服务端口 - */ - public static int serverPort = 16001; - - /** - * 清除连接 - * - * @param vid 访客ID - */ - public static void clearVccVvc(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel clientChannel = vcc.get(vid); - if (null != clientChannel) { - clientChannel.attr(VID).set(null); - vcc.remove(vid); - } - Channel visitorChannel = vvc.get(vid); - if (null != visitorChannel) { - visitorChannel.attr(VID).set(null); - vvc.remove(vid); - } - } - - /** - * 清除关闭连接 - * - * @param vid 访客ID - */ - public static void clearVccVvcAndClose(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel clientChannel = vcc.get(vid); - if (null != clientChannel) { - clientChannel.attr(VID).set(null); - vcc.remove(vid); - clientChannel.close(); - } - Channel visitorChannel = vvc.get(vid); - if (null != visitorChannel) { - visitorChannel.attr(VID).set(null); - vvc.remove(vid); - visitorChannel.close(); - } - } + /** 客户端服务channel */ + public static Channel clientChannel = null; + + /** 绑定channel_id */ + public static final AttributeKey VID = AttributeKey.newInstance("vid"); + + /** 访客,客户服务channel */ + public static Map vcc = new ConcurrentHashMap<>(); + + /** 访客,访客服务channel */ + public static Map vvc = new ConcurrentHashMap<>(); + + /** 服务代理端口 */ + public static int visitorPort = 16002; + + /** 服务端口 */ + public static int serverPort = 16001; + + /** + * 清除连接 + * + * @param vid 访客ID + */ + public static void clearVccVvc(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel clientChannel = vcc.get(vid); + if (null != clientChannel) { + clientChannel.attr(VID).set(null); + vcc.remove(vid); + } + Channel visitorChannel = vvc.get(vid); + if (null != visitorChannel) { + visitorChannel.attr(VID).set(null); + vvc.remove(vid); + } + } + + /** + * 清除关闭连接 + * + * @param vid 访客ID + */ + public static void clearVccVvcAndClose(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel clientChannel = vcc.get(vid); + if (null != clientChannel) { + clientChannel.attr(VID).set(null); + vcc.remove(vid); + clientChannel.close(); + } + Channel visitorChannel = vvc.get(vid); + if (null != visitorChannel) { + visitorChannel.attr(VID).set(null); + vvc.remove(vid); + visitorChannel.close(); + } + } } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java b/netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java index 4efca69..bca1b03 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java +++ b/netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java @@ -1,61 +1,65 @@ package com.luck.server; - + import com.luck.msg.MyMsgDecoder; import com.luck.msg.MyMsgEncoder; + import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.*; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; - + public class ServerSocket { - private static EventLoopGroup bossGroup = new NioEventLoopGroup(); - private static EventLoopGroup workerGroup = new NioEventLoopGroup(); - private static ChannelFuture channelFuture; - - /** - * 启动服务端 - * - * @throws Exception - */ - public static void startServer() throws Exception { - try { - - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); - pipeline.addLast(new MyMsgEncoder()); - pipeline.addLast(new IdleStateHandler(40, 10, 0)); - pipeline.addLast(new ClientHandler()); - } - - }); - channelFuture = b.bind(Constant.serverPort).sync(); - - channelFuture.addListener((ChannelFutureListener) channelFuture -> { - // 服务器已启动 - }); - channelFuture.channel().closeFuture().sync(); - } finally { - shutdown(); - // 服务器已关闭 - } - } - - public static void shutdown() { - if (channelFuture != null) { - channelFuture.channel().close().syncUninterruptibly(); - } - if ((bossGroup != null) && (!bossGroup.isShutdown())) { - bossGroup.shutdownGracefully(); - } - if ((workerGroup != null) && (!workerGroup.isShutdown())) { - workerGroup.shutdownGracefully(); - } - } + private static EventLoopGroup bossGroup = new NioEventLoopGroup(); + private static EventLoopGroup workerGroup = new NioEventLoopGroup(); + private static ChannelFuture channelFuture; + + /** + * 启动服务端 + * @throws Exception + */ + public static void startServer() throws Exception { + try { + + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); + pipeline.addLast(new MyMsgEncoder()); + pipeline.addLast(new IdleStateHandler(40, 10, 0)); + pipeline.addLast(new ClientHandler()); + } + + }); + channelFuture = b.bind(Constant.serverPort).sync(); + + channelFuture.addListener((ChannelFutureListener) channelFuture -> { + // 服务器已启动 + }); + channelFuture.channel().closeFuture().sync(); + } finally { + shutdown(); + // 服务器已关闭 + } + } + + public static void shutdown() { + if (channelFuture != null) { + channelFuture.channel().close().syncUninterruptibly(); + } + if ((bossGroup != null) && (!bossGroup.isShutdown())) { + bossGroup.shutdownGracefully(); + } + if ((workerGroup != null) && (!workerGroup.isShutdown())) { + workerGroup.shutdownGracefully(); + } + } } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java b/netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java index f42517f..dd340ae 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java +++ b/netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java @@ -1,103 +1,98 @@ package com.luck.server; - + +import java.util.UUID; + import com.luck.msg.MyMsg; + import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.internal.StringUtil; - -import java.util.UUID; - -/** - * description: 访客处理程序 - * - * @author 吴佳伟 - * @date: 12.2.23 10:21 - */ + public class VisitorHandler extends SimpleChannelInboundHandler { - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - // 访客连接上代理服务器了 - Channel visitorChannel = ctx.channel(); - // 先不读取访客数据 - visitorChannel.config().setOption(ChannelOption.AUTO_READ, false); - - // 生成访客ID - String vid = UUID.randomUUID().toString(); - - // 绑定访客通道 - visitorChannel.attr(Constant.VID).set(vid); - Constant.vvc.put(vid, visitorChannel); - - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_CONNECT); - myMsg.setData(vid.getBytes()); - Constant.clientChannel.writeAndFlush(myMsg); - - super.channelActive(ctx); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_TRANSFER); - myMsg.setData(bytes); - - // 代理服务器发送数据到客户端了 - Channel clientChannel = Constant.vcc.get(vid); - clientChannel.writeAndFlush(myMsg); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelInactive(ctx); - return; - } - Channel clientChannel = Constant.vcc.get(vid); - if (clientChannel != null && clientChannel.isActive()) { - - clientChannel.config().setOption(ChannelOption.AUTO_READ, true); - - // 通知客户端,访客连接已经断开 - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_DISCONNECT); - myMsg.setData(vid.getBytes()); - clientChannel.writeAndFlush(myMsg); - } - Constant.clearVccVvc(vid); - super.channelInactive(ctx); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - - Channel visitorChannel = ctx.channel(); - String vid = visitorChannel.attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelWritabilityChanged(ctx); - return; - } - Channel clientChannel = Constant.vcc.get(vid); - if (clientChannel != null) { - clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); - } - - super.channelWritabilityChanged(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.close(); - } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // 访客连接上代理服务器了 + Channel visitorChannel = ctx.channel(); + // 先不读取访客数据 + visitorChannel.config().setOption(ChannelOption.AUTO_READ, false); + + // 生成访客ID + String vid = UUID.randomUUID().toString(); + + // 绑定访客通道 + visitorChannel.attr(Constant.VID).set(vid); + Constant.vvc.put(vid, visitorChannel); + + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_CONNECT); + myMsg.setData(vid.getBytes()); + Constant.clientChannel.writeAndFlush(myMsg); + + super.channelActive(ctx); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_TRANSFER); + myMsg.setData(bytes); + + // 代理服务器发送数据到客户端了 + Channel clientChannel = Constant.vcc.get(vid); + clientChannel.writeAndFlush(myMsg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelInactive(ctx); + return; + } + Channel clientChannel = Constant.vcc.get(vid); + if (clientChannel != null && clientChannel.isActive()) { + + clientChannel.config().setOption(ChannelOption.AUTO_READ, true); + + // 通知客户端,访客连接已经断开 + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_DISCONNECT); + myMsg.setData(vid.getBytes()); + clientChannel.writeAndFlush(myMsg); + } + Constant.clearVccVvc(vid); + super.channelInactive(ctx); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + + Channel visitorChannel = ctx.channel(); + String vid = visitorChannel.attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelWritabilityChanged(ctx); + return; + } + Channel clientChannel = Constant.vcc.get(vid); + if (clientChannel != null) { + clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); + } + + super.channelWritabilityChanged(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java b/netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java index 2affe79..9aba8b5 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java +++ b/netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java @@ -1,5 +1,5 @@ package com.luck.server; - + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelInitializer; @@ -8,30 +8,30 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; - + public class VisitorSocket { - private static EventLoopGroup bossGroup = new NioEventLoopGroup(); - private static EventLoopGroup workerGroup = new NioEventLoopGroup(); - - /** - * 启动服务代理 - * - * @throws Exception - */ - public static void startServer() throws Exception { - - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new ChannelDuplexHandler()); - pipeline.addLast(new VisitorHandler()); - } - }); - b.bind(Constant.visitorPort).get(); - - } - + private static EventLoopGroup bossGroup = new NioEventLoopGroup(); + private static EventLoopGroup workerGroup = new NioEventLoopGroup(); + + /** + * 启动服务代理 + * + * @throws Exception + */ + public static void startServer() throws Exception { + + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new ChannelDuplexHandler()); + pipeline.addLast(new VisitorHandler()); + } + }); + b.bind(Constant.visitorPort).get(); + + } + } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java b/netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java index 9e09362..126da41 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java +++ b/netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java @@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.GetMapping; public class ServerController { @GetMapping("/version") - public Result version() { + public Result version(){ return ResultFactory.successOf("服务端端版本"); } } diff --git a/pom.xml b/pom.xml index d2d7958..e60f5c0 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,5 @@ - 4.0.0 pom