博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java和golang通过protobuf协议相互通信
阅读量:5063 次
发布时间:2019-06-12

本文共 15822 字,大约阅读时间需要 52 分钟。

目录

因为设备的通信协议准备采用protobuf,所以准备这篇protobuf的使用入门,golang作为客户端,java作为服务端,这才能真正体现出protobuf的无关语言特性。

本文采用protobuf2,注重于如何快速入门使用,并不会涉及到具体的细节知识点。


整体结构说明

golang作为客户端,java作为服务端,protobuf2为两者的通信协议格式。

1481607-20190724104939976-1145677807.png


protobuf2文件

  • protobuf2简介

  • helloworld.proto

    syntax = "proto2";package proto;message ProtocolMessage {    message SearchRequest{        required string name = 1;        optional int32 search = 2 ;    }    message ActionRequest{        required string name = 1;        optional int32 action = 2 ;    }    message SearchResponse{        required string name = 1;        optional int32 search = 2 ;    }    message ActionResponse{        required string name = 1;        optional int32 action = 2 ;    }    optional SearchRequest searchRequest = 1;    optional ActionRequest actionRequest = 2;    optional SearchResponse searchResponse = 3;    optional ActionResponse actionResponse = 4;}
    • SearchRequestSearchResponse为对应的请求和相应message;
    • ActionRequestActionResponse为对应的请求和相应message;
    • 由于服务端使用netty框架,限制了只能接受一个message进行编码解码,所以把SearchRequestSearchResponseActionRequestActionResponse都内嵌到ProtocolMessage中,通过对ProtocolMessage编码解码进行数据交互。

golang客户端

目录结构

client_proto/├── api│   ├── proto # 存放proto协议文件以及生产的pd.go文件│       ├── helloworld.pb.go│       └── helloworld.proto├── cmd│       ├── main.go│       ├── util│           └── util.go

采用go mod 进行开发

生成pb.go文件

  • 安装proto

    自行百度......

  • 在.proto文件处,输入protoc --go_out=./ helloworld.proto

  • 即可生成helloworld.pb.go文件

main.go

package mainimport (    "github.com/gin-gonic/gin"    proto "grpc/api/grpc_proto"    "grpc/cmd/demo3/util"    "net/http"    "time")func init()  {    util.InitTransfer()}func main() {    router := gin.Default()    // search 测试    router.GET("/search", func(c *gin.Context) {        name := "search"        search := int32(12)        message := &proto.ProtocolMessage{            SearchRequest:&proto.ProtocolMessage_SearchRequest{                Name:&name,                Search:&search,            },        }        if err := util.G_transfer.SendMsg(message); err != nil {            c.JSON(500, gin.H{                "err": err.Error(),            })            return        }        if err := util.G_transfer.ReadResponse(message); err != nil {            c.JSON(500, gin.H{                "err": err.Error(),            })            return        }        c.JSON(200, gin.H{            "message": message.SearchResponse.Name,        })    })    // action测试    router.GET("/action", func(c *gin.Context) {        name := "action"        action := int32(34)        message := &proto.ProtocolMessage{            ActionRequest: &proto.ProtocolMessage_ActionRequest{                Name:   &name,                Action: &action,            },        }        if err := util.G_transfer.SendMsg(message); err != nil {            c.JSON(500, gin.H{                "err": err.Error(),            })        }        if err := util.G_transfer.ReadResponse(message); err != nil {            c.JSON(500, gin.H{                "err": err.Error(),            })        }        c.JSON(200, gin.H{            "message": message.ActionResponse.Name,        })    })    ReadTimeout := time.Duration(60) * time.Second    WriteTimeout := time.Duration(60) * time.Second    s := &http.Server{        Addr:          ":8090",        Handler:        router,        ReadTimeout:    ReadTimeout,        WriteTimeout:   WriteTimeout,        MaxHeaderBytes: 1 << 20,    }    s.ListenAndServe()}

util.go

package utilimport (    "encoding/binary"    "errors"    "github.com/gogo/protobuf/proto"    grpc_proto "grpc/api/grpc_proto"    "net")var (    G_transfer *Transfer)func InitTransfer()  {    var (        pTCPAddr *net.TCPAddr        conn net.Conn        err error    )    if pTCPAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:3210"); err != nil {        return    }    if conn, err = net.DialTCP("tcp", nil, pTCPAddr); err != nil {        return    }    // 定义 Transfer 指针变量    G_transfer = &Transfer{        Conn:  conn,    }}// 声明 Transfer 结构体type Transfer struct {    Conn          net.Conn       // 连接    Buf           [1024 * 2]byte // 传输时,使用的缓冲}// 获取并解析服务器的消息func (transfer *Transfer) ReadResponse(response *grpc_proto.ProtocolMessage) (err error) {    _, err = transfer.Conn.Read(transfer.Buf[:4])    if err != nil {        return    }    // 根据 buf[:4] 转成一个 uint32 类型    var pkgLen uint32    pkgLen = binary.BigEndian.Uint32(transfer.Buf[:4])    //根据pkglen 读取消息内容    n, err := transfer.Conn.Read(transfer.Buf[:pkgLen])    if n != int(pkgLen) || err != nil {        return    }    if err = proto.Unmarshal(transfer.Buf[:pkgLen], response); err != nil {        return    }    return}// 发送消息到服务器func (transfer *Transfer) SendMsg(action *grpc_proto.ProtocolMessage) (err error) {    var (        sendBytes []byte        readLen   int    )    //sendBytes, ints := action.Descriptor()    if sendBytes, err = proto.Marshal(action); err != nil {        return    }    pkgLen := uint32(len(sendBytes))    var buf [4]byte    binary.BigEndian.PutUint32(buf[:4],pkgLen)    if readLen, err = transfer.Conn.Write(buf[:4]); readLen != 4 && err != nil {        if readLen == 0 {            return errors.New("发送数据长度发生异常,长度为0")        }        return    }    // 发送消息    if readLen, err = transfer.Conn.Write(sendBytes); err != nil {        if readLen == 0 {            return errors.New("检查到服务器关闭,客户端也关闭")        }        return    }    return}
  • 这里发送消息和读取消息都需要先发送/解析数据的长度,然后发送/解析数据本身;
  • 这里与服务端怎么样解析/发送数据有关,这是由于netty框架中定义的编码解码器决定的。

java服务端

目录结构

server_proto/├── src│   ├── main │       ├── java│           ├── com│               ├── dust│                   ├── proto_server│                       ├── config│                           └── NettyConfig.java │                       ├── netty│                           └── NettyServerListener.java │                           └── SocketServerHandler.java │                       ├── proto│                           └── Helloworld.java │                           └── helloworld.proto # proto配置文件│                       └── Application.java # 启动配置类│       ├── resources│           └── application.yml #配置文件│   ├── test└── pom.xml # maven配置文件

采用springBoot+netty+maven开发

pom.xml

4.0.0
org.springframework.boot
spring-boot-starter-parent
2.1.6.RELEASE
com.dust
proto_server
0.0.1-SNAPSHOT
proto_server
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
com.google.protobuf
protobuf-java
3.8.0
com.googlecode.protobuf-java-format
protobuf-java-format
1.2
io.netty
netty-all
4.1.19.Final
org.springframework.boot
spring-boot-maven-plugin
  • 注意:protobuf-java的版本为3.8.0,必须和安装proto.exe的版本保持一致。

application.yml

# netty配置netty:  # 端口号  port: 3210  # 最大线程数  maxThreads: 1024  # 数据包的最大长度  max_frame_length: 65535

NettyConfig.java

package com.dust.proto_server.config;import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;@Data@Component@ConfigurationProperties(prefix = "netty")public class NettyConfig {    private int port;}

生成Helloworld.java

  • 在.proto文件处,输入protoc --java_out=./ helloworld.proto
  • 即可生成Helloworld.java文件

SocketServerHandler.java

package com.dust.proto_server.netty;import com.dust.proto_server.proto.Helloworld;import io.netty.channel.Channel;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Component@ChannelHandler.Sharablepublic class SocketServerHandler extends ChannelInboundHandlerAdapter {    private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerHandler.class);    public ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);    @Override    public void handlerAdded(ChannelHandlerContext ctx){        Channel channel = ctx.channel();        LOGGER.info(channel.id().toString()+"加入");        CHANNEL_GROUP.add(channel);    }    @Override    public void handlerRemoved(ChannelHandlerContext ctx){        Channel channel = ctx.channel();        LOGGER.info(channel.id().toString()+"退出");        CHANNEL_GROUP.remove(channel);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }//    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        LOGGER.info("开始读取客户端发送过来的数据");        Helloworld.ProtocolMessage protocolMessage = (Helloworld.ProtocolMessage) msg;        Helloworld.ProtocolMessage.Builder builder = Helloworld.ProtocolMessage.newBuilder();        if (protocolMessage.getSearchRequest().getSerializedSize() != 0) {            Helloworld.ProtocolMessage.SearchRequest searchRequest = protocolMessage.getSearchRequest();            LOGGER.info("searchRequest--{}",searchRequest);            Helloworld.ProtocolMessage.SearchResponse searchResponse = Helloworld.ProtocolMessage.SearchResponse.newBuilder().setName("i am SearchResponse").setSearch(45).build();            builder.setSearchResponse(searchResponse);        } else if (protocolMessage.getActionRequest().getSerializedSize() != 0) {            Helloworld.ProtocolMessage.ActionRequest actionRequest = protocolMessage.getActionRequest();            LOGGER.info("actionRequest--{}",actionRequest);            Helloworld.ProtocolMessage.ActionResponse actionResponse = Helloworld.ProtocolMessage.ActionResponse.newBuilder().setName("i am ActionResponse").setAction(67).build();            builder.setActionResponse(actionResponse);        }        Helloworld.ProtocolMessage message = builder.build();        // 发送数据长度        ctx.channel().writeAndFlush(message.toByteArray().length);         // 发送数据本身        ctx.channel().writeAndFlush(message);    }}

NettyServerListener.java

package com.dust.proto_server.netty;import com.dust.proto_server.config.NettyConfig;import com.dust.proto_server.proto.Helloworld;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;import javax.annotation.Resource;@Componentpublic class NettyServerListener {    /**     * NettyServerListener 日志输出器     *     */    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);    /**     * 创建bootstrap     */    ServerBootstrap serverBootstrap = new ServerBootstrap();    /**     * BOSS     */    EventLoopGroup boss = new NioEventLoopGroup();    /**     * Worker     */    EventLoopGroup work = new NioEventLoopGroup();    @Resource    private SocketServerHandler socketServerHandler;    /**     * NETT服务器配置类     */    @Resource    private NettyConfig nettyConfig;    /**     * 关闭服务器方法     */    @PreDestroy    public void close() {        LOGGER.info("关闭服务器....");        //优雅退出        boss.shutdownGracefully();        work.shutdownGracefully();    }    /**     * 开启及服务线程     */    public void start() {        // 从配置文件中(application.yml)获取服务端监听端口号        int port = nettyConfig.getPort();        serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class)                .childHandler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 负责通过4字节Header指定的Body长度将消息切割 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); // 负责将frameDecoder处理后的完整的一条消息的protobuf字节码转成ProtocolMessage对象 pipeline.addLast("protobufDecoder", new ProtobufDecoder(Helloworld.ProtocolMessage.getDefaultInstance())); // 负责将写入的字节码加上4字节Header前缀来指定Body长度 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 负责将ProtocolMessage对象转成protobuf字节码 pipeline.addLast("protobufEncoder", new ProtobufEncoder()); pipeline.addLast(socketServerHandler); } }).option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)); try { LOGGER.info("netty服务器在[{}]端口启动监听", port); ChannelFuture f = serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.info("[出现异常] 释放资源"); boss.shutdownGracefully(); work.shutdownGracefully(); } }}
  • 这个类就定义服务端是怎么样处理接受和发送数据的;

1481607-20190724105029327-1103281993.png

  • frameDecoderprotobufDecoder对应的handler用于解码Protobuf package数据包,他们都是Upstream Handles:先处理长度,然后再处理数据本身;

  • frameEncoderprotobufEncoder对应的handler用于编码Protobuf package数据包,他们都是Downstream Handles;

  • 此外还有一个handler,是一个自定义的Upstream Handles,用于开发者从网络数据中解析得到自己所需的数据socketServerHandler;

  • 上例Handles的执行顺序为

    upstream:frameDecoder,protobufDecoder,handler   //解码从Socket收到的数据 downstream:frameEncoder,protobufEncoder         //编码要通过Socket发送出去的数据

Application.java

package com.dust.proto_server;import com.dust.proto_server.netty.NettyServerListener;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;@SpringBootApplicationpublic class Application implements CommandLineRunner {    public static void main(String[] args) {        SpringApplication.run(Application.class, args);    }        @Resource    private NettyServerListener nettyServerListener;    @Override    public void run(String... args) throws Exception {        nettyServerListener.start();    }}

测试

  • 先启动服务端,再启动客户端

  • search测试

1481607-20190724105108830-1428280513.png

  • action测试

1481607-20190724105123762-1071867984.png

转载于:https://www.cnblogs.com/dust90/p/11236581.html

你可能感兴趣的文章
[Swift]LeetCode538. 把二叉搜索树转换为累加树 | Convert BST to Greater Tree
查看>>
拼接sql
查看>>
[GIF] Parenting in GIF Loop Coder
查看>>
vimium
查看>>
python基础之数据类型
查看>>
EntityManager方法简介
查看>>
codeforce 830A Office Keys
查看>>
错误:【No configuration found for the specified action: 'login.action' in namespace: " " 】
查看>>
C# 窗体间传值方法大汇总(转)
查看>>
C#关于多线程的笔记
查看>>
js切换背景颜色
查看>>
[数据结构]哈希表
查看>>
php中global与$GLOBALS的用法及区别
查看>>
sleep
查看>>
爱因斯坦:再牛逼的伟人,也有苦逼的青春ZZ
查看>>
实战webpack系列04
查看>>
C# 字符串截取 tips
查看>>
XML中文显示问题
查看>>
对角线之和--通用
查看>>
面向过程与面向对象
查看>>