敞开成长之旅!这是我参与「日新方案 12 月更文挑战」的第2天,点击检查活动详情

Hello朋友们,在之前参与云原生活动的时分曾写过一篇文章《浅谈云原生技术组件—etcd》,在其间我首要说明了etcd在依据Kubernetes云原生微服务结构中的定位,首要是用来做服务的长途装备、KV存储等等,那么今日就来简要的弥补讲解下etcd的另一个重要的效果——服务注册和发现,没错,正是和Zookeeper、Eureka、Consul等具有一样人物的开源微服务组件,且毫不逊色于这些,那么咱们就开始进行讲解。

1 依据etcd的服务注册与发现逻辑架构

1.1 服务注册中心抽象

浅谈etcd服务注册与发现
(图片来自网络)

  • Service Registry(服务注册表,通常也成为服务注册中心):内部具有一个数据结构,用于存储已发布服务的装备信息。注册中心的效果一句话概括就是寄存和调度服务的装备,完成服务和注册中心,服务和服务之间的相互通讯,可以说是微服务中的”通讯录“,它记录了服务和服务地址的映射联系。
  • Service Requestor(服务调用者):依据服务注册中心调用已有服务。
  • Service Provider(服务供给者):供给服务到服务注册中心。

1.2 etcd服务注册发现简易版

浅谈etcd服务注册与发现

2 代码完成

2.1 总体流程

服务供给者

(1)监听网络

(2)创立gRPC服务端,并将具体的服务进行注册

(3)利用服务地址、服务名等注册etcd服务装备

(4)gRPC监听服务

服务消费者

(1)注册etcd解析器

(2)衔接etcd服务

(3)获取gRPC客户端

(4)调用gRPC服务

2.2 代码

2.2.1 服务供给方
var (
   cli         *clientv3.Client
   Schema      = "ns"
   Host        = "127.0.0.1"
   Port        = 3000              //端口
   ServiceName = "api_log_service" //服务称号
   EtcdAddr    = "127.0.0.1:2379"  //etcd地址
)
type ApiLogServer struct{}
func (api *ApiLogServer) GetApiLogByUid(ctx context.Context, req *proto.ApiLogRequest) (*proto.ApiLogResponse, error) {
   resp := &proto.ApiLogResponse{
      Msg:  "ok",
      Data: "Hello",
   }
   return resp, nil
}
//将服务地址注册到etcd中
func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
   var err error
   if cli == nil {
      cli, err = clientv3.New(clientv3.Config{
         Endpoints:   strings.Split(etcdAddr, ";"),
         DialTimeout: 50 * time.Second,
      })
      if err != nil {
         fmt.Printf("connection server err : %s\n", err)
         return err
      }
   }
   //与etcd树立长衔接,并确保衔接不断(心跳检测)
   ticker := time.NewTicker(time.Second * time.Duration(ttl))
   go func() {
      key := "/" + Schema + "/" + serviceName + "/" + serverAddr
      for {
         resp, err := cli.Get(context.Background(), key)
         if err != nil {
            fmt.Printf("get server address err : %s", err)
         } else if resp.Count == 0 { //尚未注册
            err = keepAlive(serviceName, serverAddr, ttl)
            if err != nil {
               fmt.Printf("keepAlive err : %s", err)
            }
         }
         <-ticker.C
      }
   }()
   return nil
}
//坚持服务器与etcd的长衔接
func keepAlive(serviceName, serverAddr string, ttl int64) error {
   //创立租约
   leaseResp, err := cli.Grant(context.Background(), ttl)
   if err != nil {
      fmt.Printf("create grant err : %s\n", err)
      return err
   }
   //将服务地址注册到etcd中
   key := "/" + Schema + "/" + serviceName + "/" + serverAddr
   _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
   if err != nil {
      fmt.Printf("register service err : %s", err)
      return err
   }
   //树立长衔接
   ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
   if err != nil {
      fmt.Printf("KeepAlive err : %s\n", err)
      return err
   }
   //清空keepAlive返回的channel
   go func() {
      for {
         <-ch
      }
   }()
   return nil
}
//撤销注册
func unRegister(serviceName, serverAddr string) {
   if cli != nil {
      key := "/" + Schema + "/" + serviceName + "/" + serverAddr
      cli.Delete(context.Background(), key)
   }
}
func RunApiLog() {
   //监听网络
   listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", Port))
   if err != nil {
      fmt.Println("Listen network err :", err)
      return
   }
   defer listener.Close()
   //创立grpc
   srv := grpc.NewServer()
   defer srv.GracefulStop()
   //注册到grpc服务中
   proto.RegisterApiLogServiceServer(srv, &ApiLogServer{})
   //将服务地址注册到etcd中
   serverAddr := fmt.Sprintf("%s:%d", Host, Port)
   fmt.Printf("rpc server address: %s\n", serverAddr)
   register(EtcdAddr, ServiceName, serverAddr, 10)
   //关闭信号处理
   ch := make(chan os.Signal, 1)
   signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
   go func() {
      s := <-ch
      unRegister(ServiceName, serverAddr)
      if i, ok := s.(syscall.Signal); ok {
         os.Exit(int(i))
      } else {
         os.Exit(0)
      }
   }()
   //监听服务
   err = srv.Serve(listener)
   if err != nil {
      fmt.Println("rpc server err : ", err)
      return
   }
}
2.2.2 服务消费方
var (
   cli         *clientv3.Client
   Schema      = "ns"
   ServiceName = "api_log_service" //服务称号
   EtcdAddr    = "127.0.0.1:2379"  //etcd地址
)
type EtcdResolver struct {
   etcdAddr   string
   clientConn resolver.ClientConn
}
func NewEtcdResolver(etcdAddr string) resolver.Builder {
   return &EtcdResolver{etcdAddr: etcdAddr}
}
func (r *EtcdResolver) Scheme() string {
   return Schema
}
//ResolveNow watch有改变调用
func (r *EtcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
   fmt.Println(rn)
}
//Close 解析器关闭时调用
func (r *EtcdResolver) Close() {
   fmt.Println("Close")
}
//Build 构建解析器 grpc.Dial()时调用
func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
   var err error
   //构建etcd client
   if cli == nil {
      cli, err = clientv3.New(clientv3.Config{
         Endpoints:   strings.Split(r.etcdAddr, ";"),
         DialTimeout: 15 * time.Second,
      })
      if err != nil {
         fmt.Printf("connect etcd err : %s\n", err)
         return nil, err
      }
   }
   r.clientConn = clientConn
   go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
   return r, nil
}
//watch机制:监听etcd中某个key前缀的服务地址列表的改变
func (r *EtcdResolver) watch(keyPrefix string) {
   //初始化服务地址列表
   var addrList []resolver.Address
   resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
   if err != nil {
      fmt.Println("get service list err : ", err)
   } else {
      for i := range resp.Kvs {
         addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
      }
   }
   r.clientConn.NewAddress(addrList)
   //监听服务地址列表的改变
   rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
   for n := range rch {
      for _, ev := range n.Events {
         addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
         switch ev.Type {
         case mvccpb.PUT:
            if !exists(addrList, addr) {
               addrList = append(addrList, resolver.Address{Addr: addr})
               r.clientConn.NewAddress(addrList)
            }
         case mvccpb.DELETE:
            if s, ok := remove(addrList, addr); ok {
               addrList = s
               r.clientConn.NewAddress(addrList)
            }
         }
      }
   }
}
func exists(l []resolver.Address, addr string) bool {
   for i := range l {
      if l[i].Addr == addr {
         return true
      }
   }
   return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
   for i := range s {
      if s[i].Addr == addr {
         s[i] = s[len(s)-1]
         return s[:len(s)-1], true
      }
   }
   return nil, false
}
func RunClient() {
   //注册etcd解析器
   r := NewEtcdResolver(EtcdAddr)
   resolver.Register(r)
   //衔接服务器,同步调用r.Build()
   conn, err := grpc.Dial(r.Scheme()+"://author/"+ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
   if err != nil {
      fmt.Printf("connect err : %s", err)
   }
   defer conn.Close()
   //取得gRPC客户端
   c := proto.NewApiLogServiceClient(conn)
   //调用服务
   resp, err := c.GetApiLogByUid(
      context.Background(),
      &proto.ApiLogRequest{UId: 0},
   )
   if err != nil {
      fmt.Printf("call service err : %s", err)
      return
   }
   fmt.Printf("resp : %s , data : %s", resp.Msg, resp.Data)
}
2.2.3 公共组件
syntax = "proto3";
package proto; 
option go_package = "../api_log";
service ApiLogService {
  rpc GetApiLogByUid(ApiLogRequest) returns (ApiLogResponse){}
}
message ApiLogRequest{
  int32 u_id = 1;
}
message ApiLogResponse{
  int64 code = 1;
  string msg = 2;
  int64 count = 3;
  string data = 4;
}

注意要在编译后进行运用哈

2.3 注意事项

在我编写代码进行完成的过程中遇到过种种问题,可是最让人回忆深入的便是etcd与gRPC版别不兼容的问题,用了很长时刻才搞定,在这里记录下吧:

原因是etcd3.x版别不支撑grpc1.27版别以上,可是grpc1.27以下编译成的中间代码又不支撑新版别的proto buffer,这就陷入了一个两难的处境,最终经过Stack Overflow才查到:

stackoverflow.com/questions/6…

处理,在go.mod中加入这几行代码:

replace (
   github.com/coreos/etcd => github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible
   google.golang.org/grpc => google.golang.org/grpc v1.27.0
)

3 细节剖析

3.1 服务生产端keepAlive

keepAlive是一个陈词滥调的问题了,下到TCP/IP、HTTP衔接,上到Redis集群、MySQL集群,都会有该机制,那么etcd的keepAlive是怎么搞的呢?

下面咱们来看下

etcd运用LeaseKeepAlive API调用创立的双向流来刷新租约。当客户端希望刷新租约时,它经过流发送一个leasekeepaliverrequest:

message LeaseKeepAliveRequest {
  int64 ID = 1;
}
  • ID :keepAlive有效的租约ID。

LeaseKeepAliveResponse作为keepAlive的响应:

message LeaseKeepAliveResponse {
  ResponseHeader header = 1;
  int64 ID = 2;
  int64 TTL = 3;
}
  • ID :用新的TTL刷新的租约。
  • TTL :新的生存时刻,以秒为单位,租约剩下的时刻。

3.2 服务消费端watch机制

Watch API供给了一个依据事情的接口,用于异步监督服务key的更改。etcd3 watch经过继续调查给定的修订(当时的或前史的)来等候键的更改,并将键更新流回客户端。

对每个键的每次更改都用“Event”消息表明。Event消息供给了更新的数据和更新的类型:

message Event {
  enum EventType {
    PUT = 0;
    DELETE = 1;
  }
  EventType type = 1;
  KeyValue kv = 2;
  KeyValue prev_kv = 3;
}
  • type:PUT类型表明新数据的更新,DELETE表明key的删除。
  • kv:与事情相关的键值PUT事情包括kv。
  • prev_kv:事情发生前修正版别的密钥的键值对。为了节省带宽,它只在watch显式启用的情况下填写。

watch流:

watch是长时刻运转的请求,并运用gRPC流来流化事情数据。watch流是双向的;客户端写入流来树立监督,读取流来接收监督事情。经过运用每个watch标识符来标记事情,单个watch流可以将多个不同的手表组合在一起。这种多路复用有助于减少中心etcd集群上的内存占用和衔接开支。

4 总结

微服务是当今互联网范畴的广泛概念,也是一种架构演进的结果,微服务的存在让架构规划愈加的解耦合,让人员的分工愈加清晰,当然他的落地完成也并不止步与某一两种方式,在云原生范畴的Kubernetes+etcd,互联网范畴常用的Spring Cloud全家桶以及Dubbo等都是微服务的具体完成,而etcd也仅仅是微服务中服务注册中心组件人物的一个代表罢了。

参阅:

etcd.io/docs/v3.5/d…

www.jianshu.com/p/217d0e3a8…

www.cnblogs.com/wujuntian/p…

stackoverflow.com/questions/6…

blog.csdn.net/fly910905/a…