从今天开端将陆陆续续发表一些openstack相关的文章。
openstack服务的发动
根本一切的openstack服务都依靠 evenlet 完成各种并发使命,它的进程可分为两类:
1、WSGIService: 接纳和处理 http 恳求,依靠eventlet.wsgi
的wsgi server
处理 http 恳求,比方nova-api
2、Service: 接纳和处理 rpc 恳求,如nova-operation
等
无论是WSGIService
还是Service
类型的进程,每逢接纳到一个恳求(http 或 rpc),都会在线程池中分配一个协程处理该恳求
一、WSGIService的发动
下面以nova服务为例。
nova-api 由nova/cmd/api.py发动,它初始化一个 WSGIService(由service.py界说) 目标。
def main():
objects.register_all()
CONF(sys.argv[1:], project='nova',
version=version.version_string())
logging.setup(CONF, "nova")
rpc.init(CONF)
launcher = service.get_launcher()
server = service.WSGIService('osapi_nova')
launcher.launch_service(server, workers=server.workers)
launcher.wait()
api中从service层获取一个发动器目标,终究将server目标传入发动器目标的launch_service办法中,launch_service(server, workers=server.workers)办法界说如下:
class Launcher(object):
def __init__(self):
super(Launcher, self).__init__()
self.launch_service = serve
self.wait = wait
该办法被引用到serve办法,serve办法界说如下:
def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = service.launch(CONF, server, workers=workers)
终究调用了oslo_service/service.py下的launch办法,launch办法界说如下:
def launch(conf, service, workers=1, restart_method='reload'):
…
if workers is not None and workers <= 0:
raise ValueError(_("Number of workers should be positive!"))
if workers is None or workers == 1:
launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)
能够看到这儿运用到了两种发动器,在进一步解说发动的过程中先介绍下openstack中的发动器
二、Openstack中的Launcher
Openstack中有一个叫Launcher的概念,即专门用来发动服务的,这个类被放在了oslo_service这个包里面,Launcher分为两种:
一种是ServiceLauncher;
另一种为ProcessLauncher。
ServiceLauncher用来发动单进程的服务;
而ProcessLauncher用来发动有多个worker子进程的服务,如各类api服务(nova-api、cinder-api)等
oslo_service/service.py
1、ServiceLauncher
ServiceLauncher承继自Launcher,发动服务的一个重要成员便是launcher_service,ServiceLauncher的该成员便是承继于Launcher
def launch_service(self, service, workers=1):
…
if workers is not None and workers != 1:
raise ValueError(_("Launcher asked to start multiple workers"))
_check_service_base(service)
service.backdoor_port = self.backdoor_port
self.services.add(service)
aucher_service便是将服务添加到self.services成员里面,services成员的类型是class Services,看看它的add办法
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
"""Add a service to a list and create a thread to run it.
:param service: service to run
"""
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
Services这个类的初始化很简略,即创立一个ThreadGroup,ThreadGroup其实是eventlet的GreenPool,Openstack利用eventlet完成并发,add办法,将self.run_service这个办法放入pool中,而service便是它的参数。run_service办法很简略,便是调用service的start办法,这样就完成了服务的发动
2、ProcessLauncher
ProcessLauncher直接承继于Object,相同也有launch_service办法
def launch_service(self, service, workers=1):
…
_check_service_base(service)
wrap = ServiceWrapper(service, workers)
LOG.info('Starting %d workers', wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
lauch_service除了承受service以外,还需要承受一个workers参数,即子进程的个数,然后调用_start_child发动多个子进程
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info('Forking too fast, sleeping')
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
self.launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
self.launcher.restart()
os._exit(status)
LOG.debug('Started child %d', pid)
wrap.children.add(pid)
self.children[pid] = wrap
看见了解的fork没有,仅仅简略的调用了一个os.fork(),然后子进程开端运转,子进程调用_child_process
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher(self.conf, restart_method=self.restart_method)
launcher.launch_service(service)
return launcher
_child_process其实很简略,创立一个Launcher,调用Laucher.launch_service办法,前面介绍过,其实ServiceLauncher承继自Launcher,也是调用的launcher_service办法,将服务发动,因而接下来的步骤能够参考前面,终究都将调用service.start办法发动服务
三、WSGIService的发动—续
回到前面的发动部分,从launcher节的阐明,咱们知道服务的发动终究调用了service的start办法,而这儿的service便是咱们最开端在api.py中创立的service,然后一层层传进后边的发动器中的,咱们继续回到WSGIService类中的start(self)办法
def start(self):
…
if self.manager:
self.manager.init_host()
self.server.start()
self.port = self.server.port
这儿调用了oslo_service/wsgi.py中的start(self)办法
def start(self):
…
self.dup_socket = self.socket.dup()
if self._use_ssl:
self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)
wsgi_kwargs = {
'func': eventlet.wsgi.server,
'sock': self.dup_socket,
'site': self.app,
'protocol': self._protocol,
'custom_pool': self._pool,
'log': self._logger,
'log_format': self.conf.wsgi_log_format,
'debug': False,
'keepalive': self.conf.wsgi_keep_alive,
'socket_timeout': self.client_socket_timeout
}
if self._max_url_len:
wsgi_kwargs['url_length_limit'] = self._max_url_len
self._server = eventlet.spawn(**wsgi_kwargs)
留意 wsgi_kwargs 中的参数 func,它的值为 eventlet.wsgi.server,在eventlet/wsgi.py的界说如下:
def server(sock, site,
…
try:
serv.log.info("(%s) wsgi starting up on %s" % (
serv.pid, socket_repr(sock)))
while is_accepting:
try:
client_socket = sock.accept()
client_socket[0].settimeout(serv.socket_timeout)
serv.log.debug("(%s) accepted %r" % (
serv.pid, client_socket[1]))
try:
pool.spawn_n(serv.process_request, client_socket)
except AttributeError:
warnings.warn("wsgi's pool should be an instance of "
"eventlet.greenpool.GreenPool, is %s. Please convert your"
" call site to use GreenPool instead" % type(pool),
DeprecationWarning, stacklevel=2)
pool.execute_async(serv.process_request, client_socket)
except ACCEPT_EXCEPTIONS as e:
if support.get_errno(e) not in ACCEPT_ERRNO:
raise
except (KeyboardInterrupt, SystemExit):
serv.log.info("wsgi exiting")
break
finally:
pool.waitall()
…
看,是不是看到了解的一幕了!sock.accept() 监听恳求,每逢接纳到一个新恳求,调用 pool.spawn_n() 发动一个协程处理该恳求
四、Service的发动
Service 类型的进程相同由 nova/cmd/* 目录下某些文件创立:
- nova-schedule: nova/cmd/schedule.py
- ……
作为音讯中间件的消费者,它们监听各自的 queue,每逢有 rpc 恳求来临时,它们创立一个新的协程处理 rpc 恳求。以nova-schedule为例,发动时初始化一个 Server(由service.py界说) 目标。
整个Launcher过程跟WSGIServer相同,仅仅service的start()有些区别罢了
def start(self):
…
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.KarborObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
通过层层调用,终究生成了这样一个RPCServer目标
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'):
super(RPCServer, self).__init__(transport, dispatcher, executor)
self._target = target
该类承继自MessageHandlingServer;
注:nova 的各个组件都依靠 oslo.messaging 访问音讯服务器,通过oslo/messaging/server.py初始化一个 MessageHandlingServer 的目标,监听音讯行列。
终究调用了该service的start办法
def start(self, override_pool_size=None):
…
if self._started:
LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
'racy. It is deprecated, and will become a noop '
'in a future release of oslo.messaging. If you '
'need to restart MessageHandlingServer you should '
'instantiate a new object.'))
self._started = True
executor_opts = {}
if self.executor_type in ("threading", "eventlet"):
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)
try:
self.listener = self._create_listener()
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
# HACK(sileht): We temporary pass the executor to the rabbit
# listener to fix a race with the deprecated blocking executor.
# We do this hack because this is need only for 'synchronous'
# executor like blocking. And this one is deprecated. Making
# driver working in an sync and an async way is complicated
# and blocking have 0% tests coverage.
if hasattr(self.listener, '_poll_style_listener'):
l = self.listener._poll_style_listener
if hasattr(l, "_message_operations_handler"):
l._message_operations_handler._executor = (
self.executor_type)
self.listener.start(self._on_incoming)
上述的目标又初始化一个 EventletExecutor(由oslo/messaging/_executors/impl_eventlet.py) 类型的 excuete 目标,它调用self.listener.poll()监听 rpc 恳求,每逢接纳到一个恳求,创立一个协程处理该恳求。
class EventletExecutor(base.ExecutorBase):
......
def start(self):
if self._thread is not None:
return
@excutils.forever_retry_uncaught_exceptions
def _executor_thread():
try:
while True:
incoming = self.listener.poll()
spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
except greenlet.GreenletExit:
return
self._thread = eventlet.spawn(_executor_thread)
博客:tunsuy.github.io/
github:github.com/tunsuy