引子
现在很多云原生体系、分布式体系,例如 Kubernetes,都是用 Go 言语写的,这是由于 Go 言语天然支撑异步编程,而且静态言语能保证运用体系的稳定性。笔者的开源项目 Crawlab 作为爬虫管理渠道,也运用到了分布式体系。本篇文章将介绍如何用 Go 言语编写一个简略的分布式体系。
思路
在开端写代码之前,咱们先思考一下需求完成些什么。
- 主节点(Master Node):中控体系,相当于军队中的指挥官,派发使命指令
- 作业节点(Worker Node):履行者,相当于军队中的战士,履行使命
除了上面的概念以外,咱们需求完成一些简略功能。
- 上报运转状况(Report Status):作业节点向主节点上报当前状况
- 分派使命(Assign Task):经过 API 向主节点建议恳求,主节点再向作业节点分派使命
- 运转脚本(Execute Script):作业节点履行使命中的脚本
整个流程示意图如下。
实战
节点通讯
节点之间的通讯在分布式体系中非常重要,毕竟每个节点或机器假如孤立运转,就失去了分布式体系的意义。因而,节点通讯在分布式体系中是中心模块。
gRPC 协议
首先,咱们来想一下,如何让节点之间进行相互通讯。最常用的通讯办法便是 API,不过这个通讯办法有个缺陷,便是需求将各个节点的 IP 地址及端口显示暴露给其他节点,这在公网中是不太安全的。因而,咱们挑选了 gRPC,一种流行的长途过程调用(Remote Procedure Call,RPC)结构。这里咱们不过多的解释 RPC 或 gRPC 的原理,简而言之,便是能让调用者在长途机器上履行指令的协议办法。
为了运用 gRPC 结构,咱们先创立 go.mod
并输入以下内容,并履行 go mod download
。留意:对于国内的朋友,或许需求增加代理才干正常下载,能够先履行 export GOPROXY=goproxy.cn,direct
后再履行下载指令。
module go-distributed-system
go 1.17
require (
github.com/golang/protobuf v1.5.0
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.27.1
)
然后,咱们创立 Protocol Buffers 文件 node.proto
(表示节点对应的 gRPC 协议文件),并输入以下内容。
syntax = "proto3";
package core;
option go_package = ".;core";
message Request {
string action = 1;
}
message Response {
string data = 1;
}
service NodeService {
rpc ReportStatus(Request) returns (Response){}; // Simple RPC
rpc AssignTask(Request) returns (stream Response){}; // Server-Side RPC
}
在这里咱们创立了两个 RPC 服务,别离是担任上报状况的 Simple RPC ReportStatus
以及 Server-Side RPC AssignTask
。Simple RPC 和 Server-Side RPC 的差异如下图所示,主要差异在于 Server-Side RPC 能够从经过流(Stream)向客户端(Client)主动发送数据,而 Simple RPC 只能从客户端向服务端(Server)发恳求。
创立好 .proto
文件后,咱们需求将这个 gRPC 协议文件转化为 .go
代码文件,从而能被 Go 程序引用。在指令行窗口中履行如下指令。留意:编译工具 protoc
不是自带的,需求单独下载,具体能够参阅文档 grpc.io/docs/protoc…。
mkdir core
protoc --go_out=./core \
--go-grpc_out=./core \
node.proto
履行完后,能够在 core
目录下看到两个 Go 代码文件, node.pb.go
和 node_grpc.pb.go
,这相当于 Go 程序中对应的 gRPC 库。
gRPC 服务端
现在开端编写服务端逻辑。
咱们先创立一个新文件 core/node_service_server.go
,输入以下内容。主要逻辑便是完成了之前创立好的 gRPC 协议中的两个调用办法。其间,暴露了 CmdChannel
这个通道(Channel)来获取需求发送到作业节点的指令。
package core
import (
"context"
)
type NodeServiceGrpcServer struct {
UnimplementedNodeServiceServer
// channel to receive command
CmdChannel chan string
}
func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
return &Response{Data: "ok"}, nil
}
func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
for {
select {
case cmd := <-n.CmdChannel:
// receive command and send to worker node (client)
if err := server.Send(&Response{Data: cmd}); err != nil {
return err
}
}
}
}
var server *NodeServiceGrpcServer
// GetNodeServiceGrpcServer singleton service
func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
if server == nil {
server = &NodeServiceGrpcServer{
CmdChannel: make(chan string),
}
}
return server
}
gRPC 客户端
gRPC 客户端不需求具体完成,咱们一般只需求调用 gRPC 客户端的办法,程序会主动建议向服务端的恳求以及获取后续的呼应。
主节点
编写好了节点通讯的基础部分,现在咱们需求完成主节点了,这是整个中心化分布式体系的中心。
咱们创立一个新的文件 node.go
,输入以下内容。
package core
import (
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"net"
"net/http"
)
// MasterNode is the node instance
type MasterNode struct {
api *gin.Engine // api server
ln net.Listener // listener
svr *grpc.Server // grpc server
nodeSvr *NodeServiceGrpcServer // node service
}
func (n *MasterNode) Init() (err error) {
// TODO: implement me
panic("implement me")
}
func (n *MasterNode) Start() {
// TODO: implement me
panic("implement me")
}
var node *MasterNode
// GetMasterNode returns the node instance
func GetMasterNode() *MasterNode {
if node == nil {
// node
node = &MasterNode{}
// initialize node
if err := node.Init(); err != nil {
panic(err)
}
}
return node
}
其间,咱们创立了两个占位办法 Init
和 Start
,咱们别离完成。
在初始化办法 Init
中,咱们需求做几件作业:
- 注册 gRPC 服务
- 注册 API 服务
现在,在 Init
办法中加入如下代码。
func (n *MasterNode) Init() (err error) {
// grpc server listener with port as 50051
n.ln, err = net.Listen("tcp", ":50051")
if err != nil {
return err
}
// grpc server
n.svr = grpc.NewServer()
// node service
n.nodeSvr = GetNodeServiceGrpcServer()
// register node service to grpc server
RegisterNodeServiceServer(node.svr, n.nodeSvr)
// api
n.api = gin.Default()
n.api.POST("/tasks", func(c *gin.Context) {
// parse payload
var payload struct {
Cmd string `json:"cmd"`
}
if err := c.ShouldBindJSON(&payload); err != nil {
c.AbortWithStatus(http.StatusBadRequest)
return
}
// send command to node service
n.nodeSvr.CmdChannel <- payload.Cmd
c.AbortWithStatus(http.StatusOK)
})
return nil
}
能够看到,咱们新建了一个 gRPC Server,并将之前的 NodeServiceGrpcServer
注册了进去。别的,咱们还用 gin
结构创立了一个简略的 API 服务,能够 POST 恳求到 /tasks
向 NodeServiceGrpcServer
中的指令通道 CmdChannel
传送指令。这样就将各个部件串接起来了!
发动办法 Start
很简略,便是发动 gRPC Server 以及 API Server。
func (n *MasterNode) Start() {
// start grpc server
go n.svr.Serve(n.ln)
// start api server
_ = n.api.Run(":9092")
// wait for exit
n.svr.Stop()
}
下一步,咱们就要完成实际做使命的作业节点了。
作业节点
现在,咱们创立一个新文件 core/worker_node.go
,输入以下内容。
package core
import (
"context"
"google.golang.org/grpc"
"os/exec"
)
type WorkerNode struct {
conn *grpc.ClientConn // grpc client connection
c NodeServiceClient // grpc client
}
func (n *WorkerNode) Init() (err error) {
// connect to master node
n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
return err
}
// grpc client
n.c = NewNodeServiceClient(n.conn)
return nil
}
func (n *WorkerNode) Start() {
// log
fmt.Println("worker node started")
// report status
_, _ = n.c.ReportStatus(context.Background(), &Request{})
// assign task
stream, _ := n.c.AssignTask(context.Background(), &Request{})
for {
// receive command from master node
res, err := stream.Recv()
if err != nil {
return
}
// log command
fmt.Println("received command: ", res.Data)
// execute command
parts := strings.Split(res.Data, " ")
if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
fmt.Println(err)
}
}
}
var workerNode *WorkerNode
func GetWorkerNode() *WorkerNode {
if workerNode == nil {
// node
workerNode = &WorkerNode{}
// initialize node
if err := workerNode.Init(); err != nil {
panic(err)
}
}
return workerNode
}
其间,咱们在初始化办法 Init
中创立了gRPC 客户端,并连接了主节点的 gRPC 服务端。
在发动办法 Start
中做了几件作业:
- 调用上报状况(Report Status)的 Simple RPC 办法
- 调用分配使命(Assign Task)的 Server-Side RPC 办法,获取到了流(Stream)
- 经过循环不断承受流传输过来的来自服务端(也便是主节点)的信息,并履行指令
这样,整个包含主节点、作业节点的分布式体系中心逻辑就写好了!
将它们放在一起
最终,咱们需求将这些中心逻辑用指令行工具封装一下,以便启用。
创立主程序文件 main.go
,并输入以下内容。
package main
import (
"go-distributed-system/core"
"os"
)
func main() {
nodeType := os.Args[1]
switch nodeType {
case "master":
core.GetMasterNode().Start()
case "worker":
core.GetWorkerNode().Start()
default:
panic("invalid node type")
}
}
这样,整个简略的分布式体系就创立好了!
代码作用
下面咱们来运转一下代码。
翻开两个指令行窗口,其间一个输入 go run main.go master
发动主节点,另一个输入 go run main.go worker
发动作业节点。
假如主节点发动成功,将会看到如下日志信息。
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
- using env: export GIN_MODE=release
- using code: gin.SetMode(gin.ReleaseMode)
[GIN-debug] POST /tasks --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers)
[GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.
Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.
[GIN-debug] Listening and serving HTTP on :9092
假如作业节点发动成功,将会看到如下日志信息。
worker node started
主节点、作业节点都发动成功后,咱们在别的一个指令行窗口中输入如下指令来建议 API 恳求。
curl -X POST \
-H "Content-Type: application/json" \
-d '{"cmd": "touch /tmp/hello-distributed-system"}' \
http://localhost:9092/tasks
在作业节点窗口应该能够看到日志 received command: touch /tmp/hello-distributed-system
。
然后检查文件是否顺畅生成,履行 ls -l /tmp/hello-distributed-system
。
-rw-r--r-- 1 marvzhang wheel 0B Oct 26 12:22 /tmp/hello-distributed-system
文件成功生成,表示现已经过作业节点履行成功了!大功告成!
总结
本篇文章经过 RPC 结构 gRPC 以及 Go 言语自带的 Channel,将节点串接起来,开发出了一个简略的分布式体系。所用到的中心库和技术:
- gRPC
- Protocol Buffers
- channel
- gin
- os/exec
整个代码示例库房在 GitHub 上: github.com/tikazyq/cod…
社区
假如您对笔者的文章感兴趣,能够加笔者微信 tikazyq1 并注明 “码之道”,笔者会将你拉入 “码之道” 沟通群。