上篇文章讲解了自定义通信协议,本章节介绍如何支持多种协议。
会构建一个Server,同时支持Cat,Dog和People通信协议。有二种实现方式:
- 第一种方式利用了自定义协议,传递消息的时候,对消息的前几位(比如2位)进行自定义的位置(比如AB)解码器解析的时候前二位为AB表示一种协议类型,CD一种协议类型。这种方式没有利用protobuf,而是直接使用Netty自定义协议来解决的方案。
- 第二种方式使用protobuf来实现,实际上是对消息的定义方式进行规定,因为netty本身,客户端和服务器端建立的是一条TCP连接,一方必须要判断对方发送过来的对象是什么类型。
Protocol Buffers实现netty的多种传输协议
我们知道使用Protocol Buffers首先定义一个.proto文件
定义一个最外层的消息,最外层的消息(MyMessage)包含了所有传递的消息类型,所有的消息类型嵌套在最外层的消息类型中,每次传递都将传递具体消息类型(以最外层消息类型的枚举类型传递)
syntax ="proto2";
package com.zhihao.miao.netty.sixthexample;
option optimize_for = SPEED;
option java_package = "com.zhihao.miao.netty.seventhexample";
option java_outer_classname="MyDataInfo";
message MyMessage {
enum DataType{
PeopleType = 1;
DogType = 2;
CatType = 3;
}
required DataType data_type = 1;
//oneof的意思:如果有多个可选字段,在某一个时刻只能只有一个值被设置,可以节省内存空间
oneof dataBody {
People people = 2;
Dog dog = 3;
Cat cat = 4;
}
}
message People{
optional string name = 1;
optional int32 age = 2;
optional string address = 3;
}
message Dog{
optional string name = 1;
optional string age = 2;
}
message Cat{
optional string name = 1;
optional string city = 2;
}
使用编译器编译生成代码
protoc --java_out=src/main/java src/protobuf/People.proto
关于proto协议中的Oneof含义,如果有多个可选字段,在某一个时刻只能只有一个值被设置,官方链接,生成MyDataInfo类,类代码太多,这边不贴出了
服务端代码:
package com.zhihao.miao.netty.seventhexample;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class TestServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
}
}
服务端初始化链接:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestServerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//使用最外层的消息实例
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
其实实现的关键就在于此,使用MyDataInfo.MyMessage
实列(MyDataInfo.MyMessage
是枚举类型),而我们定义的三种对象刚好就是其枚举对象
自定义的服务端的Handler,根据通道中传递数据的不同dataType值来解析程具体的类型:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if(dataType == MyDataInfo.MyMessage.DataType.PeopleType){
MyDataInfo.People people = msg.getPeople();
System.out.println(people.getName());
System.out.println(people.getAge());
System.out.println(people.getAddress());
}else if(dataType == MyDataInfo.MyMessage.DataType.DogType){
MyDataInfo.Dog dog = msg.getDog();
System.out.println(dog.getName());
System.out.println(dog.getAge());
}else if(dataType == MyDataInfo.MyMessage.DataType.CatType){
MyDataInfo.Cat cat = msg.getCat();
System.out.println(cat.getName());
System.out.println(cat.getCity());
}
}
}
客户端代码:
package com.zhihao.miao.netty.seventhexample;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TestClient {
public static void main(String[] args) throws Exception{
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
客户端的初始化链接:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//使用最外层的消息实例
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestClientHandler());
}
}
自定义处理器端的handler,随机发送不同协议的数据:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Random;
public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
}
//客户端像服务器端发送数据
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int randomInt = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if(0 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.PeopleType).
setPeople(MyDataInfo.People.newBuilder().setName("张三").
setAddress("上海").setAge(26).build()).build();
}else if(1 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.DogType).
setDog(MyDataInfo.Dog.newBuilder().setName("旺财")
.setAge("2").build()).build();
}else if(2 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.CatType).
setCat(MyDataInfo.Cat.newBuilder().setName("汤姆")
.setCity("上海"