草庐IT

[Linux]带你轻松实现线程池

Sola一轩 2023-07-17 原文

目录

前言

封装

基于RAII思想封装的锁

封装一个自己的Thread类

线程池

线程池概念

线程池的应用场景

模拟实现自己的线程池

测试线程池

测试用任务

使用线程

前言

这是博主有关多线程的第五篇博客,前面没看的这边放上链接,建议去看看。

        这次在实现线程池相关的代码前,我们先封装一下pthread库的锁和线程相关的接口,方便我们的使用和让代码更简洁

封装

        C++11中有关线程库的部分就是对线程相关的系统调用进行了封装,在这里,我们将尝试自己进行封装,以方便接下来的使用。

基于RAII思想封装的锁

        锁控制不好时,可能会造成死锁,最常见的比如在锁中间代码返回,或者在锁的范围内抛异常,因此我们会基于RAII的思想(智能指针的)来封装一个LockGuard,在方便我们使用的同时,提高代码的健壮性。(C++11采用RAII的方式对锁进行了封装,即lock_guard和unique_lock,我们这里尝试自己进行封装)、

#pragma once
#include<pthread.h>
#include<iostream>
//封装一下锁
class Mutex
{
public:
    Mutex(pthread_mutex_t* mutex = nullptr)
    :_mutex(mutex)
    {       
    }
    void Lock()
    {
        if(_mutex)
        pthread_mutex_lock(_mutex);
    }
    void UnLock()
    {
        if(_mutex)
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t* _mutex;
};
class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex)
    :_mutex(mutex)
    {
        //构造中加锁
        _mutex.Lock();
    }
    ~LockGuard()
    {
        //析构中解锁
        _mutex.UnLock();
    }
private:
    Mutex _mutex;
};

        用LockGuard来接收锁,可以在自动加锁后,如果出了作用域,就会调用析构函数自动解锁。

封装一个自己的Thread类

        由于每次使用pthread库内的线程相关的调用很繁杂,我们这里尝试进行封装一个自己的Thread类。

#pragma once
#include <pthread.h>
#include <iostream>
#include <string>
#include <functional>
#include <cassert>
class Thread
{
    typedef std::function<void*(void*)> func_t;  //用c++11中的function包装器接收方法
private:
    static void* start_routine(void* args)      //只能是静态成员的原因
    {
        Thread* t = static_cast<Thread*>(args);
        return t->callback();
    } 
public:
    Thread()        //创建时先只给线程名字,等正式要用时再创建线程
    {
        char namebuff[128];
        snprintf(namebuff,sizeof(namebuff),"Thread[%d]",ThreadNum++);
        _name = namebuff;
    }
    void Run(func_t func,void* args)
    {
        _func = func;
        _args = args;
        pthread_create(&_tid,nullptr,start_routine,this);     //传this指针的原因    
    }
    void Join()
    {
        pthread_join(_tid,nullptr);
    }
    std::string ThreadName()  //获取当前线程名字
    {
        return _name;
    }
    void* callback()
    {
        return _func(_args);
    }
private:
    std::string _name;      //线程名字
    pthread_t _tid;
    void* _args;    // 调用方法的参数
    func_t _func;     //线程执行的方法

    static int ThreadNum;
};
int Thread::ThreadNum = 1;

注意点:

  1. 我们选择了默认构造时只初始化线程的名字 ,再调用Run接口时再正式创建我们的线程,传入方法和参数。(方便我们测试观察和让线程在需要时再进行创建)

  2. start_routine是静态成员方法,由于this指针的原因。(pthread_create的第三个参数要求是返回值和参数皆为void*的函数,普通的成员函数参数中含有this指针,无法满足要求

  3. 由于start_routine是静态成员,其没有this指针,所以无法访问成员变量,因此我们通过pthread_create( )函数的第四个参数将this指针传给start_routine。

  4. 在start_routine里面,我们通过传入的this指针调用类内的callback(),通过这样来调用我们传给线程的方法。

线程池

线程池概念

一种线程使用模式。

        线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价

        线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

模拟实现自己的线程池

模拟实现的线程池功能

  1. 创建固定数量线程池,循环从任务队列中获取任务对象

  2. 获取到任务对象后,执行任务对象中的任务接口

#pragma once
#include "Thread.hpp"
#include "LockGuard.hpp"
#include <vector>
#include <queue>
#include <unistd.h>

template<class T>
class ThreadPool;
template<class T>
class ThreadData    //方便传参时能拿到threadpool的this指针和当前线程的指针
{
public:
    ThreadData(ThreadPool<T>* tp,Thread* td)
    :_tp(tp),_td(td)
    {
    }
    ThreadPool<T>* _tp;
    Thread* _td;
};

template<class T>
class ThreadPool
{
private:
    static void* start_routine(void* args)
    {
        ThreadData<T>* td = static_cast<ThreadData<T>*>(args);   //拿到threadpool的指针
        ThreadPool<T>* tp = td->_tp;            //拿到当前thread的指针
        T task;
        Thread* t = td->_td;
        while(true)
        {    
            {                                     //控制LockGuard的生命周期,拿到任务后就不需要锁保护了。
                LockGuard lock(&tp->_mutex);
                while(tp->QueueIsEmpty())
                {
                    tp->QueueWait();
                }
                tp->QueuePop(task);
            }
            std::cout<< t->ThreadName()<<":";
            task();
        }      
    }   

public:
    ThreadPool(const int num = 3)   //默认初始创建三个线程
    :_num(num),_tp(num)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
        for(int i=0;i<_num;++i)
        {
            Thread* t = new Thread;
            _tp[i] = t;
        }
    }

    ~ThreadPool()    //析构
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        for(const auto& e : _tp)
        delete e;
    }

    void start() //启动线程池获取任务,处理任务
    {
        for(int i=0;i<_num;++i)
        {
            ThreadData<T>* Td = new ThreadData<T>(this,_tp[i]); //给start_routine传入线程相关的数据,方便演示
            _tp[i]->Run(start_routine,Td);
            std::cout<<_tp[i]->ThreadName()<<"启动"<<std::endl;
        }
    }

    void Push(const T& in)      //输入的任务
    {
        LockGuard lock(&_mutex);
        _task.push(in);
        QueueSignal();
    }

    bool QueueIsEmpty()
    {
        if(_task.empty())
        {
            return true;
        }
        return false;
    }
private:
    void QueuePop(T& task)
    {
        task = _task.front();
        _task.pop();
    }
    void QueueWait()
    {
        pthread_cond_wait(&_cond,&_mutex);
    }
    void QueueSignal()
    {
        pthread_cond_signal(&_cond);
    }

private:
    int _num;         //线程池内创建多少个线程
    std::vector<Thread*> _tp;
    std::queue<T> _task;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

注意点:

  1. 我们用vector来存储创建好的线程,用queue来存放待执行的任务。

  2. 从任务队列中取任务要并发和互斥,因此需要一个互斥量和一个条件变量。

  3. 通过Push接口向队列中存放任务,通过QueuePop从任务队列中获取任务。

  4. ThreadData类是用于方便我们向封装的线程传入多个参数,传入this指针方便调用Threadpool类内的成员方法。

  5. LockGuard的生命周期在我们从任务队列中取得任务后就应该结束,线程拿到任务后应该并发的去执行各自拿到的任务,因为拿到任务后,该任务就属于当前线程了,已经与其他线程无关,所以我们用{ }来限LockGuard 的生命周期,没使用LockGuard记得在拿到任务后立即解锁即可。

  6. 通过Push放入任务后,通过QueueSignal()接口来唤醒等待中的线程来拿任务,这里不推荐用pthread_cond_broadcast,一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。

测试线程池

测试用任务

        之前用了“亿”次的任务:

#pragma once
#include <iostream>

    class Task
    {
    public:
        Task(int x = 1, int y = 1, char op = '+')
            : a(x), b(y), _op(op)
        {
        }
        ~Task()
        {
        }
        void Run()
        {
            int ret = 0;
            switch (_op)
            {
            case ('+'):
                ret = a+b;
                break;
            case ('-'):
                ret = a-b;
                break;
            case ('*'):
                ret = a*b; 
                break;
            case ('/'):
                ret = a/b;//暂不考虑b为0;
                break;
            case ('%'):
                ret = a%b;
                break;
            default:
                std::cout << "未知操作符" << std::endl;
                break;
            }

            std::cout<< a << _op << b << '=' << ret << std::endl;
        }
        void operator()()   
        {
            Run();
        }
    private:
        int a;
        int b;
        char _op;
    };

        简而言之就是帮助我们两个数进行简单的运算。

使用线程池

        为了简便,我们直接用主线程不断派发任务:

#include "ThreadPool.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
#include "Task.hpp"
#include <cstdlib>
#include <ctime>
#include<unistd.h>

int main()
{
    srand((unsigned int)time(nullptr)^pthread_self()); //生成随机数种子
    ThreadPool<Task>* tp = new ThreadPool<Task>(8);  //创建了8个线程
    tp->start();                                //开始运行
    while(true)
    {
        int x = rand()%20+1;
        int y = rand()%10+1;
        char op = "+-*/%"[rand()%5];
        Task t(x,y,op);
        tp->Push(t);                           //放入待执行任务队列
        usleep(10000);
    }
    return 0;
}

        演示成功,我们让线程池创造了八个线程,并让他们从任务队列中拿出任务进行了执行。

        知行合一,希望大家也尝试模拟实现出自己的线程池。

有关[Linux]带你轻松实现线程池的更多相关文章

  1. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  2. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  3. 屏幕录制为什么没声音?检查这2项,轻松解决 - 2

    相信很多人在录制视频的时候都会遇到各种各样的问题,比如录制的视频没有声音。屏幕录制为什么没声音?今天小编就和大家分享一下如何录制音画同步视频的具体操作方法。如果你有录制的视频没有声音,你可以试试这个方法。 一、检查是否打开电脑系统声音相信很多小伙伴在录制视频后会发现录制的视频没有声音,屏幕录制为什么没声音?如果当时没有打开音频录制,则录制好的视频是没有声音的。因此,建议在录制前进行检查。屏幕上没有声音,很可能是因为你的电脑系统的声音被禁止了。您只需打开电脑系统的声音,即可录制音频和图画同步视频。操作方法:步骤1:点击电脑屏幕右下侧的“小喇叭”图案,在上方的选项中,选择“声音”。 步骤2:在“声

  4. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  5. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  6. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  7. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  8. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

  9. ruby - 如何在 ruby​​ 中运行后台线程? - 2

    我是ruby​​的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp

  10. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

随机推荐