netty分布式服务如何实现

在说nettty分布式之前,首先说下,netty是基于nio编程的,如果大家对nio不熟悉还是先看下nio相关的知识。

netty的线程模型和核心组件

1:netty的线程模型
netty通过Reactor模型基于多路复用器接收并处理用户请求(能讲就多讲一点),内部实现了两个线程池,boss线程池和work线程池,其中boss线程池的线程负责处理请求的accept事件,当接收到accept事件的请求时,把对应的socket封装到一个NioSocketChannel中,并交给work线程池,其中work线程池负责请求的read和write事件
在这里插入图片描述

2.Netty核心组件
Bootstrap和ServerBootstrap:Netty应用程序通过设置bootstrap引导类来完成,该类提供了一个用于应用程序网络层配置的容器。Bootstrap服务端的是ServerBootstrap,客户端的是Bootstrap。
Channel:Netty 中的接口 Channel 定义了与 socket 丰富交互的操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等。
ChannelHandler:ChannelHandler 支持很多协议,并且提供用于数据处理的容器,ChannelHandler由特定事件触发, 常用的一个接口是ChannelInboundHandler,该类型处理入站读数据(socket读事件)。
ChannelPipeline:ChannelPipeline 提供了一个容器给 ChannelHandler 链并提供了一个API 用于管理沿着链入站和出站事件的流动。每个 Channel 都有自己的ChannelPipeline,当 Channel 创建时自动创建的。 下图说明了ChannelHandler和ChannelPipeline二者的关系:
在这里插入图片描述

EventLoop:EventLoop 用于处理 Channel 的 I/O 操作。一个单一的 EventLoop通常会处理多个 Channel 事件。一个 EventLoopGroup 可以含有多于一个的 EventLoop 和 提供了一种迭代用于检索清单中的下一个。
ChannelFuture:Netty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后获取它的结果。出于这个目的,Netty 提供了接口 ChannelFuture,它的 addListener 方法
Netty 是一个非阻塞、事件驱动的网络框架。Netty 实际上是使用 Threads( 多线程) 处理 I/O事件的,对于熟悉多线程编程的读者可能会需要关注同步代码。这样的方式不好,因为同步会影响程序的性能,Netty 的设计保证程序处理事件不会有同步。因为某个Channel事件是被添加到一个EventLoop中的,以后该Channel事件都是由该EventLoop来处理的,而EventLoop是一个线程来处理的,也就是说Netty不需要同步IO操作,EventLoop与EventLoopGroup的关系可以理解为线程与线程池的关系一样。

单机版netty

服务端代码

public class NettySever {
    public static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public static void main(String[] args) throws  Exception{

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        try {


            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            //获取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new MyHandler());
                        }}
                        )
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture sync = serverBootstrap.bind(8888).sync();
            ChannelFuture channelFuture = sync.channel().closeFuture().sync();


        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

客户端代码

public class NettyClient {
    public static void main(String[] args) throws Exception{
        NioEventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        Bootstrap bootstrap =new Bootstrap();
        try {

            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入相关handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自定义的handler
                            pipeline.addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                    System.out.println(s);
                                }
                            });
                        }
                    });
            ChannelFuture sync = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888)).sync();
            Channel channel = sync.channel();
            Scanner sc =new Scanner(System.in);
            while (sc.hasNextLine()){
               // System.out.println(sc.next());
                channel.writeAndFlush(sc.nextLine());
            }
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

自定义handle

public class MyhandlerAdapter extends ChannelInitializer<SocketChannel> {


    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new StringDecoder())
                .addLast(new StringEncoder())
                .addLast(new MyHandler());
    }
}

public class MyHandler extends SimpleChannelInboundHandler<String> {
// 每一个服务端都可以维护注册在自己上面的channel,当然有些需要自己去维护,比如上线的时候新增,下线删除。也可以自定义一个。
    public static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 给其他服务端说,某某某上线了
        Channel channel = ctx.channel();
        ChannelId id = channel.id();

        channelGroup.add(channel);
        System.out.println(channel.remoteAddress()+"上线了");
        channelGroup.forEach(c -> {
            if(channel==c){
                c.writeAndFlush("我自己上线了");
            }else {
                c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"上线了");
            }
        });


    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 给其他服务端说,某某某上线了
        Channel channel = ctx.channel();
        channelGroup.forEach(c -> {
            if(channel==c){
                c.writeAndFlush("我自己下线了");
            }else {
                c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"下线了");
            }
        });

    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {

            System.out.println(s);
            Channel channel = channelHandlerContext.channel();
            channelGroup.forEach(c -> {
                if(channel==c){
                    c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"我自己说  " + s);
                }else {
                    c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"说  " + s);
                }
            });
        }
    }

上面的代码基本是就是一个单机版本的netty,但是如果并发量大,用户多的情况下,单机肯定不满足,那么如何做到分布式呢,分布式的难点又在哪呢?

分布式netty的解决思路

1 : 假如有两台服务器,服务器A,服务器B,小王登录到了服务器A上,小李登录到了服务器B上,那么小王想跟小李聊天,怎么样才行呢,我想大家最先想到的是每台服务器都可以维护注册在自己服务器上的SorketChannel,想让所有的服务器共享的话,怎么才能实现呢?
2:那么我们直接用第三方的redis,可以实现数据共享,但是SorketChannel是不能被序列化的,所以这种借助第三方的的是不可行的。
3 :借助路由的形式,比如小王发消息给小李,我们可以直接给小李所在的服务器B发消息,这样消息肯定能送达,但是小王怎么知道小李在哪一台服务器上呢?这个时候其实在登录的时候我们会去选择一台服务器,我们可以在redis里面存储起来,那么不论谁给谁发消息都是可以实现的

// 用户登录的时候保存用户和自己被分配的NioSocketChannel,后面可以根据用户id查到相关的NioSocketChannel,发送消息
        if (msg.getType() == Constants.CommandType.LOGIN) {
            //保存客户端与 Channel 之间的关系
            SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel());
            SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg());
            LOGGER.info("客户端[{}]上线成功", msg.getReqMsg());
        }
 @Override
    public void saveRouteInfo(LoginReqVO loginReqVO, String msg) throws Exception {
    // key是一个固定前缀加上用户id,msg就是登录服务器的地址
        String key = ROUTE_PREFIX + loginReqVO.getUserId();
        redisTemplate.opsForValue().set(key, msg);
    }

讲到这里你应该有些思路了吧,但是真正的想要做一个im及时通讯,还有好多地方要考虑,服务器A突然挂掉,那么小王怎么办,要是小王自己没网了,那么服务器A还要保存小王的NioSocketChannel吗?
当然真正的应该是服务器A挂了,应该让小王重新注册到B上,如果小王自己网络不好,应该先重连,重连到给定的最大次数,应该就踢掉小王,避免不必要的资源浪费。

当然这个可以用心跳机制去完成。netty本身就有一个IdleStateHandler,我们可以ping-pong去看服务器和客户端是否正常连接。但是什么时候去ping,不可能我们正常聊天的时候去触发这个事件吧,我们可以在写空闲的时候去发送ping,客户端如果不是正常退出的话会触发

@Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        if (shutDownMsg == null){
            shutDownMsg = SpringBeanFactory.getBean(ShutDownMsg.class) ;
        }

        //用户主动退出,不执行重连逻辑
        if (shutDownMsg.checkStatus()){
            return;
        }

        if (scheduledExecutorService == null){
            scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
        }
        LOGGER.info("客户端断开了,重新连接!");
       // 开启一个线程去重连,重连  scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;

重连就是先看自己是不是好了,还没好的话就下线,清楚自己的路由,重新选择一个服务器(这个要可以自己实现算法,hash,随机,权重)

 public void reconnect() throws Exception {
        if (channel != null && channel.isActive()) {
            return;
        }
        //首先清除路由信息,下线
        routeRequest.offLine();

        LOGGER.info("reconnect....");
        start();
        LOGGER.info("reconnect success");
    }

转自https://blog.csdn.net/wzx7612302/article/details/104794550