从零开始的RPC(四):gRPC练习
gRPC构建简易微服务
代码实现查询,客户端向服务端查询用户的信息
定义消息
syntax = "proto3";
package query;
option go_package = "./querypb";
// 用户查询
message UserRequest {
string name = 1;
}
message UserResponse {
int32 id = 1;
string name = 2;
int32 age = 3;
repeated string hobbies = 4;
}
// service定义方法
service UserInfoService {
rpc GetUserInfo(UserRequest) returns (UserResponse);
}生成.go文件
protoc -I . --go_out=. --go-grpc_out=. ./query.proto编写服务端
package main
import (
"context"
"errors"
"from-rpc-to-microservice/querypb"
"log"
"net"
"sync"
"google.golang.org/grpc"
)
var (
ErrUsernameNotExist = errors.New("用户名不存在")
)
// 1.需要监听
// 2.需要实例化gRPC服务端
// 3.在gRPC上注册微服务
// 4.启动服务端
// 实现服务端
type UserInfoService struct {
// 今时不同往日, 现在是要手动嵌入这个结构体来继承那个空方法了
querypb.UnimplementedUserInfoServiceServer
}
func (u UserInfoService) GetUserInfo(ctx context.Context, request *querypb.UserRequest) (*querypb.UserResponse, error) {
name := request.Name
if v, exists := users[name]; exists {
return &v, nil
} else {
return nil, ErrUsernameNotExist
}
}
// 服务端实例
var u = UserInfoService{}
// 模拟数据库
var users = map[string]querypb.UserResponse{
"Tom": {
Id: 1,
Name: "Tom",
Hobbies: []string{"coding", "gaming"},
},
"Jerry": {
Id: 2,
Name: "Jerry",
Hobbies: []string{"gaming", "drawing"},
},
"Mike": {
Id: 3,
Name: "Mike",
Hobbies: []string{"coding", "swimming"},
},
}
func runServer(wg *sync.WaitGroup) {
defer wg.Done()
addr := "127.0.0.1:8080"
listener, err := net.Listen("tcp", addr)
if err != nil {
log.Panicf("TCP监听失败: %v", err)
}
log.Printf("已占用地址: %s", addr)
s := grpc.NewServer() // 实例化gRPC Server
querypb.RegisterUserInfoServiceServer(s, &u)
if err := s.Serve(listener); err != nil {
log.Panicf("启动服务失败: %v", err)
}
}编写客户端
package main
import (
"context"
"from-rpc-to-microservice/querypb"
"log"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// 1.连接服务端
// 2.实例gRPC客户端
// 3.调用
func runClient(wg *sync.WaitGroup) {
defer wg.Done()
// grpc.WithInsecure已经被废弃
// grpc.Dial也被废弃了
conn, err := grpc.NewClient("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panicf("连接服务端失败: %v", err)
}
defer func() {
_ = conn.Close()
log.Printf("已关闭连接")
}()
// 实例化客户端
client := querypb.NewUserInfoServiceClient(conn)
req := &querypb.UserRequest{
Name: "Kirk",
}
resp, err := client.GetUserInfo(context.Background(), req)
if err != nil {
log.Panicf("调用服务端失败: %v", err)
}
log.Printf("服务端返回: %v", resp)
}
gRPC API 单元编写练习
optional字段测试
注
optional: (recommended) An optional field is in one of two possible states:
- the field is set, and contains a value that was explicitly set or parsed from the wire. It will be serialized to the wire.
- the field is unset, and will return the default value. It will not be serialized to the wire.


不传值的时候可以是nil(最好把数据模型里的类型也换成*类型,这样Gorm扫描时可以忽略nil值);有传值的时候才能被序列化
业务逻辑根据自己需要编写,反正只是测试
gRPC API 情景练习
热插拔登录验证项目
由Gemini设计
情景描述
这个场景不需要真实的业务逻辑,核心在于如何优雅地表达“多种可能性”。
挑战点:
定义一个
LoginRequest,它必须支持:用户名密码登录、手机验证码登录、第三方 Token 登录(使用oneof实现互斥)。嵌套一个
DeviceInfo消息,包含设备 ID、型号和地理位置(练习nested message和optional字段)。进阶:实现一个
Streaming接口,模拟服务器实时推送“异地登录告警”。
消息嵌套与oneof修饰
message DeviceInfo {
optional string device_id = 1; // 设备ID
optional string brand = 2; // 设备型号
string location = 3; // 地理位置
}
// 使用用户名密码登录
message LoginWithPasswd {
string username = 1;
string password = 2;
}
// 使用手机验证码登录
message LoginWithPhone {
string phone_number = 1;
string code = 2; // 验证码
}
// 第三方Token登录
message LoginWithToken {
string api = 1; // 第三方API (目前仅以URL作为调试参照)
string token = 2;
}
// 用于登录请求
message LoginRequest {
// 登录方式, 互斥
// 若使用enum, 则无法为任一登录方式单独设置字段
oneof login_type {
LoginWithPasswd username_password = 1; // 用户名密码登录
LoginWithPhone phone_code = 2; // 手机验证码登录
LoginWithToken third_party_token = 3; // 第三方 Token 登录
}
DeviceInfo device_info = 4; // 客户端提交, 服务端记录设备信息并监控登录消息
}消息嵌套
在一个消息里套另一个消息:
message DeviceInfo {
optional string device_id = 1; // 设备ID
optional string brand = 2; // 设备型号
string location = 3; // 地理位置
}
// 用于登录请求
message LoginRequest {
DeviceInfo device_info = 4; // 客户端提交, 服务端记录设备信息并监控登录消息
}在LoginRequest里将DeviceInfo用作字段类型,这便是消息嵌套
oneof
必须支持三种登录方式,但又不能都启用,这时可以使用oneof修饰这些业务关系上互斥的字段:
// 使用用户名密码登录
message LoginWithPasswd {
}
// 使用手机验证码登录
message LoginWithPhone {
}
// 第三方Token登录
message LoginWithToken {
}
// 用于登录请求
message LoginRequest {
// 登录方式, 互斥
// 若使用enum, 则无法为任一登录方式单独设置字段
oneof login_type {
LoginWithPasswd username_password = 1; // 用户名密码登录
LoginWithPhone phone_code = 2; // 手机验证码登录
LoginWithToken third_party_token = 3; // 第三方 Token 登录
}
}具体的登录消息可暂时为空消息,便于后续特化填充
enum VS oneof
| Feature | Enum | Oneof |
|---|---|---|
| Purpose | To represent a fixed set of predefined, named integer values (e.g., status codes, days of the week, user roles). | To allow a field to hold different data types or messages, but only one at a time (e.g., a message that could contain a string, an int32, or another custom message as its "value"). |
| Data Type | Represents a single integer value internally (typically non-negative for efficiency). | Acts as a discriminated union, capable of holding different data types. |
| Schema Evolution | Adding new enum values is backward-compatible. Renaming or removing values requires careful management (using reserved keywords). | Adding new fields to a oneof is generally backward-compatible. It offers more flexibility for evolving the schema with different types of detailed configurations. |
| Memory Usage | Uses minimal memory, essentially storing a single integer value. | The generated message allocates memory only for the largest field within the oneof block, saving space when only one field is needed at a time. |
| Presence Checking | In proto3, by default, checking if an enum field is set (present) is not always straightforward, as it defaults to the 0 value if unset. | oneof fields explicitly expose presence, allowing the generated code to offer methods to check which field (if any) is currently set. |
oneof的特性
oneof只能存储一个值,如果赋值多个值,则只有最后一个值是有效的,其他值都会被自动清空oneof不可以和repeated共用oneof中的字段ID不是单独计算的,不能在oneof外面、消息里面复用oneof里的ID
判断oneof中的字段类型
func (s *AuthorizationService) Login(ctx context.Context, req *login.LoginRequest) (*login.DeviceInfo, error) {
if loginType := req.GetLoginType(); loginType == nil {
return nil, ErrEmptyLoginRequest
} else {
switch loginType.(type) {
case *login.LoginRequest_UsernamePassword:
log.Printf("使用用户名+密码登录, 请求内容为: %+v", req.GetUsernamePassword())
return loginWithPassword(ctx, req)
case *login.LoginRequest_PhoneCode:
log.Printf("使用手机号+验证码登录, 请求内容为: %+v", req.GetPhoneCode())
return loginWithPhone(ctx, req)
case *login.LoginRequest_ThirdPartyToken:
log.Printf("使用第三方Token登录, 请求内容为: %+v", req.GetThirdPartyToken())
return loginWithThirdParty(ctx, req)
default:
return nil, ErrLoginTypeNotSupported
}
}
}- 使用
req.GetXXXYYY获取该API中使用的oneof字段类型XXXYYY即该字段的命名,下划线全拼xxx_yyy转驼峰首字母大写XXXYYY - 先判空,如果客户端没有传值,那整个字段都不会被序列化,因此为
nil - 如果不为空,再用
switch loginType.(type)断言判断具体的类型
如果不判空就断言会nil pointer panic
由于LoginType这个字段是可导出的,所以也可以通过req.LoginType拿到业务值类型,而GetLoginType只是多了个判空处理:
func (x *LoginRequest) GetLoginType() isLoginRequest_LoginType {
if x != nil {
return x.LoginType
}
return nil
}非oneof的类型则还会有一步类型断言:
func (x *LoginRequest) GetUsernamePassword() *LoginWithPasswd {
if x != nil {
if x, ok := x.LoginType.(*LoginRequest_UsernamePassword); ok {
return x.UsernamePassword
}
}
return nil
}错误处理
在 gRPC 中,错误是一等公民,拥有一套标准化的模型。gRPC的错误类型分为标准错误模型、富错误模型(Rich Error Model) 两种
gRPC API无论是否为流式API,都会返回error
API当然可以接受errors.New的值,但标准库的error只有Error() string一个方法,无法描述错误详情
gRPC 最基础的错误表现形式是标准错误模型 (Standard Error Model) ,每个 gRPC 调用(无论是否流式)在结束时都会返回两个核心信息:
- Status Code (状态码):一个整数枚举,定义在 codes 包中。总共有 16 种标准代码(如
OK,NotFound,PermissionDenied,Internal等)。 - Status Message (错误消息):一个面向开发者的字符串,解释错误原因。
局限性:它不支持传递结构化的数据(比如“到底是哪个字段验证失败了?”)。
为了解决标准模型信息太少的问题,Google 提出了富错误模型。它基于 google/rpc/status.proto 定义。
模型包含三个字段:
- Code: 整数状态码。
- Message: 错误描述字符串。
- Details: 一个 Any 类型的数组。这是最强大的地方,它可以携带任何序列化后的 Protobuf 消息。
常见的 Detail 类型(Google 预定义):
BadRequest: 包含字段级别的验证错误(Field Violations)。RetryInfo: 告诉客户端多久后可以重试。QuotaFailure: 告诉客户端配额耗尽的具体原因。DebugInfo: 包含堆栈追踪信息(生产环境通常关闭)。
参考示例
服务端参考error handling:
不要直接返回 errors.New(),因为那会被 gRPC 识别为 Unknown 状态码。应该使用 status 包
// 1. 返回标准错误
return status.Error(codes.NotFound, "用户不存在")
// 2. 返回富错误 (带详情)
st := status.New(codes.InvalidArgument, "参数校验失败")
desc := &errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequest_FieldViolation{
{Field: "username", Description: "用户名不能为空"},
},
}
st, _ = st.WithDetails(desc)
return st.Err()客户端参考error resolving:
客户端需要判断错误类型并提取详情
res, err := client.GetUser(ctx, req)
if err != nil {
// 将普通 error 转换为 status 对象
st, ok := status.FromError(err)
if ok {
fmt.Printf("状态码: %v, 消息: %v\n", st.Code(), st.Message())
// 提取富错误详情
for _, detail := range st.Details() {
switch t := detail.(type) {
case *errdetails.BadRequest:
for _, v := range t.GetFieldViolations() {
fmt.Printf("字段错误: %s - %s\n", v.Field, v.Description)
}
}
}
}
}注意事项
在流式(Streaming)RPC 中,错误处理更加特殊:
- 发送/接收阶段:如果在流中间发生错误(例如
Recv()失败),该错误通常意味着整个流已经关闭 - EOF 不是错误:在读取流时,
io.EOF表示对方已经正常关闭了发送端,这在 gRPC 中不被视为传统意义上的 RPC 错误。 - 尾随元数据 (Trailing Metadata):gRPC 的状态码和错误消息实际上是放在 HTTP/2 的 Trailers(报尾)中发送的。这意味着只有在所有数据帧传输完成后,客户端才能拿到最终的错误状态。
Best Practices
不要在 Message 中传递敏感信息:Message 可能会被打印到日志或显示给用户,不要包含数据库密码、内部堆栈等。
优先使用标准状态码:不要为了区分细微的业务逻辑而自定义状态码,尽量映射到已有的 16 个代码(如业务逻辑不符统一用
FailedPrecondition)。合理利用 Details:
前端表单验证失败 -> 使用
BadRequest。接口限流 -> 使用
QuotaFailure并配合RetryInfo。
区分可重试错误:
Unavailable和Internal(有时)是可重试的。
拦截器集成:在服务端定义全局拦截器(Interceptor),统一捕获 Panic 并将其转换为 Internal 错误,防止服务崩溃且能给客户端友好反馈。
gRPC流式API (服务端推送API)
服务端:
send发送(单条)消息- API本身就是阻塞的,进行到
return语句后流传输也就结束了
客户端: Recv接收(单条)消息- 一直到
io.EOF后即为传输结束
- 一直到
proto参考定义:
syntax = "proto3";
package chat;
service ChatService {
rpc GetMessageHistory(ChatRequest) returns (stream Message);
}
message ChatRequest {
string user_id = 1;
int32 limit = 2;
}
message Message {
string id = 1;
string sender = 2;
string content = 3;
int64 timestamp = 4;
}服务端参考实现:
func (s *ChatServer) GetMessageHistory(req *chat.ChatRequest, stream grpc.ServerStreamingServer[chat.Message]) error {
messages, err := s.messageStore.GetRecentMessages(req.UserId, req.Limit)
if err != nil {
return err
}
for _, msg := range messages {
if err := stream.Send(msg); err != nil {
return err
}
// 可选:添加延迟以控制发送频率
time.Sleep(100 * time.Millisecond)
}
return nil
}客户端参考实现:
stream, err := client.GetMessageHistory(context.Background(), &chat.ChatRequest{
UserId: "user123",
Limit: 10,
})
if err != nil {
log.Fatalf("Error getting message history: %v", err)
}
for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
log.Fatalf("Error receiving message: %v", err)
}
fmt.Printf("Received message: %s\n", msg.Content)
}gRPC流式API中的泛型接口
2024 年初(主要以 gRPC-Go v1.61.0 版本为里程碑),gRPC 官方对 Go 语言泛型(Go 1.18+)进行了全面适配。
在 v1.61.0 及后续版本中,gRPC-Go 在核心包中引入了泛型接口
// 客户端泛型接口
// 位于 google.golang.org/grpc 包中
type ClientStream[Req any, Res any] interface {
Send(Req) error
Recv() (Res, error)
Header() (metadata.MD, error)
Trailer() metadata.MD
CloseSend() error
Context() context.Context
}// 服务端泛型接口
type ServerStreamingServer[Res any] interface {
Send(*Res) error
ServerStream
}
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
Context() context.Context
SendMsg(m any) error
RecvMsg(m any) error
}
可观测性指标上报系统 (练 map 和流通信)
proto定义
syntax = "proto3";
package metric;
option go_package = "./pb/metric";
message Empty {}
message Metric {
// labels 存储不固定的维度, 如host、app_name、instance_id
map <string, string> labels = 1;
sint64 delay = 2; // 因为要检测负数情况, 所以特意用优化过的sint存储延迟指标值
float packet_loss = 3; // 丢包率
}
message MetricReport {
Metric metrics = 1;
}
message MetricStatistic {
map <string, float> averages = 1; // 不同指标的平均值
map <string, float> maxes = 2; // 不同指标各自的最大值
}
service MonitorService {
rpc Welcome(Empty) returns (Empty);
rpc Report(stream MetricReport) returns (MetricStatistic);
}客户端流式API
客户端:
Send方法发送单条消息CloseAndRecv关闭流写入并接收响应
**服务端:Recv发送接收单条消息io.EOF通常视为客户端结束数据传输
SendAndClose发送消息并关闭流读取
type ClientStreamingServer[Req any, Res any] interface {
// Recv receives the next request message from the client. The server may
// repeatedly call Recv to read messages from the request stream. If
// io.EOF is returned, it indicates the client called CloseAndRecv on its
// ClientStreamingClient. Any other error indicates the stream was
// terminated unexpectedly, and the handler method should return, as the
// stream is no longer usable.
Recv() (*Req, error)
// SendAndClose sends a single response message to the client and closes
// the stream. This method must be called once and only once after all
// request messages have been processed. Recv should not be called after
// calling SendAndClose.
SendAndClose(*Res) error
// ServerStream is embedded to provide Context, SetHeader, SendHeader, and
// SetTrailer functionality. No other methods in the ServerStream should
// be called directly.
ServerStream
}客户端实际实现:
var maxCycles = 100 // 总共要发送多少条数据
interval := 50 * time.Millisecond
for i := 0; i < maxCycles; i++ {
msg := &metric.MetricReport{
Metrics: &metric.Metric{
Labels: map[string]string{
"host": hosts[rand.Intn(totalRandomData)], // "192.168.100.1", //
"app_name": appNames[rand.Intn(totalRandomData)], // "app.example.com", //
"instance_id": instanceIds[rand.Intn(totalRandomData)], // "docker-1", //
},
Delay: rand.Int63n(1000),
PacketLoss: rand.Float32(),
},
}
// t.Logf("正在发送第%d条数据: \n%#v", i+1, msg.GetMetrics().GetLabels())
if err := stream.Send(msg); err != nil {
t.Fatalf("无法发送数据: %v", err)
}
// t.Logf("已发送第%d条数据: \n%#v", i+1, msg.GetMetrics().GetLabels())
time.Sleep(interval)
}
resp, err := stream.CloseAndRecv()
if err != nil {
t.Fatalf("无法接收响应: %v", err)
}富错误类型
服务端实际实现:
- 其中的
ErrNegativeDelay需要是status.New生成的,才有WithDetails这个方法
// 取出数值放入切片中
if sint64Slice, exists := sint64Cache[joinedLabels.String()]; !exists {
sint64Slice = make([]int64, 0)
sint64Cache[joinedLabels.String()] = sint64Slice
}
if num := resp.GetMetrics().GetDelay(); num >= 0 {
sint64Cache[joinedLabels.String()] = append(sint64Cache[joinedLabels.String()], num)
log.Printf("[%v] 已读取到延迟数值: %v ms", idx, num)
} else {
// 准备错误详情
desc := &errdetails.BadRequest_FieldViolation{Field: "delay", Description: "延迟不能小于0"}
st, _ := ErrNegativeDelay.WithDetails(desc)
return st.Err()
}客户端实际实现:
// 流式API里的
_, err = stream.CloseAndRecv()
if err != nil {
if errors.Is(err, ErrNegativeDelay.Err()) || strings.Contains(err.Error(), "延迟不能小于0") {
t.Logf("已接收到延迟为负数的错误: %v", err)
// 无法接收响应: rpc error: code = InvalidArgument desc = 延迟不能小于0
return
}
t.Fatalf("无法接收响应: %v", err)
}已接收到延迟为负数的错误: rpc error: code = InvalidArgument desc = 延迟不能小于0流式拦截器
服务端参考实现:
// 1. 定义一个包装器,嵌入原有的 ServerStream
type wrappedStream struct {
grpc.ServerStream
}
// 2. 重写 RecvMsg 方法
func (w *wrappedStream) RecvMsg(m any) error {
err := w.ServerStream.RecvMsg(m)
log.Printf("流收到消息: %T", m)
return err
}
// 3. 拦截器实现
func myStreamServerInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 使用包装器替换原始流
wrapped := &wrappedStream{ss}
return handler(srv, wrapped)
}
// 注册:
s := grpc.NewServer(grpc.StreamInterceptor(myStreamServerInterceptor))实际实现:
- 懒得对
RecvMsg的参数进行断言,索性直接进行JSON序列化。(Protobuf序列化需要有Reset等方法,而any作为空接口没有这些方法)- 建议使用
proto.Size(m.(proto.Message)),原生支持,不需要进行序列化
- 建议使用
// 定义服务端流式拦截器,用于流量和请求耗时计算
// 定义一个包装器,封装原有的ServerStream
type wrappedServerStream struct {
grpc.ServerStream
}
// RecvMsg 重写RecvMsg方法
//
// 每接收到一条消息都会调用一次这个方法
// 重写后的方法包含使用json marshal的反射+字节计算逻辑
func (s *wrappedServerStream) RecvMsg(m interface{}) error {
// start := time.Now()
err := s.ServerStream.RecvMsg(m)
if err != nil {
return err
}
// log.Println("Recv", m, "took", time.Since(start))
byteData, _ := json.Marshal(m)
log.Printf("接收到共计%v字节数据(以JSON格式)", len(byteData))
return nil
}
// 流式拦截器会在流建立的一瞬间被调用
func timeCostStreamInterceptor(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Printf("API名称: %v", info.FullMethod)
// 创建一个包装器,将原始的ServerStream传入
wrapped := &wrappedServerStream{ss}
// 调用原始的StreamHandler方法
start := time.Now()
err := handler(srv, wrapped)
// handler是阻塞式的, 要进行完所有传输才会走到下面的流程
log.Printf("API名称: %v, 传输耗时: %v 毫秒", info.FullMethod, time.Since(start).Milliseconds())
// FullMethod格式: /package.Service/Method
// 例: /metric.MonitorService/Report
return err
}
// (一元)服务端拦截器
func welcomeInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
log.Printf("欢迎访问[%v]API", info.FullMethod)
return handler(ctx, req)
}
模拟任务调度
双向流式API
双向API需要的StreamServer
// BidiStreamingServer represents the server side of a bidirectional-streaming
// (many requests, many responses) RPC. It is generic over both the type of the
// request message stream and the type of the response message stream. It is
// used in generated code.
//
// To terminate the stream, return from the handler method and return
// an error from the status package, or use nil to indicate an OK status code.
type BidiStreamingServer[Req any, Res any] interface {
// Recv receives the next request message from the client. The server may
// repeatedly call Recv to read messages from the request stream. If
// io.EOF is returned, it indicates the client called CloseSend on its
// BidiStreamingClient. Any other error indicates the stream was
// terminated unexpectedly, and the handler method should return, as the
// stream is no longer usable.
Recv() (*Req, error)
// Send sends a response message to the client. The server handler may
// call Send multiple times to send multiple messages to the client. An
// error is returned if the stream was terminated unexpectedly, and the
// handler method should return, as the stream is no longer usable.
Send(*Res) error
// ServerStream is embedded to provide Context, SetHeader, SendHeader, and
// SetTrailer functionality. No other methods in the ServerStream should
// be called directly.
ServerStream
}和StreamClient
// BidiStreamingClient represents the client side of a bidirectional-streaming
// (many requests, many responses) RPC. It is generic over both the type of the
// request message stream and the type of the response message stream. It is
// used in generated code.
type BidiStreamingClient[Req any, Res any] interface {
// Send sends a request message to the server. The client may call Send
// multiple times to send multiple messages to the server. On error, Send
// aborts the stream. If the error was generated by the client, the status
// is returned directly. Otherwise, io.EOF is returned, and the status of
// the stream may be discovered using Recv().
Send(*Req) error
// Recv receives the next response message from the server. The client may
// repeatedly call Recv to read messages from the response stream. If
// io.EOF is returned, the stream has terminated with an OK status. Any
// other error is compatible with the status package and indicates the
// RPC's status code and message.
Recv() (*Res, error)
// ClientStream is embedded to provide Context, Header, Trailer, and
// CloseSend functionality. No other methods in the ClientStream should be
// called directly.
ClientStream
}- 服务端和客户端自己分别起协程跑各自的
Recv和Send;这俩方法都是阻塞式的,所以我多起了一个协程专门把recv的返回值导入Channel里,交给sender协程去消费
(客户端)富错误处理
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Canceled {
log.Printf("任务监听已取消")
quit <- struct{}{}
return
} else if st.Code() == codes.InvalidArgument {
log.Printf("任务参数错误: %v", err)
quit <- struct{}{}
return
}
}对于grpc的错误类型,需要使用status.FromError拿到原始类型再做判断,而不可使用标准库的errors.Is
客户端流式拦截器
需要实现StreamClientInterceptor函数类型:
// StreamClientInterceptor may return a custom ClientStream to intercept all I/O
// operations. The returned error must be compatible with the status package.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)然后在新建客户端实例时调用grpc.WithStreamInterceptor或grpc.WithChainStreamInterceptor时把拦截i器注册上去:
conn, err := grpc.NewClient(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStreamInterceptor(ClientIdentityInterceptor),
)
if err != nil {
log.Fatalf("无法连接: %v", err)
}参考实现如下:
// ClientIdentityInterceptor (在流建立的一瞬间)注入客户端身份标识(由faker库生成)
func ClientIdentityInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
clientId := fmt.Sprintf("Client-%v", faker.E164PhoneNumber())
// 注入数据
c := metadata.AppendToOutgoingContext(ctx, "Client-ID", clientId) // 服务端用client-id作为键取出客户端身份标识
// 替换客户端流
clientStream, err := streamer(c, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return clientStream, nil
}- 如果不想考虑Context本身已有的数据的话,可以直接调用
context.Value覆盖上下文中已有的值 - 否则都建议使用
AppendToOutgoingContext追加元数据
服务端上使用metedata.FromIncomingContext(context.Context)拿到元数据,然后使用md.Get获取具体的Header切片
grpc会自动将key处理成全小写,所以在服务端上需要通过client-id拿到元数据
func (s *SchedulerService) getClientId(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", ErrNoMetadata
}
clientId := md.Get("client-id")
if len(clientId) == 0 {
detail := &errdetails.BadRequest_FieldViolation{Field: "client_id", Description: "任务终端ID不可为空(随便写点都行)"}
st, _ := ErrInvalidClientId.WithDetails(detail)
return "", st.Err()
}
return clientId[0], nil
}
func (s *SchedulerService) ManageTask(stream grpc.BidiStreamingServer[task.TaskStatus, task.TaskDistribution]) error {
/*
if err := s.processCtx(stream.Context()); err != nil {
return err
}
*/
if id, err := s.getClientId(stream.Context()); err != nil {
return err
} else {
log.Printf("终端[%v]已连接", id)
}
ctx := stream.Context()
currReqRoutineWG := &sync.WaitGroup{}
// 消息转移协程
s.listener(currReqRoutineWG, ctx, stream)
// 消息发送协程
s.sender(currReqRoutineWG, ctx, stream)
// 状态接收协程
s.receiver(currReqRoutineWG, ctx, stream)
currReqRoutineWG.Wait()
return nil
}HTTP/2协议允许请求中存在多个同样的Header Key,所以metadata.Get拿到的值是[]string;一般只取索引[0]的值
成果
初代
- 没有扇出机制,所以一个请求只会对接一个客户端
- 客户端任务缓存使用手动Channel池管理,因此在查找并更新特定任务时存在
O(N)的性能开销 - 服务端和客户端均未加入指数退避机制,亦不会检查报错次数以便中断连接
- 服务端的任务缓存仅作参考,不用于校验客户端提交的任务是否存在;客户端的任务缓存亦仅用于缓存任务,并防止多个协程共同修改同一任务实例
- 受制于代码厚度,无法模拟真实业务的诸多安全措施

二代
- 客户端添加流式拦截器用于注入身份标识

附录
其他流式API
gRPC提供如下四种流式API:
- 一元RPC(Unary RPC)
传统的请求-响应模式,客户端发送单个请求,服务端返回单个响应。
rpc UnaryCall(Request) returns (Response);- 服务端流式(Server Streaming)
客户端发送单个请求,服务端返回消息流。
rpc ServerStreaming(Request) returns (stream Response);使用场景:
- 实时数据推送(如股票价格、传感器数据)
- 大文件分块下载
- 日志流实时查看
- 客户端流式(Client Streaming)
客户端发送消息流,服务端返回单个响应。
rpc ClientStreaming(stream Request) returns (Response);使用场景:
- 大文件分块上传
- 数据批量处理
- 客户端数据聚合
- 双向流式(Bidirectional Streaming)
客户端和服务端都可以发送消息流,双方可以异步地发送和接收消息。
rpc BidiStreaming(stream Request) returns (stream Response);使用场景:
- 实时聊天应用
- 协作编辑工具
- 游戏服务器
- 实时监控系统
gRPC 响应状态码
OK- 操作成功时返回。
Canceled- 操作被取消(通常由调用方发起)。gRPC框架会在请求取消时生成此错误码。
Unknown- 未知错误。例如,从其他地址空间接收到的状态值属于本地址空间未知的错误空间,或者API未返回足够错误信息时可能会转换为此错误。gRPC框架在上述两种情况下会生成此错误码。
InvalidArgument- 客户端指定了无效参数。与
FailedPrecondition不同,它表示无论系统状态如何都有问题的参数(例如,格式错误的文件名)。此错误码不会由gRPC框架生成。
- 客户端指定了无效参数。与
DeadlineExceeded- 操作在完成前超时。对于更改系统状态的操作,即使操作已成功完成也可能返回此错误。例如,服务器响应可能延迟太久导致截止时间过期。gRPC框架会在超过截止时间时生成此错误码。
NotFound- 请求的实体(例如文件或目录)未找到。此错误码不会由gRPC框架生成。
AlreadyExists- 尝试创建实体失败,因为该实体已存在。此错误码不会由gRPC框架生成。
PermissionDenied- 调用方没有执行指定操作的权限。不得用于因资源耗尽而拒绝的情况(应使用
ResourceExhausted),也不得用于无法识别调用方的情况(应使用Unauthenticated)。此错误码不会由gRPC核心框架生成,但期望认证中间件使用它。
- 调用方没有执行指定操作的权限。不得用于因资源耗尽而拒绝的情况(应使用
ResourceExhausted- 某些资源已耗尽,可能是用户配额或整个文件系统空间不足。gRPC框架在内存不足和服务器过载情况或消息大于配置的最大大小时会生成此错误码。
FailedPrecondition- 操作被拒绝,因为系统未处于执行操作所需的状态。例如,要删除的目录非空,对非目录应用了
rmdir操作等。此错误码不会由gRPC框架生成。
- 操作被拒绝,因为系统未处于执行操作所需的状态。例如,要删除的目录非空,对非目录应用了
Aborted- 操作中止,通常是由于并发问题,如序列检查失败、事务中止等。此错误码不会由gRPC框架生成。
OutOfRange- 操作尝试超出有效范围。例如,寻址或读取文件末尾之后的内容。与
InvalidArgument不同,此错误表明如果系统状态改变可能修复的问题。此错误码不会由gRPC框架生成。
- 操作尝试超出有效范围。例如,寻址或读取文件末尾之后的内容。与
Unimplemented- 操作未实现或在此服务中不支持/启用。gRPC框架会生成此错误码,最常见的是当服务器缺少方法实现时。
Internal- 内部错误。表示底层系统的某些不变量已被破坏。如果您看到这些错误之一,说明出现了严重问题。gRPC框架在多种内部错误条件下会生成此错误码。
Unavailable- 服务当前不可用。这很可能是一个临时状况,可通过退避重试来纠正。请注意,并非所有非幂等操作都安全重试。gRPC框架在服务器进程或网络连接突然关闭期间会生成此错误码。
DataLoss- 不可恢复的数据丢失或损坏。此错误码不会由gRPC框架生成。
Unauthenticated- 请求没有有效的身份验证凭据来进行操作。gRPC框架在身份验证元数据无效或凭证回调失败时会生成此错误码,但也期望认证中间件生成它。
拦截器/Interceptor
在 gRPC-Go 中,Interceptor(拦截器)类似于 Web 框架中的中间件(Middleware)。根据调用的类型(一元 vs 流式)以及作用端(客户端 vs 服务端),拦截器分为 4 种基本类型。
1. 一元拦截器 (Unary Interceptors)
处理简单的“一问一答”式 RPC。这是最容易编写的拦截器。
服务端一元拦截器
常用于:日志、认证、Panic 恢复。
func myUnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
// 1. 前置处理 (如:从 metadata 获取 Token)
log.Printf("调用方法: %s", info.FullMethod)
// 2. 调用真正的业务逻辑
resp, err := handler(ctx, req)
// 3. 后置处理 (如:记录耗时、错误转换)
return resp, err
}
// 注册方式:
s := grpc.NewServer(grpc.UnaryInterceptor(myUnaryServerInterceptor))客户端一元拦截器
func myUnaryClientInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 前置处理:注入 TraceID
err := invoker(ctx, method, req, reply, cc, opts...)
// 后置处理
return err
}2. 流式拦截器 (Stream Interceptors)
流式拦截器的编写逻辑稍复杂:拦截器本身只在流建立瞬间触发。如果要拦截流中发送/接收的每一条消息,你需要编写一个包装器(Wrapper)。
服务端流拦截器示例
假设我们要拦截流中的每一条消息:
// 1. 定义一个包装器,嵌入原有的 ServerStream
type wrappedStream struct {
grpc.ServerStream
}
// 2. 重写 RecvMsg 方法
func (w *wrappedStream) RecvMsg(m any) error {
err := w.ServerStream.RecvMsg(m)
log.Printf("流收到消息: %T", m)
return err
}
// 3. 拦截器实现
func myStreamServerInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 使用包装器替换原始流
wrapped := &wrappedStream{ss}
return handler(srv, wrapped)
}
// 注册:
s := grpc.NewServer(grpc.StreamInterceptor(myStreamServerInterceptor))客户端流式拦截器的应用场景
客户端流式拦截器的典型应用场景:
- 客户端侧的“透明重连与重试”
流式 RPC(尤其是长连接的双向流)极易受到网络波动影响。
场景:如果一个长连接流因为网络抖动断开了,业务代码往往需要自己写逻辑去重新拨号、重新创建流。
拦截器作用:你可以在拦截器里包装 SendMsg 方法。当检测到连接断开时,拦截器自动进行指数退避重试并透明地重建流,业务代码对此几乎无感知。
- 消息级别的可观测性 (Metrics & Logging)
一元调用很容易统计耗时,但流式调用往往持续很久。
场景:你想知道在某次双向流通话中,客户端每秒发送了多少个数据包,或者服务端返回每一条消息的平均延迟。
拦截器作用:通过包装 SendMsg 和 RecvMsg,拦截器可以记录每一条消息的发送/接收时间戳,并将指标(Counter/Histogram)上报到 Prometheus 或 Jaeger。
- 动态元数据注入 (Auth & Tracing)
场景:在流的整个生命周期内,你可能需要根据业务状态动态改变 Metadata。或者,你需要确保流中的第一条消息带有一个唯一的 Trace-ID。
拦截器作用:拦截器可以在 Streamer 真正建立流之前,从 Context 或全局状态中提取最新的 JWT Token 或追踪信息,注入到 Outgoing Metadata 中。
- 客户端限流与背压控制 (Rate Limiting)
场景:防止客户端程序在流中发送速度过快导致本地 OOM,或者超过服务端的处理能力。
拦截器作用:拦截器在 SendMsg 时进行令牌桶检查。如果当前发送频率过高,拦截器可以阻塞发送操作或直接返回 ResourceExhausted 错误。
- 数据转换与校验 (Validation & Compression)
场景:客户端在发送大型流数据前进行分块加密,或者对每一条流式消息进行 Schema 校验。
拦截器作用:在消息真正交给 gRPC 底层协议栈前,在 SendMsg 里对其进行拦截,检查字段是否合法。
3. 泛型适配后的变化 (2024 更新点)
在 gRPC-Go v1.61.0+ 中,生成的代码开始使用泛型。这虽然没有改变拦截器的函数签名(为了不破坏生态),但在拦截器内部处理逻辑上提供了更好的支持:
- 类型安全的消息处理:以前在拦截器里拿到的消息是
any,需要用反射或断言。现在随着grpc.ClientStream[Req, Res]等泛型接口在生成的 Stub 中使用,你可以更容易地编写支持泛型的通用工具函数来配合拦截器使用。 - 代码精简:以前你可能需要为不同业务写不同的拦截器逻辑,现在可以利用泛型编写通用的“流包装组件”。
4. 组合多个拦截器 (Chaining)
在旧版本中,你只能设置一个拦截器,必须手动嵌套。现在 gRPC 官方直接支持链式调用:
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
authInterceptor, // 第一个执行
logInterceptor, // 第二个执行
metricsInterceptor, // 第三个执行
),
grpc.ChainStreamInterceptor(
streamLogInterceptor,
),
)5. 编写拦截器的关键步骤
- 确定位置:是客户端(Client)还是服务端(Server)?
- 确定类型:是一次性的一元(Unary)还是长连接流(Stream)?
- 实现签名:按照
grpc.UnaryServerInterceptor等类型定义实现函数。 - 控制流向:
- 如果你想拦截并阻止请求(如权限不足),直接返回
error,不调用handler。 - 如果你想修改请求/响应,在调用
handler前后修改对应的参数。 - 如果你想统计数据,记录调用
handler的时间。
- 如果你想拦截并阻止请求(如权限不足),直接返回
- 处理流消息:如果是流式,记得定义一个结构体包装
grpc.ServerStream或grpc.ClientStream并重写SendMsg或RecvMsg。
总结
- 一元拦截器:简单的函数,像 Onion 模型。
- 流式拦截器:函数+包装器类,像透明代理。
- 链式支持:通过
grpc.Chain...组合多个中间件。 - 2024 趋势:拦截器本身 API 没变,但内部处理流消息时,正逐步利用 Go 泛型带来的类型安全提升。
客户端拦截器注入元数据
在 gRPC 中,注入 TraceID 或身份令牌(Token)的通用方式是利用 Metadata(元数据)。Metadata 在底层会被映射为 HTTP/2 的 Header。
对于客户端拦截器,注入逻辑的核心在于:在请求发出之前,修改 context.Context 以包含元数据。
以下是针对“一元(Unary)”和“流式(Stream)”两种拦截器的具体实现方法。
1. 核心工具:metadata 包
gRPC 提供 google.golang.org/grpc/metadata 包来处理这些数据。
- TraceID:通常从当前 Context 中提取(如果用了 OpenTelemetry)或手动生成。
- Token:通常从配置文件或缓存中读取。
2. 一元拦截器实现 (Unary Interceptor)
一元拦截器最简单,因为它在调用执行前可以直接操作 ctx。
func UnaryAuthInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 1. 准备要注入的数据
token := "my-secret-token"
traceID := "trace-8888-9999"
// 2. 创建或更新 Metadata
// 使用 AppendToOutgoingContext 可以保留 Context 中原有的元数据
ctx = metadata.AppendToOutgoingContext(ctx,
"authorization", "Bearer "+token,
"x-trace-id", traceID,
)
// 3. 继续调用
return invoker(ctx, method, req, reply, cc, opts...)
}3. 流式拦截器实现 (Stream Interceptor)
关键点: 对于流式 RPC,Metadata 必须在创建流(Streamer)的瞬间注入。一旦流建立成功(Header 发送完毕),中途再改 Context 就没法通过 Header 发送身份信息了。
func StreamAuthInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// 1. 注入数据
token := "my-secret-token"
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
// 2. 创建流
// 注意:这里传入的是修改后的 ctx
clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return clientStream, nil
}此处需要实现的函数类型是:
// StreamClientInterceptor intercepts the creation of a ClientStream. Stream
// interceptors can be specified as a DialOption, using WithStreamInterceptor()
// or WithChainStreamInterceptor(), when creating a ClientConn. When a stream
// interceptor(s) is set on the ClientConn, gRPC delegates all stream creations
// to the interceptor, and it is the responsibility of the interceptor to call
// streamer.
//
// desc contains a description of the stream. cc is the ClientConn on which the
// RPC was invoked. streamer is the handler to create a ClientStream and it is
// the responsibility of the interceptor to call it. opts contain all applicable
// call options, including defaults from the ClientConn as well as per-call
// options.
//
// StreamClientInterceptor may return a custom ClientStream to intercept all I/O
// operations. The returned error must be compatible with the status package.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)4. 如何在业务逻辑中传递这些数据?
拦截器通常从 context 中读取业务层传入的动态 ID。例如使用 OpenTelemetry 时:
func OpenTelemetryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 从 context 中提取 TraceID (假设使用了特定的追踪库)
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID().String()
// 注入到 gRPC Metadata
ctx = metadata.AppendToOutgoingContext(ctx, "x-trace-id", traceID)
return invoker(ctx, method, req, reply, cc, opts...)
}5. 注入方式的对比:New vs Append
在编写拦截器时,请务必注意这两个函数的区别:
| 函数 | 行为 | 推荐场景 |
|---|---|---|
metadata.NewOutgoingContext | 覆盖原有 Context 中的所有 Metadata。 | 确定 Context 中没有任何有用信息时。 |
metadata.AppendToOutgoingContext | 追加到现有元数据列表后。 | 推荐做法。防止丢失上游或其他拦截器注入的信息(如重试次数、超时标记)。 |
6. 特殊情况:Per-RPC Credentials (推荐方式)
如果你只是想注入身份令牌,gRPC 其实提供了一个更简洁、更标准化的接口,不需要手写拦截器,即 credentials.PerRPCCredentials。
type authCreds struct {
token string
}
func (c authCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + c.token,
}, nil
}
func (c authCreds) RequireTransportSecurity() bool {
return true // 生产环境通常要求 TLS
}
// 在拨号时直接注入
conn, _ := grpc.Dial("localhost:50051",
grpc.WithPerRPCCredentials(authCreds{token: "my-token"}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)总结
- TraceID:建议通过拦截器注入,因为它是横切关注点(Cross-cutting concerns),不需要业务层感知。
- 身份令牌:如果全局通用,建议使用
PerRPCCredentials;如果需要根据业务逻辑动态改变,建议使用拦截器操作metadata。 - 2024 泛型适配后的视角:虽然
metadata的注入依然基于context,但由于流式接口现在由泛型grpc.ClientStream[Req, Res]承载,你可以更方便地在拦截器外层通过类型安全的方式进一步包装这些注入逻辑。
服务端获取元数据
服务端获取拦截器注入数据(Metadata)的过程与客户端正好相反。在 gRPC 服务端,所有来自客户端的 Header(包括身份令牌、TraceID 等)都会被存放在 IncomingContext 中。
你可以通过 google.golang.org/grpc/metadata 包提供的方法来提取。
以下是具体的实现方式,分为 RPC 业务方法内获取 和 服务端拦截器统一提取 两个场景。
1. 在业务方法中直接获取
这是最简单的方式,直接从当前请求的 ctx 中提取。
func (s *server) GetUser(ctx context.Context, req *pb.UserRequest) (*pb.UserResponse, error) {
// 1. 从 context 中提取入站元数据 (Incoming Metadata)
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "未能获取元数据")
}
// 2. 提取具体字段
// 注意:gRPC 的所有 Key 在传输后都会变成【小写】
// 且返回值是 []string,因为同一个 Key 可能对应多个值
if tokens := md.Get("authorization"); len(tokens) > 0 {
token := tokens[0]
fmt.Printf("收到令牌: %s\n", token)
}
if traceIDs := md.Get("x-trace-id"); len(traceIDs) > 0 {
fmt.Printf("收到 TraceID: %s\n", traceIDs[0])
}
return &pb.UserResponse{...}, nil
}2. 在服务端拦截器中统一获取 (推荐)
在生产环境中,我们通常不希望在每个业务方法里重复写提取代码。我们会写一个服务端拦截器来统一处理认证和追踪。
A. 服务端一元拦截器示例
func UnaryServerAuthInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
// 统一校验 Token
if tokens := md.Get("authorization"); len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "需要身份认证")
} else {
// 这里可以做具体的 JWT 校验逻辑
fmt.Printf("拦截器验证 Token: %s\n", tokens[0])
}
// 继续执行业务逻辑
return handler(ctx, req)
}B. 服务端流式拦截器示例
流式拦截器同样从 ss.Context() 获取:
func StreamServerAuthInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
traceIDs := md.Get("x-trace-id")
if len(traceIDs) > 0 {
log.Printf("处理流式请求,TraceID: %s", traceIDs[0])
}
}
return handler(srv, ss)
}3. 重要注意事项
① Key 全部自动转为小写
这是初学者最容易踩的坑。无论客户端注入时写的是 Authorization 还是 X-Trace-ID,服务端通过 md.Get() 拿的时候必须用小写:
- 错误:
md.Get("Authorization") - 正确:
md.Get("authorization")
② 值的类型是切片 []string
根据 HTTP/2 协议,一个 Header Key 可以重复出现。因此 md.Get("key") 返回的是一个字符串切片。通常我们只取第一个元素 [0]。
③ Context 的二次封装 (进阶技巧)
如果你希望在拦截器里提取出 UserID 后,让后续业务代码更方便地使用(而不是反复查 Metadata),你可以将提取出的信息放入自定义的 Context Key 中:
type userKey struct{} // 自定义类型防止冲突
func UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, _ := metadata.FromIncomingContext(ctx)
userID := md.Get("user-id")[0]
// 将信息存入 context,后续业务代码直接 ctx.Value(userKey{}) 即可
newCtx := context.WithValue(ctx, userKey{}, userID)
return handler(newCtx, req)
}总结对照表
| 动作 | 方法 |
|---|---|
| 客户端注入 | metadata.AppendToOutgoingContext(ctx, ...) |
| 服务端获取 | metadata.FromIncomingContext(ctx) |
| 多值读取 | md.Get("key") (返回 []string) |
| Key 大小写 | 强制小写 (由 gRPC 底层处理) |
这种 Metadata 机制配合拦截器,实现了分布式系统中 TraceID 自动透传(Context Propagation)的核心功能。