从零开始的RPC(一)
2026/1/26大约 15 分钟
RPC
简介
- 远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议
- 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
- 如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用
Golang中的RPC
- golang中实现RPC非常简单,官方提供了封装好的库,还有一些第三方的库
- golang官方的
net/rpc库使用encoding/gob进行编解码,支持tcp和http数据传输方式,由于其他语言不支持gob编解码方式,所以golang的RPC只支持golang开发的服务器与客户端之间的交互 - 官方还提供了
net/rpc/jsonrpc库实现RPC方法,jsonrpc采用JSON进行数据编解码,因而支持跨语言调用,目前jsonrpc库是基于tcp协议实现的,暂不支持http传输方式
例一:用RPC实现求矩形面积和方法
注
- golang写RPC程序,必须符合4个基本条件,不然RPC用不了
- 结构体字段首字母要大写,可以别人调用(结构体可导出)
- 函数名必须首字母大写(接收者方法可导出)
- 函数第一参数是接收参数,第二个参数是返回给客户端的参数,必须是指针类型
- 函数还必须有一个返回值error
package main
import (
"log"
"net/http"
"net/rpc"
"testing"
"time"
)
// 例: 使用RPC实现求矩形面积和州长
type ServerParams struct {
Width int
Height int
}
type Rectangle struct {
}
// 服务端方法
func (r *Rectangle) Area(p ServerParams, reply *int) error {
*reply = p.Width * p.Height
return nil
}
func (r *Rectangle) Perimeter(p ServerParams, reply *int) error {
*reply = 2 * (p.Width + p.Height)
return nil
}
func runServer() {
// 注册服务
rect := new(Rectangle)
// 注册该服务
_ = rpc.Register(rect)
// 绑定到HTTP上
rpc.HandleHTTP()
// 监听服务
if err := http.ListenAndServe(":1234", nil); err != nil {
log.Fatal("ListenAndServe:", err)
}
}
// 客户端参数和方法
type ClientParams struct { // 不同名是为了避免symbol干扰
Width int
Height int
}
func runClient() {
// 连接RPC服务
conn, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
defer func() {
_ = conn.Close()
}()
if err != nil {
log.Fatal("dialing:", err)
}
// 调用方法
// 面积
ret := 0
if err := conn.Call("Rectangle.Area", ClientParams{Width: 10, Height: 20}, &ret); err != nil {
log.Fatal("calling:", err)
}
log.Println("Area:", ret)
// 周长
ret = 0
if err := conn.Call("Rectangle.Perimeter", ClientParams{Width: 10, Height: 20}, &ret); err != nil {
log.Fatal("calling:", err)
}
log.Println("Perimeter:", ret)
}
func TestRPC(t *testing.T) {
go runServer()
time.Sleep(time.Second * 5)
go runClient()
time.Sleep(time.Second * 30)
}
例二:实现RPC程序,服务端接收2个参数,可以做乘法运算,也可以做商和余数的运算,客户端进行传参和访问
package rpc_2
import (
"errors"
"log" "net/http" "net/rpc" "testing" "time")
var (
ErrDivideByZero = errors.New("除数不能为0")
)
// 服务端
type ServerParams struct {
Former int
Latter int
}
func (p *ServerParams) Product(req ServerParams, reply *int) error {
*reply = req.Former * req.Latter
return nil
}
func (p *ServerParams) Divide(req ServerParams, reply *int) error {
if req.Latter == 0 {
return ErrDivideByZero
}
*reply = req.Former / req.Latter
return nil
}
func (p *ServerParams) Modulo(req ServerParams, reply *int) error {
*reply = req.Former % req.Latter
return nil
}
func (p *ServerParams) Add(req ServerParams, reply *int) error {
*reply = req.Former + req.Latter
return nil
}
func (p *ServerParams) Subtract(req ServerParams, reply *int) error {
*reply = req.Former - req.Latter
return nil
}
func runServer() {
algae := new(ServerParams)
if err := rpc.Register(algae); err != nil {
log.Fatalf("rpc.Register: %v", err)
}
rpc.HandleHTTP()
log.Println("listening...")
log.Fatal(http.ListenAndServe(":8080", nil))
}
// 客户端
type ClientParams struct {
Former int
Latter int
}
func runClient() {
conn, err := rpc.DialHTTP("tcp", "127.0.0.1:8080")
if err != nil {
log.Fatalf("dialing: %v", err)
}
defer func() {
_ = conn.Close()
}()
// 调用方法
// 乘法运算
var reply int
err = conn.Call("ServerParams.Product", ClientParams{Former: 2, Latter: 3}, &reply)
if err != nil {
log.Fatalf("arith error: %v", err)
}
log.Println("乘法运算结果:", reply)
// 余数运算
err = conn.Call("ServerParams.Modulo", ClientParams{Former: 5, Latter: 2}, &reply)
if err != nil {
log.Fatalf("arith error: %v", err)
}
log.Println("余数运算结果:", reply)
// 除0测试
err = conn.Call("ServerParams.Divide", ClientParams{Former: 5, Latter: 0}, &reply)
if err != nil {
log.Printf("arith error: %v", err)
return
}
log.Println("除数运算结果:", reply)
}
func TestRPC(t *testing.T) {
go runServer()
time.Sleep(time.Second * 3)
runClient()
}
jsonrpc版的例一
func runServer() {
// 注册服务
rect := new(Rectangle)
// 注册该服务
_ = rpc.Register(rect)
// 先占用8080端口
listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal("listen error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("accept error:", err)
}
go func(conn net.Conn) {
log.Printf("accepted connection from %s", conn.RemoteAddr())
jsonrpc.ServeConn(conn)
}(conn)
}
}
func runClient() {
// 连接RPC服务
conn, err := jsonrpc.Dial("tcp", "127.0.0.1:8080")
defer func() {
_ = conn.Close()
}()
if err != nil {
log.Fatal("dialing:", err)
}
// 调用方法
// 面积
ret := 0
if err := conn.Call("Rectangle.Area", ClientParams{Width: 10, Height: 20}, &ret); err != nil {
log.Fatal("calling:", err)
}
log.Println("Area:", ret)
// 周长
ret = 0
if err := conn.Call("Rectangle.Perimeter", ClientParams{Width: 10, Height: 20}, &ret); err != nil {
log.Fatal("calling:", err)
}
log.Println("Perimeter:", ret)
}
RPC调用流程
- 微服务架构下数据交互一般是对内 RPC,对外 REST
- 将业务按功能模块拆分到各个微服务,具有提高项目协作效率、降低模块耦合度、提高系统可用性等优点,但是开发门槛比较高,比如 RPC 框架的使用、后期的服务监控等工作
- 一般情况下,我们会将功能代码在本地直接调用,微服务架构下,我们需要将这个函数作为单独的服务运行,客户端通过网络调用
数据传输格式
- 两端要约定好数据包的格式
- 成熟的RPC框架会有自定义传输协议,这里网络传输格式定义如下,前面是固定长度消息头,后面是变长消息体

自定义数据读写

package rpc_4
import (
"encoding/binary"
"log"
"net"
"sync"
"testing"
)
// 自定义数据读写
type Session struct {
conn net.Conn
}
func NewSession(conn net.Conn) *Session {
return &Session{conn: conn}
}
// io 读写实现
// RPC消息格式是: 4字节头部 + 变长消息体
// 头部是一个uint32类型数据
func (s *Session) Write(p []byte) (n int, err error) {
buf := make([]byte, 4+len(p))
// 前四个字节作为消息体长度
binary.BigEndian.PutUint32(buf, uint32(len(p)))
// 把数据拷贝到buffer头部之后
_ = copy(buf[4:], p)
return s.conn.Write(buf)
}
func (s *Session) Read() (data []byte, err error) {
header := make([]byte, 4)
n, err := s.conn.Read(header)
if err != nil {
return nil, err
} else {
log.Printf("read header length: %d", n)
}
dataLen := binary.BigEndian.Uint32(header) // 解析长度
data = make([]byte, dataLen)
if n, err := s.conn.Read(data); err != nil {
return nil, err
} else {
log.Printf("read data length: %d, data content: %v", n, string(data))
}
return data, nil
}
func TestSession(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(2)
data := []byte("hello world")
// 写数据的协程
go func() {
defer wg.Done()
listener, err := net.Listen("tcp", "127.0.0.1:8080")
if err != nil {
t.Errorf("listen error: %v", err)
return
}
// Accept() 方法会阻塞式等待连接
conn, _ := listener.Accept()
s := NewSession(conn)
if _, err := s.Write(data); err != nil {
t.Errorf("write error: %v", err)
return
}
t.Logf("write data: %s successfully", data)
}()
// 读数据的协程
go func() {
defer wg.Done()
conn, err := net.Dial("tcp", "127.0.0.1:8080")
if err != nil {
t.Errorf("dial error: %v", err)
return
}
s := NewSession(conn)
if buf, err := s.Read(); err != nil {
t.Errorf("read error: %v", err)
return
} else {
if string(buf) == string(data) {
t.Logf("read data: %v successfully, size: %d", string(buf), len(buf))
} else {
t.Errorf("read data: %v error, size: %d", string(buf), len(buf))
return
}
}
}()
wg.Wait()
}自定义数据编解码
package rpc
import (
"bytes"
"encoding/gob"
)
// 定义RPC交互的数据结构
type RPCData struct {
// 访问的函数
Name string
// 访问时的参数
Args []interface{}
}
// 编码
func encode(data RPCData) ([]byte, error) {
//得到字节数组的编码器
var buf bytes.Buffer
bufEnc := gob.NewEncoder(&buf)
// 编码器对数据编码
if err := bufEnc.Encode(data); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// 解码
func decode(b []byte) (RPCData, error) {
buf := bytes.NewBuffer(b)
// 得到字节数组解码器
bufDec := gob.NewDecoder(buf)
// 解码器对数据节码
var data RPCData
if err := bufDec.Decode(&data); err != nil {
return data, err
}
return data, nil
}实现RPC服务端
- 服务端接收到的数据需要包括什么?
- 调用的函数名、参数列表,还有一个返回值error类型
- 服务端需要解决的问题是什么?
- Map维护客户端传来调用函数,服务端知道去调谁
- 服务端的核心功能有哪些?
- 维护函数map
- 客户端传来的东西进行解析
- 函数的返回值打包,传给客户端
package rpc_5_server
import (
"log"
"net"
"reflect"
)
// 自定义RPC服务端
type Server struct {
addr string
// funcs 用于维护关系
funcs map[string]reflect.Value // reflect.Value 包含运行时类型信息, 可以直接调用方法获取原值
}
func NewServer(addr string) *Server {
return &Server{
addr: addr,
funcs: make(map[string]reflect.Value),
}
}
func (s *Server) Register(rpcName string, f any) {
if _, exists := s.funcs[rpcName]; exists {
log.Printf("rpcName %s already exists", rpcName)
return // 已经有了就不需要添加了
}
s.funcs[rpcName] = reflect.ValueOf(f)
log.Printf("registered rpcName %s", rpcName)
return
}
func (s *Server) Run() {
listener, err := net.Listen("tcp", s.addr)
if err != nil {
log.Fatalf("[server] listen %s error: %v", s.addr, err)
}
// 服务端循环获取客户端连接
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("[server] accept error: %v", err)
return
} else {
log.Printf("[server] accept new connection from %s", conn.RemoteAddr())
}
// 这里需要前面定义的Session和RPCData结构体
sess := NewSession(conn)
data, err := sess.Read() // RPC方式读取数据
if err != nil {
log.Printf("[server] read error: %v", err)
}
// 数据解码
rpcData, err := decode(data)
if err != nil {
log.Printf("[server] decode error: %v", err)
return
}
// 根据rpcName获取对应的方法
fn, exists := s.funcs[rpcData.FuncName]
if !exists {
log.Printf("[server] rpcName %s not exists", rpcData.FuncName)
return
}
// 遍历客户端的传参
inArgs := make([]reflect.Value, len(rpcData.Args))
for i, arg := range rpcData.Args {
inArgs[i] = reflect.ValueOf(arg)
}
// 反射调用方法
res := fn.Call(inArgs)
outArgs := make([]any, len(res))
for i, res := range res {
outArgs[i] = res.Interface()
}
// 数据编码
encodedRPCData, err := encode(RPCData{
FuncName: rpcData.FuncName,
Args: outArgs,
})
if err != nil {
log.Printf("[server] encode error: %v", err)
return
}
if n, err := sess.Write(encodedRPCData); err != nil {
log.Printf("[server] write error: %v", err)
} else {
log.Printf("[server] write %d bytes success", n)
}
}
}实现RPC客户端
- 客户端只有函数原型,使用
reflect.MakeFunc()可以完成原型到函数的调用 reflect.MakeFunc()是Client从函数原型到网络调用的关键
package rpc_5_server
import (
"log"
"net"
"reflect"
)
// 自定义RPC客户端
// 客户端
type Client struct {
conn net.Conn
}
func NewClient(conn net.Conn) *Client {
return &Client{
conn: conn,
}
}
func (c *Client) callRPC(rpcName string, fPtr any) {
// Elem方法用于获取指针指向的内容
fn := reflect.ValueOf(fPtr).Elem() // 反射获取fPtr未初始化的函数原型
// 需要另一个函数来操作函数的参数
f := func(args []reflect.Value) []reflect.Value {
// 处理参数
inArgs := make([]any, len(args))
for i, arg := range args {
inArgs[i] = arg.Interface()
}
// 连接
sess := NewSession(c.conn)
// 编码数据
encodedRPCData, err := encode(RPCData{
FuncName: rpcName,
Args: inArgs,
})
if err != nil {
log.Fatalf("[client] encode error: %v", err)
}
// 写入数据
if n, err := sess.Write(encodedRPCData); err != nil {
log.Fatalf("[client] write error: %v", err)
} else {
log.Printf("[client] write %d bytes success", n)
}
// 接收数据
respBytes, err := sess.Read()
if err != nil {
log.Fatalf("[client] read response error: %v", err)
}
// 解码
rpcData, err := decode(respBytes)
if err != nil {
log.Fatalf("[client] decode response error: %v", err)
}
// 处理返回值
outArgs := make([]reflect.Value, len(rpcData.Args))
for i, arg := range rpcData.Args {
if arg == nil {
// Out函数用于获取函数的返回值类型
// reflect.Zero函数用于获取类型的零值
outArgs[i] = reflect.Zero(fn.Type().Out(i))
} else {
outArgs[i] = reflect.ValueOf(arg)
}
}
return outArgs
}
// 完成原型到函数调用的转换
v := reflect.MakeFunc(fn.Type(), f)
// 为函数指针赋值
fn.Set(v)
}RPC通信
- 给服务端注册一个查询用户的方法,客户端使用RPC方式调用
package rpc_5_server
import (
"encoding/gob"
"errors"
"net"
"testing"
)
// 测试用户查询方法, 客户端使用RPC方式调用
type User struct {
Name string
Age int
}
var users = []User{
User{Name: "张三", Age: 18},
User{Name: "李四", Age: 19},
User{Name: "王五", Age: 20},
}
func queryUserByIndex(index int) (User, error) {
if index < 0 || index >= len(users) {
return User{}, errors.New("用户不存在")
}
return users[index], nil
}
func queryUserByName(name string) (User, error) {
for _, user := range users {
if user.Name == name {
return user, nil
}
}
return User{}, errors.New("用户不存在")
}
func TestRPC(t *testing.T) {
// 注册User类型
gob.Register(User{})
addr := "127.0.0.1:8080"
// 创建服务端
srv := NewServer(addr)
// 注册服务端方法
srv.Register("queryUserByIndex", queryUserByIndex)
srv.Register("queryUserByName", queryUserByName)
go srv.Run()
// 客户端获取连接
conn, err := net.Dial("tcp", addr)
if err != nil {
t.Fatalf("[client] dial %s error: %v", addr, err)
}
defer func() {
_ = conn.Close()
}()
// 创建客户端对象
cli := NewClient(conn)
// 声明函数原型
var query func(index int) (res User, err error)
cli.callRPC("queryUserByIndex", &query)
// 获取查询结果
res, err := query(0)
if err != nil {
t.Errorf("[client] query error: %v", err)
} else if res.Name != users[0].Name {
t.Errorf("[client] query result error: %v", res)
} else {
t.Logf("[client] query result: %v", res)
}
// 服务端的实现不支持连接复用, 第二次连接会报错服务已关闭
/*
var query2 func(name string) (res User, err error)
cli.callRPC("queryUserByName", &query2)
res, err = query2("王五")
if err != nil {
t.Errorf("[client] query error: %v", err)
} else if res.Name != users[2].Name {
t.Errorf("[client] query result error: %v", res)
} else {
t.Logf("[client] query result: %v", res)
}
*/
}Raft
注
Raft是consoul和etcd的核心算法
介绍
- Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换
- 它有许多开源参考实现,具有Go,C ++,Java和Scala中的完整规范实现
- 一个Raft集群包含若干个服务器节点,通常是5个,这允许整个系统容忍2个节点的失效,每个节点处于以下三种状态之一
- follower(跟随者) :所有节点都以 follower 的状态开始。如果没收到 leader消息则会变成 candidate状态
- candidate(候选人):会向其他节点“拉选票”,如果得到大部分的票则成为leader,这个过程就叫做Leader选举(Leader Election)
- leader(领导者):所有对系统的修改都会先经过leader
Raft一致性算法
- Raft通过选出一个leader来简化日志副本的管理,例如,日志项(log entry)只允许从leader流向follower
- 基于leader的方法,Raft算法可以分解成三个子问题
- Leader election (领导选举):原来的leader挂掉后,必须选出一个新的leader
- Log replication (日志复制):leader从客户端接收日志,并复制到整个集群中
- Safety (安全性):如果有任意的server将日志项回放到状态机中了,那么其他的server只会回放相同的日志项
Raft VS Fan-Out
从分布式一致性的严谨定义来看,Fan-Out 并不等同于 Raft 这种领导者模型。它们的设计初衷和假设完全不同。
区别一:对 Follower 存活的假设
- Fan-Out(扇出): 通常出现在消息队列或负载均衡中。Leader(发送端)往往默认 Follower 存在即存活 。它只管把消息推出去,或者推给 MQ 就不管了。它不强制要求 Follower 必须反馈“我收到了且存盘了”,因此它不能保证强一致性。
- Raft Leader: 它是疑心病很重的领导者。它从不假设 Follower 存活,而是通过心跳和响应来确认。如果 Follower 没回应,Leader 就不敢提交数据(直到多数派回应)。
区别二:是否具有“选举”和“反向制约”
- Fan-Out: 通常是单向的、锁死的。如果发送端(Leader)挂了,通常需要外部干预(比如人工切换或 VIP 漂移),Follower 本身通常没有“自荐为王”的能力。
- Raft: 是动态民主。Follower 一旦发现 Leader 不发心跳,会立即触发选举。Leader 的权力来自于 Follower 的投票。
区别三:一致性目标
- Fan-Out: 追求的是吞吐量和扩散速度(比如把一个事件分发给 100 个下游服务)。它允许一部分服务处理失败。
- Raft: 追求的是数据绝对正确。只要有一笔数据在 Leader 看来成功了,那么在整个集群的视角里,这笔数据就必须永久生效。
选举
- Raft 使用一种心跳机制来触发领导人选举
- 当服务器程序启动时,节点都是 follower(跟随者) 身份
- 如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者
- 要开始一次选举过程,follower 会给当前term加1并且转换成candidate状态,然后它会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。
- 候选人的状态维持直到发生以下任何一个条件发生的时候
- 他自己赢得了这次的选举
- 其他的服务器成为领导者
- 一段时间之后没有任何一个获胜的人
日志复制
- 当选出
leader后,它会开始接收客户端请求,每个请求会带有一个指令,可以被回放到状态机中 leader把指令追加成一个log entry,然后通过AppendEntries RPC并行地发送给其他的server,当该entry被多数server复制后,leader会把该entry回放到状态机中,然后把结果返回给客户端- 当
follower宕机或者运行较慢时,leader会无限地重发AppendEntries给这些follower,直到所有的follower都复制了该log entry - raft的
log replication要保证如果两个log entry有相同的index和term,那么它们存储相同的指令 leader在一个特定的term和index下,只会创建一个log entry
自制实现
gRPC
介绍
- 微服务架构中,由于每个服务对应的代码库是独立运行的,无法直接调用,彼此间的通信就是个大问题
- gRPC可以实现微服务,将大的项目拆分为多个小且独立的业务模块,也就是服务,各服务间使用高效的protobuf协议进行RPC调用,gRPC默认使用protocol buffers,这是google开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如JSON)
- 可以用proto files创建gRPC服务,用message类型来定义方法参数和返回类型
安装依赖
- 安装编译器(本体)
去 Protobuf Releases 下载最新版的protoc-xxx-win64.zip。解压后把里面的bin/protoc.exe放到任意目录,并添加进系统环境变量 PATH - 安装Go插件(最新版 v1.36+)
# 核心插件:负责生成 .pb.go (消息结构) go install google.golang.org/protobuf/cmd/protoc-gen-go@latest # gRPC 插件:负责生成 _grpc.pb.go (服务接口) go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

