根据C++规范库完成定时器Timer类
定时器类是多线程编程中经常规划到的工具类
简略的定时器原理其实很简略(是不是有点GNU is not unix的味道;):
- 创立一个新线程
- 在那个线程里等候
- 等候指定时长后做任务
python规范库中就有这么一个定时器类:threading.Timer
此类表明一个操作应该在等候一定的时刻之后运转 — 相当于一个定时器。
Timer 类是 Thread 类的子类,因而能够像一个自定义线程相同工作。
与线程相同,经过调用 start() 办法发动定时器。 而 cancel() 办法能够中止计时器(在计时完毕前)。
例如:
def hello():
print("hello, world")
t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
class threading.Timer(interval, function, args=None, kwargs=None)
创立一个定时器,在经过 interval 秒的距离事件后,将会用参数 args 和关键字参数 kwargs 调用 function。
假如 args 为 None (默认值),则会运用一个空列表。假如 kwargs 为 None (默认值),则会运用一个空字典。
cancel() 中止定时器并取消履行计时器即将履行的操作。仅当计时器仍处于等候状况时有用。
接下来我将给出两种C++的Timer完成,接口类似于python的threading.Timer,不过精度为秒级的,其原因有二:
- 完成代码参阅了Posix多线程编程里的alarm实例程序,为了便利咱们对比C语言版别的完成,这儿也以秒为单位
- 避免一上来过多的触及C++规范库中
<chrono>
的用法,使代码逻辑更清晰的集中在定时器相关的部分
当然,作为一个在产品级代码中可用的Timer,精度至少应该在毫秒级才行,所以文章最终也会给出精度在微秒的代码完成的链接。
首要,给出C++版别的Timer的接口定义:
几乎完全模仿python的threading.Timer,
class Timer {
public:
typedef std::function<void ()> Callback;
Timer(int interval, Callback function);
void Start();
void Cancel();
};
- Callback:类型为std::function<void ()>,即回来类型为void的“函数”,当然在C++里能够是普通函数,函数对象,lambda等。
- Timer(interval, function):结构函数,创立一个Timer,interval秒后到期(相对于调用Start函数时的时刻点),回调函数为function。
- Start():发动定时器
- Cancel():中止定时器并取消履行计时器即将履行的操作。
在给出C++的完成前,咱们先给出测验驱动程序。测验驱动程序来源于《Posix多线程程序规划》(英文原版书名为Programming with POSIX Threads)里的闹钟实例程序。
而我接下来的介绍次序源于我的编码完成次序,如下:
- 先给出书中根据pthread的多线程版别的实例代码,C代码
- 将C版别的代码转化成等价的python代码,根据threading.Timer接口完成的版别
- 将python版别的代码,转化成C++版别,并根据C++的Timer类接口,得到C++的Timer类的测验驱动代码
- 完成C++版的Timer类,而且编译测验驱动代码,运转验证
那么,我先给出根据pthread的多线程版别的实例代码,C代码:
/*
* alarm_fork.c
*
* This version of alarm.c uses pthread_create to create a
* separate thread to wait for each alarm to expire.
*/
#include <pthread.h>
#include "errors.h"
typedef struct alarm_tag {
int seconds;
char message[64];
} alarm_t;
void *alarm_thread (void *arg)
{
alarm_t *alarm = (alarm_t*)arg;
int status;
status = pthread_detach (pthread_self ());
if (status != 0)
err_abort (status, "Detach thread");
sleep (alarm->seconds);
printf ("(%d) %s\n", alarm->seconds, alarm->message);
free (alarm);
return NULL;
}
int main (int argc, char *argv[])
{
int status;
char line[128];
alarm_t *alarm;
pthread_t thread;
while (1) {
printf ("Alarm> ");
if (fgets (line, sizeof (line), stdin) == NULL) exit (0);
if (strlen (line) <= 1) continue;
alarm = (alarm_t*)malloc (sizeof (alarm_t));
if (alarm == NULL)
errno_abort ("Allocate alarm");
/*
* Parse input line into seconds (%d) and a message
* (%64[^\n]), consisting of up to 64 characters
* separated from the seconds by whitespace.
*/
if (sscanf (line, "%d %64[^\n]",
&alarm->seconds, alarm->message) < 2) {
fprintf (stderr, "Bad command\n");
free (alarm);
} else {
status = pthread_create (
&thread, NULL, alarm_thread, alarm);
if (status != 0)
err_abort (status, "Create alarm thread");
}
}
}
代码的完好阐明参见《Posix多线程程序规划》 1.5.3章节,这儿就不再搬运原文了。
接下来是移殖成python版别的代码:
#!/usr/bin/env python3
from threading import Timer
class Alarm:
def __init__(self, seconds:int, message:str):
self.seconds = seconds
self.message = message
def callback(alarm:Alarm):
print("({}) {}\n".format(alarm.seconds, alarm.message))
if __name__ == "__main__":
while True:
line = input("Alarm> ")
if len(line) <= 1:
continue
try:
seconds, *message = line.split(' ')
alarm = Alarm(int(seconds), ' '.join(message))
t = Timer(interval=int(seconds), function=callback, args=(alarm, ))
t.start()
except:
print("Bad command")
python版别的代码咱们有兴趣的能够在本地运转一下,看看效果;)
再然后,我把这段代码翻译成C++版别的,根据C++的Timer类接口:
#include "timer.hpp"
#include <cstdlib>
#include <string>
#include <memory>
#include <iostream>
struct Alarm {
Alarm(int seconds_, const std::string& message_):
seconds(seconds_), message(message_) {
}
int seconds;
std::string message;
};
void callback(std::shared_ptr<Alarm> alarm) {
std::cout << "(" << alarm->seconds << ") " << alarm->message << std::endl;
}
std::tuple<int, std::string> parse_command(const std::string& line) {
auto pos = line.find(' ');
if (pos == std::string::npos)
throw std::runtime_error("invalid line: separator not found");
int seconds = std::stoi(line.substr(0, pos));
std::string message = line.substr(pos+1);
return std::make_tuple(seconds, message);
}
int main()
{
std::string line;
int seconds;
std::string message;
while (true) {
std::cout << "Alarm> ";
if (!std::getline(std::cin, line)) exit(0);
if (line.length() <= 1) continue;
try {
std::tie(seconds, message) = parse_command(line);
auto alarm = std::make_shared<Alarm>(seconds, message);
Timer t(seconds, std::bind(callback, alarm));
t.Start();
}
catch (const std::exception& e) {
std::cout << "Bad command" << std::endl;
}
}
}
这样咱们就有了C++版别Timer类的测验驱动程序,而且能够跟C和python版别的代码对比运转。
接下来给出C++版别的Timer完成:
#pragma once
#include <thread>
#include <chrono>
#include <atomic>
#include <functional>
class Timer {
public:
typedef std::function<void ()> Callback;
Timer(int interval, Callback function) {
pimpl = std::make_shared<Impl>(interval, function);
}
void Start() {
std::thread t([pimpl=pimpl]() {
if(!pimpl->active.load()) return;
std::this_thread::sleep_for(std::chrono::seconds(pimpl->interval));
if(!pimpl->active.load()) return;
pimpl->function();
});
t.detach();
}
void Cancel() {
pimpl->active.store(false);
}
private:
struct Impl {
Impl(int interval_, Callback function_): interval(interval_), function(function_) {}
int interval;
Callback function;
std::atomic<bool> active{true};
};
private:
std::shared_ptr<Impl> pimpl;
};
C++完成部分,基本上是C版别的代码抽离和封装,并把相关函数替换成C++规范库的完成罢了。不过Timer类麻雀虽小,但五脏俱全,其中用到的C++规范库组件有:
- std::function:用于笼统到期回调函数
- std::shared_ptr:用于办理Timer::Impl的生命周期
- std::atomic:用于Cancel Timer的flag,确保线程安全
- std::thread:用于Timer线程,sleep指定时刻,然后调用回调函数
- std::chrono:C++规范库中时刻相关的完成都在其中
- C++ lambda:Timer线程的target函数,捕获了this->pimpl,确保了Timerl::Impl对象不会因为Timer对象的析构而析构
这儿还用的了Pimpl惯用法,虽然目前是把接口和完成都放在了头文件里,但规范的做法是Timer的成员函数完成和Timer::Impl完成都放到源文件中,
这样头文件里能够去掉除了std::shared_ptr和std::function以外的依靠。
这个Timer类的完成优缺陷是非常明显的:长处是代码简练,一望而知,缺陷是太过简练,每个Timer需求一个线程去运转,系统资源耗费大。
于是就引出了根据条件变量版别的Timer,完成“参阅”了《Posix多线程程序规划》3.3.4章节说到闹钟实例的最终版别(与其说“参阅”,改成“直译”也不为过;)。
照例,我先给出根据pthread的条件变量版别的实例代码,C代码:
/*
* alarm_cond.c
*
* This is an enhancement to the alarm_mutex.c program, which
* used only a mutex to synchronize access to the shared alarm
* list. This version adds a condition variable. The alarm
* thread waits on this condition variable, with a timeout that
* corresponds to the earliest timer request. If the main thread
* enters an earlier timeout, it signals the condition variable
* so that the alarm thread will wake up and process the earlier
* timeout first, requeueing the later request.
*/
#include <pthread.h>
#include <time.h>
#include "errors.h"
/*
* The "alarm" structure now contains the time_t (time since the
* Epoch, in seconds) for each alarm, so that they can be
* sorted. Storing the requested number of seconds would not be
* enough, since the "alarm thread" cannot tell how long it has
* been on the list.
*/
typedef struct alarm_tag {
struct alarm_tag *link;
int seconds;
time_t time; /* seconds from EPOCH */
char message[64];
} alarm_t;
pthread_mutex_t alarm_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t alarm_cond = PTHREAD_COND_INITIALIZER;
alarm_t *alarm_list = NULL;
time_t current_alarm = 0;
/*
* Insert alarm entry on list, in order.
*/
void alarm_insert (alarm_t *alarm)
{
int status;
alarm_t **last, *next;
/*
* LOCKING PROTOCOL:
*
* This routine requires that the caller have locked the
* alarm_mutex!
*/
last = &alarm_list;
next = *last;
while (next != NULL) {
if (next->time >= alarm->time) {
alarm->link = next;
*last = alarm;
break;
}
last = &next->link;
next = next->link;
}
/*
* If we reached the end of the list, insert the new alarm
* there. ("next" is NULL, and "last" points to the link
* field of the last item, or to the list header.)
*/
if (next == NULL) {
*last = alarm;
alarm->link = NULL;
}
#ifdef DEBUG
printf ("[list: ");
for (next = alarm_list; next != NULL; next = next->link)
printf ("%d(%d)[\"%s\"] ", next->time,
next->time - time (NULL), next->message);
printf ("]\n");
#endif
/*
* Wake the alarm thread if it is not busy (that is, if
* current_alarm is 0, signifying that it's waiting for
* work), or if the new alarm comes before the one on
* which the alarm thread is waiting.
*/
if (current_alarm == 0 || alarm->time < current_alarm) {
current_alarm = alarm->time;
status = pthread_cond_signal (&alarm_cond);
if (status != 0)
err_abort (status, "Signal cond");
}
}
/*
* The alarm thread's start routine.
*/
void *alarm_thread (void *arg)
{
alarm_t *alarm;
struct timespec cond_time;
time_t now;
int status, expired;
/*
* Loop forever, processing commands. The alarm thread will
* be disintegrated when the process exits. Lock the mutex
* at the start -- it will be unlocked during condition
* waits, so the main thread can insert alarms.
*/
status = pthread_mutex_lock (&alarm_mutex);
if (status != 0)
err_abort (status, "Lock mutex");
while (1) {
/*
* If the alarm list is empty, wait until an alarm is
* added. Setting current_alarm to 0 informs the insert
* routine that the thread is not busy.
*/
current_alarm = 0;
while (alarm_list == NULL) {
status = pthread_cond_wait (&alarm_cond, &alarm_mutex);
if (status != 0)
err_abort (status, "Wait on cond");
}
alarm = alarm_list;
alarm_list = alarm->link;
now = time (NULL);
expired = 0;
if (alarm->time > now) {
#ifdef DEBUG
printf ("[waiting: %d(%d)\"%s\"]\n", alarm->time,
alarm->time - time (NULL), alarm->message);
#endif
cond_time.tv_sec = alarm->time;
cond_time.tv_nsec = 0;
current_alarm = alarm->time;
while (current_alarm == alarm->time) {
status = pthread_cond_timedwait (
&alarm_cond, &alarm_mutex, &cond_time);
if (status == ETIMEDOUT) {
expired = 1;
break;
}
if (status != 0)
err_abort (status, "Cond timedwait");
}
if (!expired)
alarm_insert (alarm);
} else
expired = 1;
if (expired) {
printf ("(%d) %s\n", alarm->seconds, alarm->message);
free (alarm);
}
}
}
int main (int argc, char *argv[])
{
int status;
char line[128];
alarm_t *alarm;
pthread_t thread;
status = pthread_create (
&thread, NULL, alarm_thread, NULL);
if (status != 0)
err_abort (status, "Create alarm thread");
while (1) {
printf ("Alarm> ");
if (fgets (line, sizeof (line), stdin) == NULL) exit (0);
if (strlen (line) <= 1) continue;
alarm = (alarm_t*)malloc (sizeof (alarm_t));
if (alarm == NULL)
errno_abort ("Allocate alarm");
/*
* Parse input line into seconds (%d) and a message
* (%64[^\n]), consisting of up to 64 characters
* separated from the seconds by whitespace.
*/
if (sscanf (line, "%d %64[^\n]",
&alarm->seconds, alarm->message) < 2) {
fprintf (stderr, "Bad command\n");
free (alarm);
} else {
status = pthread_mutex_lock (&alarm_mutex);
if (status != 0)
err_abort (status, "Lock mutex");
alarm->time = time (NULL) + alarm->seconds;
/*
* Insert the new alarm into the list of alarms,
* sorted by expiration time.
*/
alarm_insert (alarm);
status = pthread_mutex_unlock (&alarm_mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
}
}
}
代码有些长,代码阐明见《Posix多线程程序规划》3.3.4章节,我这儿啰嗦几句,简略总结一下,根据条件变量完成定时器的原理:
- 程序维护了一个有序的定时器列表,次序按照到期时刻从小到大摆放
- 独立的alarm_thread线程函数,完成定时器到期回调的逻辑,经过条件变量的pthread_cond_wait函数,处理定时器列表为空的状况,以及等候队列中最早的定时器到期,以及有更早的定时器刺进队列的状况
- 函数alarm_insert函数,完成将定时器刺进到定时器列表的逻辑,并在定时器列表原先为空的状况下,或者在刺进的定时器到期时刻早于定时刻列表中最早到期时,通知alarm_thread线程,唤醒pthread_cond_wait函数。
因为根据pthread的条件变量版别的实例代码中,增加了DEBUG宏和调试代码,所以我的C++版别的测验驱动程序也需求做相应的调整:
#include "timer.hpp"
#include <cstdlib>
#include <string>
#include <memory>
#include <iostream>
struct Alarm {
Alarm(int seconds_, const std::string& message_):
seconds(seconds_), message(message_) {
}
int seconds;
std::string message;
};
void callback(std::shared_ptr<Alarm> alarm) {
std::cout << "(" << alarm->seconds << ") " << alarm->message << std::endl;
}
std::tuple<int, std::string> parse_command(const std::string& line) {
auto pos = line.find(' ');
if (pos == std::string::npos)
throw std::runtime_error("invalid line: separator not found");
int seconds = std::stoi(line.substr(0, pos));
std::string message = line.substr(pos+1);
return std::make_tuple(seconds, message);
}
int main()
{
std::string line;
int seconds;
std::string message;
while (true) {
std::cout << "Alarm> ";
if (!std::getline(std::cin, line)) quick_exit(0);
if (line.length() <= 1) continue;
try {
std::tie(seconds, message) = parse_command(line);
auto alarm = std::make_shared<Alarm>(seconds, message);
Timer t(seconds, std::bind(callback, alarm));
#ifdef DEBUG
t.SetMessage(message);
#endif
t.Start();
}
catch (const std::exception& e) {
std::cout << "Bad command" << std::endl;
}
}
}
其实修改就只有两处:
接下来给出C++的完成,假如C版别的代码看得懂,那C++版别的代码就能够说是一望而知,毕竟C++的版别是“直译”过来的。
#pragma once
#include <functional>
#include <memory>
class Timer {
public:
typedef std::function<void ()> Callback;
Timer(int interval, Callback function);
void Start();
void Cancel();
#ifdef DEBUG
void SetMessage(const std::string& message);
#endif
public:
struct Impl;
private:
std::shared_ptr<Impl> pimpl;
};
首要来看Timer的头文件,这儿就看出Pimpl惯用法的优势了,头文件里完全剥离了对<chrono>、<thread>、<atomic>
的依靠。
别的,增加了SetMessage接口,用于完成C版别中DEBUG宏中的调试信息打印
下面是Timer的源文件,代码和C版别的相同,有点儿长:
#include "timer.hpp"
#include <list>
#include <memory>
#include <chrono>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <thread>
#ifdef DEBUG
#include <iostream>
#endif
using Clock = std::chrono::system_clock;
using TimePoint = Clock::time_point;
using Seconds = std::chrono::seconds;
using AlarmPtr = std::shared_ptr<Timer::Impl>;
#ifdef DEBUG
namespace {
std::ostream& operator<<(std::ostream& out, const TimePoint& tp) {
using namespace std::chrono;
auto d = duration_cast<seconds>(tp.time_since_epoch());
out << d.count();
return out;
}
template <typename Rep, typename Preiod>
std::ostream& operator<<(std::ostream& out, const std::chrono::duration<Rep, Preiod>& d) {
using namespace std::chrono;
auto s = duration_cast<seconds>(d);
out << s.count();
return out;
}
} // namespace
#endif
class Timer::Impl {
public:
Impl(int interval_, Callback function_): interval(interval_), function(function_) {
}
void CalculateTime() {
time = Clock::now() + Seconds(interval);
}
int interval;
Timer::Callback function;
TimePoint time;
std::atomic<bool> active{true};
#ifdef DEBUG
std::string message;
#endif
};
class AlarmLooper {
public:
AlarmLooper() = default;
AlarmLooper(const AlarmLooper&) = delete;
void operator=(const AlarmLooper&) = delete;
void ThreadSafetyInsert(AlarmPtr alarm) {
std::unique_lock<std::mutex> lock(alarm_mutex);
Insert(alarm);
}
void Insert(AlarmPtr alarm);
void Run();
private:
std::list<AlarmPtr> alarm_list;
TimePoint current_alarm;
std::mutex alarm_mutex;
std::condition_variable alarm_cond;
};
/*
* Insert alarm entry on list, in order.
*/
void AlarmLooper::Insert(AlarmPtr alarm) {
auto first = alarm_list.begin();
auto last = alarm_list.end();
/*
* LOCKING PROTOCOL:
*
* This routine requires that the caller have locked the
* alarm_mutex!
*/
for ( ; first != last; ++first) {
if ((*first)->time >= alarm->time) {
alarm_list.insert(first, alarm);
break;
}
}
/*
* If we reached the end of the list, insert the new alarm
* there. ("next" is NULL, and "last" points to the link
* field of the last item, or to the list header.)
*/
if (first == last) {
alarm_list.push_back(alarm);
}
#ifdef DEBUG
std::cout << "[list:";
for (auto item : alarm_list) {
std::cout << item->time << "(" << (item->time - Clock::now()) << ")[\""
<< item->message << "\"] ";
}
std::cout << "]\n" << std::flush;
#endif
/*
* Wake the alarm thread if it is not busy (that is, if
* current_alarm is 0, signifying that it's waiting for
* work), or if the new alarm comes before the one on
* which the alarm thread is waiting.
*/
if (current_alarm == TimePoint{} || alarm->time < current_alarm) {
current_alarm = alarm->time;
alarm_cond.notify_one();
}
}
/*
* The alarm thread's start routine.
*/
void AlarmLooper::Run() {
AlarmPtr alarm;
TimePoint now;
bool expired;
std::cv_status status;
/*
* Loop forever, processing commands. The alarm thread will
* be disintegrated when the process exits. Lock the mutex
* at the start -- it will be unlocked during condition
* waits, so the main thread can insert alarms.
*/
std::unique_lock<std::mutex> lock(alarm_mutex);
while (true) {
/*
* If the alarm list is empty, wait until an alarm is
* added. Setting current_alarm to 0 informs the insert
* routine that the thread is not busy.
*/
current_alarm = TimePoint{};
while (alarm_list.empty()) {
alarm_cond.wait(lock);
}
alarm = alarm_list.front();
alarm_list.pop_front();
now = Clock::now();
expired = false;
if (alarm->time > now) {
#ifdef DEBUG
std::cout << "[waiting: " << alarm->time << "(" << (alarm->time - Clock::now()) << ")\""
<< alarm->message << "\"\n" << std::flush;
#endif
current_alarm = alarm->time;
while (current_alarm == alarm->time) {
status = alarm_cond.wait_until(lock, alarm->time);
if (status == std::cv_status::timeout) {
expired = true;
break;
}
}
if (!expired) {
Insert(alarm);
}
} else {
expired = true;
}
if (expired) {
if (alarm->active) {
alarm->function();
}
}
}
}
class TimerThread {
public:
TimerThread();
void AddTimer(std::shared_ptr<Timer::Impl> timer);
bool CurrentThreadIsAlarmLooperThread() {
return std::this_thread::get_id() == looper_thread.get_id();
}
static TimerThread& GetInstance() {
static TimerThread timer_thread;
return timer_thread;
}
private:
AlarmLooper alarm_looper;
std::thread looper_thread;
};
TimerThread::TimerThread() {
looper_thread = std::thread(&AlarmLooper::Run, &alarm_looper);
looper_thread.detach();
}
void TimerThread::AddTimer(std::shared_ptr<Timer::Impl> timer) {
if (CurrentThreadIsAlarmLooperThread()) {
alarm_looper.Insert(timer);
} else {
alarm_looper.ThreadSafetyInsert(timer);
}
}
Timer::Timer(int interval, Callback function) {
pimpl = std::make_shared<Impl>(interval, function);
}
void Timer::Start() {
pimpl->CalculateTime();
TimerThread::GetInstance().AddTimer(pimpl);
}
void Timer::Cancel() {
pimpl->active = false;
}
#ifdef DEBUG
void Timer::SetMessage(const std::string& message) {
pimpl->message = message;
}
#endif
完成原理和C版别的共同(要不说是“直译”呢;),具体的映射联系如下:
- AlarmLooper类:封装了C版别的函数alarm_thread、alarm_insert逻辑,AlarmLooper::Run对应alarm_thread,AlarmLooper::Insert对应alarm_insert
- TimerThread类:作为单例类,办理运转AlarmLooper::Run的线程
- 然后就是std::list代替了C手写的链表
- std::mutex和std::conditon_variable代替了pthread_mutex_t和pthread_cond_t结构体和函数。
最终,我来“吹一吹”完成C++版别Timer类的价值,以及能够改善优化的方向。
价值方面:
- C版别的代码很优秀,用C++的面向对象方法完成,能够在C++工程中以组件的方法复用(主要是指条件变量版别,多线程版别过于粗陋,不过优点是head only)
- C++版别依靠于C++规范库,而C++规范库是跨渠道的,所以Timer类也是跨渠道的(当然,假如把C版别的pthread函数替换成C11规范库的线程函数也能到达同样目的,但据我所知各编译器厂商对C11的多线程支撑的不是很活跃)
改善方面:
- 到期时刻的精度是秒级的,后面会给出微秒级精度的完成链接(和目前的完成原理共同)
- std::list能够替换成std::multi_set以提高功能,根据是用红黑树的功能要好于链表的刺进排序。
- TimerThread类办理的thread是detach的,造成在linux上,进程退出时,有可能在std::condition_variable的析构函数上把进程hang住,这个也在微秒级的完成版别中做了改善。
最终的最终,给出文章中说到的所有代码的完好完成链接:
- 秒级的完成,包含C版别的原始代码,python版别的实例代码,C++的两个版别的代码,链接如下:github.com/hexu1985/Li…
- 微秒级的完成,包含python版别的实例代码,C++的两个版别的代码,链接如下:github.com/hexu1985/Co…
参阅文档:
- 《Posix多线程程序规划(Programming with POSIX Threads)》
- 3.11.0 Documentation The Python Standard Library Concurrent Execution threading
- A Simple Timer in C++