Chaichai
生产者与消费者模型

生产者与消费者模型

多线程编程的一个经典模型

一、生产者与消费者模型

生产者与消费者模型(Producer - Consumer Model)是一种经典的多线程同步问题模型,用于描述生产者和消费者之间对共享资源(如缓冲区)的交互过程。它在计算机科学中被广泛应用于解决多线程编程中的线程同步、资源竞争和数据共享等问题。

(一)模型的基本组成

  1. 生产者(Producer)
    • 生产者的作用是生成数据并将其放入共享缓冲区。生产者不断地生产数据,例如在实际应用场景中,它可以是一个数据采集线程,从传感器采集数据后放入缓冲区。
  2. 消费者(Consumer)
    • 消费者从共享缓冲区中取出数据并进行处理。消费者不断地消费数据,比如一个数据分析线程从缓冲区取出数据进行分析处理。
  3. 共享缓冲区(Shared Buffer)
    • 这是生产者和消费者之间共享的资源。它可以是一个队列、数组或其他数据结构。缓冲区的大小是有限的,它存储着生产者生产的、尚未被消费者消费的数据。

(二)模型的核心问题

  1. 同步问题
    • 生产者和消费者需要协调工作。如果缓冲区满了,生产者不能继续生产,否则会导致数据丢失或覆盖;如果缓冲区空了,消费者不能继续消费,否则会因为没有数据可处理而出现错误。
  2. 互斥问题
    • 当生产者向缓冲区添加数据或消费者从缓冲区取出数据时,需要保证对缓冲区的访问是互斥的。否则,可能会出现数据错乱的情况,比如一个生产者正在向缓冲区写入数据时,另一个生产者或消费者同时对缓冲区进行操作。

二、生产者与消费者模型的实现方式

(一)基于信号量的实现

信号量是一种用于线程同步的机制,它是一个计数器,用于记录资源的可用数量。信号量可以用来解决生产者和消费者模型中的同步和互斥问题。

  1. 互斥信号量(mutex)
    • 用于保证对共享缓冲区的互斥访问。当一个线程(生产者或消费者)获得mutex信号量后,其他线程必须等待,直到该线程释放mutex信号量。
  2. 空闲缓冲区信号量(empty)
    • 初始值为缓冲区的大小。当生产者向缓冲区添加数据时,empty信号量的值减1;当消费者从缓冲区取出数据时,empty信号量的值加1。生产者在添加数据前会检查empty信号量,如果empty信号量的值为0,说明缓冲区满了,生产者需要等待。
  3. 满缓冲区信号量(full)
    • 初始值为0。当消费者从缓冲区取出数据时,full信号量的值减1;当生产者向缓冲区添加数据时,full信号量的值加1。消费者在取出数据前会检查full信号量,如果full信号量的值为0,说明缓冲区空了,消费者需要等待。

(二)基于锁和条件变量的实现

在现代操作系统和编程语言中,锁和条件变量也是实现生产者与消费者模型的常用方法。

  1. 锁(Lock)
    • 用于保证对缓冲区的互斥访问。在对缓冲区进行操作时,线程需要先获取锁,操作完成后释放锁。
  2. 条件变量(Condition Variable)
    • 条件变量用于线程间的协调,允许一个或多个线程等待某个条件的发生。它通常与互斥量一起使用,以实现线程间的同步。std::condition_variable用于实现线程间的等待和通知机制。例如,当生产者向缓冲区添加数据后,它可以通过条件变量通知消费者缓冲区中有数据可消费;当消费者从缓冲区取出数据后,它可以通过条件变量通知生产者缓冲区有空闲空间可使用。

三、生产者与消费者模型的应用场景

(一)数据处理领域

在数据采集和处理的场景中,生产者可以是数据采集线程,它从外部设备(如传感器、网络接口等)采集数据并放入缓冲区;消费者可以是数据分析线程,它从缓冲区取出数据进行分析处理。这种模型可以有效地解耦数据采集和数据处理的流程,提高系统的效率和可扩展性。

(二)消息队列系统

在消息队列系统中,生产者向消息队列发送消息,消费者从消息队列中接收消息。消息队列作为共享缓冲区,存储着生产者发送的、尚未被消费者接收的消息。生产者与消费者模型可以很好地解决消息队列系统中的消息同步和并发问题,确保消息的可靠传递和处理。

(三)多线程任务调度

在多线程任务调度场景中,生产者可以是任务生成线程,它根据系统的需要生成任务并放入任务队列;消费者可以是任务执行线程,它从任务队列中取出任务并执行。通过生产者与消费者模型,可以实现任务的动态生成和高效执行,提高系统的并发性能和资源利用率。

四、模型的代码具体实现

引入头文件

1
2
3
4
5
#include <iostream>
#include <thread> // 线程
#include <mutex> // 互斥锁
#include <condition_variable> // 条件变量
#include <queue> // 队列

我们将整个模型封装成一个类,这样这一个类就是一个体系,也更好的体现在项目中

1
2
3
4
5
6
7
8
9
10
class PCM{
public:
void Producer();
void Consumer();

privated:
std::queue<int> my_queue;// 创建一个缓冲区,这个缓冲区是一个队列
std::condition_variable my_cv; // 用于通知消费者,在缓冲区中又新加入了任务
std::mutex mtx; // 互斥锁,若不知道其具体用途请转移至多线程编程中了解或自行搜索
}

生产者函数设计

1
2
3
4
5
6
7
8
9
10
11
12
13
void Producer(){
for(int i = 0; i < 10; i++)
{
{
std::unique_lock<std::mutex> lock(mtx);
my_queue.push(i); // 队列中添加任务
my_cv.notify_one(); // 用于唤醒一个正在等待该条件变量的线程

std::cout << "Task:" << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}

为什么需要两层括号?

  • 内层代码块被额外的一层花括号 {} 包裹起来,这主要是为了限制 std::unique_lock<std::mutex> 对象 lock 的作用域。具体来说:
    作用域限制:当代码块结束时(即遇到右花括号 }),lock 对象会自动析构。在析构过程中,std::unique_lock 会自动释放它所持有的锁(mtx)。这确保了锁的释放是及时的,避免了锁被长时间占用,从而提高了线程间的协作效率。
  • 避免死锁:如果不使用额外的花括号,lock 对象的作用域会扩展到整个循环体,这意味着每次循环都会持续持有锁,直到循环结束。这会导致其他线程无法获取锁,从而引发死锁或严重的性能问题。

消费者函数设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Consumer()
{
while(1)
{
{
std::unique_lock<std::mutex> lock(mtx);
my_cv.wait(lock,!my_queue.empty());
int value = my_queue.front();
my_queue.pop();
}

std::cout << "Consumer:" << value << std::endl;
}
}

详细解释
my_cv.wait(lock, !my_queue.empty());
这行代码的作用是让当前线程等待条件变量 my_cv,直到队列 my_queue 非空。
作用:

  • 如果 predicate 返回 false(即队列为空),当前线程会进入等待状态,释放锁 lock。
  • 当其他线程调用 my_cv.notify_one() 或 my_cv.notify_all() 时,当前线程会被唤醒。
  • 被唤醒后,线程会重新尝试获取锁 lock,并再次检查 predicate 是否为 true。如果为 true,线程继续执行;否则,线程会再次进入等待状态。

int value = my_queue.front();
这行代码的作用是从队列 my_queue 中取出队列的第一个元素,并将其赋值给变量 value。

my_queue.pop();
从队列 my_queue 中移除第一个元素。确保数据在被消费者消费后正确移除

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <iostream>
#include <thread> // 线程
#include <mutex> // 互斥锁
#include <condition_variable> // 条件变量
#include <queue> // 队列

class PCM{
public:
void Producer(){
for(int i = 0; i < 10; i++)
{
{
std::unique_lock<std::mutex> lock(mtx);
my_queue.push(i); // 队列中添加任务
my_cv.notify_one(); // 用于唤醒一个正在等待该条件变量的线程

std::cout << "Task:" << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}

void Consumer()
{
while (1)
{
{
std::unique_lock<std::mutex> lock(mtx);
my_cv.wait(lock, [this]()
{ return !my_queue.empty(); });
value = my_queue.front();
my_queue.pop();
}

std::cout << "Consumer: " << value << std::endl;
}
}

private:
std::queue<int> my_queue;// 创建一个缓冲区,这个缓冲区是一个队列
std::condition_variable my_cv; // 用于通知消费者,在缓冲区中又新加入了任务
std::mutex mtx; // 互斥锁,若不知道其具体用途请转移至多线程编程中了解或自行搜索
}

int main()
{
PCM pcm;
std::thread t1(&PCM::Producer, &pcm);
std::thread t2(&PCM::Consumer, &pcm);
t1.join();
t2.join();
return 0;
}
Author:Chaichai
Link:https://chaichai438.github.io/2025/06/16/RM学习/生产者与消费者模型/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可