Celery 根底架构

根本组件

Celery 有五大组件,至于为什么是五大,这个是我从网上搜到的,咱们都说是五个, 查了查官方文档,也没说,那就姑且以为它是五个吧。

  1. worker ,不必多说,提到celery的组件,worker究竟拥有一席之地,也是咱们最为了解的巨中心的组件之一。worker是celery中的中心组件,首要承担的职责是使命履行者,说人话便是监听音讯行列,从broker中拉取使命,然后分配给worker内部保护的子进程去履行。worker能够是分布式的,能够部署在多台服务器,一个使命终究被决定到分配到哪个worker上去履行是经过Gossip协议选出来的。
  2. broker: celery中的音讯中间件,首要的使命是,接受使命音讯,将音讯存进行列中按顺序分发给指定的使命消费方,broker支撑多种不同的使命消费方法,比较常用的有rabbitmqredis 数据库。
  3. beat: 使命调度器,一般有守时使命的时分会开启beat,beat的作用首要是周期性的沦亡守时使命,看到哪个时间到了就放到使命行列里边交给worker去履行。beat供给了接口,允许开发者自界说自己的 scheduler
  4. producer: 有顾客,自然就有出产者,producer是celery中的使命出产者,出产使命的方法有很多种,一般咱们经过函数、装饰器和调用celery api来出产使命。
  5. result backend:使命成果存储,使命处理完后保存状况信息和成果,以供查询。存储的方位首要在于你指定来什么样的媒介来存储使命的履行成果,celery支撑多种backend,常用的有rabbitmq。

关于上面的五大组件,详细又是怎么协同完结一个使命完好的生命周期的呢?

下面咱们以一个简略的使命履行模型为例:

@task
def add(x, y):
  return x+y
add.delay((2, 3))

上面的代码很简略,履行这段代码并不会返回给我5,而是会返回我一个AsyncResult对象。假设咱们用的brokerrabbitmq, 一般来说. 一个使命的生命周期大致是这样的。

  1. add.delay((2, 3)) 这行代码实际上就对应着咱们上面组件的使用celery api 方法出产使命的 producer,到这步,使命出产出来了
  2. 使命就像产品相同,出产出来得有地儿卖啊,讲究一个渠道,所以使命出产者出产出来使命之后,就扔给了broker,留意,出产者并不直接和音讯行列打交道,就像格力空调总部也会直接和各个城镇的专卖店直接接触相同,这儿的broker就和格力各个省份的总署理是一个意思,出产者把使命给broker之后,broker 就把出产出来的使命给丢给他署理的音讯行列RabbitMq里边去了。
  3. 这个时分顾客可没闲着,worker可一直在监听行列,行列里边一旦发现有了使命,就匆促消费,留意,使命的消费实际上是由worker的子进程去消费的。为什么?咱们想一想,一般上面有什么销售使命,都是格力专卖店的老板去干的?当然是给服务员去做哇。因为服务员能够有多个,老板只能有一个哇。使用子进程的方法首要是为了提高worker自身的履行效率。
  4. worker 履行完事儿之后,把成果再怼回去到RabbitMQ中。

流程图大致长这样:

Celery 源码剖析(二): 根底架构
这个时分有人就有疑问了,不对,Exchange你玩意你也没说啊?

Exchange

咱们以rabbitmq为例,exchange的作用其实便是一个路由器

Celery 源码剖析(二): 根底架构

你告知Exhange说, 给我把音讯送到这个行列里边去,这个是地址(route_key)。Exhange就会帮你把音讯扔进指定的行列里边去。别忘记咱们的Worker发动的时分但是能够指定监听哪个行列的。那你就能够挑选出产使命的时分把使命扔到指定的行列里边交给指定的worker去消费。

咱们能够经过配置celery task_routes的方法指定某个task的行列:

{
  "test.add":{
      "queue":"addtask",
      "routing_key":"test"
  }
}

然后发动worker的时分指定这个行列就好了:

 celery -A proj worker  --loglevel=info -Q addtask

kombu

celery自身也并不直接操作音讯行列,实际上,celery操作音讯行列是经过Kombu来完结的。Kombu的根本概念如下:

Kombu 的定位是一个兼容 AMQP 协议的音讯行列抽象,是一个把音讯传递封装成统一接口的库。其特点是支撑多种的契合APMQ协议的音讯行列体系。不仅支撑原生的AMQP音讯行列如RabbitMQ、Qpid,还支撑虚拟的音讯行列如redis、

关于AMQP协议和kombu的中心概念比及今后再讲,详细的原因十分实在,我现在也不是十分的了解这一块的内容。

Worker之间的交互

在前面那一章咱们讲到,Celery是一个分布式的使命行列框架,这儿的分布式咱们一般讲的是Worker, Beat不是分布式的。既然是分布式,那就有一点点分布式的样子,各个Worker之间是怎么交互的呢? 最简略的一个例子便是,那么多的worker,我往行列里边发送一个音讯,交给哪个Worker去消费是个问题,让他们自己去抢吧,听起来不太优雅,现在究竟是法治社会,咱们要调和,要民主。

文章开头咱们说过,Worker之间的推举自身是经过Gossip协议完结的,Celery在Worker的层面并没有所畏的Master存在去协调所有Worker的运转。所以终究交由哪个Worker去履行,是由Worker互相之间的民主协商决定的,并不是由一个Master进程自己决定决定的。

关于Gossip协议,咱们能够看这篇文章:

cloud.tencent.com/developer/a…

详细在Celery源码中的方位是:

celery.worker.consumer.gossip.Gossip

在此之前咱们需要明确这两点:

  1. 每个Worker都保护一个逻辑时钟clock,逻辑时钟的值便是当前Worker自身的排序
  2. 每次推举会有三个根本信息:
    • id 标明这是哪一次的推举
    • topic: 主题,标明action的类型
    • action: 本次推举的目的,推举成功的worker将会履行这个 action

大致的推举顺序如下:

  1. 当control意识到需要一次推举的时分,就会调用本worker的gossip发送一个worker-event 给所有其他的Worker。
  2. 其他的Worker收到之后,就将自己的Clock给回复曩昔。
  3. 一段时间之后(一般很快),每个worker都能收到其他所有Worker的clock,然后在自己本地进行比照,看看自己是不是最大的那一个。假如是的话,自己处理这个使命,假如不是的话,那就不管它。因为与此同时,其他的worker也在重复相同的进程,clock最大的那个worker迟早会发现自己是最大的,然后顾客个使命,所以不必担心使命无法被消费的问题。除非音讯传送的进程中丢了。

除了推举之外,worker之间几乎谁也不搭理谁,互不干涉,谁拿到使命谁就去履行就好了。

听起来十分的简略,但实际上Gossip和其他的协议相同,也存在不可避免的缺陷:

Celery 源码剖析(二): 根底架构

  1. 音讯推迟:节点随机向少量几个节点发送音讯,音讯终究是经过多个轮次的散播而抵达全网,不可避免的形成音讯推迟。这关于及时性要求十分高的体系是不适用的。
  2. 音讯冗余:节点守时随机挑选周围节点发送音讯,而收到音讯的节点也会重复该步骤,因此不可避免地引起同一节点屡次接收同一音讯,增加音讯处理的压力。一次通讯会对网路带宽、CUP资源形成很大的负载,而这些负载又受限于 通讯频率,该频率又影响着算法收敛的速度。
  3. 拜占庭问题:假如有一个歹意传播音讯的节点,Gossip协议的分布式体系就会出问题。

Worker的组成部分

在celery中,怎么快速知道某个组件的组成部分十分的有规律,咱们只需要找到对应的中心组件,然后找到对应的steps就行, 比如在Worker中,咱们先是找到了Worker这个类,然后发现Worker继承了WorkController这个类, 点进去一看,Blueprint这个子类映入眼帘。

    class Blueprint(bootsteps.Blueprint):
        """Worker bootstep blueprint."""
        name = 'Worker'
        default_steps = {
            'celery.worker.components:Hub',
            'celery.worker.components:Pool',
            'celery.worker.components:Beat',
            'celery.worker.components:Timer',
            'celery.worker.components:StateDB',
            'celery.worker.components:Consumer',
            'celery.worker.autoscale:WorkerComponent',
        }
  1. Hub: Eventloop 的封装对象,

  2. Pool: Worker保护的池子(进程池,线程池,协程), 实际上使命的履行是交由这些子进程or线程完结的。

  3. Beat: 只要再加 -beat 的时分才会开启。

  4. Timer: 用于履行守时使命的 Timer, 当咱们设置一个使命五秒后履行的时分,这个使命并不会直接进入使命行列,而是先进入Timer中保护的行列,时间到了在扔到使命履行行列中去。

  5. StateDB: 用户耐久化Worker重启的时分的数据。

  6. Consumer: 使命顾客,负责从broker那里接受音讯,然后封装音讯交由对应的逻辑去处理。

  7. WorkerComponent: 在线调节进程池巨细。

问题:

  1. celery 是怎么发动的?
  2. 咱们界说的task celery 是怎么扫描并注册到celery的?
  3. worker 是怎么消费使命的?
  4. celery beat 轮询使命的?

后面的篇幅咱们将在翻看源码的时分,去解答这些问题。