目录
因为设备的通信协议准备采用
protobuf
,所以准备这篇protobuf
的使用入门,golang
作为客户端,java
作为服务端,这才能真正体现出protobuf
的无关语言特性。本文采用
protobuf2
,注重于如何快速入门使用,并不会涉及到具体的细节知识点。
整体结构说明
golang
作为客户端,java
作为服务端,protobuf2
为两者的通信协议格式。
protobuf2文件
protobuf2简介
helloworld.proto
syntax = "proto2";package proto;message ProtocolMessage { message SearchRequest{ required string name = 1; optional int32 search = 2 ; } message ActionRequest{ required string name = 1; optional int32 action = 2 ; } message SearchResponse{ required string name = 1; optional int32 search = 2 ; } message ActionResponse{ required string name = 1; optional int32 action = 2 ; } optional SearchRequest searchRequest = 1; optional ActionRequest actionRequest = 2; optional SearchResponse searchResponse = 3; optional ActionResponse actionResponse = 4;}
SearchRequest
和SearchResponse
为对应的请求和相应message;ActionRequest
和ActionResponse
为对应的请求和相应message;- 由于服务端使用
netty
框架,限制了只能接受一个message进行编码解码,所以把SearchRequest
、SearchResponse
、ActionRequest
和ActionResponse
都内嵌到ProtocolMessage
中,通过对ProtocolMessage
编码解码进行数据交互。
golang客户端
目录结构
client_proto/├── api│ ├── proto # 存放proto协议文件以及生产的pd.go文件│ ├── helloworld.pb.go│ └── helloworld.proto├── cmd│ ├── main.go│ ├── util│ └── util.go
采用go mod
进行开发
生成pb.go文件
安装proto
自行百度......
在.proto文件处,输入
protoc --go_out=./ helloworld.proto
即可生成
helloworld.pb.go
文件
main.go
package mainimport ( "github.com/gin-gonic/gin" proto "grpc/api/grpc_proto" "grpc/cmd/demo3/util" "net/http" "time")func init() { util.InitTransfer()}func main() { router := gin.Default() // search 测试 router.GET("/search", func(c *gin.Context) { name := "search" search := int32(12) message := &proto.ProtocolMessage{ SearchRequest:&proto.ProtocolMessage_SearchRequest{ Name:&name, Search:&search, }, } if err := util.G_transfer.SendMsg(message); err != nil { c.JSON(500, gin.H{ "err": err.Error(), }) return } if err := util.G_transfer.ReadResponse(message); err != nil { c.JSON(500, gin.H{ "err": err.Error(), }) return } c.JSON(200, gin.H{ "message": message.SearchResponse.Name, }) }) // action测试 router.GET("/action", func(c *gin.Context) { name := "action" action := int32(34) message := &proto.ProtocolMessage{ ActionRequest: &proto.ProtocolMessage_ActionRequest{ Name: &name, Action: &action, }, } if err := util.G_transfer.SendMsg(message); err != nil { c.JSON(500, gin.H{ "err": err.Error(), }) } if err := util.G_transfer.ReadResponse(message); err != nil { c.JSON(500, gin.H{ "err": err.Error(), }) } c.JSON(200, gin.H{ "message": message.ActionResponse.Name, }) }) ReadTimeout := time.Duration(60) * time.Second WriteTimeout := time.Duration(60) * time.Second s := &http.Server{ Addr: ":8090", Handler: router, ReadTimeout: ReadTimeout, WriteTimeout: WriteTimeout, MaxHeaderBytes: 1 << 20, } s.ListenAndServe()}
util.go
package utilimport ( "encoding/binary" "errors" "github.com/gogo/protobuf/proto" grpc_proto "grpc/api/grpc_proto" "net")var ( G_transfer *Transfer)func InitTransfer() { var ( pTCPAddr *net.TCPAddr conn net.Conn err error ) if pTCPAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:3210"); err != nil { return } if conn, err = net.DialTCP("tcp", nil, pTCPAddr); err != nil { return } // 定义 Transfer 指针变量 G_transfer = &Transfer{ Conn: conn, }}// 声明 Transfer 结构体type Transfer struct { Conn net.Conn // 连接 Buf [1024 * 2]byte // 传输时,使用的缓冲}// 获取并解析服务器的消息func (transfer *Transfer) ReadResponse(response *grpc_proto.ProtocolMessage) (err error) { _, err = transfer.Conn.Read(transfer.Buf[:4]) if err != nil { return } // 根据 buf[:4] 转成一个 uint32 类型 var pkgLen uint32 pkgLen = binary.BigEndian.Uint32(transfer.Buf[:4]) //根据pkglen 读取消息内容 n, err := transfer.Conn.Read(transfer.Buf[:pkgLen]) if n != int(pkgLen) || err != nil { return } if err = proto.Unmarshal(transfer.Buf[:pkgLen], response); err != nil { return } return}// 发送消息到服务器func (transfer *Transfer) SendMsg(action *grpc_proto.ProtocolMessage) (err error) { var ( sendBytes []byte readLen int ) //sendBytes, ints := action.Descriptor() if sendBytes, err = proto.Marshal(action); err != nil { return } pkgLen := uint32(len(sendBytes)) var buf [4]byte binary.BigEndian.PutUint32(buf[:4],pkgLen) if readLen, err = transfer.Conn.Write(buf[:4]); readLen != 4 && err != nil { if readLen == 0 { return errors.New("发送数据长度发生异常,长度为0") } return } // 发送消息 if readLen, err = transfer.Conn.Write(sendBytes); err != nil { if readLen == 0 { return errors.New("检查到服务器关闭,客户端也关闭") } return } return}
- 这里发送消息和读取消息都需要先发送/解析数据的长度,然后发送/解析数据本身;
- 这里与服务端怎么样解析/发送数据有关,这是由于
netty
框架中定义的编码解码器决定的。
java服务端
目录结构
server_proto/├── src│ ├── main │ ├── java│ ├── com│ ├── dust│ ├── proto_server│ ├── config│ └── NettyConfig.java │ ├── netty│ └── NettyServerListener.java │ └── SocketServerHandler.java │ ├── proto│ └── Helloworld.java │ └── helloworld.proto # proto配置文件│ └── Application.java # 启动配置类│ ├── resources│ └── application.yml #配置文件│ ├── test└── pom.xml # maven配置文件
采用springBoot
+netty
+maven
开发
pom.xml
4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.6.RELEASE com.dust proto_server 0.0.1-SNAPSHOT proto_server Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test com.google.protobuf protobuf-java 3.8.0 com.googlecode.protobuf-java-format protobuf-java-format 1.2 io.netty netty-all 4.1.19.Final org.springframework.boot spring-boot-maven-plugin
- 注意:
protobuf-java
的版本为3.8.0
,必须和安装proto.exe
的版本保持一致。
application.yml
# netty配置netty: # 端口号 port: 3210 # 最大线程数 maxThreads: 1024 # 数据包的最大长度 max_frame_length: 65535
NettyConfig.java
package com.dust.proto_server.config;import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;@Data@Component@ConfigurationProperties(prefix = "netty")public class NettyConfig { private int port;}
生成Helloworld.java
- 在.proto文件处,输入
protoc --java_out=./ helloworld.proto
- 即可生成
Helloworld.java
文件
SocketServerHandler.java
package com.dust.proto_server.netty;import com.dust.proto_server.proto.Helloworld;import io.netty.channel.Channel;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Component@ChannelHandler.Sharablepublic class SocketServerHandler extends ChannelInboundHandlerAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerHandler.class); public ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx){ Channel channel = ctx.channel(); LOGGER.info(channel.id().toString()+"加入"); CHANNEL_GROUP.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx){ Channel channel = ctx.channel(); LOGGER.info(channel.id().toString()+"退出"); CHANNEL_GROUP.remove(channel); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }// @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { LOGGER.info("开始读取客户端发送过来的数据"); Helloworld.ProtocolMessage protocolMessage = (Helloworld.ProtocolMessage) msg; Helloworld.ProtocolMessage.Builder builder = Helloworld.ProtocolMessage.newBuilder(); if (protocolMessage.getSearchRequest().getSerializedSize() != 0) { Helloworld.ProtocolMessage.SearchRequest searchRequest = protocolMessage.getSearchRequest(); LOGGER.info("searchRequest--{}",searchRequest); Helloworld.ProtocolMessage.SearchResponse searchResponse = Helloworld.ProtocolMessage.SearchResponse.newBuilder().setName("i am SearchResponse").setSearch(45).build(); builder.setSearchResponse(searchResponse); } else if (protocolMessage.getActionRequest().getSerializedSize() != 0) { Helloworld.ProtocolMessage.ActionRequest actionRequest = protocolMessage.getActionRequest(); LOGGER.info("actionRequest--{}",actionRequest); Helloworld.ProtocolMessage.ActionResponse actionResponse = Helloworld.ProtocolMessage.ActionResponse.newBuilder().setName("i am ActionResponse").setAction(67).build(); builder.setActionResponse(actionResponse); } Helloworld.ProtocolMessage message = builder.build(); // 发送数据长度 ctx.channel().writeAndFlush(message.toByteArray().length); // 发送数据本身 ctx.channel().writeAndFlush(message); }}
NettyServerListener.java
package com.dust.proto_server.netty;import com.dust.proto_server.config.NettyConfig;import com.dust.proto_server.proto.Helloworld;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;import javax.annotation.Resource;@Componentpublic class NettyServerListener { /** * NettyServerListener 日志输出器 * */ private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class); /** * 创建bootstrap */ ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * BOSS */ EventLoopGroup boss = new NioEventLoopGroup(); /** * Worker */ EventLoopGroup work = new NioEventLoopGroup(); @Resource private SocketServerHandler socketServerHandler; /** * NETT服务器配置类 */ @Resource private NettyConfig nettyConfig; /** * 关闭服务器方法 */ @PreDestroy public void close() { LOGGER.info("关闭服务器...."); //优雅退出 boss.shutdownGracefully(); work.shutdownGracefully(); } /** * 开启及服务线程 */ public void start() { // 从配置文件中(application.yml)获取服务端监听端口号 int port = nettyConfig.getPort(); serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 负责通过4字节Header指定的Body长度将消息切割 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); // 负责将frameDecoder处理后的完整的一条消息的protobuf字节码转成ProtocolMessage对象 pipeline.addLast("protobufDecoder", new ProtobufDecoder(Helloworld.ProtocolMessage.getDefaultInstance())); // 负责将写入的字节码加上4字节Header前缀来指定Body长度 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 负责将ProtocolMessage对象转成protobuf字节码 pipeline.addLast("protobufEncoder", new ProtobufEncoder()); pipeline.addLast(socketServerHandler); } }).option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)); try { LOGGER.info("netty服务器在[{}]端口启动监听", port); ChannelFuture f = serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.info("[出现异常] 释放资源"); boss.shutdownGracefully(); work.shutdownGracefully(); } }}
- 这个类就定义服务端是怎么样处理接受和发送数据的;
frameDecoder
和protobufDecoder
对应的handler用于解码Protobuf package数据包,他们都是Upstream Handles:先处理长度,然后再处理数据本身;frameEncoder
和protobufEncoder
对应的handler用于编码Protobuf package数据包,他们都是Downstream Handles;此外还有一个handler,是一个自定义的Upstream Handles,用于开发者从网络数据中解析得到自己所需的数据
socketServerHandler
;上例Handles的执行顺序为
upstream:frameDecoder,protobufDecoder,handler //解码从Socket收到的数据 downstream:frameEncoder,protobufEncoder //编码要通过Socket发送出去的数据
Application.java
package com.dust.proto_server;import com.dust.proto_server.netty.NettyServerListener;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;@SpringBootApplicationpublic class Application implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Resource private NettyServerListener nettyServerListener; @Override public void run(String... args) throws Exception { nettyServerListener.start(); }}
测试
先启动服务端,再启动客户端
search测试
- action测试