From 6cb17fbd585fe098341a21a66122f68b205986bb Mon Sep 17 00:00:00 2001 From: wujiawei <12345678> Date: Sun, 12 Feb 2023 10:34:46 +0800 Subject: [PATCH] init --- netty-proxy-client/pom.xml | 4 +- .../java/com/luck/client/ClientStart.java | 3 - .../main/java/com/luck/client/Constant.java | 150 ++++++------ .../java/com/luck/client/ProxyHandler.java | 175 +++++++------- .../java/com/luck/client/ProxySocket.java | 184 ++++++++------- .../java/com/luck/client/RealHandler.java | 129 ++++++----- .../main/java/com/luck/client/RealSocket.java | 108 +++++---- .../client/controller/ClientController.java | 2 +- netty-proxy-common/pom.xml | 4 +- .../src/main/java/com/luck/msg/MyMsg.java | 96 ++++---- .../main/java/com/luck/msg/MyMsgDecoder.java | 70 +++--- .../main/java/com/luck/msg/MyMsgEncoder.java | 44 ++-- netty-proxy-server/pom.xml | 4 +- .../java/com/luck/server/ClientHandler.java | 216 +++++++++--------- .../main/java/com/luck/server/Constant.java | 142 ++++++------ .../java/com/luck/server/ServerSocket.java | 106 +++++---- .../java/com/luck/server/VisitorHandler.java | 175 +++++++------- .../java/com/luck/server/VisitorSocket.java | 52 ++--- .../server/controller/ServerController.java | 2 +- pom.xml | 2 +- 20 files changed, 837 insertions(+), 831 deletions(-) diff --git a/netty-proxy-client/pom.xml b/netty-proxy-client/pom.xml index c43cc09..db5ff27 100644 --- a/netty-proxy-client/pom.xml +++ b/netty-proxy-client/pom.xml @@ -1,6 +1,6 @@ - netty-proxy diff --git a/netty-proxy-client/src/main/java/com/luck/client/ClientStart.java b/netty-proxy-client/src/main/java/com/luck/client/ClientStart.java index 6776f21..3117db7 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/ClientStart.java +++ b/netty-proxy-client/src/main/java/com/luck/client/ClientStart.java @@ -1,8 +1,5 @@ package com.luck.client; -import org.springframework.boot.SpringApplication; - - public class ClientStart { public static void main(String[] args) throws Exception { diff --git a/netty-proxy-client/src/main/java/com/luck/client/Constant.java b/netty-proxy-client/src/main/java/com/luck/client/Constant.java index d6119fd..18e09a7 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/Constant.java +++ b/netty-proxy-client/src/main/java/com/luck/client/Constant.java @@ -1,75 +1,87 @@ package com.luck.client; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - + import io.netty.channel.Channel; import io.netty.util.AttributeKey; import io.netty.util.internal.StringUtil; - + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + public class Constant { - /** 代理服务channel */ - public static Channel proxyChannel = null; - - /** 绑定访客id */ - public static final AttributeKey VID = AttributeKey.newInstance("vid"); - - /** 访客,代理服务channel */ - public static Map vpc = new ConcurrentHashMap<>(); - - /** 访客,真实服务channel */ - public static Map vrc = new ConcurrentHashMap<>(); - - /** 真实服务端口 */ - public static int realPort = 8080; - - /** 服务端口 */ - public static int serverPort = 16001; - - /** 服务IP */ - public static String serverIp = "127.0.0.1"; - - /** - * 清除连接 - * - * @param vid 访客ID - */ - public static void clearvpcvrc(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel clientChannel = vpc.get(vid); - if (null != clientChannel) { - clientChannel.attr(VID).set(null); - vpc.remove(vid); - } - Channel visitorChannel = vrc.get(vid); - if (null != visitorChannel) { - visitorChannel.attr(VID).set(null); - vrc.remove(vid); - } - } - - /** - * 清除关闭连接 - * - * @param vid 访客ID - */ - public static void clearvpcvrcAndClose(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel clientChannel = vpc.get(vid); - if (null != clientChannel) { - clientChannel.attr(VID).set(null); - vpc.remove(vid); - clientChannel.close(); - } - Channel visitorChannel = vrc.get(vid); - if (null != visitorChannel) { - visitorChannel.attr(VID).set(null); - vrc.remove(vid); - visitorChannel.close(); - } - } + /** + * 绑定访客id + */ + public static final AttributeKey VID = AttributeKey.newInstance("vid"); + /** + * 代理服务channel + */ + public static Channel proxyChannel = null; + /** + * 访客,代理服务channel + */ + public static Map vpc = new ConcurrentHashMap<>(); + + /** + * 访客,真实服务channel + */ + public static Map vrc = new ConcurrentHashMap<>(); + + /** + * 真实服务端口 + */ + public static int realPort = 8080; + + /** + * 服务端口 + */ + public static int serverPort = 16001; + + /** + * 服务IP + */ + public static String serverIp = "127.0.0.1"; + + /** + * 清除连接 + * + * @param vid 访客ID + */ + public static void clearvpcvrc(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel clientChannel = vpc.get(vid); + if (null != clientChannel) { + clientChannel.attr(VID).set(null); + vpc.remove(vid); + } + Channel visitorChannel = vrc.get(vid); + if (null != visitorChannel) { + visitorChannel.attr(VID).set(null); + vrc.remove(vid); + } + } + + /** + * 清除关闭连接 + * + * @param vid 访客ID + */ + public static void clearvpcvrcAndClose(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel clientChannel = vpc.get(vid); + if (null != clientChannel) { + clientChannel.attr(VID).set(null); + vpc.remove(vid); + clientChannel.close(); + } + Channel visitorChannel = vrc.get(vid); + if (null != visitorChannel) { + visitorChannel.attr(VID).set(null); + vrc.remove(vid); + visitorChannel.close(); + } + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java b/netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java index 3bc202d..35932bc 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java +++ b/netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java @@ -1,7 +1,6 @@ package com.luck.client; - + import com.luck.msg.MyMsg; - import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -9,91 +8,91 @@ import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.internal.StringUtil; - + public class ProxyHandler extends SimpleChannelInboundHandler { - - @Override - public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { - // 客户端读取到代理过来的数据了 - byte type = myMsg.getType(); - String vid = new String(myMsg.getData()); - switch (type) { - case MyMsg.TYPE_HEARTBEAT: - break; - case MyMsg.TYPE_CONNECT: - RealSocket.connectRealServer(vid); - break; - case MyMsg.TYPE_DISCONNECT: - Constant.clearvpcvrcAndClose(vid); - break; - case MyMsg.TYPE_TRANSFER: - // 把数据转到真实服务 - ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); - buf.writeBytes(myMsg.getData()); - - String visitorId = ctx.channel().attr(Constant.VID).get(); - Channel rchannel = Constant.vrc.get(visitorId); - if (null != rchannel) { - rchannel.writeAndFlush(buf); - } - break; - default: - // 操作有误 - } - // 客户端发数据到真实服务了 - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelWritabilityChanged(ctx); - return; - } - Channel realChannel = Constant.vrc.get(vid); - if (realChannel != null) { - realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); - } - - super.channelWritabilityChanged(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelInactive(ctx); - return; - } - Channel realChannel = Constant.vrc.get(vid); - if (realChannel != null && realChannel.isActive()) { - realChannel.close(); - } - super.channelInactive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - cause.printStackTrace(); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent event = (IdleStateEvent) evt; - switch (event.state()) { - case READER_IDLE: - ctx.channel().close(); - break; - case WRITER_IDLE: - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_HEARTBEAT); - ctx.channel().writeAndFlush(myMsg); - break; - case ALL_IDLE: - break; - } - } - } + + @Override + public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { + // 客户端读取到代理过来的数据了 + byte type = myMsg.getType(); + String vid = new String(myMsg.getData()); + switch (type) { + case MyMsg.TYPE_HEARTBEAT: + break; + case MyMsg.TYPE_CONNECT: + RealSocket.connectRealServer(vid); + break; + case MyMsg.TYPE_DISCONNECT: + Constant.clearvpcvrcAndClose(vid); + break; + case MyMsg.TYPE_TRANSFER: + // 把数据转到真实服务 + ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); + buf.writeBytes(myMsg.getData()); + + String visitorId = ctx.channel().attr(Constant.VID).get(); + Channel rchannel = Constant.vrc.get(visitorId); + if (null != rchannel) { + rchannel.writeAndFlush(buf); + } + break; + default: + // 操作有误 + } + // 客户端发数据到真实服务了 + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelWritabilityChanged(ctx); + return; + } + Channel realChannel = Constant.vrc.get(vid); + if (realChannel != null) { + realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + + super.channelWritabilityChanged(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelInactive(ctx); + return; + } + Channel realChannel = Constant.vrc.get(vid); + if (realChannel != null && realChannel.isActive()) { + realChannel.close(); + } + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + cause.printStackTrace(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + switch (event.state()) { + case READER_IDLE: + ctx.channel().close(); + break; + case WRITER_IDLE: + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_HEARTBEAT); + ctx.channel().writeAndFlush(myMsg); + break; + case ALL_IDLE: + break; + } + } + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java b/netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java index 8d6ae8d..236a2a7 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java +++ b/netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java @@ -1,107 +1,101 @@ package com.luck.client; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - + import com.luck.msg.MyMsg; import com.luck.msg.MyMsgDecoder; import com.luck.msg.MyMsgEncoder; - import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.internal.StringUtil; - + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + public class ProxySocket { - private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - - /** 重连代理服务 */ - private static final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); - - public static Channel connectProxyServer() throws Exception { - reconnectExecutor.scheduleAtFixedRate(() -> { - try { - connectProxyServer(null); - } catch (Exception e) { - e.printStackTrace(); - } - }, 3, 3, TimeUnit.SECONDS); - return connectProxyServer(null); - } - - public static Channel connectProxyServer(String vid) throws Exception { - if (StringUtil.isNullOrEmpty(vid)) { - if (Constant.proxyChannel == null || !Constant.proxyChannel.isActive()) { - newConnect(null); - } - return null; - } else { - Channel channel = Constant.vpc.get(vid); - if (null == channel) { - newConnect(vid); - channel = Constant.vpc.get(vid); - } - return channel; - } - } - - private static void newConnect(String vid) throws InterruptedException { - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); - pipeline.addLast(new MyMsgEncoder()); - pipeline.addLast(new IdleStateHandler(40, 8, 0)); - pipeline.addLast(new ProxyHandler()); - } - }); - - bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // 客户端链接代理服务器成功 - Channel channel = future.channel(); - if (StringUtil.isNullOrEmpty(vid)) { - // 告诉服务端这条连接是client的连接 - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_CONNECT); - myMsg.setData("client".getBytes()); - channel.writeAndFlush(myMsg); - - Constant.proxyChannel = channel; - } else { - - // 告诉服务端这条连接是vid的连接 - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_CONNECT); - myMsg.setData(vid.getBytes()); - channel.writeAndFlush(myMsg); - - // 客户端绑定通道关系 - Constant.vpc.put(vid, channel); - channel.attr(Constant.VID).set(vid); - - Channel realChannel = Constant.vrc.get(vid); - if (null != realChannel) { - realChannel.config().setOption(ChannelOption.AUTO_READ, true); - } - } - } - } - }); - } + /** + * 重连代理服务 + */ + private static final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); + private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + + public static Channel connectProxyServer() throws Exception { + reconnectExecutor.scheduleAtFixedRate(() -> { + try { + connectProxyServer(null); + } catch (Exception e) { + e.printStackTrace(); + } + }, 3, 3, TimeUnit.SECONDS); + return connectProxyServer(null); + } + + public static Channel connectProxyServer(String vid) throws Exception { + if (StringUtil.isNullOrEmpty(vid)) { + if (Constant.proxyChannel == null || !Constant.proxyChannel.isActive()) { + newConnect(null); + } + return null; + } else { + Channel channel = Constant.vpc.get(vid); + if (null == channel) { + newConnect(vid); + channel = Constant.vpc.get(vid); + } + return channel; + } + } + + private static void newConnect(String vid) throws InterruptedException { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); + pipeline.addLast(new MyMsgEncoder()); + pipeline.addLast(new IdleStateHandler(40, 8, 0)); + pipeline.addLast(new ProxyHandler()); + } + }); + + bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // 客户端链接代理服务器成功 + Channel channel = future.channel(); + if (StringUtil.isNullOrEmpty(vid)) { + // 告诉服务端这条连接是client的连接 + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_CONNECT); + myMsg.setData("client".getBytes()); + channel.writeAndFlush(myMsg); + + Constant.proxyChannel = channel; + } else { + + // 告诉服务端这条连接是vid的连接 + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_CONNECT); + myMsg.setData(vid.getBytes()); + channel.writeAndFlush(myMsg); + + // 客户端绑定通道关系 + Constant.vpc.put(vid, channel); + channel.attr(Constant.VID).set(vid); + + Channel realChannel = Constant.vrc.get(vid); + if (null != realChannel) { + realChannel.config().setOption(ChannelOption.AUTO_READ, true); + } + } + } + } + }); + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/RealHandler.java b/netty-proxy-client/src/main/java/com/luck/client/RealHandler.java index 3707fe4..5f1d679 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/RealHandler.java +++ b/netty-proxy-client/src/main/java/com/luck/client/RealHandler.java @@ -1,75 +1,74 @@ package com.luck.client; - + import com.luck.msg.MyMsg; - import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.internal.StringUtil; - + public class RealHandler extends SimpleChannelInboundHandler { - - @Override - public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { - // 客户读取到真实服务数据了 - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_TRANSFER); - myMsg.setData(bytes); - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel proxyChannel = Constant.vpc.get(vid); - if (null != proxyChannel) { - proxyChannel.writeAndFlush(myMsg); - } - // 客户端发送真实数据到代理了 - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelInactive(ctx); - return; - } - Channel proxyChannel = Constant.vpc.get(vid); - if (proxyChannel != null) { - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_DISCONNECT); - myMsg.setData(vid.getBytes()); - proxyChannel.writeAndFlush(myMsg); - } - - super.channelInactive(ctx); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelWritabilityChanged(ctx); - return; - } - Channel proxyChannel = Constant.vpc.get(vid); - if (proxyChannel != null) { - proxyChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); - } - - super.channelWritabilityChanged(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { + // 客户读取到真实服务数据了 + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_TRANSFER); + myMsg.setData(bytes); + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel proxyChannel = Constant.vpc.get(vid); + if (null != proxyChannel) { + proxyChannel.writeAndFlush(myMsg); + } + // 客户端发送真实数据到代理了 + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelInactive(ctx); + return; + } + Channel proxyChannel = Constant.vpc.get(vid); + if (proxyChannel != null) { + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_DISCONNECT); + myMsg.setData(vid.getBytes()); + proxyChannel.writeAndFlush(myMsg); + } + + super.channelInactive(ctx); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelWritabilityChanged(ctx); + return; + } + Channel proxyChannel = Constant.vpc.get(vid); + if (proxyChannel != null) { + proxyChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + + super.channelWritabilityChanged(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/RealSocket.java b/netty-proxy-client/src/main/java/com/luck/client/RealSocket.java index 82b84a7..2b5d84f 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/RealSocket.java +++ b/netty-proxy-client/src/main/java/com/luck/client/RealSocket.java @@ -1,65 +1,59 @@ package com.luck.client; - + import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.internal.StringUtil; - + public class RealSocket { - static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - - /** - * 连接真实服务 - * - * @param vid 访客ID - * @return - */ - public static Channel connectRealServer(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return null; - } - Channel channel = Constant.vrc.get(vid); - if (null == channel) { - newConnect(vid); - channel = Constant.vrc.get(vid); - } - return channel; - } - - private static void newConnect(String vid) { - try { - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new RealHandler()); - } - - }); - bootstrap.connect("127.0.0.1", Constant.realPort).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // 客户端链接真实服务成功 - future.channel().config().setOption(ChannelOption.AUTO_READ, false); - future.channel().attr(Constant.VID).set(vid); - Constant.vrc.put(vid, future.channel()); - ProxySocket.connectProxyServer(vid); - } - } - }); - } catch (Exception e) { - e.printStackTrace(); - } - } + static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + + /** + * 连接真实服务 + * + * @param vid 访客ID + * @return + */ + public static Channel connectRealServer(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return null; + } + Channel channel = Constant.vrc.get(vid); + if (null == channel) { + newConnect(vid); + channel = Constant.vrc.get(vid); + } + return channel; + } + + private static void newConnect(String vid) { + try { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new RealHandler()); + } + + }); + bootstrap.connect("127.0.0.1", Constant.realPort).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // 客户端链接真实服务成功 + future.channel().config().setOption(ChannelOption.AUTO_READ, false); + future.channel().attr(Constant.VID).set(vid); + Constant.vrc.put(vid, future.channel()); + ProxySocket.connectProxyServer(vid); + } + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } } \ No newline at end of file diff --git a/netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java b/netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java index ee95e7b..7d33c6a 100644 --- a/netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java +++ b/netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java @@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.GetMapping; public class ClientController { @GetMapping("/version") - public Result version(){ + public Result version() { return ResultFactory.successOf("客户端版本"); } } diff --git a/netty-proxy-common/pom.xml b/netty-proxy-common/pom.xml index b94a89d..5e9a2e9 100644 --- a/netty-proxy-common/pom.xml +++ b/netty-proxy-common/pom.xml @@ -1,6 +1,6 @@ - netty-proxy diff --git a/netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java b/netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java index d828684..c9d1087 100644 --- a/netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java +++ b/netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java @@ -1,46 +1,58 @@ package com.luck.msg; - + import java.util.Arrays; - + public class MyMsg { - - /** 心跳 */ - public static final byte TYPE_HEARTBEAT = 0X00; - - /** 连接成功 */ - public static final byte TYPE_CONNECT = 0X01; - - /** 数据传输 */ - public static final byte TYPE_TRANSFER = 0X02; - - /** 连接断开 */ - public static final byte TYPE_DISCONNECT = 0X09; - - /** 数据类型 */ - private byte type; - - /** 消息传输数据 */ - private byte[] data; - - public byte getType() { - return type; - } - - public void setType(byte type) { - this.type = type; - } - - public byte[] getData() { - return data; - } - - public void setData(byte[] data) { - this.data = data; - } - - @Override - public String toString() { - return "MyMsg [type=" + type + ", data=" + Arrays.toString(data) + "]"; - } - + + /** + * 心跳 + */ + public static final byte TYPE_HEARTBEAT = 0X00; + + /** + * 连接成功 + */ + public static final byte TYPE_CONNECT = 0X01; + + /** + * 数据传输 + */ + public static final byte TYPE_TRANSFER = 0X02; + + /** + * 连接断开 + */ + public static final byte TYPE_DISCONNECT = 0X09; + + /** + * 数据类型 + */ + private byte type; + + /** + * 消息传输数据 + */ + private byte[] data; + + public byte getType() { + return type; + } + + public void setType(byte type) { + this.type = type; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + @Override + public String toString() { + return "MyMsg [type=" + type + ", data=" + Arrays.toString(data) + "]"; + } + } \ No newline at end of file diff --git a/netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java b/netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java index 38af367..6f9809e 100644 --- a/netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java +++ b/netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java @@ -1,41 +1,41 @@ package com.luck.msg; - + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; - + public class MyMsgDecoder extends LengthFieldBasedFrameDecoder { - - public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, - int initialBytesToStrip) { - super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); - } - - public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, - int initialBytesToStrip, boolean failFast) { - super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); - } - - @Override - protected MyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception { - ByteBuf in = (ByteBuf) super.decode(ctx, in2); - if (in == null) { - return null; - } - - if (in.readableBytes() < 4) { - return null; - } - - MyMsg myMsg = new MyMsg(); - int dataLength = in.readInt(); - byte type = in.readByte(); - myMsg.setType(type); - byte[] data = new byte[dataLength - 5]; - in.readBytes(data); - myMsg.setData(data); - in.release(); - - return myMsg; - } + + public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, + int initialBytesToStrip) { + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); + } + + public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, + int initialBytesToStrip, boolean failFast) { + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); + } + + @Override + protected MyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception { + ByteBuf in = (ByteBuf) super.decode(ctx, in2); + if (in == null) { + return null; + } + + if (in.readableBytes() < 4) { + return null; + } + + MyMsg myMsg = new MyMsg(); + int dataLength = in.readInt(); + byte type = in.readByte(); + myMsg.setType(type); + byte[] data = new byte[dataLength - 5]; + in.readBytes(data); + myMsg.setData(data); + in.release(); + + return myMsg; + } } \ No newline at end of file diff --git a/netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java b/netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java index f645e90..ec10ecf 100644 --- a/netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java +++ b/netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java @@ -1,28 +1,28 @@ package com.luck.msg; - + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; - + public class MyMsgEncoder extends MessageToByteEncoder { - - public MyMsgEncoder() { - - } - - @Override - protected void encode(ChannelHandlerContext ctx, MyMsg msg, ByteBuf out) throws Exception { - int bodyLength = 5; - if (msg.getData() != null) { - bodyLength += msg.getData().length; - } - - out.writeInt(bodyLength); - - out.writeByte(msg.getType()); - - if (msg.getData() != null) { - out.writeBytes(msg.getData()); - } - } + + public MyMsgEncoder() { + + } + + @Override + protected void encode(ChannelHandlerContext ctx, MyMsg msg, ByteBuf out) throws Exception { + int bodyLength = 5; + if (msg.getData() != null) { + bodyLength += msg.getData().length; + } + + out.writeInt(bodyLength); + + out.writeByte(msg.getType()); + + if (msg.getData() != null) { + out.writeBytes(msg.getData()); + } + } } \ No newline at end of file diff --git a/netty-proxy-server/pom.xml b/netty-proxy-server/pom.xml index cc1a880..fad37ad 100644 --- a/netty-proxy-server/pom.xml +++ b/netty-proxy-server/pom.xml @@ -1,6 +1,6 @@ - netty-proxy diff --git a/netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java b/netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java index 15c0041..0bbb5f0 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java +++ b/netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java @@ -1,119 +1,113 @@ - package com.luck.server; - + import com.luck.msg.MyMsg; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOption; -import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.*; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.internal.StringUtil; - + public class ClientHandler extends SimpleChannelInboundHandler { - - @Override - public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { - // 代理服务器读到客户端数据了 - byte type = myMsg.getType(); - switch (type) { - case MyMsg.TYPE_HEARTBEAT: - MyMsg hb = new MyMsg(); - hb.setType(MyMsg.TYPE_HEARTBEAT); - ctx.channel().writeAndFlush(hb); - break; - case MyMsg.TYPE_CONNECT: - String vid = new String(myMsg.getData()); - if (StringUtil.isNullOrEmpty(vid) || "client".equals(vid)) { - Constant.clientChannel = ctx.channel(); - } else { - // 绑定访客和客户端的连接 - Channel visitorChannel = Constant.vvc.get(vid); - if (null != visitorChannel) { - ctx.channel().attr(Constant.VID).set(vid); - Constant.vcc.put(vid, ctx.channel()); - - // 通道绑定完成可以读取访客数据 - visitorChannel.config().setOption(ChannelOption.AUTO_READ, true); - } - } - break; - case MyMsg.TYPE_DISCONNECT: - String disVid = new String(myMsg.getData()); - Constant.clearVccVvcAndClose(disVid); - break; - case MyMsg.TYPE_TRANSFER: - // 把数据转到用户服务 - ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); - buf.writeBytes(myMsg.getData()); - - String visitorId = ctx.channel().attr(Constant.VID).get(); - Channel vchannel = Constant.vvc.get(visitorId); - if (null != vchannel) { - vchannel.writeAndFlush(buf); - } - break; - default: - // 操作有误 - } - // 代理服务器发送数据到用户了 - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if(StringUtil.isNullOrEmpty(vid)) { - super.channelWritabilityChanged(ctx); - return; - } - Channel visitorChannel = Constant.vvc.get(vid); - if (visitorChannel != null) { - visitorChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); - } - - super.channelWritabilityChanged(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelInactive(ctx); - return; - } - Channel visitorChannel = Constant.vvc.get(vid); - if (visitorChannel != null && visitorChannel.isActive()) { - // 数据发送完成后再关闭连接,解决http1.0数据传输问题 - visitorChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); - visitorChannel.close(); - } else { - ctx.channel().close(); - } - Constant.clearVccVvc(vid); - super.channelInactive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent event = (IdleStateEvent) evt; - switch (event.state()) { - case READER_IDLE: - ctx.channel().close(); - break; - case WRITER_IDLE: - break; - case ALL_IDLE: - break; - } - } - } + + @Override + public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { + // 代理服务器读到客户端数据了 + byte type = myMsg.getType(); + switch (type) { + case MyMsg.TYPE_HEARTBEAT: + MyMsg hb = new MyMsg(); + hb.setType(MyMsg.TYPE_HEARTBEAT); + ctx.channel().writeAndFlush(hb); + break; + case MyMsg.TYPE_CONNECT: + String vid = new String(myMsg.getData()); + if (StringUtil.isNullOrEmpty(vid) || "client".equals(vid)) { + Constant.clientChannel = ctx.channel(); + } else { + // 绑定访客和客户端的连接 + Channel visitorChannel = Constant.vvc.get(vid); + if (null != visitorChannel) { + ctx.channel().attr(Constant.VID).set(vid); + Constant.vcc.put(vid, ctx.channel()); + + // 通道绑定完成可以读取访客数据 + visitorChannel.config().setOption(ChannelOption.AUTO_READ, true); + } + } + break; + case MyMsg.TYPE_DISCONNECT: + String disVid = new String(myMsg.getData()); + Constant.clearVccVvcAndClose(disVid); + break; + case MyMsg.TYPE_TRANSFER: + // 把数据转到用户服务 + ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); + buf.writeBytes(myMsg.getData()); + + String visitorId = ctx.channel().attr(Constant.VID).get(); + Channel vchannel = Constant.vvc.get(visitorId); + if (null != vchannel) { + vchannel.writeAndFlush(buf); + } + break; + default: + // 操作有误 + } + // 代理服务器发送数据到用户了 + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelWritabilityChanged(ctx); + return; + } + Channel visitorChannel = Constant.vvc.get(vid); + if (visitorChannel != null) { + visitorChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + + super.channelWritabilityChanged(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelInactive(ctx); + return; + } + Channel visitorChannel = Constant.vvc.get(vid); + if (visitorChannel != null && visitorChannel.isActive()) { + // 数据发送完成后再关闭连接,解决http1.0数据传输问题 + visitorChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + visitorChannel.close(); + } else { + ctx.channel().close(); + } + Constant.clearVccVvc(vid); + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + switch (event.state()) { + case READER_IDLE: + ctx.channel().close(); + break; + case WRITER_IDLE: + break; + case ALL_IDLE: + break; + } + } + } } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/Constant.java b/netty-proxy-server/src/main/java/com/luck/server/Constant.java index 1abe930..ed6cde3 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/Constant.java +++ b/netty-proxy-server/src/main/java/com/luck/server/Constant.java @@ -1,72 +1,82 @@ package com.luck.server; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - + import io.netty.channel.Channel; import io.netty.util.AttributeKey; import io.netty.util.internal.StringUtil; - + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + public class Constant { - /** 客户端服务channel */ - public static Channel clientChannel = null; - - /** 绑定channel_id */ - public static final AttributeKey VID = AttributeKey.newInstance("vid"); - - /** 访客,客户服务channel */ - public static Map vcc = new ConcurrentHashMap<>(); - - /** 访客,访客服务channel */ - public static Map vvc = new ConcurrentHashMap<>(); - - /** 服务代理端口 */ - public static int visitorPort = 16002; - - /** 服务端口 */ - public static int serverPort = 16001; - - /** - * 清除连接 - * - * @param vid 访客ID - */ - public static void clearVccVvc(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel clientChannel = vcc.get(vid); - if (null != clientChannel) { - clientChannel.attr(VID).set(null); - vcc.remove(vid); - } - Channel visitorChannel = vvc.get(vid); - if (null != visitorChannel) { - visitorChannel.attr(VID).set(null); - vvc.remove(vid); - } - } - - /** - * 清除关闭连接 - * - * @param vid 访客ID - */ - public static void clearVccVvcAndClose(String vid) { - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - Channel clientChannel = vcc.get(vid); - if (null != clientChannel) { - clientChannel.attr(VID).set(null); - vcc.remove(vid); - clientChannel.close(); - } - Channel visitorChannel = vvc.get(vid); - if (null != visitorChannel) { - visitorChannel.attr(VID).set(null); - vvc.remove(vid); - visitorChannel.close(); - } - } + /** + * 绑定channel_id + */ + public static final AttributeKey VID = AttributeKey.newInstance("vid"); + /** + * 客户端服务channel + */ + public static Channel clientChannel = null; + /** + * 访客,客户服务channel + */ + public static Map vcc = new ConcurrentHashMap<>(); + + /** + * 访客,访客服务channel + */ + public static Map vvc = new ConcurrentHashMap<>(); + + /** + * 服务代理端口 + */ + public static int visitorPort = 16002; + + /** + * 服务端口 + */ + public static int serverPort = 16001; + + /** + * 清除连接 + * + * @param vid 访客ID + */ + public static void clearVccVvc(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel clientChannel = vcc.get(vid); + if (null != clientChannel) { + clientChannel.attr(VID).set(null); + vcc.remove(vid); + } + Channel visitorChannel = vvc.get(vid); + if (null != visitorChannel) { + visitorChannel.attr(VID).set(null); + vvc.remove(vid); + } + } + + /** + * 清除关闭连接 + * + * @param vid 访客ID + */ + public static void clearVccVvcAndClose(String vid) { + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + Channel clientChannel = vcc.get(vid); + if (null != clientChannel) { + clientChannel.attr(VID).set(null); + vcc.remove(vid); + clientChannel.close(); + } + Channel visitorChannel = vvc.get(vid); + if (null != visitorChannel) { + visitorChannel.attr(VID).set(null); + vvc.remove(vid); + visitorChannel.close(); + } + } } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java b/netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java index bca1b03..4efca69 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java +++ b/netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java @@ -1,65 +1,61 @@ package com.luck.server; - + import com.luck.msg.MyMsgDecoder; import com.luck.msg.MyMsgEncoder; - import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; - + public class ServerSocket { - private static EventLoopGroup bossGroup = new NioEventLoopGroup(); - private static EventLoopGroup workerGroup = new NioEventLoopGroup(); - private static ChannelFuture channelFuture; - - /** - * 启动服务端 - * @throws Exception - */ - public static void startServer() throws Exception { - try { - - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); - pipeline.addLast(new MyMsgEncoder()); - pipeline.addLast(new IdleStateHandler(40, 10, 0)); - pipeline.addLast(new ClientHandler()); - } - - }); - channelFuture = b.bind(Constant.serverPort).sync(); - - channelFuture.addListener((ChannelFutureListener) channelFuture -> { - // 服务器已启动 - }); - channelFuture.channel().closeFuture().sync(); - } finally { - shutdown(); - // 服务器已关闭 - } - } - - public static void shutdown() { - if (channelFuture != null) { - channelFuture.channel().close().syncUninterruptibly(); - } - if ((bossGroup != null) && (!bossGroup.isShutdown())) { - bossGroup.shutdownGracefully(); - } - if ((workerGroup != null) && (!workerGroup.isShutdown())) { - workerGroup.shutdownGracefully(); - } - } + private static EventLoopGroup bossGroup = new NioEventLoopGroup(); + private static EventLoopGroup workerGroup = new NioEventLoopGroup(); + private static ChannelFuture channelFuture; + + /** + * 启动服务端 + * + * @throws Exception + */ + public static void startServer() throws Exception { + try { + + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); + pipeline.addLast(new MyMsgEncoder()); + pipeline.addLast(new IdleStateHandler(40, 10, 0)); + pipeline.addLast(new ClientHandler()); + } + + }); + channelFuture = b.bind(Constant.serverPort).sync(); + + channelFuture.addListener((ChannelFutureListener) channelFuture -> { + // 服务器已启动 + }); + channelFuture.channel().closeFuture().sync(); + } finally { + shutdown(); + // 服务器已关闭 + } + } + + public static void shutdown() { + if (channelFuture != null) { + channelFuture.channel().close().syncUninterruptibly(); + } + if ((bossGroup != null) && (!bossGroup.isShutdown())) { + bossGroup.shutdownGracefully(); + } + if ((workerGroup != null) && (!workerGroup.isShutdown())) { + workerGroup.shutdownGracefully(); + } + } } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java b/netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java index dd340ae..bc2124a 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java +++ b/netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java @@ -1,98 +1,97 @@ package com.luck.server; - -import java.util.UUID; - + import com.luck.msg.MyMsg; - import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.internal.StringUtil; - + +import java.util.UUID; + public class VisitorHandler extends SimpleChannelInboundHandler { - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - // 访客连接上代理服务器了 - Channel visitorChannel = ctx.channel(); - // 先不读取访客数据 - visitorChannel.config().setOption(ChannelOption.AUTO_READ, false); - - // 生成访客ID - String vid = UUID.randomUUID().toString(); - - // 绑定访客通道 - visitorChannel.attr(Constant.VID).set(vid); - Constant.vvc.put(vid, visitorChannel); - - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_CONNECT); - myMsg.setData(vid.getBytes()); - Constant.clientChannel.writeAndFlush(myMsg); - - super.channelActive(ctx); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - return; - } - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_TRANSFER); - myMsg.setData(bytes); - - // 代理服务器发送数据到客户端了 - Channel clientChannel = Constant.vcc.get(vid); - clientChannel.writeAndFlush(myMsg); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String vid = ctx.channel().attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelInactive(ctx); - return; - } - Channel clientChannel = Constant.vcc.get(vid); - if (clientChannel != null && clientChannel.isActive()) { - - clientChannel.config().setOption(ChannelOption.AUTO_READ, true); - - // 通知客户端,访客连接已经断开 - MyMsg myMsg = new MyMsg(); - myMsg.setType(MyMsg.TYPE_DISCONNECT); - myMsg.setData(vid.getBytes()); - clientChannel.writeAndFlush(myMsg); - } - Constant.clearVccVvc(vid); - super.channelInactive(ctx); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - - Channel visitorChannel = ctx.channel(); - String vid = visitorChannel.attr(Constant.VID).get(); - if (StringUtil.isNullOrEmpty(vid)) { - super.channelWritabilityChanged(ctx); - return; - } - Channel clientChannel = Constant.vcc.get(vid); - if (clientChannel != null) { - clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); - } - - super.channelWritabilityChanged(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.close(); - } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // 访客连接上代理服务器了 + Channel visitorChannel = ctx.channel(); + // 先不读取访客数据 + visitorChannel.config().setOption(ChannelOption.AUTO_READ, false); + + // 生成访客ID + String vid = UUID.randomUUID().toString(); + + // 绑定访客通道 + visitorChannel.attr(Constant.VID).set(vid); + Constant.vvc.put(vid, visitorChannel); + + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_CONNECT); + myMsg.setData(vid.getBytes()); + Constant.clientChannel.writeAndFlush(myMsg); + + super.channelActive(ctx); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + return; + } + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_TRANSFER); + myMsg.setData(bytes); + + // 代理服务器发送数据到客户端了 + Channel clientChannel = Constant.vcc.get(vid); + clientChannel.writeAndFlush(myMsg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String vid = ctx.channel().attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelInactive(ctx); + return; + } + Channel clientChannel = Constant.vcc.get(vid); + if (clientChannel != null && clientChannel.isActive()) { + + clientChannel.config().setOption(ChannelOption.AUTO_READ, true); + + // 通知客户端,访客连接已经断开 + MyMsg myMsg = new MyMsg(); + myMsg.setType(MyMsg.TYPE_DISCONNECT); + myMsg.setData(vid.getBytes()); + clientChannel.writeAndFlush(myMsg); + } + Constant.clearVccVvc(vid); + super.channelInactive(ctx); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + + Channel visitorChannel = ctx.channel(); + String vid = visitorChannel.attr(Constant.VID).get(); + if (StringUtil.isNullOrEmpty(vid)) { + super.channelWritabilityChanged(ctx); + return; + } + Channel clientChannel = Constant.vcc.get(vid); + if (clientChannel != null) { + clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); + } + + super.channelWritabilityChanged(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java b/netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java index 9aba8b5..2affe79 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java +++ b/netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java @@ -1,5 +1,5 @@ package com.luck.server; - + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelInitializer; @@ -8,30 +8,30 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; - + public class VisitorSocket { - private static EventLoopGroup bossGroup = new NioEventLoopGroup(); - private static EventLoopGroup workerGroup = new NioEventLoopGroup(); - - /** - * 启动服务代理 - * - * @throws Exception - */ - public static void startServer() throws Exception { - - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new ChannelDuplexHandler()); - pipeline.addLast(new VisitorHandler()); - } - }); - b.bind(Constant.visitorPort).get(); - - } - + private static EventLoopGroup bossGroup = new NioEventLoopGroup(); + private static EventLoopGroup workerGroup = new NioEventLoopGroup(); + + /** + * 启动服务代理 + * + * @throws Exception + */ + public static void startServer() throws Exception { + + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new ChannelDuplexHandler()); + pipeline.addLast(new VisitorHandler()); + } + }); + b.bind(Constant.visitorPort).get(); + + } + } \ No newline at end of file diff --git a/netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java b/netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java index 126da41..9e09362 100644 --- a/netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java +++ b/netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java @@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.GetMapping; public class ServerController { @GetMapping("/version") - public Result version(){ + public Result version() { return ResultFactory.successOf("服务端端版本"); } } diff --git a/pom.xml b/pom.xml index e60f5c0..d2d7958 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,5 @@ - 4.0.0 pom