出产者顾客模型(CP模型)是一种非常经典的设计,在实践开发中被广泛运用,因为它在线程场景中非常高效

1、出产者顾客模型

1.1 什么是出产者顾客模型

出产者顾客模型是经过一个买卖场所来处理出产者与顾客的强耦合联系,出产者与顾客之间不直接进行通讯,而是运用 买卖场所来进行通讯

现实中的超市作业模式便是一个生动形象的出产者顾客模型

Linux运用编程根底07-出产者顾客模型(多线程)

  • 超市从工厂进货,工厂需求向超市供给产品
  • 顾客在超市选购,超市需求向顾客供给产品

得益于超市(买卖场所),顾客(顾客)不需求跑到工厂购买产品,工厂(出产者)也不需求将产品配送到顾客手中,这便是处理出产者与顾客间的强耦合联系

超市是买卖场所,通常是一种特定的缓冲区,常见的有堵塞行列环形行列

买卖场所会被多个出产者顾客(多个线程) 看到,是一个同享资源;在多线程环境中,需求保证同享资源被多线程并发拜访时的安全

1.2 出产者顾客模型的特点

出产者顾客模型是一个存在出产者、顾客、买卖场所三个条件,以及不同人物间的同步、互斥联系的高效模型

出产者与出产者:互斥

比方多个工厂供应同一种产品时,为了抢占更多的商场,总会经过一些促销手段来扫除竞品,但商场(超市中的货架位置)是有限的

顾客与顾客:互斥

当超市只要一个产品时,顾客之间会竞争

出产者与顾客:互斥、同步

出产者不断出产,买卖场所堆满产品后,需求告诉顾客进行消费,顾客不断消费,买卖场所为空时,需求告诉出产者进行出产

[管道](Linux运用编程根底05-进程通信 – (juejin.cn))实质上便是一个天然的出产者顾客模型,因为它答应多个进程一起拜访,而且不会出现问题,意味着它维护好了互斥、同步联系;当写端写满管道时,无法再写,告诉读端进行读取;当管道为空时,无法读取,告诉写端写入数据

1.3 出产者顾客模型的长处

  • 出产者、顾客 能够在同一个买卖场所中进行操作
  • 出产者在出产时,无需重视顾客的状况,只需重视买卖场所中是否有闲暇位置
  • 顾客在消费时,无需重视出产者的状况,只需重视买卖场所中是否有就绪数据
  • 能够依据不同的战略,调整出产者与顾客间的协同联系

出产者、顾客、买卖场所 各司其职,能够依据详细需求自在设计,很好地做到了解耦,便于维护和扩展

2、基于堵塞行列的出产者顾客模型

2.1 堵塞行列

堵塞行列是一种特殊的行列,作为行列宗族的一员,它具有 先进先出 FIFO 的基本特性,与普通行列不同的是: 堵塞行列的巨细是固定的

将其带入出产者顾客模型中,入队便是出产产品,而出队则是消费产品

  • 堵塞行列为满时:无法入队 -> 无法出产(堵塞)
  • 堵塞行列为空时:无法出队 -> 无法消费(堵塞)

至于怎么处理队空/队满的特殊状况,就需求凭借互斥、同步相关常识

2.2 出产者顾客模型

堵塞行列模板类

#pragma once
#include <queue>
#include <mutex>
#include <pthread.h>
namespace MyBlockQueue
{
#define DEF_SIZE 10 // 堵塞行列长度
    template <class T>
    class BlockQueue
    {
    private:
        std::queue<T> _queue; // 行列
        size_t _cap;          // 堵塞行列的容量
        // 无论是「出产者」仍是「顾客」,它们需求看到同一个堵塞行列,因而运用一把互斥锁进行维护
        pthread_mutex_t _mtx; // 互斥锁(存疑)
        //「出产者」关怀是否为满,「顾客」关怀是否为空,两者重视的点不相同,不能只运用一个条件变量,
        pthread_cond_t _pro_cond; // 出产者条件变量
        pthread_cond_t _con_cond; // 出产者条件变量
    public:
        BlockQueue(size_t cap = DEF_SIZE) : _cap(cap){
            // 初始化锁和条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_pro_cond, nullptr);
            pthread_cond_init(&_con_cond, nullptr);
        }
        ~BlockQueue(){
            // 毁掉锁和条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_pro_cond);
            pthread_cond_destroy(&_con_cond);
        }
        // 出产数据(入队)
        void push(const T &inData){
            // 加锁
            pthread_mutex_lock(&_mtx);
            // 判别条件是否满意
            while (isFull()){
                //堵塞等候条件满意(堵塞的时分需求把锁作为参数进行传递)
                // -堵塞时,需求开释锁,否则其他线程得不到锁,就会导致死锁
                // 过了一段时间,当条件满意时(顾客现已消费数据了),代码从 pthread_cond_wait 函数之后持续运转
                pthread_cond_wait(&_pro_cond, &_mtx);
            }
            _queue.push(inData);
            // 顾客也会有堵塞的状况,当有数据时,唤醒顾客
            pthread_cond_signal(&_con_cond);
            pthread_mutex_unlock(&_mtx);
        }
        // 消费数据(出队)
        void pop(T *outData){
            // 加锁
            pthread_mutex_lock(&_mtx);
            // 判别条件运用while不运用if的理由:
            // 1) pthread_cond_wait 函数可能调用失败(误唤醒、伪唤醒),此刻假如是 if 就会向后持续运转,导致在条件不满意的时分进行了 出产/消费
            // 2) 在多线程场景中,可能会运用 pthread_cond_broadcast 唤醒一切等候线程,假如在只出产了一个数据的状况下,唤醒一切线程,会导致只要一个线程进行了合法操作,其他线程都是非法操作了
            while (isEmpty()){
                pthread_cond_wait(&_con_cond, &_mtx);
            }
            *outData = _queue.front();
            _queue.pop();
            // 能够加战略唤醒,比方消费完后才唤醒出产者
            pthread_cond_signal(&_pro_cond);
            pthread_mutex_unlock(&_mtx);
        }
    private:
        bool isFull(){
            return _queue.size() == _cap;
        }
        bool isEmpty(){
            return _queue.empty();
        }
    };
}

main.cpp

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include "BlockingQueue.hpp"
// 出产线程
void *Producer(void *args)
{
    MyBlockQueue::BlockQueue<int> *bq = static_cast<MyBlockQueue::BlockQueue<int> *>(args);
    while (true)
    {
        // 1.出产产品(经过某种途径获取数据)
        int num = rand() % 10;
        // 2.将产品推送至堵塞行列中
        bq->push(num);
        std::cout << "Producer 出产了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }
    pthread_exit((void *)0);
}
// 消费线程
void *Consumer(void *args)
{
    MyBlockQueue::BlockQueue<int> *bq = static_cast<MyBlockQueue::BlockQueue<int> *>(args);
    while (true)
    {
        sleep(1); // 每隔1s消费一个
        // 1.从堵塞行列中获取产品
        int num;
        bq->pop(&num);
        // 2.消费产品(结合某种详细事务进行处理)
        std::cout << "Consumer 消费了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }
    pthread_exit((void *)0);
}
int main()
{
    // 创立堵塞行列
    MyBlockQueue::BlockQueue<int> *bq = new MyBlockQueue::BlockQueue<int>;
    // 创立两个线程(出产、消费)
    pthread_t pro, con;
    pthread_create(&pro, nullptr, Producer, bq); // 出产者、顾客需求看到同一个堵塞行列
    pthread_create(&con, nullptr, Consumer, bq);
    pthread_join(pro, nullptr);
    pthread_join(con, nullptr);
    delete bq;
}

Linux运用编程根底07-出产者顾客模型(多线程)

3、基于循环行列完成出产者顾客模型

3.1 POSIX 信号量

互斥、同步不只能经过 互斥锁、条件变量 完成,还能经过 信号量 sem、互斥锁 完成

信号量的实质便是一个 计数器,只要在计数器不为 0 的状况下,才能进行资源请求

  • 请求到资源,计数器 –(P 操作)
  • 开释完资源,计数器 (V 操作)

假如信号量只要两种状况:1、0,能够完成类似 互斥锁 的效果,即完成线程互斥(二元信号量)

信号量不止能够用于互斥,它的首要目的是描绘临界资源中的资源数目,比方把堵塞行列切割成 N 份,初始化信号量的值为N,当某一份资源就绪时,sem–,资源被开释后,sem ,这样能够像条件变量相同完成同步(多元信号量)

  • 当 sem == N 时,堵塞行列现已空了,顾客无法消费
  • 当 sem == 0 时,堵塞行列现已满了,出产者无法出产

将信号量实践带入之前的出产者顾客模型中,是不需求进行资源条件判别的,因为信号量本身就现已是资源的计数器

在完成 互斥、同步 时,该怎么选择?

结合事务场景进行分析,假如待操作的同享资源是一个全体,比较合适运用 互斥锁 条件变量 的方案,但假如同享资源是多份资源,运用 信号量 就比较便利

信号量相关操作:

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
/*
* sem:需求初始化的信号量,sem_t 实践便是一个联合体,里面包含了一个 char 数组,以及一个 long int
* pshared:表明当时信号量的同享状况,传递 0 表明线程间同享,传递 非0 表明进程间同享
* value:信号量的初始值,能够设置为双元或多元信号量
*/

毁掉信号量:

#include <semaphore.h>
int sem_destroy(sem_t *sem);

请求信号量(等候信号量):

#include <semaphore.h>
int sem_wait(sem_t *sem); // 表明从哪个信号量中请求(堵塞)
int sem_trywait(sem_t *sem); // 测验请求,假如没有请求到资源,就会放弃请求(非堵塞)
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);//每隔一段时间进行请求

开释信号量(发布信号量):

#include <semaphore.h>
int sem_post(sem_t *sem);// 将资源开释到哪个信号量中

运用信号量标识资源的运用状况,出产者和顾客重视的资源并不相同,所以需求运用两个信号量来进行操作

  • 出产者信号量:标识当时有多少可用空间
  • 顾客信号量:标识当时有多少数据

3.2 出产者顾客模型

经过两个信号量,当两个信号量都不为 0 时,两边能够并发操作,这是循环行列最大的特点

  • 当出产者信号量为 0 时,出产者陷入堵塞等候,等候顾客消费
  • 当顾客信号量为 0 时,顾客也会堵塞住,在这儿堵塞便是互斥的表现

当对方完成 出产 / 消费 后,自己会解除堵塞状况,而这便是 同步

循环行列模板类:

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <mutex>
#include <pthread.h>
namespace MyRingQueue
{
#define DEF_SIZE 10 // 循环行列长度
    template <class T>
    class RingQueue
    {
    private:
        std::vector<T> _queue; //循环行列(用数组表明)
        size_t _cap;           // 容量
        sem_t _pro_sem; // 出产者信号量
        sem_t _con_sem; // 顾客信号量
        size_t _pro_step; // 出产者下标
        size_t _con_step; // 顾客下标
        // 这儿需求两把锁:因为当时的出产者和顾客重视的资源不相同,一个重视剩下空间,一个重视是否有产品
        // (堵塞行列只需求一把锁:因为同享资源是一整个行列,出产者和顾客拜访的是同一份资源)
        pthread_mutex_t _pro_mtx;
        pthread_mutex_t _con_mtx;
    public:
        RingQueue(size_t cap = DEF_SIZE) : _cap(cap)
        {
            _queue.resize(_cap);
            // 初始化信号量
            sem_init(&_pro_sem, 0, _cap);
            sem_init(&_con_sem, 0, 0);
            // 初始化互斥锁
            pthread_mutex_init(&_pro_mtx, nullptr);
            pthread_mutex_init(&_con_mtx, nullptr);
        }
        ~RingQueue()
        {
            // 毁掉信号量
            sem_destroy(&_pro_sem);
            sem_destroy(&_con_sem);
            // 毁掉互斥锁
            pthread_mutex_destroy(&_pro_mtx);
            pthread_mutex_destroy(&_con_mtx);
        }
        // 出产数据(入队)
        void push(const T &inData)
        {
            // 请求信号量(空位-1)
            sem_wait(&_pro_sem); // 因为操作信号量是原子操作,能够保证线程安全,也就不需求加锁维护
            // 加锁
            pthread_mutex_lock(&_pro_mtx);
            // 出产(循环行列入队,不需求再单独判别对满,因为信号量现已判别)
            _queue[_pro_step  ] = inData;
            _pro_step %= _cap;
            pthread_mutex_unlock(&_pro_mtx);
            // 开释信号量(可消费量 1)
            sem_post(&_con_sem);
        }
        // 消费数据(出队)
        void pop(T *outData)
        {
            // 请求信号量
            sem_wait(&_con_sem);
            pthread_mutex_lock(&_con_mtx);
            // 消费
            *outData = _queue[_con_step  ];
            _con_step %= _cap;
            pthread_mutex_unlock(&_con_mtx);
            // 开释信号量
            sem_post(&_pro_sem);
        }
    };
}

多线程:

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include "RingQueue.hpp"
// 出产线程
void *Producer(void *args)
{
    MyRingQueue::RingQueue<int> *bq = static_cast<MyRingQueue::RingQueue<int> *>(args);
    while (true)
    {
        sleep(1); // 每隔1s消费一个
        // 1.出产产品(经过某种途径获取数据)
        int num = rand() % 10;
        // 2.将产品推送至堵塞行列中
        bq->push(num);
        std::cout << "Producer 出产了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }
    pthread_exit((void *)0);
}
// 消费线程
void *Consumer(void *args)
{
    MyRingQueue::RingQueue<int> *bq = static_cast<MyRingQueue::RingQueue<int> *>(args);
    while (true)
    {
        sleep(1); // 每隔1s消费一个
        // 1.从堵塞行列中获取产品
        int num;
        bq->pop(&num);
        // 2.消费产品(结合某种详细事务进行处理)
        std::cout << "Consumer 消费了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }
    pthread_exit((void *)0);
}
int main()
{
    // 种子
    srand((size_t)time(nullptr));
    // 创立循环行列
    MyRingQueue::RingQueue<int> *rq = new MyRingQueue::RingQueue<int>;
    // 创立多个线程(出产者、顾客)
    pthread_t pro[10], con[20];
    for (int i = 0; i < 10; i  )
        pthread_create(pro   i, nullptr, Producer, rq);
    for (int i = 0; i < 20; i  )
        pthread_create(con   i, nullptr, Consumer, rq);
    for (int i = 0; i < 10; i  )
        pthread_join(pro[i], nullptr);
    for (int i = 0; i < 20; i  )
        pthread_join(con[i], nullptr);
    delete rq;
}

4、比较堵塞行列和循环行列

首先要理解出产者顾客模型高效的当地历来都不是往缓冲区中放数据、从缓冲区中拿数据

需求重视的点在于出产数据和消费数据,这是比较消耗时间的,堵塞行列至多支撑获取一次数据获取或一次数据消费,在代码中的详细表现便是一切线程都在运用一把锁,而且每次只能 push、pop 一个数据;

而循环行列就不相同了,出产者、顾客 能够经过信号量知晓数据获取、数据消费次数,而且因为数据获取、消费操作没有加锁,支撑并发,因而效率非常高

循环行列必定优于堵塞行列吗?

Linux运用编程根底07-出产者顾客模型(多线程)