前语
RPC协议是咱们在开发进程中最常常接触的技能之一,Go 规范库net/rpc供给了一个简略、强大且高性能的 RPC 完结,仅需编写很少的代码就能完结 RPC 服务,现在现已广泛应用到微服务架构体系中。对于Golang RPC规范库的学习有助于咱们了解RPC的服务调用原理、协议、通讯等。
初学者可能会好奇,RPC协议是怎么处理衔接,怎么完结办法调用的。本文首要对RPC中的基本概念进行简略的介绍,之后对net/rpc库的源码进行解析,相信你看完本篇文章,必定能够对Golang RPC规范库的底层原理以及RPC协议的原理有更深的了解和体会。
现在源码专栏已更新《Go源码解析——Channel篇》、《Go源码解析——Map篇》、《Golang规范库net/rpc》3篇Golang源码文章,现在浏览量2500+,后边也会持续更新Go相关源码内容,感爱好的同学能够持续关注。
专栏地址:Golang源码解析
RPC简介
RPC(Remote Procedure Call),首要是协助咱们屏蔽网络编程细节 ,是咱们更专注于事务逻辑,完结调用长途办法就像调用本地办法相同。
RPC通讯进程
RPC通讯进程如下图所示:
由服务供给者给出事务接口声明,在调用方的程序里边,RPC 结构依据调用的服务接口提前生成动态代理完结类,并经过依靠注入等技能注入到声明晰该接口的相关事务逻辑里边。该代理完结类会拦截一切的办法调用,在供给的办法处理逻辑里边完结一整套的长途调用,并把长途调用成果回来给调用方,这样调用方在调用长途办法的时候就获得了像调用本地接口相同的体验。
RPC规划组成
以下是对RPC的四种人物的解说和说明:
-
客户端(Client): 服务调用建议方,也称为服务消费者。
-
客户端存根(Client Stub): 该程序运转在客户端所在的计算机机器上,首要用来存储要调用的服务器的地址,另外,该程序还负责将客户端恳求远端服务器程序的数据信息打包成数据包,经过网络发送给服务端Stub程序;其次,还要接纳服务端Stub程序发送的调用成果数据包,并解析回来给客户端。
-
服务端(Server): 远端的计算机机器上运转的程序,其间有客户端要调用的办法。
-
服务端存根(Server Stub): 接纳客户Stub程序经过网络发送的恳求音讯数据包,并调用服务端中真正的程序功用办法,完结功用调用;其次,将服务端履行调用的成果进行数据处理打包发送给客户端Stub程序。
RPC原理和调用进程
实际上,假如咱们想要在网络中的任意两台计算机上完结长途调用进程,要处理许多问题,比如:
-
两台物理机器在网络中要树立安稳牢靠的通讯衔接。
-
两台服务器的通讯协议的界说问题,即两台服务器上的程序怎么辨认对方的恳求和回来成果。也便是说两台计算机必须都能够辨认对方发来的信息,并且能够辨认出其间的恳求意义和回来意义,然后才能进行处理。这其实便是通讯协议所要完结的作业。
咱们来看看RPC具体是怎么处理这些问题的,RPC具体的调用进程图如下:
在上述图中,经过1-10的进程图解的形式,说明晰RPC每一步的调用进程。具体描述为:
-
1、客户端想要建议一个长途进程调用,首要经过调用本地客户端Stub程序的办法调用想要运用的功用办法名;
-
2、客户端Stub程序接纳到了客户端的功用调用恳求,将客户端恳求调用的办法名,带着的参数等信息做序列化操作,并打包成数据包。
-
3、客户端Stub查找到长途服务器程序的IP地址,调用Socket通讯协议,经过网络发送给服务端。
-
4、服务端Stub程序接纳到客户端发送的数据包信息,并经过约好好的协议将数据进行反序列化,得到恳求的办法名和恳求参数等信息。
-
5、服务端Stub程序准备相关数据,调用本地Server对应的功用办法进行,并传入相应的参数,进行事务处理。
-
6、服务端程序依据已有事务逻辑履行调用进程,待事务履行完毕,将履行成果回来给服务端Stub程序。
-
7、服务端Stub程序将程序调用成果依照约好的协议进行序列化, 并经过网络发送回客户端Stub程序。
-
8、客户端Stub程序接纳到服务端Stub发送的回来数据,对数据进行反序列化操作, 并将调用回来的数据传递给客户端恳求建议者。
-
9、客户端恳求建议者得到调用成果,整个RPC调用进程完毕。
简略的
在对RPC进行简略介绍之后,咱们先看一个简略的RPC服务注册和调用的demo,之后会别离从server和client的中心代码出发,深入到底层去了解RPC库的具体完结,并在学习的进程中考虑完结一个RPC协议所需求做的作业以及怎么依据RPC根底库进行进一步开发。
server.go
服务端代码如下:
package main
import (
"log"
"net"
"net/rpc"
)
func main() {
rpc.RegisterName("HelloService", new(HelloService))
listener, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
rpc.Accept(listener)
}
type HelloService struct{}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
log.Println("got req", request)
return nil
}
client.go
调用端代码如下:
package main
import (
"fmt"
"log"
"net/rpc"
)
func main() {
client, err := rpc.Dial("tcp", "localhost:8888")
if err != nil {
log.Fatal("dialing:", err)
}
var reply string
err = client.Call("HelloService.Hello", "RPC", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
启动server.go 之后启动client.go ,RPC调用成功,操控台别离打印:
// server
got req RPC
// client
hello:RPC
接下来咱们别离从服务端和客户端两个角度对其原理进行解析。
服务端
在代码中,能够看到服务端办法首要包含RegisterName、Listen和Accept, 我将服务端首要的作业流程分为三个进程:
- 服务办法注册
- 监听和参数处理
- 调用 RPC 办法
服务办法注册
首要咱们来看注册进程,注册进程中首要调用的办法是server.register办法,其实首要需求注册的便是服务目标和服务办法,接下来经过代码来看是怎么为完结的:
相关结构体:
type service struct {// 表示服务的结构体,用于注册服务目标
name string // 服务名
rcvr reflect.Value // 注册服务的结构体实例
typ reflect.Type // 注册服务的结构体类型
method map[string]*methodType // 办法名与办法的映射列表
}
type methodType struct {// 表示办法的结构体,用于注册服务办法
sync.Mutex
method reflect.Method // 办法名
ArgType reflect.Type // 参数类型
ReplyType reflect.Type // 回来值类型
numCalls uint // 调用次数
}
注册首要流程:
func (server *Server) RegisterName(name string, rcvr interface{}) error {
return server.register(rcvr, name, true)
}
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
// 1. 服务相关字段的注册
s := new(service)
s.typ = reflect.TypeOf(rcvr) // 获取实例类型
s.rcvr = reflect.ValueOf(rcvr) // 获取实例本身
// 当Type为指针时Name()回来空字符串,所以要先经过Indirect取指针的值
sname := reflect.Indirect(s.rcvr).Type().Name() // 取结构体类型名
// 是否指定服务名的处理:useName==true默许运用参数中的name
if useName {
sname = name
}
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Print(s)
return errors.New(s)
}
// IsExported:判别字符串首字符是否为大写字母
if !token.IsExported(sname) && !useName {
s := "rpc.Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
s.name = sname
// 2. 注册RPC办法
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
str := ""
// reflect.PtrTo(s.typ)获取s.typ的指针类型 提示用户修改注册服务的代码
method := suitableMethods(reflect.PtrTo(s.typ), false)
if len(method) != 0 {// 提示运用方注册服务时传递结构体实例指针
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
}
log.Print(str)
return errors.New(str)
}
// 存入一个sync.map
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
这个办法的首要逻辑便是将传入的办法名和结构体进行校验解析之后整理成结构体service存入一个map(serviceMap) 中
其间办法的注册首要经过suitableMethods函数处理,首要便是经过很多的反射办法对注册办法的参数、回来值等进行合法性校验:
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m) // 获取实例类型对应的办法
mtype := method.Type // 获取办法类型
mname := method.Name // 获取办法名
// 经过PkgPath判别办法是否可导出
if method.PkgPath != "" {
continue
}
// 一个办法必须有且仅有三个参数:结构体实例、rpc参数、rpc呼应值
if mtype.NumIn() != 3 { // NumIn()获取参数个数
if reportErr {
log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
}
continue
}
// 下标从0开端,第一个参数是实例本身
// 对于Object.Method(req, resp),第一个参数是Object
argType := mtype.In(1) // 参数类型
if !isExportedOrBuiltinType(argType) {// rpc参数必须为可导出或许内置类型
if reportErr {
log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
}
continue
}
replyType := mtype.In(2) // resp必须为指针
if replyType.Kind() != reflect.Ptr {
if reportErr {
log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
}
continue
}
// 呼应值必须为可导出或许内置类型
if !isExportedOrBuiltinType(replyType) {
if reportErr {
log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
}
continue
}
if mtype.NumOut() != 1 { // 办法回来值个数
if reportErr {
log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
}
continue
}
// 办法回来值必须为error
if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr {
log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
}
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
在办法注册完结后,后边服务端接纳到客户端的恳求之后,会经过读取map(serviceMap) ,来完结办法的调用和处理,详情可参考后边解析。
监听和参数处理
介绍完注册进程,重点介绍Accept办法即服务端接纳到客户端之后恳求的处理进程。代码如下:
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
}
}
Accept办法首要是一个不断承受新的恳求衔接的for循环 ,一旦监听器接纳了一个衔接,之后为每个衔接开一个go协程调用ServerConn 进行处理。
接下来咱们看下ServerConn办法的首要逻辑,代码如下:
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
ServerConn 办法也很简略,首要构建一个Codec结构体,去处理RPC协议,参数包含衔接、序列化反序列化办法以及Writer,规范库的默许序列化办法为GobServeCodec。
golang官方还供给了net/ rpc /jsonrpc库完结RPC办法,JSON RPC选用JSON进行数据编解码,因而支持跨语言调用。有爱好的同学能够自行查看。
在这里咱们能够看到server.ServeCodec办法的参数为一个接口, 假如你要自己完结一个rpc协议的话,只需求完结ServerCodec接口对应的的办法就能够进行个性化开发了。
后边代码解读中会发现,处理进程中首要调用的便是这个接口的办法。
接口如下:
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error
// Close can be called multiple times and must be idempotent.
Close() error
}
默许序列化办法GobServeCodec的办法完结如下:
// 读取恳求头信息,将c.dec中的数据写入到r
func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
return c.dec.Decode(r)
}
func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
return c.dec.Decode(body)
}
// 依次编码呼应头和呼应内容
func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
// 序列化呼应头
if err = c.enc.Encode(r); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: gob error encoding response:", err)
c.Close()
}
return
}
// 序列化呼应体
if err = c.enc.Encode(body); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: gob error encoding body:", err)
c.Close()
}
return
}
// 将buffer中的数据写入writer
return c.encBuf.Flush()
}
func (c *gobServerCodec) Close() error {
if c.closed {
return nil
}
c.closed = true
return c.rwc.Close()
}
接下来咱们来看ServeCodec办法:
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
// 解析恳求信息
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
// 假如无法正确解析恳求头部信息则keepReading==false 退出循环,封闭衔接
if !keepReading {
break
}
// 发送解析恳求信息犯错的呼应信息 invalidRequest
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error()) // 开释req目标回链表
server.freeRequest(req)
}
continue
}
wg.Add(1)
// 调用对应RPC函数
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
wg.Wait()
codec.Close()
}
ServeCodec办法的首要逻辑是一个for循环,在for循环中首要有两个办法:
- server.readRequest——读取恳求数据并解码
- server.call——调用客户端要调用的办法,将回来值回来给客户端 接下来咱们看server.readRequest办法:
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
// 解析头部信息:若犯错且挑选越过这个恳求(keepReading==true)等候处理下一个恳求,这时需求取出衔接中的本次恳求的音讯主体避免影响对读取下一次恳求时犯错
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
if err != nil {
if !keepReading {
return
}
// discard body
codec.ReadRequestBody(nil)
return
}
// 解析恳求主体信息(即参数):经过要调用的办法对应的参数类型来结构参数实例指针(经过reflect.New),然后再经过ReadRequestBody解码信息。
argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
}
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
return
}
if argIsValue {
argv = argv.Elem()
}
//结构呼应值实例:呼应类型为指针,所以需求.Elem()获取具体类型。当呼应值为slice或许map时要调用反射的MakeSlice或许 MakeMap办法来申请内存(创立实例)
replyv = reflect.New(mtype.ReplyType.Elem())
switch mtype.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
}
return
}
readRequest办法相同包含三个进程:
- 解析头部信息:若犯错且挑选越过本次恳求(keepReading==true)等候处理下一个恳求,这时需求取出衔接中的本次恳求的音讯主体避免影响对读取下一次恳求时犯错
- 解析恳求主体信息:经过要调用的办法对应的参数类型来结构参数实例指针(经过reflect.New),然后再经过ReadRequestBody解码参数信息。
- 结构呼应值实例:调用反射的MakeSlice或许 MakeMap办法来申请内存(创立实例),结构呼应实例
在介绍恳求头以及恳求体的数据解析进程之前咱们首要对net/rpc界说的音讯格式进行介绍:
net/rpc将音讯分为头部和主体两部分,
-
对于request:头部包含恳求的序列号、服务名等根底信息;主体为调用RPC办法需求的参数
-
对于response:头部相同包含恳求的序列号、服务名等根底信息;主体则为办法回来值
- 恳求头部Request
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
- 呼应头部Response
type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
}
- 由于每次恳求和呼应都需求界说Request/Response目标,为了减少内存分配,net/rpc完结了目标的复用,经过链表(freeReq/freeResp)的办法完结了一个目标池。复用流程如图所示:
getRequest和freeRequest办法代码如下:
func (server *Server) getRequest() *Request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
}
server.reqLock.Unlock()
return req
}
func (server *Server) freeRequest(req *Request) {
server.reqLock.Lock()
req.next = server.freeReq
serve.freeReq = req
server.reqLock.Unlock()
}
咱们来看readRequestHeader办法:
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
req = server.getRequest() // 复用request
err = codec.ReadRequestHeader(req) // 读取恳求头信息
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
err = errors.New("rpc: server cannot decode request: " + err.Error())
return
}
keepReading = true
// 获取服务名和调用办法
dot := strings.LastIndex(req.ServiceMethod, ".")
if dot < 0 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
svci, ok := server.serviceMap.Load(serviceName)
if !ok {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
return
}
svc = svci.(*service)
mtype = svc.method[methodName]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
return
}
readRequestHeader办法的首要办法包含:
- codec.ReadRequestHeader办法读取恳求头中信息
- 获取服务名和办法名
- 加载办法注册进程中存储在map中的service信息
调用RPC办法
经过上面的进程,咱们现已从恳求中获取了调用的服务、办法以及参数等信息,接下来就能够经过service.call调用对应的RPC办法处理恳求了。
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done()
}
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
需求留意的几点包含:
- 反射的Method类型的Func字段记录了调用办法所需的信息,包含办法地址等;
- 调用rpc办法时需求传递参数:调用的办法所属的结构体实例、办法参数、办法呼应值;
- returnValues中的目标是reflect.Value类型,转为interface{}类型再转为确切的类型。
之后调用server.sendResponse办法发送呼应给客户端:
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := server.getResponse()
// 编辑呼应头部信息
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
sending.Lock()
err := codec.WriteResponse(resp, reply)
if debugLog && err != nil {
log.Println("rpc: writing response:", err)
}
sending.Unlock()
server.freeResponse(resp)
}
首要规划包含:
- 经过sending互斥锁避免异步处理恳求时,对同一链接写入相应信息造成抵触
- 发送音讯后开释Response目标以便复用
客户端
在代码中能够看到客户端办法首要包含dial和call两个办法,我将首要流程相同划分为三个进程:
- 树立 RPC 衔接
- 调用 RPC 办法
- 回来值处理
树立RPC衔接
Dial办法首要包含两个办法:
-
net.Dial——作用是依据传入的传输层协议和地址树立衔接并初始化一个RPC客户端
-
NewClient——初始化client,编码办法默许运用god,与服务端的初始化办法相同
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
// 依据解码办法创立客户端
// 参数类型ClientCodec是接口类型,任何完结了ClientCodec接口的结构体都能够作为客户端的编解码办法。
// 成功创立客户端的同时也异步调用了client.input()用于处理rpc服务端的呼应音讯
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
type ClientCodec interface {
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
之前在服务端代码解析中,说过完结ServerCodec接口的办法就能够进行个性化开发,这里也是这样,完结ClientCodec接口,才能进行个性化开发。差异在于客户端是写恳求读呼应,而服务端是读恳求写呼应。
在这里需求留意的是,在NewClientWithCodec办法中,经过异步的办法调用了client.input办法,这个办法其实是对回来值的处理,在后边第三步(回来值处理)中会具体解析。
默许gobClientCodec的完结办法如下:
type gobClientCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
}
func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
// 序列化恳求头
if err = c.enc.Encode(r); err != nil {
return
}
// 序列化恳求体
if err = c.enc.Encode(body); err != nil {
return
}
// 将buffer中数据写入writer
return c.encBuf.Flush()
}
func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
return c.dec.Decode(r)
}
func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
return c.dec.Decode(body)
}
func (c *gobClientCodec) Close() error {
return c.rwc.Close()
}
调用RPC办法
恳求结构体:
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Receives *Call when Go is complete.
}
经过client.Call办法调用指定的RPC办法,归于同步调用,实质上调用了Go办法,然后等候接纳调用完毕信号,信号由Done传递。同步异步调用的操控也是在这里经过done这个channel来操控的。
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10)
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
client.Go办法首要做的进程便是初始化call结构体,之后经过client.send办法发送恳求。
func (client *Client) send(call *Call) {
client.reqMutex.Lock() // 互斥锁,避免写入抵触
defer client.reqMutex.Unlock()
client.mutex.Lock()
if client.shutdown || client.closing { // 客户端是否封闭,主动封闭(closing),被动封闭(shutdowm)
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++// 递加序列号仅有符号恳求
client.pending[seq] = call
client.mutex.Unlock()
// 恳求头
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
// 发送恳求
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)// 恳求失利,移除恳求
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done() // 发送完毕恳求信号
}
}
回来值处理
在前面,咱们说到,在创立客户端的进程中,异步调用了client.input的办法用于对RPC调用的回来值进行处理。代码如下:
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
首要流程包含:
- 从socket衔接中轮询获取呼应音讯(音讯头+音讯体)
- 首要读取音讯头,经过序列号seq获取待处理恳求
-
读取恳求体信息
- Call == nil:此刻咱们没有pending的call,意味着写恳求失利了,并且call现已删除,response是一个error信息,此刻咱们仍需求去读response body。
- response.Error != “”:RPC办法内部犯错,需求将呼应音讯读取出来但是不需求得到具体的音讯内容。call.Error = ServerError(response.Error)设置恳求的回来值err,ServerError是string的别号。
- 正常的处理流程
-
处理进程犯错,退出循环,封闭衔接
- 处理服务端呼应是启动一个goroutine进行轮询,为了避免在向服务端发送恳求时该goroutine因犯错而要封闭衔接,因而选用client.reqMutex。
- 运用client.mutex是该逻辑涉及对map的读取,对client一些属性的写入,避免写入/读取抵触
- client.shutdown = true表示客户端异常退出,因而需求处理client.pending中待处理的call,避免一些RPC调用在Call办法处堵塞等候(<-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done)
总结
以上,咱们别离从服务端和客户端角度别离对net/rpc库的源码进行了具体的解析,个人认为比较好的规划在于request和response结构体的复用,避免了每次恳求都需求创立结构体。
相关资料
pkg.go.dev/net/rpc
pkg.go.dev/encoding/go…