gRPC入门

knoci 发布于 2025-02-12 102 次阅读


什么是gRPC

        gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHPC# 支持

grpc通信示意图

从Hello开始的简单使用

proto

        首先,我们要编写proto文件,proto文件的详细语法之后再讲

syntax = "proto3";
option go_package = ".;proto";
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

        然后生成go文件,-I 指定了编译器的搜索路径。. 表示当前目录。编译器会在当前目录下搜索.proto文件。goods.proto是要编译的 .proto 文件的名称,go_out表示用go语言,plugins是插件,在:后的./proto表示在当前目录下proto生成

protoc -I . goods.proto --go_out=plugins=grpc:./proto

server端

        定义结构体,实现我们要调用的方法,在grpc中,参数的第一个要传入context。

        总的来说,过程可以分为以下四部曲:

  • grpc.NewServer()定义一个gRPC服务端,注意这个只是一个服务端,没有注册任何服务,也可以,注册任何服务
  • Myproto.Register(Xxx)Server(g, &struct),给g的服务端注册Xxx服务,这里的Xxx服务在Myproto文件中,而Myproto是proto生成的go文件,要在import中自行引入,同时结构类型&strcut要实现Xxx Service的方法。
  • net包设置监听获得lis
  • g.Server(lis)在监听地址启动grpc的服务端
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpc_demo/Myproto"
    "net"
)

type Server struct {
}


func (s *Server)  SayHello(ctx context.Context,request *Myproto.HelloRequest)(*hello.HelloReply,error){
    return &Myproto.HelloReply{Message:"Hello "+request.Name},nil
}

func main()  {
    g := grpc.NewServer()
    s := Server{}
    Myproto.RegisterGreeterServer(g,&s)
    lis, err := net.Listen("tcp", fmt.Sprintf(":8080"))
    if err != nil {
        panic("failed to listen: "+err.Error())
    }
    g.Serve(lis)
}

client端

        客户端一般有三步曲:

  • grpc.Dial()拨号,获取grpc连接。其中grpc.WithInsecure()是不启用TLS,在最新的gRPC中Dial()已经被弃用,推荐使用NewClient(),但是许多之前的项目的习惯都沿用Dial()
  • Myproto.New(Xxx)Client(conn),基于conn的grpc连接建立Xxx服务的grpc客户端
  • client.XxxMethod,调用名为client的grpc客户端的XxxMethod方法
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpc_demo/Myproto"
)

func main()  {
    conn,err := grpc.Dial("127.0.0.1:8080",grpc.WithInsecure())
    if err!=nil{
        panic(err)
    }
    defer conn.Close()
    c := Myproto.NewGreeterClient(conn)
    r,err := c.SayHello(context.Background(),&hello.HelloRequest{Name:"bobby"})
    if err!=nil{
        panic(err)
    }
    fmt.Println(r.Message)
}

Proto的语法介绍

        官方地址: https://developers.google.com/protocol-buffers/docs/proto3

定义一个消息类型

        先来看一个非常简单的例子。假设你想定义一个“搜索请求”的消息格式,每一个请求含有一个查询字符串、你感兴趣的查询结果所在的页数,以及每一页多少条查询结果。可以采用如下的方式来定义消息类型的.proto文件了:

syntax = "proto3";
message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}
  • 文件的第一行指定了你正在使用proto3语法:如果你没有指定这个,编译器会使用proto2。这个指定语法行必须是文件的非空非注释的第一个行。
  • SearchRequest消息格式有3个字段,在消息中承载的数据分别对应于每一个字段。其中每个字段都有一个名字和一种类型。

指定字段类型

        在上面的例子中,所有字段都是标量类型:两个整型(page_number和result_per_page),一个string类型(query)。当然,你也可以为字段指定其他的合成类型,包括枚举(enumerations)或其他消息类型。

分配标识号

        正如你所见,在消息定义中,每个字段都有唯一的一个数字标识符。这些标识符是用来在消息的二进制格式中识别各个字段的,一旦开始使用就不能够再改变。注:[1,15]之内的标识号在编码的时候会占用一个字节。[16,2047]之内的标识号则占用2个字节。所以应该为那些频繁出现的消息元素保留 [1,15]之内的标识号。切记:要为将来有可能添加的、频繁出现的标识号预留一些标识号。

        最小的标识号可以从1开始,最大到2^29 - 1, or 536,870,911。不可以使用其中的[19000-19999]( (从FieldDescriptor::kFirstReservedNumber 到 FieldDescriptor::kLastReservedNumber))的标识号, Protobuf协议实现中对这些进行了预留。如果非要在.proto文件中使用这些预留标识号,编译时就会报警。同样你也不能使用早期保留的标识号。

指定字段规则

所指定的消息字段修饰符必须是如下之一:

  • singular:一个格式良好的消息应该有0个或者1个这种字段(但是不能超过1个)。
  • repeated:在一个格式良好的消息中,这种字段可以重复任意多次(包括0次)。重复的值的顺序会被保留。
    在proto3中,repeated的标量域默认情况虾使用packed。
    你可以了解更多的pakced属性在Protocol Buffer 编码

添加更多消息类型

        在一个.proto文件中可以定义多个消息类型。在定义多个相关的消息的时候,这一点特别有用——例如,如果想定义与SearchResponse消息类型对应的回复消息格式的话,你可以将它添加到相同的.proto文件中,如:

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}
message SearchResponse {
 ...
}

保留标识符(Reserved)

        如果你通过删除或者注释所有域,以后的用户可以重用标识号当你重新更新类型的时候。如果你使用旧版本加载相同的.proto文件这会导致严重的问题,包括数据损坏、隐私错误等等。现在有一种确保不会发生这种情况的方法就是指定保留标识符(and/or names, which can also cause issues for JSON serialization不明白什么意思),protocol buffer的编译器会警告未来尝试使用这些域标识符的用户。

message Foo {
  reserved 2, 15, 9 to 11;
  reserved "foo", "bar";
}

        注:不要在同一行reserved声明中同时声明域名字和标识号


从.proto文件生成了什么?

        当用protocol buffer编译器来运行.proto文件时,编译器将生成所选择语言的代码,这些代码可以操作在.proto文件中定义的消息类型,包括获取、设置字段值,将消息序列化到一个输出流中,以及从一个输入流中解析消息。

  • 对C++来说,编译器会为每个.proto文件生成一个.h文件和一个.cc文件,.proto文件中的每一个消息有一个对应的类。
  • 对Java来说,编译器为每一个消息类型生成了一个.java文件,以及一个特殊的Builder类(该类是用来创建消息类接口的)。
  • 对Python来说,有点不太一样——Python编译器为.proto文件中的每个消息类型生成一个含有静态描述符的模块,,该模块与一个元类(metaclass)在运行时(runtime)被用来创建所需的Python数据访问类。
  • 对go来说,编译器会位每个消息类型生成了一个.pd.go文件。
  • 对于Ruby来说,编译器会为每个消息类型生成了一个.rb文件。
  • javaNano来说,编译器输出类似域java但是没有Builder类
  • 对于Objective-C来说,编译器会为每个消息类型生成了一个pbobjc.h文件和pbobjcm文件,.proto文件中的每一个消息有一个对应的类。
  • 对于C#来说,编译器会为每个消息类型生成了一个.cs文件,.proto文件中的每一个消息有一个对应的类。

        你可以从如下的文档链接中获取每种语言更多API。API Reference


标量数值类型

一个标量消息字段可以含有一个如下的类型——该表格展示了定义于.proto文件中的类型,以及与之对应的、在自动生成的访问类中定义的类型:

.proto TypeNotesPython TypeGo Type
doublefloatfloat64
floatfloatfloat32
int32使用变长编码,对于负值的效率很低,如果你的域有可能有负值,请使用sint64替代intint32
uint32使用变长编码intuint32
uint64使用变长编码intuint64
sint32使用变长编码,这些编码在负值时比int32高效的多intint32
sint64使用变长编码,有符号的整型值。编码时比通常的int64高效。intint64
fixed32总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效。intuint32
fixed64总是8个字节,如果数值总是比总是比256大的话,这个类型会比uint64高效。intuint64
sfixed32总是4个字节intint32
sfixed64总是8个字节intint64
boolboolbool
string一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。strstring
bytes可能包含任意顺序的字节数据。str[]byte

你可以在文章Protocol Buffer 编码中,找到更多“序列化消息时各种类型如何编码”的信息。

  1. 在java中,无符号32位和64位整型被表示成他们的整型对应形似,最高位被储存在标志位中。
  2. 对于所有的情况,设定值会执行类型检查以确保此值是有效。
  3. 64位或者无符号32位整型在解码时被表示成为ilong,但是在设置时可以使用int型值设定,在所有的情况下,值必须符合其设置其类型的要求。
  4. python中string被表示成在解码时表示成unicode。但是一个ASCIIstring可以被表示成str类型。
  5. Integer在64位的机器上使用,string在32位机器上使用

默认值

当一个消息被解析的时候,如果被编码的信息不包含一个特定的singular元素,被解析的对象锁对应的域被设置位一个默认值,对于不同类型指定如下:

  • 对于strings,默认是一个空string
  • 对于bytes,默认是一个空的bytes
  • 对于bools,默认是false
  • 对于数值类型,默认是0
  • 对于枚举,默认是第一个定义的枚举值,必须为0;
  • 对于消息类型(message),域没有被设置,确切的消息是根据语言确定的,详见generated code guide
    对于可重复域的默认值是空(通常情况下是对应语言中空列表)。
    注:对于标量消息域,一旦消息被解析,就无法判断域释放被设置为默认值(例如,例如boolean值是否被设置为false)还是根本没有被设置。你应该在定义你的消息类型时非常注意。例如,比如你不应该定义boolean的默认值false作为任何行为的触发方式。也应该注意如果一个标量消息域被设置为标志位,这个值不应该被序列化传输。
    查看generated code guide选择你的语言的默认值的工作细节。

枚举

        当需要定义一个消息类型的时候,可能想为一个字段指定某“预定义值序列”中的一个值。例如,假设要为每一个SearchRequest消息添加一个 corpus字段,而corpus的值可能是UNIVERSAL,WEB,IMAGES,LOCAL,NEWS,PRODUCTS或VIDEO中的一个。 其实可以很容易地实现这一点:通过向消息定义中添加一个枚举(enum)并且为每个可能的值定义一个常量就可以了。

        在下面的例子中,在消息格式中添加了一个叫做Corpus的枚举类型——它含有所有可能的值 ——以及一个类型为Corpus的字段:

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
  enum Corpus {
    UNIVERSAL = 0;
    WEB = 1;
    IMAGES = 2;
    LOCAL = 3;
    NEWS = 4;
    PRODUCTS = 5;
    VIDEO = 6;
  }
  Corpus corpus = 4;
}

        如你所见,Corpus枚举的第一个常量映射为0:每个枚举类型必须将其第一个类型映射为0,这是因为:

  • 必须有有一个0值,我们可以用这个0值作为默认值。
  • 这个零值必须为第一个元素,为了兼容proto2语义,枚举类的第一个值总是默认值。
    你可以通过将不同的枚举常量指定位相同的值。如果这样做你需要将allow_alias设定位true,否则编译器会在别名的地方产生一个错误信息。
enum EnumAllowingAlias {
  option allow_alias = true;
  UNKNOWN = 0;
  STARTED = 1;
  RUNNING = 1;
}
enum EnumNotAllowingAlias {
  UNKNOWN = 0;
  STARTED = 1;
  // RUNNING = 1;  // Uncommenting this line will cause a compile error inside Google and a warning message outside.
}

        枚举常量必须在32位整型值的范围内。因为enum值是使用可变编码方式的,对负数不够高效,因此不推荐在enum中使用负数。如上例所示,可以在 一个消息定义的内部或外部定义枚举——这些枚举可以在.proto文件中的任何消息定义里重用。当然也可以在一个消息中声明一个枚举类型,而在另一个不同 的消息中使用它——采用MessageType.EnumType的语法格式。

        当对一个使用了枚举的.proto文件运行protocol buffer编译器的时候,生成的代码中将有一个对应的enum(对Java或C++来说),或者一个特殊的EnumDescriptor类(对 Python来说),它被用来在运行时生成的类中创建一系列的整型值符号常量(symbolic constants)。

        在反序列化的过程中,无法识别的枚举值会被保存在消息中,虽然这种表示方式需要依据所使用语言而定。在那些支持开放枚举类型超出指定范围之外的语言中(例如C++和Go),为识别的值会被表示成所支持的整型。在使用封闭枚举类型的语言中(Java),使用枚举中的一个类型来表示未识别的值,并且可以使用所支持整型来访问。在其他情况下,如果解析的消息被序列号,未识别的值将保持原样。


使用其他消息类型

        你可以将其他消息类型用作字段类型。例如,假设在每一个SearchResponse消息中包含Result消息,此时可以在相同的.proto文件中定义一个Result消息类型,然后在SearchResponse消息中指定一个Result类型的字段,如:

message SearchResponse {
  repeated Result results = 1;
}
message Result {
  string url = 1;
  string title = 2;
  repeated string snippets = 3;
}

导入定义

        在上面的例子中,Result消息类型与SearchResponse是定义在同一文件中的。如果想要使用的消息类型已经在其他.proto文件中已经定义过了呢?

        你可以通过导入(importing)其他.proto文件中的定义来使用它们。要导入其他.proto文件的定义,你需要在你的文件中添加一个导入声明,如:

import "myproject/other_protos.proto";

        默认情况下你只能使用直接导入的.proto文件中的定义. 然而, 有时候你需要移动一个.proto文件到一个新的位置, 可以不直接移动.proto文件, 只需放入一个伪 .proto 文件在老的位置, 然后使用import public转向新的位置。import public 依赖性会通过任意导入包含import public声明的proto文件传递。例如:

// 这是新的proto
// All definitions are moved here
// 这是旧的proto
// 这是所有客户端正在导入的包
import public "new.proto";
import "other.proto";
// 客户端proto
import "old.proto";
// 现在你可以使用新久两种包的proto定义了。

        通过在编译器命令行参数中使用-I/--proto_pathprotocal 编译器会在指定目录搜索要导入的文件。如果没有给出标志,编译器会搜索编译命令被调用的目录。通常你只要指定proto_path标志为你的工程根目录就好。并且指定好导入的正确名称就好。

嵌套类型

        你可以在其他消息类型中定义、使用消息类型,在下面的例子中,Result消息就定义在SearchResponse消息内,如:

message SearchResponse {
  message Result {
    string url = 1;
    string title = 2;
    repeated string snippets = 3;
  }
  repeated Result results = 1;
}

        如果你想在它的父消息类型的外部重用这个消息类型,你需要以Parent.Type的形式使用它,如:

message SomeOtherMessage {
  SearchResponse.Result result = 1;
}

        当然,你也可以将消息嵌套任意多层,如:

message Outer {                  // Level 0
  message MiddleAA {  // Level 1
    message Inner {   // Level 2
      int64 ival = 1;
      bool  booly = 2;
    }
  }
  message MiddleBB {  // Level 1
    message Inner {   // Level 2
      int32 ival = 1;
      bool  booly = 2;
    }
  }
}

更新一个消息类型

        如果一个已有的消息格式已无法满足新的需求——如,要在消息中添加一个额外的字段——但是同时旧版本写的代码仍然可用。不用担心!更新消息而不破坏已有代码是非常简单的。在更新时只要记住以下的规则即可。

  • 不要更改任何已有的字段的数值标识。
  • 如果你增加新的字段,使用旧格式的字段仍然可以被你新产生的代码所解析。你应该记住这些元素的默认值这样你的新代码就可以以适当的方式和旧代码产生的数据交互。相似的,通过新代码产生的消息也可以被旧代码解析:只不过新的字段会被忽视掉。注意,未被识别的字段会在反序列化的过程中丢弃掉,所以如果消息再被传递给新的代码,新的字段依然是不可用的(这和proto2中的行为是不同的,在proto2中未定义的域依然会随着消息被序列化)
  • 非required的字段可以移除——只要它们的标识号在新的消息类型中不再使用(更好的做法可能是重命名那个字段,例如在字段前添加“OBSOLETE_”前缀,那样的话,使用的.proto文件的用户将来就不会无意中重新使用了那些不该使用的标识号)。
  • int32, uint32, int64, uint64,和bool是全部兼容的,这意味着可以将这些类型中的一个转换为另外一个,而不会破坏向前、 向后的兼容性。如果解析出来的数字与对应的类型不相符,那么结果就像在C++中对它进行了强制类型转换一样(例如,如果把一个64位数字当作int32来 读取,那么它就会被截断为32位的数字)。
  • sint32和sint64是互相兼容的,但是它们与其他整数类型不兼容。
  • string和bytes是兼容的——只要bytes是有效的UTF-8编码。
  • 嵌套消息与bytes是兼容的——只要bytes包含该消息的一个编码过的版本。
  • fixed32与sfixed32是兼容的,fixed64与sfixed64是兼容的。
  • 枚举类型与int32,uint32,int64和uint64相兼容(注意如果值不相兼容则会被截断),然而在客户端反序列化之后他们可能会有不同的处理方式,例如,未识别的proto3枚举类型会被保留在消息中,但是他的表示方式会依照语言而定。int类型的字段总会保留他们的

Any

        Any类型消息允许你在没有指定他们的.proto定义的情况下使用消息作为一个嵌套类型。一个Any类型包括一个可以被序列化bytes类型的任意消息,以及一个URL作为一个全局标识符和解析消息类型。为了使用Any类型,你需要导入import google/protobuf/any.proto

import "google/protobuf/any.proto";
message ErrorStatus {
  string message = 1;
  repeated google.protobuf.Any details = 2;
}

        对于给定的消息类型的默认类型URL是type.googleapis.com/packagename.messagename

        不同语言的实现会支持动态库以线程安全的方式去帮助封装或者解封装Any值。例如在java中,Any类型会有特殊的pack()unpack()访问器,在C++中会有PackFrom()UnpackTo()方法。

// Storing an arbitrary message type in Any.
NetworkErrorDetails details = ...;
ErrorStatus status;
status.add_details()->PackFrom(details);
// Reading an arbitrary message from Any.
ErrorStatus status = ...;
for (const Any& detail : status.details()) {
  if (detail.Is<NetworkErrorDetails>()) {
    NetworkErrorDetails network_error;
    detail.UnpackTo(&network_error);
    ... processing network_error ...
  }
}

Oneof

        如果你的消息中有很多可选字段, 并且同时至多一个字段会被设置, 你可以加强这个行为,使用oneof特性节省内存.

        Oneof字段就像可选字段, 除了它们会共享内存, 至多一个字段会被设置。 设置其中一个字段会清除其它字段。 你可以使用case()或者WhichOneof() 方法检查哪个oneof字段被设置, 看你使用什么语言了.

使用Oneof

        为了在.proto定义Oneof字段, 你需要在名字前面加上oneof关键字, 比如下面例子的test_oneof:

message SampleMessage {
  oneof test_oneof {
    string name = 4;
    SubMessage sub_message = 9;
  }
}

        然后你可以增加oneof字段到 oneof 定义中. 你可以增加任意类型的字段, 但是不能使用repeated 关键字.

        在产生的代码中, oneof字段拥有同样的 getters 和setters, 就像正常的可选字段一样. 也有一个特殊的方法来检查到底那个字段被设置. 你可以在相应的语言API指南中找到oneof API介绍.

Oneof 特性

  • 设置oneof会自动清楚其它oneof字段的值. 所以设置多次后,只有最后一次设置的字段有值.
SampleMessage message;
message.set_name("name");
CHECK(message.has_name());
message.mutable_sub_message();   // Will clear name field.
CHECK(!message.has_name());
  • 如果解析器遇到同一个oneof中有多个成员,只有最会一个会被解析成消息。
  • oneof不支持repeated.
  • 反射API对oneof 字段有效.
  • 如果使用C++,需确保代码不会导致内存泄漏. 下面的代码会崩溃, 因为sub_message 已经通过set_name()删除了
SampleMessage message;
SubMessage* sub_message = message.mutable_sub_message();
message.set_name("name");      // Will delete sub_message
sub_message->set_...            // Crashes here
  • 在C++中,如果你使用Swap()两个oneof消息,每个消息,两个消息将拥有对方的值,例如在下面的例子中,msg1会拥有sub_message并且msg2会有name
SampleMessage msg1;
msg1.set_name("name");
SampleMessage msg2;
msg2.mutable_sub_message();
msg1.swap(&msg2);
CHECK(msg1.has_sub_message());
CHECK(msg2.has_name());

向后兼容性问题

        当增加或者删除oneof字段时一定要小心. 如果检查oneof的值返回None/NOT_SET, 它意味着oneof字段没有被赋值或者在一个不同的版本中赋值了。 你不会知道是哪种情况,因为没有办法判断如果未识别的字段是一个oneof字段。

Tage 重用问题:

  • 将字段移入或移除oneof:在消息被序列号或者解析后,你也许会失去一些信息(有些字段也许会被清除)
  • 删除一个字段或者加入一个字段:在消息被序列号或者解析后,这也许会清除你现在设置的oneof字段
  • 分离或者融合oneof:行为与移动常规字段相似。

Map(映射)

        如果你希望创建一个关联映射,protocol buffer提供了一种快捷的语法:

map<key_type, value_type> map_field = N;

        其中key_type可以是任意Integer或者string类型(所以,除了floating和bytes的任意标量类型都是可以的)value_type可以是任意类型。

例如,如果你希望创建一个project的映射,每个Projecct使用一个string作为key,你可以像下面这样定义:

map<string, Project> projects = 3;
  • Map的字段可以是repeated。
  • 序列化后的顺序和map迭代器的顺序是不确定的,所以你不要期望以固定顺序处理Map
  • 当为.proto文件产生生成文本格式的时候,map会按照key 的顺序排序,数值化的key会按照数值排序。
  • 从序列化中解析或者融合时,如果有重复的key则后一个key不会被使用,当从文本格式中解析map时,如果存在重复的key。

生成map的API现在对于所有proto3支持的语言都可用了,你可以从API指南找到更多信息。

向后兼容性问题

map语法序列化后等同于如下内容,因此即使是不支持map语法的protocol buffer实现也是可以处理你的数据的:

message MapFieldEntry {
  key_type key = 1;
  value_type value = 2;
}
repeated MapFieldEntry map_field = N;

当然可以为.proto文件新增一个可选的package声明符,用来防止不同的消息类型有命名冲突。如:

package foo.bar;
message Open { ... }

在其他的消息格式定义中可以使用包名+消息名的方式来定义域的类型,如:

message Foo {
  ...
  required foo.bar.Open open = 1;
  ...
}

包的声明符会根据使用语言的不同影响生成的代码。

  • 对于C++,产生的类会被包装在C++的命名空间中,如上例中的Open会被封装在 foo::bar空间中; - 对于Java,包声明符会变为java的一个包,除非在.proto文件中提供了一个明确有java_package
  • 对于 Python,这个包声明符是被忽略的,因为Python模块是按照其在文件系统中的位置进行组织的。
  • 对于Go,包可以被用做Go包名称,除非你显式的提供一个option go_package在你的.proto文件中。
  • 对于Ruby,生成的类可以被包装在内置的Ruby名称空间中,转换成Ruby所需的大小写样式 (首字母大写;如果第一个符号不是一个字母,则使用PB_前缀),例如Open会在Foo::Bar名称空间中。
  • 对于javaNano包会使用Java包,除非你在你的文件中显式的提供一个option java_package
  • 对于C#包可以转换为PascalCase后作为名称空间,除非你在你的文件中显式的提供一个option csharp_namespace,例如,Open会在Foo.Bar名称空间中

包及名称的解析

        Protocol buffer语言中类型名称的解析与C++是一致的:首先从最内部开始查找,依次向外进行,每个包会被看作是其父类包的内部类。当然对于 (foo.bar.Baz)这样以“.”分隔的意味着是从最外围开始的。

        ProtocolBuffer编译器会解析.proto文件中定义的所有类型名。 对于不同语言的代码生成器会知道如何来指向每个具体的类型,即使它们使用了不同的规则。


定义服务(Service)

        如果想要将消息类型用在RPC(远程方法调用)系统中,可以在.proto文件中定义一个RPC服务接口,protocol buffer编译器将会根据所选择的不同语言生成服务接口代码及存根。如,想要定义一个RPC服务并具有一个方法,该方法能够接收 SearchRequest并返回一个SearchResponse,此时可以在.proto文件中进行如下定义:

service SearchService {
  rpc Search (SearchRequest) returns (SearchResponse);
}

        最直观的使用protocol buffer的RPC系统是gRPC一个由谷歌开发的语言和平台中的开源的PRC系统,gRPC在使用protocl buffer时非常有效,如果使用特殊的protocol buffer插件可以直接为您从.proto文件中产生相关的RPC代码。


gRPC的四种数据流

gRPC支持四种数据流类型,分别是:

  1. 简单RPC(Unary RPCs):客户端发送单个请求给服务器,并接收单个响应。这是最接近常规函数调用的RPC形式。
  2. 服务器流式RPC(Server streaming RPCs):客户端发送请求后,服务器返回一系列消息构成的流。客户端从流中读取消息,直到没有更多消息,gRPC保证消息顺序的正确性。
  3. 客户端流式RPC(Client streaming RPCs):客户端通过流式发送一系列消息给服务器,服务器在接收完所有消息后返回单个响应。gRPC保证消息顺序的正确性。
  4. 双向流式RPC(Bidirectional streaming RPCs):客户端和服务器通过读写流独立地发送一系列消息。两个流独立操作,可以以任意顺序读写消息,gRPC保证每个流中消息顺序的正确性。

 proto

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名
option go_package=".;proto";

//声明grpc服务
service Greeter {
    /*
    以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
    */
    rpc GetStream (StreamReqData) returns (stream StreamResData){}
    rpc PutStream (stream StreamReqData) returns (StreamResData){}
    rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
}


//stream请求结构
message StreamReqData {
    string data = 1;
}
//stream返回结构
message StreamResData {
    string data = 1;
}

server端

        这里我们注意到方法传入的参数变成了proto.Greeter_(Xxx)StreamServer,而不是proto定义的Stream(Xxx)Data,这就是流实现的关键,具体方法的参数可以在proto生成的go文件中看,我们可以对(Xxx)StreamServer调用Send()或Recv()实现流式传输。

package main

import (
	"fmt"
	"google.golang.org/grpc"
	"log"
	"net"
	"start/new_stream/proto"
	"sync"
	"time"
)

const PORT  = ":50052"

type server struct {
}

//服务端 单向流
func (s *server)GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error{
	i:= 0
	for{
		i++
		res.Send(&proto.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})
		time.Sleep(1*time.Second)
		if i >10 {
			break
		}
	}
	return nil
}

//客户端 单向流
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {

	for {
		if tem, err := cliStr.Recv(); err == nil {
			log.Println(tem)
		} else {
			log.Println("break, err :", err)
			break
		}
	}

	return nil
}

//客户端服务端 双向流
func(s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {

	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		for {
			data, _ := allStr.Recv()
			log.Println(data)
		}
		wg.Done()
	}()

	go func() {
		for {
			allStr.Send(&proto.StreamResData{Data:"ssss"})
			time.Sleep(time.Second)
		}
		wg.Done()
	}()

	wg.Wait()
	return nil
}

func main(){
	//监听端口
	lis,err := net.Listen("tcp",PORT)
	if err != nil{
		panic(err)
		return
	}
	//创建一个grpc 服务器
	s := grpc.NewServer()
	//注册事件
	proto.RegisterGreeterServer(s,&server{})
	//处理链接
	err = s.Serve(lis)
	if err != nil {
		panic(err)
	}
}

client端

        客户端也是同理,变成了参数proto.Greeter_(Xxx)StreamServer,来实现数据流。

package main

import (
	"fmt"
	"google.golang.org/grpc"
	"log"
	"net"
	"start/new_stream/proto"
	"sync"
	"time"
)

const PORT  = ":50052"

type server struct {
}

//服务端 单向流
func (s *server)GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error{
	i:= 0
	for{
		i++
		res.Send(&proto.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})
		time.Sleep(1*time.Second)
		if i >10 {
			break
		}
	}
	return nil
}

//客户端 单向流
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {

	for {
		if tem, err := cliStr.Recv(); err == nil {
			log.Println(tem)
		} else {
			log.Println("break, err :", err)
			break
		}
	}

	return nil
}

//客户端服务端 双向流
func(s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {

	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		for {
			data, _ := allStr.Recv()
			log.Println(data)
		}
		wg.Done()
	}()

	go func() {
		for {
			allStr.Send(&proto.StreamResData{Data:"ssss"})
			time.Sleep(time.Second)
		}
		wg.Done()
	}()

	wg.Wait()
	return nil
}

func main(){
	//监听端口
	lis,err := net.Listen("tcp",PORT)
	if err != nil{
		panic(err)
		return
	}
	//创建一个grpc 服务器
	s := grpc.NewServer()
	//注册事件
	proto.RegisterGreeterServer(s,&server{})
	//处理链接
	err = s.Serve(lis)
	if err != nil {
		panic(err)
	}
}

go控制gRPC的metadata

        gRPC让我们可以像本地调用一样实现远程调用,对于每一次的RPC调用中,都可能会有一些有用的数据,而这些数据就可以通过metadata来传递。metadata是以key-value的形式存储数据的,其中key是string类型,而value是[]string,即一个字符串数组类型。metadata使得client和server能够为对方提供关于本次调用的一些信息,就像一次http请求的RequestHeader和ResponseHeader一样。http中header的生命周周期是一次http请求,那么metadata的生命周期就是一次RPC调用。


go中使用metadata

创建metadata

        MD 类型实际上是map,key是string,value是string类型的slice。

type MD map[string][]string

        创建的时候可以像创建普通的map类型一样使用new关键字进行创建:

//第一种方式
md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})
//第二种方式 key不区分大小写,会被统一转成小写。
md := metadata.Pairs(
    "key1", "val1",
    "key1", "val1-2", // "key1" will have map value []string{"val1", "val1-2"}
    "key2", "val2",
)

发送metadata

md := metadata.Pairs("key", "val")

// 新建一个有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)

// 单向 RPC
response, err := client.SomeRPC(ctx, someRequest)

接收metadata

func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
    md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}

grpc中使用metadata

proto

syntax = "proto3";
option go_package=".;proto";

// The greeting service definition.
service Greeter {
    //   Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {
    }
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}

server端

package main

import (
	"OldPackageTest/grpc_test/proto"
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
)

func main(){
	//stream
	conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithInsecure())
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	c := proto.NewGreeterClient(conn)

	//md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
	md := metadata.New(map[string]string{
		"name":"bobby",
		"pasword":"imooc",
	})
	ctx := metadata.NewOutgoingContext(context.Background(), md)
	r, err := c.SayHello(ctx, &proto.HelloRequest{Name:"bobby"})
	if err != nil {
		panic(err)
	}
	fmt.Println(r.Message)
}

client端

package main

import (
	"OldPackageTest/grpc_test/proto"
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
)

func main(){
	//stream
	conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithInsecure())
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	c := proto.NewGreeterClient(conn)

	//md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
	md := metadata.New(map[string]string{
		"name":"bobby",
		"pasword":"imooc",
	})
	ctx := metadata.NewOutgoingContext(context.Background(), md)
	r, err := c.SayHello(ctx, &proto.HelloRequest{Name:"bobby"})
	if err != nil {
		panic(err)
	}
	fmt.Println(r.Message)
}

gRPC拦截器实现Token认证

拦截器类型

gRPC提供了四种类型的拦截器:

  • UnaryServerInterceptor:服务端一元拦截器,用于拦截单个RPC调用。
  • StreamServerInterceptor:服务端流拦截器,用于拦截流式RPC调用。
  • UnaryClientInterceptor:客户端一元拦截器,用于拦截单个RPC调用。
  • StreamClientInterceptor:客户端流拦截器,用于拦截流式RPC调用。

服务端一元拦截器(UnaryServerInterceptor)的定义如下:

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error)

        这个函数接收四个参数:

  • ctx:上下文对象,用于传递请求范围的信息。
  • req:请求对象。
  • info:包含RPC方法信息的结构体。
  • handler:实际处理请求的函数。

        函数返回两个值:

  • 响应对象。
  • 错误对象。

客户端一元拦截器(UnaryClientInterceptor)的类型定义如下:

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) (err error)

        这个函数接收七个参数:

  • ctx:上下文对象。
  • method:RPC方法名。
  • req:请求对象。
  • reply:响应对象。
  • cc:客户端连接对象。
  • invoker:实际发起RPC调用的函数。
  • opts:调用选项。

        函数返回一个错误值。


proto

syntax = "proto3";
option go_package = ".;proto";
service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}

server端

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
	"net"

	"google.golang.org/grpc"

	"start/token_interceptor/proto"
)


type Server struct{}

func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply,
	error){
	return &proto.HelloReply{
		Message: "hello "+request.Name,
	}, nil
}


func main(){
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	var interceptor grpc.UnaryServerInterceptor
	interceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		md, ok := metadata.FromIncomingContext(ctx)
		if !ok {
			return resp, status.Errorf(codes.Unauthenticated, "无Token认证信息")
		}

		var (
			appid  string
			appkey string
		)

		if val, ok := md["appid"]; ok {
			appid = val[0]
		}

		if val, ok := md["appkey"]; ok {
			appkey = val[0]
		}

		if appid != "imooc" || appkey != "bobby" {
			return resp, status.Errorf(codes.Unauthenticated, "Token认证信息无效: appid=%s, appkey=%s", appid, appkey)
		}

		// 继续处理请求
		return handler(ctx, req)
	}
	var opts []grpc.ServerOption
	opts = append(opts, grpc.UnaryInterceptor(interceptor))

	s := grpc.NewServer(opts...)
	ser :=& Server{}
	proto.RegisterGreeterServer(s, ser)
	s.Serve(lis)
}

client端

package  main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"start/token_interceptor/proto"
)

type customCredential struct{}

func (c customCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"appid":  "101010",
		"appkey": "i am key",
	}, nil
}

func (c customCredential) RequireTransportSecurity() bool {
	return false
}


func main() {
	var opts []grpc.DialOption

	//opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
	opts = append(opts, grpc.WithInsecure())
	opts = append(opts, grpc.WithPerRPCCredentials(new(customCredential)))
	// 指定客户端interceptor

	conn, err := grpc.Dial("localhost:50051", opts...)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	c := proto.NewGreeterClient(conn)
	//rsp, _ := c.Search(context.Background(), &empty.Empty{})
	rsp, err := c.SayHello(context.Background(), &proto.HelloRequest{
		Name: "bobby",

	})
	if err != nil {
		panic(err)
	}
	fmt.Println(rsp.Message)
}

gRPC中的异常处理和超时机制

异常处理

server端

st := status.New(codes.InvalidArgument, "invalid username")

client端

st, ok := status.FromError(err)
if !ok {
    // Error was not a status error
}
st.Message()
st.Code()

超时机制

ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
defer cancel()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

gRPC和thrift相比

        grpc复用http2协议是个好事,这样能让grpc有高度的兼容性,但是相比thrift来说性能是差点意思。毕竟grpc相对于thrift来说,需要unpack两次数据包,一次是header,一次是protobuf body。但话又说回来,相比性能,grpc有更活跃的社区支持和兼容性。

        像istio的envoy sidecar本就支持http2,那么自然就很容易支持grpc。而像小米为了解决kubernets istio的联合,改了不少istio的控制面板代码来支持thrift协议。 ​