草庐IT

【Linux】POSIX信号量 | 基于环形队列的生产者消费者模型

阿亮joy. 2023-04-08 原文

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根

目录

👉POSIX信号量👈

POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于线程间同步。信号量分为二元信号量和多远信号量。二元信号量(Binary Semaphore),即:计数器维护的 value 只有 0 和 1 着两种可能,以此来实现互斥,所以也称为互斥信号量(互斥锁)。1 表示可以访问资源,0 表示不可以访问资源。多元信号量是 value 大于 2 的信号量,以原子的方式对该值进行加加和减减操作。

深入理解信号量

初始化信号量

  • sem:是 sem_t 类型变量的地址,也就是信号量。
  • pshared:0 表示线程间共享,非零表示进程间共享。
  • value:信号量本质是一个计数器,value 就是信号量的初始值。

注:信号量本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。

销毁信号量

等待信号量

  • P 操作:等待信号量,会将信号量的值减 1。

发布信号量

  • V 操作:发布信号量,表示资源使用完毕,可以归还资源了并将信号量值加 1。

注:P 操作和 V 操作都是原子操作。

👉基于环形队列的生产者消费者模型👈

空间资源和数据资源

  • 生产者关注的是空间资源,只有当环形队列中有空间,生产者才能进行生产。消费者关注的是数据资源,只有当环形队列中有数据,消费者才能进行消费。
  • 通过信号量来描述环形队列中的空间资源(space_sem)和数据资源(data_sem)。space_sem 的初始值是为环形队列的容量上限,因为刚开始时环形队列当中全是空间。data_sem 的初始值是 0,因为刚开始环形队列中没有数据。

申请和释放资源

生产者申请空间资源,释放数据资源。

对于生产者来说,生产者每次生产数据前都需要申请 space_sem。

  • 如果 space_sem 不为 0,则申请信号量成功,生产者可以进行生产。
  • 如果 space_sem 为 0,则申请信号量失败,生产者需要在 space_sem 的等待队列中进行阻塞等待,直到环形队列中有新的空间资源,才能被唤醒。

当生产者生产数据后,应该释放 data_sem。

  • 虽然生产者在进行生产前是对 space_sem 进行的 P 操作,但是生产结束后,并不是对 space_sem 进行 V 操作,而是对 data_sem 进行 V 操作。
  • 当生产者生产完数据后,环形队列中就会多了一个数据资源,因此我们要对 data_sem 进行 V 操作。

消费者申请数据资源,释放空间资源。

对于消费者来说,消费者每次消费数据前都要申请 data_sem。

  • 如果 data_sem 不为 0,则申请信号量成功,消费者可以进行消费。
  • 如果 data_sem 为 0,则申请信号量失败,消费者需要在 data_sem 的等待队列中进行阻塞等待,直到环形队列中有新的数据资源,才能被唤醒。

当消费者消费完数据后,需要释放 space_sem。

  • 虽然消费者在进行消费前是对 data_sem 进行的 P 操作,但是消费结束后,并不是对 data_sem 进行 V 操作,而是对 space_sem 进行 V 操作。
  • 当消费者消费完数据后,环形队列中就会多了一个空间资源,因此我们要对 space_sem 进行 V 操作。

两个规则

在基于环形队列的生产者和消费者模型当中,生产者和消费者必须遵守两个规则。

第一个规则:生产者和消费者不能对同一个位置进行访问。

生产者和消费者对环形队列进行访问时

  • 如果生产者和消费者访问的是环形队列中的同一个位置,那么就相当于生产者和消费者同时对一块临界资源进行访问,这样将可能会出现数据不一致问题。
  • 如果生产者和消费者访问的是环形队列中的不同位置,那么生产者和消费就可以并发地进行生产数据和消费数据,并不会出现数据不一致的问题。

如果保证生产者和消费者不会对环形队列的同一个位置进行访问?

  • 当生产者和消费者将要对环形队列的同一个位置进行访问时,此时的环形队列要么为满,要么为空。
  • 当环形队列为满时,space_sem 为 0,生产者不能对环形队列进行访问,需要在 space_sem 的等待队列中进行阻塞等待。
  • 当环形队列为空时,data_sem 为 0,消费者不能对环形队列进行访问,需要在 data_sem 的等待队列中进行等待。
  • 通过信号量就保证了当生产者和消费者指向环形队列的同一个位置时,生产和消费的串行化过程。同时也保证了当生产者和消费者执行的不是同一个位置时,生产者和消费者可以并发地进行生产和消费,以提高效率。

第二规则:生产者不能将消费者套圈,消费者不能超过生产者。

  • 如果生产者将消费者套圈了,那么就会出现这样的情况:消费者还没有将生产者之前生产的数据消费掉,该数据就被覆盖掉了,这很显然是不允许的。所以当生产者生产了一圈后,再次遇到消费者时,生产者就不能再进行生产了,需要等消费者消费数据后,才能进行生产。
  • 如果消费者超过了生产者,那么就会出现这样的情况:消费者会将之前已经消费过的废弃数据再消费一次,这也是不允许的。所以当消费者消费一圈后,再次遇到生产者,消费者就不能再进行消费了,需要等生产者生产数据后,才能进行消费。
  • 很明显,第二个规则也是通过信号量来保证的。

代码实现

Sem 是对信号量的封装,其构造函数是对信号量进行初始化,析构函数是对信号量进行销毁,同时也将等待信号量和发布信号量分别封装成 P 操作和 V 操作。

// sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_

#include <semaphore.h>

class Sem
{
public:
    Sem(int value)
    {
        sem_init(&_sem, 0, value);
    }

    ~Sem()
    {
        sem_destroy(&_sem);
    }

    void P()
    {
        sem_wait(&_sem);
    }

    void V()
    {
        sem_post(&_sem);
    }

private:
    sem_t _sem;
};

#endif

环形队列 RingQueue 是生产者和消费者模型当中的交易场所,我们可以通过 STL 中的 vector 来模拟实现。

// ringQueue.hpp
#ifndef _RING_QUEUE_HPP_
#define _RING_QUEUE_HPP_

#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"

const int DefaultCapacity = 5;

template <class T>
class RingQueue
{
public:
    RingQueue(int cap = DefaultCapacity)
        : _rq(cap)
        , _cap(cap)
        , _p_pos(0)
        , _c_pos(0)
        , _space_sem(cap)
        , _data_sem(0)
    {}

    ~RingQueue()
    {}

    // 生产数据(生产者调用)
    void Push(const T& in)
    {
        _space_sem.P();
        _rq[_p_pos++] = in;
        _p_pos %= _cap;
        _data_sem.V();
    }

    // 消费数据(消费者调用)
    void Pop(T& out)
    {
        _data_sem.P();
        out = _rq[_c_pos++];
        _c_pos %= _cap;
        _space_sem.V();
    }

private:
    std::vector<T> _rq;
    int _cap;
    int _p_pos; // 生产位置
    int _c_pos; // 消费位置
    Sem _space_sem; // 空间资源
    Sem _data_sem;  // 数据资源
};

#endif

相关说明:

  • 当不指定环形队列的大小时,将会使用缺省值 DefaultCapacity 作为环形队列的容量上限。
  • 生产者每次生产时,会将生产的数据放在下标为 _p_pos 的位置上;消费者每次消费时,会取出下标为 _c_pos 的位置上的数据进行消费。
  • 生产者和消费者生产和消费结束后,需要对 _p_pos 和 _c_pos 进行加加操作,以标记下一次放入数据的位置和取数据的位置,最好还需要对下标进行模除操作,以达到环形效果。
  • _p_pos 只会由生产者线程更新,_c_pos 只会由消费者线程更新,所以对它们的访问不需要进行保护,因此它们的更新操作可以放在 V 操作之后。

单生产者单消费者

// Test.cc
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ringQueue.hpp"

void* Consumer(void* args)
{
    RingQueue<int>* rq = (RingQueue<int>*)(args);

    while(true)
    {
        int data;
        rq->Pop(data);
        std::cout << "Comsumer: " << data << std::endl;
    }

    return nullptr;
}

void* Productor(void* args)
{
    RingQueue<int>* rq = (RingQueue<int>*)(args);

    while(true)
    {
        sleep(1);
        int data = rand() % 100 + 1;
        rq->Push(data);
        std::cout << "Productor: " << data << std::endl;
    }

    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr));

    RingQueue<int>* rq = new RingQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, (void*)rq);
    pthread_create(&p, nullptr, Productor, (void*)rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete rq;

    return 0;
}


多生产者多消费者

有了多个生产者和多消费者,就会存在生产者和生产者之间的竞争关系、消费者和消费者之间的竞争关系,那么我们就需要对临界资源(下标)进行加锁保护。

// RingQueue.hpp
#ifndef _RING_QUEUE_HPP_
#define _RING_QUEUE_HPP_

#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"

const int DefaultCapacity = 5;

template <class T>
class RingQueue
{
public:
    RingQueue(int cap = DefaultCapacity)
        : _rq(cap)
        , _cap(cap)
        , _p_pos(0)
        , _c_pos(0)
        , _space_sem(cap)
        , _data_sem(0)
    {
        pthread_mutex_init(&_plock, nullptr);
        pthread_mutex_init(&_clock, nullptr);
    }

    ~RingQueue()
    {
        pthread_mutex_destroy(&_plock);
        pthread_mutex_destroy(&_clock);
    }

    // 生产数据(生产者调用)
    void Push(const T& in)
    {
        // 先申请信号量:先将资源分发给线程
        _space_sem.P();
        pthread_mutex_lock(&_plock);
        // 一定是竞争成功的生产者线程 -- 就一个!
        _rq[_p_pos++] = in;
        _p_pos %= _cap;
        pthread_mutex_unlock(&_plock);
        _data_sem.V();
    }

    // 消费数据(消费者调用)
    void Pop(T& out)
    {
        _data_sem.P();
        pthread_mutex_lock(&_clock);
        // 一定是竞争成功的消费者线程 -- 就一个!
        out = _rq[_c_pos++];
        _c_pos %= _cap;
        pthread_mutex_unlock(&_clock);
        _space_sem.V();
    }

private:
    std::vector<T> _rq;
    int _cap;
    int _p_pos; // 生产位置
    int _c_pos; // 消费位置
    Sem _space_sem; // 空间资源
    Sem _data_sem;  // 数据资源
    pthread_mutex_t _plock;
    pthread_mutex_t _clock;
};

#endif

注意:_p_pos 和 c_pos 的更新需要再加锁和解锁之间。如果它们的更新不在加锁和解锁之间,将可能会出现这样的情况:线程 A 释放了锁彬没来得及将下标进行更新,然后线程 B 就获得了锁并执行到更新下标的地方,这样就有可能会出现数据不一致的问题!

// Test.cc
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ringQueue.hpp"

void* Consumer(void* args)
{
    RingQueue<int>* rq = (RingQueue<int>*)(args);

    while(true)
    {
        sleep(1);
        int data;
        rq->Pop(data);
        std::cout << "Comsumer: " << data << " [" << pthread_self() << "]" << std::endl;
    }

    return nullptr;
}

void* Productor(void* args)
{
    RingQueue<int>* rq = (RingQueue<int>*)(args);

    while(true)
    {
        int data = rand() % 100 + 1;
        rq->Push(data);
        std::cout << "Productor: " << data << " [" << pthread_self() << "]" << std::endl;
    }

    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr));

    RingQueue<int>* rq = new RingQueue<int>();
    pthread_t c[3], p[2];
    pthread_create(c, nullptr, Consumer, (void*)rq);
    pthread_create(c+1, nullptr, Consumer, (void*)rq);
    pthread_create(c+2, nullptr, Consumer, (void*)rq);

    pthread_create(p, nullptr, Productor, (void*)rq);
    pthread_create(p+1, nullptr, Productor, (void*)rq);

    for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);
    for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);
    delete rq;

    return 0;
}

多生产多消费的意义

生产的本质是将私有的任务或数据放入到公共空间中,消费的本质是将公共空间中的任务或数据变成私有。生产和消费并不是简单地将任务或数据放入到交易场所或从交易场所中取出,还需要考虑数据或任务放入到交易场所前和拿到任务或数据之后的处理,这两个过程是最耗费时间的。所以,多生产多消费的意义就在于能够并发地生产和处理任务。

信号量的意义

信号量本质是一个计数器,那这个计数器的意义是什么呢?计数器的意义就是不用进入临界区,就可以得知临界资源的情况,甚至可以减少临界区内部的判断!而在基于阻塞队列的生产者和消费者模型中,需要申请锁,然后进行临界资源是否满足的判断再进行访问,最后释放锁,需要进行判断的原因就是我们并不清楚临界资源的情况!而信号量要提前预设资源的情况,在进行 PV 操作的过程中,我们在临界区外部就能得知临界资源的情况。

👉总结👈

本篇博客主要讲解了信号量、信号量的相关函数以及基于环形队列的生产者消费者模型等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️

有关【Linux】POSIX信号量 | 基于环形队列的生产者消费者模型的更多相关文章

  1. 叮咚买菜基于 Apache Doris 统一 OLAP 引擎的应用实践 - 2

    导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵

  2. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  3. kvm虚拟机安装centos7基于ubuntu20.04系统 - 2

    需求:要创建虚拟机,就需要给他提供一个虚拟的磁盘,我们就在/opt目录下创建一个10G大小的raw格式的虚拟磁盘CentOS-7-x86_64.raw命令格式:qemu-imgcreate-f磁盘格式磁盘名称磁盘大小qemu-imgcreate-f磁盘格式-o?1.创建磁盘qemu-imgcreate-fraw/opt/CentOS-7-x86_64.raw10G执行效果#ls/opt/CentOS-7-x86_64.raw2.安装虚拟机使用virt-install命令,基于我们提供的系统镜像和虚拟磁盘来创建一个虚拟机,另外在创建虚拟机之前,提前打开vnc客户端,在创建虚拟机的时候,通过vnc

  4. ruby-on-rails - (Ruby,Rails) 基于角色的身份验证和用户管理...? - 2

    我正在寻找用于Rails的优质管理插件。似乎大多数现有的插件/gem(例如“restful_authentication”、“acts_as_authenticated”)都围绕着self注册等展开。但是,我正在寻找一种功能齐全的基于管理/管理角色的解决方案——但不是简单地附加到另一个非基于角色的解决方案。如果我找不到,我想我会自己动手......只是不想重新发明轮子。 最佳答案 RyanBates最近做了两个关于授权的railscast(注意身份验证和授权之间的区别;身份验证检查用户是否如她所说的那样,授权检查用户是否有权访问资源

  5. ruby - 在 Rakefile 中动态生成 Rake 测试任务(基于现有的测试文件) - 2

    我正在根据Rakefile中的现有测试文件动态生成测试任务。假设您有各种以模式命名的单元测试文件test_.rb.所以我正在做的是创建一个以“测试”命名空间内的文件名命名的任务。使用下面的代码,我可以用raketest:调用所有测试require'rake/testtask'task:default=>'test:all'namespace:testdodesc"Runalltests"Rake::TestTask.new(:all)do|t|t.test_files=FileList['test_*.rb']endFileList['test_*.rb'].eachdo|task|n

  6. ruby - 如何使用 Ruby 基于字母数字字符串生成颜色? - 2

    我想要像“嘿那里”这样的东西变成,例如,#316583。我希望将任意长度的字符串“归结”为十六进制颜色。我不知道从哪里开始。我在想,每个字符串的MD5散列都是不同的-但如何将该散列转换为十六进制颜色数字? 最佳答案 你可以只取几位前几位:require'digest/md5'color=Digest::MD5.hexdigest('Mytext')[0..5] 关于ruby-如何使用Ruby基于字母数字字符串生成颜色?,我们在StackOverflow上找到一个类似的问题:

  7. 【自动驾驶环境感知项目】——基于Paddle3D的点云障碍物检测 - 2

    文章目录1.自动驾驶实战:基于Paddle3D的点云障碍物检测1.1环境信息1.2准备点云数据1.3安装Paddle3D1.4模型训练1.5模型评估1.6模型导出1.7模型部署效果附录show_lidar_pred_on_image.py1.自动驾驶实战:基于Paddle3D的点云障碍物检测项目地址——自动驾驶实战:基于Paddle3D的点云障碍物检测课程地址——自动驾驶感知系统揭秘1.1环境信息硬件信息CPU:2核AI加速卡:v100总显存:16GB总内存:16GB总硬盘:100GB环境配置Python:3.7.4框架信息框架版本:PaddlePaddle2.4.0(项目默认框架版本为2.3

  8. Verilog使用inout信号的方法 - 2

    目录一、inout在设计文件中的使用方法1.1、inout的第一种使用方法1.2、inout实现的第二种使用方法1.3、inout使用总结 二、inout在仿真测试中的使用方法一、inout在设计文件中的使用方法在FPGA的设计过程中,有时候会遇到双向信号(既能作为输出,也能作为输入的信号叫双向信号)。比如,IIC总线中的SDA信号就是一个双向信号,QSPIFlash的四线操作的时候四根信号线均为双向信号。在Verilog中用关键字inout定义双向信号,这里总结一下双向信号的处理方法。1.1、inout的第一种使用方法  实际上,双向信号的本质是由一个三态门组成的,三态门可以输出高电平,低电

  9. 【Linux操作系统】——网络配置与SSH远程 - 2

    Linux操作系统——网络配置与SSH远程安装完VMware与系统后,需要进行网络配置。第一个目标为进行SSH连接,可以从本机到VMware进行文件传送,首先需要进行网络配置。1.下载远程软件首先需要先下载安装一款远程软件:FinalShell或者xhell7FinalShellxhell7FinalShell下载:Windows下载http://www.hostbuf.com/downloads/finalshell_install.exemacOS下载http://www.hostbuf.com/downloads/finalshell_install.pkg2.配置CentOS网络安装好

  10. Linux磁盘分区中物理卷(PV)、卷组(VG)、逻辑卷(LV)创建和(LVM)管理 - 2

    文章目录一基础定义二创建逻辑卷2-1准备物理设备2-2创建物理卷2-3创建卷组2-4创建逻辑卷2-5创建文件系统并挂载文件三扩展卷组和缩减卷组3-1准备物理设备3-2创建物理卷3-3扩展卷组3-4查看卷组的详细信息以验证3-5缩减卷组四扩展逻辑卷4-1检查卷组是否有可用的空间4-2扩展逻辑卷4-3扩展文件系统五删除逻辑卷5-1备份数据5-2卸载文件系统5-3删除逻辑卷5-4删除卷组5-5删除物理卷六LVM逻辑卷缩容6-1缩容注意事项6-2标准缩容步骤一基础定义LVM,LogicalVolumeManger,逻辑卷管理,Linux磁盘分区管理的一种机制,建立在硬盘和分区上的一个逻辑层,提高磁盘分

随机推荐