上篇文章讲解了自定义通信协议,本章节介绍如何支持多种协议。

会构建一个Server,同时支持Cat,Dog和People通信协议。有二种实现方式:

  • 第一种方式利用了自定义协议,传递消息的时候,对消息的前几位(比如2位)进行自定义的位置(比如AB)解码器解析的时候前二位为AB表示一种协议类型,CD一种协议类型。这种方式没有利用protobuf,而是直接使用Netty自定义协议来解决的方案。
  • 第二种方式使用protobuf来实现,实际上是对消息的定义方式进行规定,因为netty本身,客户端和服务器端建立的是一条TCP连接,一方必须要判断对方发送过来的对象是什么类型。

Protocol Buffers实现netty的多种传输协议

我们知道使用Protocol Buffers首先定义一个.proto文件

定义一个最外层的消息,最外层的消息(MyMessage)包含了所有传递的消息类型,所有的消息类型嵌套在最外层的消息类型中,每次传递都将传递具体消息类型(以最外层消息类型的枚举类型传递)

syntax ="proto2";

package com.zhihao.miao.netty.sixthexample;

option optimize_for = SPEED;
option java_package = "com.zhihao.miao.netty.seventhexample";
option java_outer_classname="MyDataInfo";

message MyMessage {

    enum DataType{
        PeopleType = 1;
        DogType = 2;
        CatType = 3;
    }

    required DataType data_type = 1;

    //oneof的意思:如果有多个可选字段,在某一个时刻只能只有一个值被设置,可以节省内存空间
    oneof dataBody {
        People people = 2;
        Dog dog = 3;
        Cat cat = 4;
    }
}

message People{
    optional string name = 1;
    optional int32 age = 2;
    optional string address = 3;
}

message Dog{
    optional string name = 1;
    optional string age = 2;
}

message Cat{
    optional string name = 1;
    optional string city = 2;
}

使用编译器编译生成代码

protoc --java_out=src/main/java src/protobuf/People.proto

关于proto协议中的Oneof含义,如果有多个可选字段,在某一个时刻只能只有一个值被设置,官方链接,生成MyDataInfo类,类代码太多,这边不贴出了

服务端代码:

package com.zhihao.miao.netty.seventhexample;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class TestServer {
    public static void main(String[] args) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new TestServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }
    }
}

服务端初始化链接:

package com.zhihao.miao.netty.seventhexample;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;


public class TestServerInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //使用最外层的消息实例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());

        pipeline.addLast(new TestServerHandler());
    }
}

其实实现的关键就在于此,使用MyDataInfo.MyMessage实列(MyDataInfo.MyMessage是枚举类型),而我们定义的三种对象刚好就是其枚举对象

自定义的服务端的Handler,根据通道中传递数据的不同dataType值来解析程具体的类型:

package com.zhihao.miao.netty.seventhexample;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
        MyDataInfo.MyMessage.DataType dataType = msg.getDataType();

        if(dataType == MyDataInfo.MyMessage.DataType.PeopleType){
            MyDataInfo.People people = msg.getPeople();

            System.out.println(people.getName());
            System.out.println(people.getAge());
            System.out.println(people.getAddress());
        }else if(dataType == MyDataInfo.MyMessage.DataType.DogType){
            MyDataInfo.Dog dog = msg.getDog();

            System.out.println(dog.getName());
            System.out.println(dog.getAge());
        }else if(dataType == MyDataInfo.MyMessage.DataType.CatType){
            MyDataInfo.Cat cat = msg.getCat();

            System.out.println(cat.getName());
            System.out.println(cat.getCity());
        }
    }
}

客户端代码:

package com.zhihao.miao.netty.seventhexample;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TestClient {

    public static void main(String[] args) throws Exception{
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new TestClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost",8888).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

客户端的初始化链接:

package com.zhihao.miao.netty.seventhexample;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class TestClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //使用最外层的消息实例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());

        pipeline.addLast(new TestClientHandler());
    }
}

自定义处理器端的handler,随机发送不同协议的数据:

package com.zhihao.miao.netty.seventhexample;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Random;

public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {

    }

    //客户端像服务器端发送数据
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        int randomInt = new Random().nextInt(3);

        MyDataInfo.MyMessage myMessage = null;

        if(0 == randomInt){
            myMessage = MyDataInfo.MyMessage.newBuilder().
                    setDataType(MyDataInfo.MyMessage.DataType.PeopleType).
                    setPeople(MyDataInfo.People.newBuilder().setName("张三").
                            setAddress("上海").setAge(26).build()).build();
        }else if(1 == randomInt){
            myMessage = MyDataInfo.MyMessage.newBuilder().
                    setDataType(MyDataInfo.MyMessage.DataType.DogType).
                    setDog(MyDataInfo.Dog.newBuilder().setName("旺财")
                            .setAge("2").build()).build();
        }else if(2 == randomInt){
            myMessage = MyDataInfo.MyMessage.newBuilder().
                    setDataType(MyDataInfo.MyMessage.DataType.CatType).
                    setCat(MyDataInfo.Cat.newBuilder().setName("汤姆")
                            .setCity("上海").build()).build();
        }

        ctx.channel().writeAndFlush(myMessage);
    }
}

启动服务器端,然后启动客户端多执行几次,服务器的控制台显示:

七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x82a26e9f, L:/127.0.0.1:8888 - R:/127.0.0.1:51777]
七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
汤姆
上海
七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x128da3e7, L:/127.0.0.1:8888 - R:/127.0.0.1:52049]
七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
张三
26
上海
七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xa8220c73, L:/127.0.0.1:8888 - R:/127.0.0.1:52097]
七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
汤姆
上海
七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x9ac52ec1, L:/127.0.0.1:8888 - R:/127.0.0.1:52125]
七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
张三
26
上海
七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x797d03b6, L:/127.0.0.1:8888 - R:/127.0.0.1:52178]
七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
旺财
2

使用netty实现多种传输协议

官网类似的demo,自己写了很长也参考了官网才写出这个demo,对netty的理解又加深了:

三种协议实体类:

Person协议

package com.zhihao.miao.test.day10;

public class Person {

    private String username;

    private int age;
    
    //get set方法

}

Dog协议

package com.zhihao.miao.test.day10;
public class Dog {

    private String name;

    private String age;

    //get set方法
}

Cat协议

package com.zhihao.miao.test.day10;

public class Cat {
    private String name;
    private String city;
  //get set方法
}

服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class MultiServer {

    public static void main(String args[]) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 指定socket的一些属性
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)  // 指定是一个NIO连接通道
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ServerChannelInitializer());

        // 绑定对应的端口号,并启动开始监听端口上的连接
        Channel ch = serverBootstrap.bind(8899).sync().channel();


        // 等待关闭,同步端口
        ch.closeFuture().sync();

    }
}

服务器端初始化lInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //解析handler
        pipeline.addLast(new ServlerDecoder());
        pipeline.addLast(new TestServerHandler());
    }
}

服务端解码器Handler,如果解析的位置数据是0则按照 Person协议进行解码,如果传递的位置数据是1,则按照Dog协议进行解码,如果传递的位置数据是2,则按照Cat协议进行解码:

public class ServlerDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        int flag = in.readInt();

        if(flag == 0){
            int usernamelength = in.readInt();

            byte[] usernamebys = new byte[usernamelength];
            in.readBytes(usernamebys);

            String username = new String(usernamebys);

            int age = in.readInt();

            Person pserson = new Person();
            pserson.setUsername(username);
            pserson.setAge(age);

            out.add(pserson);


        }
        if(flag ==1){
            int namelength =in.readInt();

            byte[] namebys = new byte[namelength];
            in.readBytes(namebys);

            String name = new String(namebys);

            byte[] agebys = new byte[in.readableBytes()];
            in.readBytes(agebys);

            String age = new String(agebys);

            Dog dog = new Dog();
            dog.setName(name);
            dog.setAge(age);

            out.add(dog);
        }
        if(flag ==2){
            int namelength = in.readInt();

            byte[] nameByte = new byte[namelength];
            in.readBytes(nameByte);

            String name = new String(nameByte);

            byte[] colorbys = new byte[in.readableBytes()];
            in.readBytes(colorbys);

            String color = new String(colorbys);

            Cat cat = new Cat();
            cat.setName(name);
            cat.setColor(color);

            out.add(cat);
        }
    }

自定义服务器端Handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TestServerHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof Person){
            System.out.println(((Person) msg).getUsername());
            System.out.println(((Person) msg).getAge());
        }

        if(msg instanceof Dog){
            System.out.println(((Dog) msg).getName());
            System.out.println(((Dog) msg).getAge());
        }

        if(msg instanceof Cat){
            System.out.println(((Cat) msg).getName());
            System.out.println(((Cat) msg).getColor());
        }
    }
}

客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MultiClient {

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class).handler(new ClientChannelInitializer());

        // Start the connection attempt.
        Channel ch = b.connect("127.0.0.1", 8899).sync().channel();

        ch.flush();
    }
}

客户端初始化Initializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

import java.util.Random;

public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

    

三种自定义编码协议,与服务器端进行对应传输Person数据的时候,在Person数据之前加上标识位置数据0,在Dog数据之前加上标识位置数据1,在Cat数据之前加上标识位置数据2,然后将其与本身的数据一起编码成二进制进行传输。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class PersonEncoder extends MessageToByteEncoder<Person> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
        String username = msg.getUsername();
        int usernamelength = username.length();
        int age = msg.getAge();

        out.writeInt(0); //标识位
        out.writeInt(usernamelength);
        out.writeBytes(username.getBytes());
        out.writeInt(age);
    }
}

Dog协议编码器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class DogEncoder extends MessageToByteEncoder<Dog> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Dog msg, ByteBuf out) throws Exception {

        String name = msg.getName();
        int namelength = name.length();
        String age = msg.getAge();

        out.writeInt(1); //标识位
        out.writeInt(namelength);
        out.writeBytes(name.getBytes());
        out.writeBytes(age.getBytes());
    }
}

Cat协议编码器:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class CatEncoder extends MessageToByteEncoder<Cat> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Cat msg, ByteBuf out) throws Exception {
        String name = msg.getName();
        int namelength = name.length();
        String color = msg.getColor();

        out.writeInt(2); //标识位
        out.writeInt(namelength);
        out.writeBytes(name.getBytes());
        out.writeBytes(color.getBytes());

    }
}

自定义客户端处理器:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TestClientHandler extends ChannelInboundHandlerAdapter {

    private Person person;

    private Cat cat;

    private Dog dog;

    public TestClientHandler(Person person){
        this.person = person;
    }

    public TestClientHandler(Dog dog){
        this.dog = dog;
    }

    public TestClientHandler(Cat cat){
        this.cat =cat;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if(person != null){
            ctx.channel().writeAndFlush(person);
        }

        if(dog != null){
            ctx.channel().writeAndFlush(dog);
        }

        if(cat != null){
            ctx.channel().writeAndFlush(cat);
        }
    }
}

启动服务端,再多次启动客户端,服务器控制台打印出不同协议传输的结果

maomi
yellow
十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0xf40f7b07, L:/127.0.0.1:8899 - R:/127.0.0.1:57879]
十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
wangcai
2
十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x3384f158, L:/127.0.0.1:8899 - R:/127.0.0.1:57914]
十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
zhihao
27

demo链接

   

转自:https://www.jianshu.com/p/9466c24beaa2