gprc知识整理

1 grpc概述

grpc是一个高性能、开源的rpc框架,目前提供了多种语言版本,基于HTTP/2标准设计,拥有双向流、流控、头部压缩、单TCP连接上的多复用请求特性,接口描述语言使用protobuf。

grpc调用流程:

  • 客户端发起调用,即在程序中调用某个方法;
  • 对请求信息使用protobuf进行对象序列化后发给服务端;
  • 服务端接收请求后,解码请求信息,进行业务逻辑处理;
  • 对处理结果使用protobuf进行对象序列化压缩后返回给客户端;
  • 客户端接收到服务端响应后,解码结果。


grpc优点:

  • 性能好,比json编解码数读快几十倍。
  • 代码生成方便,使用proto工具自动生成对应语言代码。
  • 流传输,支持一元RPC、服务端流式RPC、客户端流式RPC、双流向RPC共4中传输流。
  • 超时和取消,客户端和服务端在截至时间后对取消事件进行相关处理。


grpc缺点:

  • 可读性差
  • 不支持浏览器调用
  • 外部组件支持性差


2 protobuf简介

protobuf是一种与语言无关、平台无关、可扩展的可序列化和结构化的数据描述语言,通常其IDL,常用于通信协议、数据存储等,比json、XML更小,编码解码速度快得多。

语法模板:

syntax = "proto3";

package helloworld;

service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}


protobuf与go语言常见数据类型映射表:

proto go
bool bool
string string
bytes []byte
int32 int32
int64 int64
uint32 uint32
uint64 uint64
float float32
double float64
sint32, sfixed32 int32
sint64, sfixed64 int64
fixed32 unit32
fixed64 unit64


复合类型映射表:

(1) 数组类型

message HelloRequest {
    repeated string name = 1;  # 等价go的[]string
}


(2) 嵌套类型

message User {
    string name = 1;
}

message HelloRequest {
    repeated User users = 1;  # 等价go的[]User
}


(3) map

message HelloRequest {
    map<string, string> names = 2;  # 等价go的map[string]striing
}


安装protobuf、生成go语言的插件,grpc库:

# 打开protoc的gihub官网 https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0,下载对应系统文件
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.14.0/protoc-3.14.0-win64.zip
# 解压文件,把protoc.exe和include目录都移动到$GOROOT/bin目录下,注:protoc.exe和include要在同一级目录
# 查看版本号
protoc.exec --version


# 下载编译生成proto-gen-go可执行文件
wget https://github.com/golang/protobuf/archive/v1.4.3.zip
# 解压进入protobuf-1.4.3\protoc-gen-go目录,执行命令go build生成二可执行进制文件protoc-gen-go.exe,
# 然后把protoc-gen-go.exe移动到$GOROOT/bin目录下


# 下载安装grpc库
wget https://github.com/grpc/grpc-go/archive/v1.33.2.zip
# 解压文件,把grpc-go-1.33.2目录改名为grpc,然后把grpc移动到$GOPATH/srcgoogle.golang.org目录下


# 执行proto命令,生成对应的pb.go文件
protoc.exe --go_out=plugins=grpc:. .\proto\*.proto


3 grpc简单示例

在grpc中,一共有4种调用方式:

  • 一元RPC(unary RPC): 称为单次RPC,也就是一问一答RPC请求,是最基础最常用的调用方式。
  • 服务端流式RPC(Server-side Streaming RPC): 是一个单向流,客户端发起一次普通RPC请求,服务端通过流式返回数据集。
  • 客户端流式RPC(Client-side Streaming RPC): 是一个单向流,客户端通过流式发送数据集,服务端回复一次普通RPC请求。
  • 双向流式RPC(Bidirectional Streaming RPC): 由客户端以流式发起请求,服务端同样以流式方式响应请求。一定有客户端发起,但交互方式(谁先谁后、一次发多少、相应多少、什么时候关闭)则由程序编写的方式来控制(可以结合协程)。


3.1 grpc调用方式示例

protobuf文件内容如下:

syntax = "proto3";

package proto;

service Greeter {
    // 一元RPC
    rpc SayHello (HelloRequest) returns (HelloReply) {}
    // 服务端流式RPC
    rpc SayList (HelloRequest) returns (stream HelloReply) {}
    // 客户端流式RPC
    rpc SayRecord (stream HelloRequest) returns (HelloReply) {}
    // 双向流式RPC
    rpc SayRoute (stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}


服务端代码如下:

package main

import (
    "context"
    "fmt"
    "io"
    "net"
    "strings"
    "time"

    pb "grpc/demo/proto"

    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
)

type GreeterServer struct{}

func (g *GreeterServer) SayHello(ctx context.Context, r *pb.HelloRequest) (*pb.HelloReply, error) {
    fmt.Println("\nSayHello receive req: " + r.Name)
    return &pb.HelloReply{Message: "hello " + r.Name}, nil
}

func (g *GreeterServer) SayList(r *pb.HelloRequest, stream pb.Greeter_SayListServer) error {
    var err error
    fmt.Println("\nSayList receive req: " + r.Name)

    for i := 0; i < 5; i++ {
        err = stream.Send(&pb.HelloReply{Message: "hello " + r.Name + fmt.Sprintf(" %d", i)})
        if err != nil {
            return err
        }
        time.Sleep(time.Second)
    }

    return nil
}

func (g *GreeterServer) SayRecord(stream pb.Greeter_SayRecordServer) error {
    values := []string{}
    defer func() {
        fmt.Println("\nSayRecord receive req: ", values)
    }()

    for {
        // 阻塞等待接收流数据,当结束时会受到EOF表示结束,当出现错误会返回rpc错误信息
        // 默认的MaxReceiveMessageSize值为1024x1024x4字节,如果有特殊需求可以调整
        resp, err := stream.Recv()
        if err != nil {
            if err == io.EOF { // 判断是否数据流结束
                return stream.SendAndClose(&pb.HelloReply{
                    Message: "hello " + strings.Join(values, ","),
                })
            }
            return err
        }

        values = append(values, resp.Name)
    }

    return nil
}

func (g *GreeterServer) SayRoute(stream pb.Greeter_SayRouteServer) error {
    recValues := []string{}
    sendValues := []string{}

    defer func() {
        fmt.Println("\nSayRoute receive req: ", recValues)
        fmt.Println("SayRoute send req: ", sendValues)
    }()

    for {
        resp, err := stream.Recv()
        if err != nil {
            if err == io.EOF { // 判断是否数据流结束
                return nil
            }
            return err
        }
        recValues = append(recValues, resp.Name)

        err = stream.Send(&pb.HelloReply{Message: "hello " + resp.Name})
        if err != nil {
            return err
        }
        sendValues = append(sendValues, resp.Name)
    }

    return nil
}

func main() {
    server := grpc.NewServer()
    pb.RegisterGreeterServer(server, &GreeterServer{})
    reflection.Register(server) // 注册给反射服务,通过grpcurl工具可以调试接口

    list, err := net.Listen("tcp", "127.0.0.1:8080")
    if err != nil {
        panic(err)
    }
    err = server.Serve(list)
    if err != nil {
        panic(err)
    }
}


客户端代码如下:

package main

import (
    "context"
    "fmt"
    "io"
    "time"

    pb "grpc/demo/proto"

    "google.golang.org/grpc"
)

func SayHello(client pb.GreeterClient) error {
    resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "zhangsan"})
    if err != nil {
        return err
    }

    fmt.Println("resp:", resp.Message)
    return nil
}

func SayList(client pb.GreeterClient) error {
    stream, err := client.SayList(context.Background(), &pb.HelloRequest{Name: "zhangsan"})
    if err != nil {
        return err
    }

    for {
        // 阻塞等待接收流数据,当结束时会受到EOF表示结束,当出现错误会返回rpc错误信息
        // 默认的MaxReceiveMessageSize值为1024x1024x4字节,如果有特殊需求可以调整
        resp, err := stream.Recv()
        if err != nil {
            if err == io.EOF { // 判断是否数据流结束
                break
            }
            return err
        }

        fmt.Println("resp:", resp.Message)
    }

    return nil
}

func SayRecord(client pb.GreeterClient) error {
    stream, err := client.SayRecord(context.Background())
    if err != nil {
        return err
    }

    names := []string{"zhangsan", "lisi", "wangwu"}
    for _, name := range names {
        err := stream.Send(&pb.HelloRequest{Name: name})
        if err != nil {
            return err
        }
        time.Sleep(time.Second)
    }

    resp, err := stream.CloseAndRecv()
    if err != nil {
        return err
    }
    fmt.Println("resp:", resp.Message)

    return nil
}

func SayRoute(client pb.GreeterClient) error {
    stream, err := client.SayRoute(context.Background())
    if err != nil {
        return err
    }

    names := []string{"zhangsan", "lisi", "wangwu"}
    for _, name := range names {
        err := stream.Send(&pb.HelloRequest{Name: name})
        if err != nil {

            return err
        }

        resp, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                break
            }
            return err
        }

        fmt.Println("resp:", resp.Message)
    }

    time.Sleep(10 * time.Millisecond)
    err = stream.CloseSend()
    if err != nil {
        return err
    }

    return nil
}

func main() {
    conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure())
    if err != nil {
        panic(err)
    }

    client := pb.NewGreeterClient(conn)

    fmt.Println("\n一元RPC调用示例:SayHello")
    if err := SayHello(client); err != nil {
        panic(err)
    }

    fmt.Println("\n服务端流式RPC:SayList")
    if err := SayList(client); err != nil {
        panic(err)
    }

    fmt.Println("\n客户端流式RPC:SayRecord")
    if err := SayRecord(client); err != nil {
        panic(err)
    }

    fmt.Println("\n双向流式RPC:SayRoute")
    if err := SayRoute(client); err != nil {
        panic(err)
    }
}


3.2 调试grpc接口

grpcurl是一个命令行工具,可让您与gRPC服务器进行交互,基本上是curl针对gRPC服务器的。

(1) 安装grpcurl工具

# 打开grpcurl的github官网 https://github.com/fullstorydev/grpcurl/releases/tag/v1.7.0,根据系统类型下载对应文件
wget https://github.com/fullstorydev/grpcurl/releases/download/v1.7.0/grpcurl_1.7.0_windows_x86_64.zip

# 解压文件,然后把grpcurl移动到$GOROOT/bin目录下,查看版本
grpcurl --version


(2) 调试

使用grpcurl前提是,grpc服务端已经注册了反射服务,所以需要在服务代码加上一句

reflection.Register(server)

反射使用的包是 google.golang.org/grpc/reflection。

# 查看grpc服务提供哪些接口列表
grpcurl -plaintext 127.0.0.1:8080 list

# 查看grpc服务提供接口的方法列表
grpcurl -plaintext 127.0.0.1:8080 list proto.Greeter
    # proto.Greeter.SayHello
    # proto.Greeter.SayList
    # proto.Greeter.SayRecord
    # proto.Greeter.SayRoute

# 请求方法
grpcurl -plaintext -d '{"name":"grpc"}' 127.0.0.1:8080 proto.Greeter.SayHello

grpcurl参数说明:

  • plaintext: grpcurl工具默认使用TLS认证,使用-plaintext参数忽略TLS认证。
  • list: 指定执行命令,获取该服务的rpc方法列表信息。
  • -d: 请求数据,输入内容必须为json格式。


3.3 grpc错误处理

grpc返回字段有Code和Message两部分,官方定义的状态码如下:

Code 状态码 说明
0 OK 成功
1 Canceled 该操作被调用方取消
2 Unknown 未知错误,如果不是grpc状态类型都统一归为未知错误,一般是用户自定义错误
3 InvalidArgument 无效参数
4 DeadlineExceeded 在操作完成之前超过了约定的最后期限
5 NotFound 找不到
6 AlreadyExists 已经存在
7 PermissionDenied 权限不足
8 ResourceExhausted 资源耗尽
9 FailedPrecondition 该操作被拒绝,因为未处于执行该操作所需的态
10 Aborted 该操作被中止
11 OutOfRange 超出范围,尝试执行的操作超出了约定的有
12 Unimplemented 未实现
13 Internal 内部错误
14 Unavailable 该服务当前不可用
15 DataLoss 不可恢复的数据丢失或损坏
16 Unauthenticated 身份验证元数据无效或凭据回调失败


在grpc的状态信息中一共包含三个属性,分别是错误码(code)、错误消息(message)、错误信息详情(Details),从any.proto文件引入detail字段,作为应用程序的错误码原型,rpc_error.proto文件内容如下:

syntax = "proto3";

package proto;

import "google/protobuf/any.proto";

message Error {
    int32 code = 1;
    string message = 2;
    google.protobuf.Any detail = 3;
}

使用protoc命令生成rpc_error.pb.go。


重新封装grpc错误码和业务错误码,文件errCode.go内容如下:

package errCode

import (
    "fmt"

    pb "grpc/errCode/proto"

    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

var (
    // 自定义业务错误码
    ERROR_LOGIN_DAIL = NewError(20001, "登录失败")

    // grpc内部错误码
    Success          = NewError(0, "ok")
    Fail             = NewError(10000, "内部错误")
    InvalidParams    = NewError(10001, "无效参数")
    Unauthorized     = NewError(10002, "认证错误")
    NotFound         = NewError(10003, "没有找到")
    Unknown          = NewError(10004, "未知")
    DeadlineExceeded = NewError(10005, "超出最后止期限")
    AccessDenied     = NewError(10006, "访问被拒绝")
    LimitExceed      = NewError(10007, "访问限制")
    MethodNotAllowed = NewError(10008, "不支持该方法")
)

type Error struct {
    code int
    msg  string
}

var errorCodes = map[int]string{}

func NewError(code int, msg string) *Error {
    if _, ok := errorCodes[code]; ok {
        panic(fmt.Sprintf("code %d 已经存在"))
    }

    errorCodes[code] = msg

    return &Error{code: code, msg: msg}
}

func (e *Error) Code() int {
    return e.code
}

func (e *Error) Msg() string {
    return e.msg
}

func (e *Error) String() string {
    return fmt.Sprintf("code: %d, msg: %s", e.code, e.msg)
}

// ToRPCCode 自定义错误码转换为RPC识别的错误码,避免返回Unknown状态码
func ToRPCCode(code int) codes.Code {
    var statusCode codes.Code

    switch code {
    case Fail.code:
        statusCode = codes.Internal
    case InvalidParams.code:
        statusCode = codes.InvalidArgument
    case Unauthorized.code:
        statusCode = codes.Unauthenticated
    case NotFound.code:
        statusCode = codes.NotFound
    case DeadlineExceeded.code:
        statusCode = codes.DeadlineExceeded
    case AccessDenied.code:
        statusCode = codes.PermissionDenied
    case LimitExceed.code:
        statusCode = codes.ResourceExhausted
    case MethodNotAllowed.code:
        statusCode = codes.Unimplemented
    default:
        statusCode = codes.Unknown
    }

    return statusCode
}

// ----------------------------------------------------------------------------------

type Status struct {
    *status.Status
}

func FromError(err error) *Status {
    s, _ := status.FromError(err)
    return &Status{s}
}

// ToGRPCStatus 除了原始业务错误码,新增其他说明信息msg,主要给内部客户端
func ToGRPCStatus(err *Error, msg string) *Status {
    s, _ := status.New(ToRPCCode(err.code), msg).WithDetails(&pb.Error{Code: int32(err.code), Message: err.msg})
    return &Status{s}
}

// ToGRPCError 通过Details属性返回错误信息给外部客户端
func ToGRPCError(err *Error) error {
    s, _ := status.New(ToRPCCode(err.code), err.msg).WithDetails(&pb.Error{Code: int32(err.code), Message: err.msg})
    return s.Err()
}

外部客户端可以直接调用 errCode.ToGRPCERROR(errCode.ERROR_LOGIN_DAIL)返回错误信息,而内部客户端获取错误详情代码如下:

    err:=errCode.ToGRPCERROR(errCode.ERROR_LOGIN_DAIL)
    details:=errCode.FromError(err).Details()


3.4 restful接口调用grpc

grpc-proxy是谷歌的协议缓冲区编译器的一个插件 protoc,它读取protobuf服务定义并生成一个反向代理服务器,该服务器将RESTful HTTP API转换为gRPC。该服务器是根据google.api.http 服务定义中的注释生成的 。

(1) 安装protoc-gen-grpc-gateway和go库

# 打开grpc-gateway的github官网 https://github.com/grpc-ecosystem/grpc-gateway/releases/tag/v1.16.0,下载系统对应版本
# 安装grpc-gateway
wget https://github.com/grpc-ecosystem/grpc-gateway/releases/download/v1.16.0/protoc-gen-grpc-gateway-v1.16.0-windows-x86_64.exe -O protoc-gen-grpc-gateway.exe
# 把protoc-gen-grpc-gateway.exe移动到$GOROOT/bin目录下

# 安装go语言使用的grpc-gateway库
wget https://github.com/grpc-ecosystem/grpc-gateway/archive/v1.16.0.zip
# 解压后,把目录改名为grpc-gateway,然后把grpc-gateway移动到$GOPATH/src/github.com/grpc-ecosystem/
# (可选)把grpc-gateway/third_party/googleapis/google/下使用的第三方proto文件复制到$GOROOT/bin/include/google/目录下,主要为了让protoc使用默认路径就可以使用第三方proto文件,如果不复制,需要-I参数来指定目录


syntax = "proto3";

package proto;

// 这里导入的annotations.proto文件在$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis/google/api目录下,在安装grpc-gateway的go库的目录
import "google/api/annotations.proto";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      post: "/v1/sayHello"
      body: "*"
    };
  }
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}


执行命令生成.bp.go和.pb.gw.go文件

# 生成bp.go文件
protoc --go_out=plugins=grpc:. .\proto\*.proto

# 生成pb.gw.go文件
protoc --proto_path=$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --grpc-gateway_out=logtostderr=true:. .\proto\*.proto
# 如果文件移动到了$GOROOT/bin/include/google/目录下,就不需要导入proto文件路径了
protoc --grpc-gateway_out=logtostderr=true:. .\proto\*.proto


服务端程序:

package main

import (
    "context"
    "fmt"
    "net"
    "net/http"

    pb "grpc/http2grpc/unary/proto"

    "github.com/grpc-ecosystem/grpc-gateway/runtime"
    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
)

const (
    httpAddr = "127.0.0.1:8080"
    grpcAddr = "127.0.0.1:9090"
)

type GreeterServer struct{}

func (g *GreeterServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
    return &pb.HelloReply{Message: "Hello " + req.Name}, nil
}

// 启动grpc服务
func grpcRun() {
    server := grpc.NewServer()
    pb.RegisterGreeterServer(server, &GreeterServer{})
    reflection.Register(server) // 注册给反射服务,通过grpcurl工具可以调试接口
    list, err := net.Listen("tcp", grpcAddr)
    if err != nil {
        panic(err)
    }

    fmt.Println("start up grpc server ", grpcAddr)
    err = server.Serve(list)
    if err != nil {
        panic(err)
    }
}

// 启动web服务
func httpRun() {
    mux := runtime.NewServeMux()
    opts := []grpc.DialOption{grpc.WithInsecure()}
    err := pb.RegisterGreeterHandlerFromEndpoint(context.Background(), mux, grpcAddr, opts)
    if err != nil {
        panic(err)
    }

    fmt.Println("start up web server ", httpAddr)
    err = http.ListenAndServe(httpAddr, mux)
    if err != nil {
        panic(err)
    }
}

func main() {
    go grpcRun()

    httpRun()
}


验证:

# http 请求
curl -XPOST http://127.0.0.1:8080/v1/sayHello -d '{"name":"http"}'

# grpc 请求
grpcurl -plaintext  -d '{"name":"grpc"}' 127.0.0.1:9090 proto.Greeter.SayHello


3.5 生成swagger接口文档

(1) 安装插件 protoc-gen-swagger

# 下载
wget https://github.com/grpc-ecosystem/grpc-gateway/releases/download/v1.16.0/protoc-gen-swagger-v1.16.0-windows-x86_64.exe -O protoc-gen-swagger.exe

# 把protoc-gen-swagger.exe移动到$GOROOT/bin/目录下


(2) 下载swagger UI文件

# 在$GOROOT/bin/include/目录下新建swagger目录
mkdir -p $GOROOT/bin/include/swagger

# 下载
wget https://github.com/swagger-api/swagger-ui/archive/v3.37.0.zip

# 解压,把swagger-ui里的dist目录下所有文件(不包括dist目录)移动到目录$GOROOT/bin/include/swagger-ui/


(3) 安装go-bindata和go-bindata-assetfs库

go-bindata工具主要为了将swagger-ui静态文件转为go代码,go-bindata-assetfs库是为了使外部访问能够swagger UI。

go get -u github.com/go-bindata/go-bindata/...
go get -u github.com/elazarl/go-bindata-assetfs/...


(4) 将swagger静态资源转为go代码

# 创建目录swagger-ui和swagger,其中swagger-ui存放dist目录下所有静态文件,swagger存放把静态文件转换后的go文件
mkdir -p pkg/swagger-ui pkg/swagger

# 转换为go
go-bindata --nocompress -pkg=swagger -o=pkg/swagger/data.go pkg/swagger-ui/...


(5) 测试swagger UI服务

把转换后的data.go文件复制到当前swagger目录下。

package main

import (
    "fmt"
    "net/http"

    "grpc/swagger-ui/swagger"

    assetfs "github.com/elazarl/go-bindata-assetfs"
)

func main() {
    mux := http.NewServeMux()

    prefix := "/swagger-ui/"
    fileServer := http.FileServer(&assetfs.AssetFS{
        Asset:    swagger.Asset, // 由于ide原因,这里显示为红色,不需要理会
        AssetDir: swagger.AssetDir,
        Prefix:   "pkg/swagger-ui",
    })
    mux.Handle(prefix, http.StripPrefix(prefix, fileServer))

    swaggerAddr := "127.0.0.1:7070"
    fmt.Println("start up swagger server ", swaggerAddr)
    err := http.ListenAndServe(swaggerAddr, mux)
    if err != nil {
        panic(err)
    }
}

启动服务,在浏览器访问 http://127.0.0.1:7070/swagger-ui/ ,如果能看到界面,说明swagger服务正常,这个界面默认读取的是swagger.json的远程地址(https://petstore.swagger.io/v2/swagger.json)。


(5) 生成本地swagger描述文件和测试调用接口

# 生成*.swagger.json文件
protoc --swagger_out=logtostderr=true:. .\proto\*.proto

一个完整的http+grpc+swagger程序,目录结构如下:

.
├── pkg
│   └── swagger
│       └── data.go
├── proto
│   ├── hello.pb.go
│   ├── hello.pb.gw.go
│   ├── hello.proto
│   └── hello.swagger.json
└── server
    └── main.go


服务端程序如下:

package main

import (
    "context"
    "fmt"
    "net"
    "net/http"
    "path"
    "strings"

    "grpc/demo/pkg/swagger"
    pb "grpc/demo/proto"

    assetfs "github.com/elazarl/go-bindata-assetfs"
    "github.com/grpc-ecosystem/grpc-gateway/runtime"
    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
)

const (
    httpAddr = "127.0.0.1:8080"
    grpcAddr = "127.0.0.1:9090"
)

type GreeterServer struct{}

func (g *GreeterServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
    return &pb.HelloReply{Message: "Hello " + req.Name}, nil
}

// 启动grpc服务
func grpcRun() {
    server := grpc.NewServer()
    pb.RegisterGreeterServer(server, &GreeterServer{})
    reflection.Register(server) // 注册给反射服务,通过grpcurl工具可以调试接口
    list, err := net.Listen("tcp", grpcAddr)
    if err != nil {
        panic(err)
    }

    fmt.Println("start up grpc server ", grpcAddr)
    err = server.Serve(list)
    if err != nil {
        panic(err)
    }
}

// 启动web服务
func httpRun() {
    gwMux := runtime.NewServeMux()
    opts := []grpc.DialOption{grpc.WithInsecure()}
    err := pb.RegisterGreeterHandlerFromEndpoint(context.Background(), gwMux, grpcAddr, opts)
    if err != nil {
        panic(err)
    }

    // web服务
    mux := http.NewServeMux()
    mux.Handle("/", gwMux)

    prefix := "/swagger-ui/"
    fileServer := http.FileServer(&assetfs.AssetFS{
        Asset:    swagger.Asset,
        AssetDir: swagger.AssetDir,
        Prefix:   "pkg/swagger-ui",
    })
    mux.Handle(prefix, http.StripPrefix(prefix, fileServer))

    // helloSwagger 文档
    mux.HandleFunc("/swagger/", helloSwagger)

    fmt.Println("start up web server ", httpAddr)
    err = http.ListenAndServe(httpAddr, mux)
    if err != nil {
        panic(err)
    }
}

func helloSwagger(w http.ResponseWriter, r *http.Request) {
    if !strings.HasSuffix(r.URL.Path, "swagger.json") {
        http.NotFound(w, r)
        return
    }

    p := strings.TrimPrefix(r.URL.Path, "/swagger/")
    p = path.Join("proto", p) // 指定生成*.swagger.json所在路径
    fmt.Println(p)

    http.ServeFile(w, r, p)
}

func main() {
    go grpcRun()

    httpRun()
}

启动服务,在浏览器访问 http://127.0.0.1:8080/swagger-ui/ ,把 http://127.0.0.1:8080/swagger/hello.swagger.json 复制到swagger界面执行,就可以执行接口测试了。

注:hello.swagger.json中的schemes字段值为空,在swagger测试时默认使用https,回导致本来http接口无法访问,所以需要在生成的*.swagger.json文档中手动加入schemes字段,这样可以选择https或http来测试接口。

  "schemes":[
    "https",
    "http"
  ],


3.6 grpc拦截器

grpc拦截器(Interceptor)可以在每一个RPC方法的前面或后面做统一的特殊处理,并且不直接侵入业务代码, 例如鉴权校验、超时控制、日志记录、链路跟踪等。拦截器的类型分为两种:

  • 一元拦截器(UnaryInterceptor):拦截和处理一元RPC调用。
  • 流拦截器(StreamInterceptor):拦截和处理流式RPC调用。

由于客户端和服务端有各自的一元拦截器和流拦截器, 因此,在gRPC中, 也可以说共有四种类型的拦截器。

  • 服务端一元拦截器(StreamServerInterceptor)
  • 服务端流拦截器(StreamServerInterceptor)
  • 客户端一元拦截器(UnaryClientInterceptor)
  • 客户端流拦截器(StreamClientInterceptor)

因为grpc拦截器类型不能重复,当需要多个拦截器时,借助go-grpc-middleware库来实现,安装库

go get -u github.com/grpc-ecosystem/go-grpc-middleware

安装多个拦截器示例

package main

import (
    "context"
    "fmt"
    "time"
    "net"

    grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"

    pb "grpc/interceptor/proto"

    "google.golang.org/grpc"
)

type GreeterServer struct{}

func (g *GreeterServer) SayHello(ctx context.Context, r *pb.HelloRequest) (*pb.HelloReply, error) {
    fmt.Println("receive req: ", r.Name)
    return &pb.HelloReply{Message: "hello " + r.Name}, nil
}

// 拦截器1
func interceptor1(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    fmt.Println("interceptor1 请求前处理")
    resp, err := handler(ctx, req)
    time.Sleep(time.Second)
    fmt.Println("interceptor1 请求完毕后,处理结束")
    return resp, err
}

// 拦截器2
func interceptor2(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    fmt.Println("interceptor2 请求前处理")
    resp, err := handler(ctx, req)
    time.Sleep(time.Second)
    fmt.Println("interceptor2 请求完毕后,处理结束")
    return resp, err
}

func main() {
    // 拦截器
    opts := []grpc.ServerOption{
        grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(interceptor1, interceptor2)),
    }

    server := grpc.NewServer(opts...)
    pb.RegisterGreeterServer(server, &GreeterServer{})
    list, err := net.Listen("tcp", "127.0.0.1:8080")
    if err != nil {
        panic(err)
    }
    err = server.Serve(list)
    if err != nil {
        panic(err)
    }
}


(1) grpc服务端常用拦截器

package middleware

import (
    "context"
    "time"

    "github.com/zhufuyi/logger"
    "google.golang.org/grpc"
)

// AccessLog 请求日志
func AccessLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    beginTime := time.Now().Local().Unix()
    logger.Infof("grpc request, method=%s, beginTime=%d, request=%v", info.FullMethod, beginTime, req)

    resp, err := handler(ctx, req)

    endTime := time.Now().Local().Unix()
    logger.Infof("grpc response, method=%s, beginTime=%d, end_time=%d, response=%v", info.FullMethod, beginTime, endTime, resp)
    return resp, err
}

// ErrorLog 错误日志
func ErrorLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    resp, err := handler(ctx, req)
    if err != nil {
        logger.Errorf("grpc error, method=%s, err=%s", info.FullMethod, err.Error())
    }

    return resp, err
}

// Recovery 异常捕获
func Recovery(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    defer func() {
        if e := recover(); e != nil {
            logger.Errorf("grpc recovery, method=%s, message=%v", info.FullMethod, e)
        }
    }()

    return handler(ctx, req)
}


(2) grpc客户端常用拦截器

package middleware

import (
    "context"
    "time"

    grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
)

// 默认超时
func defaultContextTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
    var cancel context.CancelFunc
    if _, ok := ctx.Deadline(); !ok {
        ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
    }

    return ctx, cancel
}

// ContextTimeout 一元调用超时中间件
func ContextTimeout() grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        ctx, cancel := defaultContextTimeout(ctx)
        if cancel != nil {
            defer cancel()
        }
        return invoker(ctx, method, req, resp, cc, opts...)
    }
}

// StreamContextTimeout 流式调用超时中间件
func StreamContextTimeout() grpc.StreamClientInterceptor {
    return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
        ctx, cancel := defaultContextTimeout(ctx)
        if cancel != nil {
            defer cancel()
        }
        return streamer(ctx, desc, cc, method, opts...)
    }
}

// Retry 重试,可以根据自己定义的错误码重试
func Retry() grpc.UnaryClientInterceptor {
    return grpc_retry.UnaryClientInterceptor(
        grpc_retry.WithMax(2),
        grpc_retry.WithCodes(codes.Unknown, codes.Internal, codes.DeadlineExceeded),
    )
}


3.7 在grpc调用中传递metadata数据

在HTTP/1.1中,通常通过Header来传递数据,对于grpc(HTTP/2)来说,很少直接用Header传递数据,一般使用metadata来传递和操作数据,metadata是一个map结构(map[string][]string),共有两种创建方式:

  • 直接使用函数 metadata.New(map[string]string{})
  • 直接调用函数 metadata.Pairs(key,value),默认会把key转为小写,如果key相同,会追加到对应key的[]string上

在grpc中,为了防止metadata从入站rpc直接转发到出站rpc情况,因此metadata分为传入和传出两种:

  • metadata.NewIncomingContext: 创建一个附加了传入metadata的新上下文,仅供自身的grpc使用。
  • metadata.NewOutgoingContext: 创建一个附加了传出metadata的新上下文,仅供外部的grpc使用。

在grpc中,metadata是存储在context的,context中的数据是在请求的Header中的,因此通过Header可以看到metadata数据。

客户端传入自定义metadata信息示例:

func SayHello(ctx contex.Contex,client pb.GreeterClient) error {
    kv := []string{"color1", "red", "color2", "green", "color3", "blue"}
    newCtx := metadata.AppendToOutgoingContext(ctx, kv...) // 这里使用Apend方法,如果直接创建,会导致原来metadata被替换

    resp, err := client.SayHello(newCtx, &pb.HelloRequest{Name: "zhangsan"})
    if err != nil {
        return err
    }

    fmt.Println("resp:", resp.Message)
    return nil
}


服务端读出自定义metadata信息示例:

func (g *GreeterServer) SayHello(ctx context.Context, r *pb.HelloRequest) (*pb.HelloReply, error) {
    md, _ := metadata.FromIncomingContext(ctx)
    fmt.Printf("metadata: %+v\n", md)

    fmt.Println("receive req: ", r.Name)

    return &pb.HelloReply{Message: "hello " + r.Name}, nil
}


3.8 在grpc中使用链路跟踪

在微服务复制的分布式场景下,注入链路追踪是非常重要和必要的。做链路追踪的基本条件是注入追踪信息,而最简单的方法就是使用服务端和客户端拦截器组成完整的链路信息,具体如下:

  • 服务端拦截器:从metadata中提取链路信息, 将其设置并追加到服务端的调用上下文中。也就是说,如果发现本次调用并没有上一级的链路信息,那么它将会生成对应的父级信息,自己成为父级;如果发现本次调用存在既有的上一级链路信息,那么它将会根据上一级链路信息进行设置,成为其子级。
  • 客户端拦截器:从调用的上下文中提取链路信息, 并将其作为metadata追加到rpc调用中。

借助OpenTracing API和Jaeger Client两个go库实现与追踪系统对接,安装命令如下:

go get -u -v github.com/opentracing/opentracing-go
go get -v -u https://github.com/jaegertracing/jaeger-client-go

服务端拦截器:

// ServerTracing 链路追踪
func ServerTracing(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        md = metadata.New(nil)
    }

    parentSpanContext, _ := global.Tracer.Extract(opentracing.TextMap, metatext.MetadataTextMap{md})
    spanOpts := []opentracing.StartSpanOption{
        opentracing.Tag{Key: string(ext.Component), Value: "grpc"},
        ext.SpanKindRPCServer,
        ext.RPCServerOption(parentSpanContext),
    }

    span := global.Tracer.StartSpan(info.FullMethod, spanOpts...)
    defer span.Finish()

    ctx = opentracing.ContextWithSpan(ctx, span)

    return handler(ctx, req)
}


客户端拦截器:

// ClientTracing 链路追踪
func ClientTracing() grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        var (
            parentCtx opentracing.SpanContext
            spanOpts  []opentracing.StartSpanOption
        )

        parentSpan := opentracing.SpanFromContext(ctx)
        if parentSpan != nil {
            parentCtx = parentSpan.Context()
            spanOpts = append(spanOpts, opentracing.ChildOf(parentCtx))
        }

        spanOpts = append(spanOpts, []opentracing.StartSpanOption{
            opentracing.Tag{Key: string(ext.Component), Value: "grpc"},
            ext.SpanKindRPCClient,
        }...)

        span := global.Tracer.StartSpan(method, spanOpts...)
        defer span.Finish()

        md, ok := metadata.FromOutgoingContext(ctx)
        if !ok {
            md = metadata.New(nil)
        }

        global.Tracer.Inject(span.Context(), opentracing.TextMap, metatext.MetadataTextMap{md})
        newCtx := opentracing.ContextWithSpan(metadata.NewOutgoingContext(ctx, md), span)

        return invoker(newCtx, method, req, resp, cc, opts...)
    }
}


在服务端和客户端使用链路追踪拦截器之前需要先初始化,初始化代码如下:

package tracer

import (
    "io"
    "time"

    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go/config"
)

func InitJaegerTracer(serviceName string, agentHostPort string) (opentracing.Tracer, io.Closer, error) {
    cfg := &config.Configuration{
        ServiceName: serviceName,
        Sampler:     &config.SamplerConfig{Type: "const", Param: 1},
        Reporter:    &config.ReporterConfig{LogSpans: true, BufferFlushInterval: time.Second, LocalAgentHostPort: agentHostPort},
    }

    tracer, closer, err := cfg.NewTracer()
    if err != nil {
        return nil, nil, err
    }

    opentracing.SetGlobalTracer(tracer)

    return tracer, closer, nil
}


3.9 grpc负载均衡

在分布式系统中,为了实现高可用,通常同一个服务会部署多个,为了使访问流量要均衡分散到多个服务上,需要负载均衡器。

客户端要访问服务端,需要知道服务端ip地址和端口,如果服务数量比较少,并且服务不会频繁更改ip和端口,人还可以处理,如果服务数量多了,通过人工力量处理就非常麻烦了,需要动态获取服务端地址,也就是服务注册与发现,常见角色:

  • 注册中心:承担对服务信息进行注册、协调、管理等工作。
  • 服务提供者(服务端): 暴露特定端口,并提供一个到多个的服务来允许外部访问。
  • 服务消费者(客户端): 调用服务方。

服务注册与发现原理:”服务提供者”在启动服务时会将自己的服务信息(ip地址、端口号、版本号等)注册到”注册中心”。”服务消费者”在进行调用时,会以约定命名标识(如服务名)到”注册中心”查询,发现当前哪些具体的服务可以调用。”注册中心”再根据约定的负载均衡算法进行调度,最终请求到服务提供者。 另外,当”服务提供者”出现问题时,或是当定期的”心跳检测”发现”服务提供者”无正确响应时,那么这个出现问题的服务就会被下线,并标识为不可用。即在启动时上报”注册中心”进行注册,把被检测到出问题的服务下线,以此来维护服务注册和发现。


常见的负载均衡有客户端负载均衡和服务端负载均衡。

(1) 客户端负载均衡

客户端负载是指在调用时,由客户端到”注册中心”对服务提供者进行查询,并获取所需的服务清单。服务清单中包含各个服务的实际信息(如ip地址、端口号、集群命名空间等)。由客户端使用特定的负载均衡策略(如轮询)在服务清单中选择一个或多个服务进行调用。

  • 优点: 高性能、去中心化,并且不需要借助独立的外部负载均衡组件。
  • 缺点: 实现成本较高, 要对不同语言的客户端实现各自对应的SDK及其负载均衡策略。


(2) 服务端负载均衡

服务端负载,又被称为”代理”模式,在服务端侧搭设独立的负载均衡器,负载均衡器再根据给定的目标名称(如服务名)找到适合调用的服务实例,因此它具备负载均衡和反向代理两项功能。

  • 优点: 简单、透明,客户端不需要知道背后的逻辑,只需按给定的目标名称调用、访问即可,由服务端侧管理负载、均衡策略及代理
  • 缺点: 外部的负载均衡器理论上可能成为性能瓶颈,会受到负载均衡器的吞吐率影响,并且与客户端负载相比,有可能出现更高的网络延迟。同时,必须要保持高可用,因为它是整个系统的 关键节点,一旦出现问题,影响非常大。


(3) grpc官方设计思路

  • 客户端根据服务名称发起请求。
  • 名称解析器解析服务名称并返回,服务名称解析成一个或多个ip地址,每个ip都会有标识,标识分为服务端地址、负载均衡地址、客户端使用的负载均衡策略。
  • 客户端根据服务端类型选择相应的策略,如果grpc客户端获取的地址是负载均衡器地址,那么客户端将使用grpclb策略,否则使用服务配置请求的负载均衡策略;如果服务配置未请求负载均衡策略,则客户端默认选择第一个可用的服务端地址。
  • 最后根据不同的策略进行实际调用。

grpc默认支持两种负载均衡算法pick_first 和 round_robin


可能会遇到新版本grpc不兼容etcd库问题,下面使用grpc-1.33.2和etcd-3.4.13库实现grpc服务注册、发现、负载均衡的示例。

grpc服务端代码如下:

package main

import (
    "context"
    "flag"
    "fmt"
    "net"
    "time"

    pb "grpc/loadbalance/proto"

    "go.etcd.io/etcd/clientv3"
    "go.etcd.io/etcd/proxy/grpcproxy"
    "google.golang.org/grpc"
)

const (
    schema      = "grpc"
    authority   = "demo"
    serviceName = "hello-service"
)

var targetAddr = fmt.Sprintf("%s://%s/%s", schema, authority, serviceName)

// InitETCD 连接etcd服务
func InitETCD(endPoints []string) (*clientv3.Client, error) {
    config := clientv3.Config{
        Endpoints:   endPoints,
        DialTimeout: 30 * time.Second,
    }

    return clientv3.New(config)
}

type GreeterServer struct{}

func (g *GreeterServer) SayHello(ctx context.Context, r *pb.HelloRequest) (*pb.HelloReply, error) {
    fmt.Println("receive req: ", r.Name)
    return &pb.HelloReply{Message: "hello " + r.Name}, nil
}

var port string

func init() {
    flag.StringVar(&port, "port", "8080", "service port")
    flag.Parse()
}

func main() {
    serviceAddr := "127.0.0.1:" + port

    etcdCli, err := InitETCD([]string{"192.168.3.5:2379"})
    if err != nil {
        panic(err)
    }
    // 服务注册到etcd
    grpcproxy.Register(etcdCli, targetAddr, serviceAddr, 60)

    server := grpc.NewServer()
    pb.RegisterGreeterServer(server, &GreeterServer{})
    list, err := net.Listen("tcp", serviceAddr)
    if err != nil {
        panic(err)
    }
    fmt.Println("start up grpc server success.")
    err = server.Serve(list)
    if err != nil {
        panic(err)
    }
}


在客户端自定义实现resolver代码如下:

package main

import (
    "context"
    "fmt"
    "strings"
    "sync"

    "go.etcd.io/etcd/clientv3"
    "go.etcd.io/etcd/mvcc/mvccpb"
    "google.golang.org/grpc/resolver"
)

type Builder struct {
    Client *clientv3.Client
}

func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    prefix := targetAddr + "/" // etcd的key前缀

    r := &Resolver{
        Client: b.Client,
        cc:     cc,
        prefix: prefix,
    }

    go r.watcher()
    r.ResolveNow(resolver.ResolveNowOptions{})

    return r, nil
}

func (b *Builder) Scheme() string {
    return schema
}

type Resolver struct {
    sync.RWMutex
    Client    *clientv3.Client
    cc        resolver.ClientConn
    prefix    string
    addresses map[string]resolver.Address
}

// watch 有变化以后会调用
func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
    address := r.getAddresses()
    endpoints := []string{}
    for _, addr := range address {
        endpoints = append(endpoints, addr.Addr)
    }

    fmt.Println("ResolveNow endpoints: ", endpoints)
}

func (r *Resolver) Close() {
    // todo
    fmt.Println("close resolver")
}

func (r *Resolver) watcher() {
    r.addresses = make(map[string]resolver.Address)

    resp, err := r.Client.Get(context.Background(), r.prefix, clientv3.WithPrefix())
    if err != nil {
        fmt.Println("获取服务地址列表失败:", err)
    } else {
        for _, kv := range resp.Kvs {
            r.setAddress(string(kv.Key), string(kv.Value))
        }

        r.cc.UpdateState(resolver.State{
            Addresses: r.getAddresses(),
        })
    }

    watch := r.Client.Watch(context.Background(), r.prefix, clientv3.WithPrefix())
    for response := range watch {
        for _, event := range response.Events {
            switch event.Type {
            case mvccpb.PUT:
                r.setAddress(string(event.Kv.Key), string(event.Kv.Value))
            case mvccpb.DELETE:
                r.delAddress(string(event.Kv.Key))
            }
        }

        r.cc.UpdateState(resolver.State{
            Addresses: r.getAddresses(),
        })
    }
}

func (r *Resolver) setAddress(key, value string) {
    r.Lock()
    defer r.Unlock()

    addr := strings.TrimPrefix(key, r.prefix) // 服务端真实连接地址
    r.addresses[key] = resolver.Address{Addr: addr}
}

func (r *Resolver) getAddresses() []resolver.Address {
    r.Lock()
    defer r.Unlock()

    addresses := []resolver.Address{}
    for _, address := range r.addresses {
        addresses = append(addresses, address)
    }
    return addresses
}

func (r *Resolver) delAddress(key string) {
    r.Lock()
    defer r.Unlock()
    delete(r.addresses, key)
}

客户端代码如下:

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"

    pb "grpc/loadbalance/proto"

    "go.etcd.io/etcd/clientv3"
    "google.golang.org/grpc"
    "google.golang.org/grpc/balancer/roundrobin"
    "google.golang.org/grpc/resolver"
)

const (
    schema      = "grpc"
    authority   = "demo"
    serviceName = "hello-service"
)

var targetAddr = fmt.Sprintf("%s://%s/%s", schema, authority, serviceName)

func InitETCD(endPoints []string) (*clientv3.Client, error) {
    config := clientv3.Config{
        Endpoints:   endPoints,
        DialTimeout: 30 * time.Second,
    }

    return clientv3.New(config)
}

func SayHello(client pb.GreeterClient, i int) error {
    resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "name " + strconv.Itoa(i)})
    if err != nil {
        return err
    }

    fmt.Println("resp:", resp.Message)
    return nil
}

func main() {
    etcdCli, err := InitETCD([]string{"192.168.3.5:2379"})
    if err != nil {
        panic(err)
    }
    builder := &Builder{Client: etcdCli}
    resolver.Register(builder)

    var opts []grpc.DialOption
    opts = append(opts,
        grpc.WithInsecure(),
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)), // 可以自定义负载均衡策略,默认提供的均衡策略在$GOPATH/src/google.golang.org/grpc/balancer目录下
    )

    conn, err := grpc.Dial(targetAddr, opts...)
    if err != nil {
        panic(err)
    }
    client := pb.NewGreeterClient(conn)

    // 调用测试
    for i := 0; i < 100; i++ {
        err = SayHello(client, i)
        if err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}


测试负载均衡:

# 启动三个grpc服务端
server.exe --port=8080
server.exe --port=8081
server.exe --port=8082

# 启动客户端请求
client.exe

# 查看各个服务端打印是否轮流请求

# 关掉一个或两个服务,查看是否影响正常请求


专题「golang相关」的其它文章 »