草庐IT

c++ - 查询简单的 C++ 线程池实现

coder 2024-02-05 原文

Stackoverflow 对我帮助很大,我想回馈社区。我一直在使用 TinyThread++ 实现一个简单的线程池 website C++ 可移植线程库,使用我从 Stackoverflow 学到的知识。我是线程编程的新手,所以对互斥量等不太满意。 在展示代码(在 Linux 下运行良好)之后,我有一个最好的问题:

// ThreadPool.h

class ThreadPool
{
 public:

 ThreadPool();
~ThreadPool();

// Creates a pool of threads and gets them ready to be used
void CreateThreads(int numOfThreads);

// Assigns a job to a thread in the pool, but doesn't start the job
// Each SubmitJob call will use up one thread of the pool.
// This operation can only be undone by calling StartJobs and
// then waiting for the jobs to complete. On completion,
// new jobs may be submitted.
void SubmitJob( void (*workFunc)(void *), void *workData );

// Begins execution of all the jobs in the pool.
void StartJobs();

// Waits until all jobs have completed.
// The wait will block the caller.
// On completion, new jobs may be submitted.
void WaitForJobsToComplete();

private:

enum typeOfWorkEnum { e_work, e_quit };

 class ThreadData
 {
   public:

    bool ready;  // thread has been created and is ready for work  
    bool haveWorkToDo;
    typeOfWorkEnum  typeOfWork;

    // Pointer to the work function each thread has to call.
    void (*workFunc)(void *);

    // Pointer to work data
    void *workData;

    ThreadData() : ready(false), haveWorkToDo(false) {  };
 };

struct ThreadArgStruct
{
    ThreadPool *threadPoolInstance;
    int         threadId;
};

// Data for each thread
ThreadData  *m_ThreadData;

ThreadPool(ThreadPool const&); // copy ctor hidden
ThreadPool& operator=(ThreadPool const&); // assign op. hidden

// Static function that provides the function pointer that a thread can call
// By including the ThreadPool instance in the void * parameter,
// we can use it to access other data and methods in the ThreadPool instance.
static void ThreadFuncWrapper(void *arg)
{
    ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg);
    threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId);
}

// The function each thread calls    
void ThreadFunc( int threadId );

// Called by the thread pool destructor
void DestroyThreadPool();

// Total number of threads available
// (fixed on creation of thread pool)
int m_numOfThreads;
int m_NumOfThreadsDoingWork;
int m_NumOfThreadsGivenJobs;

// List of threads
std::vector<tthread::thread *> m_ThreadList;

// Condition variable to signal each thread has been created and executing
tthread::mutex              m_ThreadReady_mutex;
tthread::condition_variable m_ThreadReady_condvar;

 // Condition variable to signal each thread to start work
tthread::mutex              m_WorkToDo_mutex;
tthread::condition_variable m_WorkToDo_condvar;

// Condition variable to signal the main thread that 
// all threads in the pool have completed their work
tthread::mutex              m_WorkCompleted_mutex;
tthread::condition_variable m_WorkCompleted_condvar;
};

cpp文件:

//
//  ThreadPool.cpp
//

#include "ThreadPool.h"    

// This is the thread function for each thread.
// All threads remain in this function until
// they are asked to quit, which only happens
// when terminating the thread pool.
void ThreadPool::ThreadFunc( int threadId )
{
 ThreadData *myThreadData = &m_ThreadData[threadId]; 
 std::cout << "Hello world: Thread " << threadId << std::endl;

 // Signal that this thread is ready
 m_ThreadReady_mutex.lock();
       myThreadData->ready = true;
       m_ThreadReady_condvar.notify_one(); // notify the main thread
 m_ThreadReady_mutex.unlock();       

 while(true)
 {
    //tthread::lock_guard<tthread::mutex> guard(m);
    m_WorkToDo_mutex.lock();

    while(!myThreadData->haveWorkToDo) // check for work to do
         m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here 
    myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex

    m_WorkToDo_mutex.unlock();

    // Do the work
    switch(myThreadData->typeOfWork)
    {
        case e_work:
            std::cout << "Thread " << threadId << ": Woken with work to do\n";

            // Do work
            myThreadData->workFunc(myThreadData->workData);

            std::cout << "#Thread " << threadId  << ": Work is completed\n";
            break;

         case e_quit:
             std::cout << "Thread " << threadId << ": Asked to quit\n";
             return; // ends the thread
    }

    // Now to signal the main thread that my work is completed
    m_WorkCompleted_mutex.lock();
       m_NumOfThreadsDoingWork--;

      // Unsure if this 'if' would make the program more efficient
      // if(m_NumOfThreadsDoingWork == 0)
           m_WorkCompleted_condvar.notify_one(); // notify the main thread
    m_WorkCompleted_mutex.unlock();       
  }

}


ThreadPool::ThreadPool() 
{ 
   m_numOfThreads = 0;  m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0;
}


ThreadPool::~ThreadPool()
{
    if(m_numOfThreads)
    {
    DestroyThreadPool(); 
    delete [] m_ThreadData;
    }
}


void ThreadPool::CreateThreads(int numOfThreads)
{
// Check if a thread pool has already been created
if(m_numOfThreads > 0) 
   return;

m_NumOfThreadsGivenJobs = 0;
m_NumOfThreadsDoingWork = 0;
m_numOfThreads = numOfThreads;
m_ThreadData = new ThreadData[m_numOfThreads];
ThreadArgStruct threadArg;

for(int i=0; i<m_numOfThreads; ++i)
 {   
    threadArg.threadId = i;
    threadArg.threadPoolInstance = this;

    // Creates the thread and saves it in a list so we can destroy it later
    m_ThreadList.push_back( new tthread::thread( ThreadFuncWrapper, (void *)&threadArg  ) ); 

    // It takes a little time for a thread to get established.
    // Best wait until it gets established before creating the next thread.
    m_ThreadReady_mutex.lock();
    while(!m_ThreadData[i].ready)  // Check if thread is ready
        m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here
    m_ThreadReady_mutex.unlock();    
 } 
} 


// Assigns a job to a thread, but doesn't start the job
void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData)
{
 // Check if the thread pool has been created
 if(!m_numOfThreads) 
    return;

 if(m_NumOfThreadsGivenJobs >= m_numOfThreads)
    return;

 m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc;
 m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData;  

 std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl;

 m_NumOfThreadsGivenJobs++;  
}

void ThreadPool::StartJobs()
{
// Check that the thread pool has been created
// and some jobs have been assigned
if(!m_numOfThreads || !m_NumOfThreadsGivenJobs) 
   return;

// Set 'haveworkToDo' flag for all threads 
m_WorkToDo_mutex.lock();
   for(int i=0; i<m_NumOfThreadsGivenJobs; ++i)
   {
       m_ThreadData[i].typeOfWork = e_work;  // forgot to do this !
       m_ThreadData[i].haveWorkToDo = true;
   }
   m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs;

   // Reset this counter so we can resubmit jobs later
   m_NumOfThreadsGivenJobs = 0;

   // Notify all threads they have work to do
   m_WorkToDo_condvar.notify_all();
   m_WorkToDo_mutex.unlock();
}


void ThreadPool::WaitForJobsToComplete()
{
  // Check that a thread pool has been created
  if(!m_numOfThreads) 
   return;

 m_WorkCompleted_mutex.lock();
 while(m_NumOfThreadsDoingWork > 0)  // Check if all threads have completed their work
   m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here
 m_WorkCompleted_mutex.unlock();    
}


void ThreadPool::DestroyThreadPool()
{
std::cout << "Ask threads to quit\n";
m_WorkToDo_mutex.lock();
   for(int i=0; i<m_numOfThreads; ++i)
   {
     m_ThreadData[i].haveWorkToDo = true;
     m_ThreadData[i].typeOfWork = e_quit;
   }
   m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();

// As each thread terminates, catch them here
for(int i=0; i<m_numOfThreads; ++i)
 {
     tthread::thread *t = m_ThreadList[i];

     // Wait for thread to complete
     t->join();
 }
 m_numOfThreads = 0;
}

使用示例: (通过对平方的倒数求和来计算 pi-squared/6) 实际上,此用法示例并行运行相同的计算 10 次。更实际的用法是让每个线程计算一组不同的求和项。池作业完成后,将所有线程结果相加即可获得最终结果。

struct CalculationDataStruct
{
int inputVal;
double outputVal;
};

void LongCalculation( void *theSums )
{
CalculationDataStruct *sums = (CalculationDataStruct *)theSums;

int terms = sums->inputVal;
double sum;
for(int i=1; i<terms; i++)
    sum += 1.0/( double(i)*double(i) );
sums->outputVal = sum;
}


int main(int argc, char** argv)
{ 
int numThreads = 10;

// Create pool
ThreadPool threadPool;
threadPool.CreateThreads(numThreads);

// Create thread workspace
CalculationDataStruct sums[numThreads];

// Set up jobs
for(int i=0; i<numThreads; i++)
{
    sums[i].inputVal = 3000*(i+1);
    threadPool.SubmitJob(LongCalculation, &sums[i]);
}

// Run the jobs
threadPool.StartJobs();
threadPool.WaitForJobsToComplete();

// Print results
for(int i=0; i<numThreads; i++)
   std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl;

 return 0;
}

问题: 在 ThreadPool::ThreadFunc 方法中,如果下面的 if 语句会获得更好的性能

if(NumOfThreadsDoingWork == 0)

被包括在内? 此外,我将感谢批评和改进代码的方法。同时,希望代码对其他人有用。

最佳答案

首先,您可能想查看 C++11 的“std::thread”和“std::mutex”。您可能还想研究英特尔的“Threading Building Blocks”,它提供了多种工作分配模式。对于可移植的、跨平台的、C++ 封装的 API,我通常使用 OpenThreads library。 .最后,您可以使用消息传递库(例如 ZeroMQ)在没有互斥体的情况下构建可扩展的分布式工作负载。 .

查看您当前的代码,我最担心的是您似乎没有锁定用于将工作分配给线程的变量;我假设这是因为您已将 SubmitJob 和 StartWork 分开。

但归根结底,您的 ThreadPool 不是线程安全的。

它还有一些复杂的 API,包括工作类型等。您可能需要抽象出“工作”的概念。这是我这样做的一个示例,您可能希望将大部分代码封装回您的 ThreadPool 类中;终止方法(NULL 作业)也有点人为,您可能想使用 pthread_cancel,但这对演示非常有用。

#include <queue>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

static int jobNo = 0;
class Job {
public:
    Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); }
    int m_i;
    void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); }
};

std::queue<Job*> queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void AddJob(Job* job) {
    pthread_mutex_lock(&mutex);
    queue.push(job);
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
}

void* QueueWorker(void* /*threadInfo*/) {
    Job* job = NULL;
    for (;;) {
        pthread_mutex_lock(&mutex);
        while ( queue.empty() ) {
            // unlock the mutex until the cond is signal()d or broadcast() to.
            // if this call succeeds, we will have the mutex locked again on the other side.
            pthread_cond_wait(&cond, &mutex);
        }
        // take the first task and then release the lock.
        job = queue.front();
        queue.pop();
        pthread_mutex_unlock(&mutex);

        if ( job == NULL ) {
            // in this demonstration, NULL ends the run, so forward to any other threads.
            AddJob(NULL);
            break;
        }
        job->Execute();
        delete job;
    }
    return NULL;
}

int main(int argc, const char* argv[]) {
    pthread_t worker1, worker2;
    pthread_create(&worker1, NULL, &QueueWorker, NULL);
    pthread_create(&worker2, NULL, &QueueWorker, NULL);

    srand(time(NULL));

    // queue 5 jobs with delays.
    for ( size_t i = 0; i < 5; ++i ) {
        long delay = (rand() % 800) * 1000;
        printf("Producer sleeping %fs\n", (float)delay / (1000*1000));
        usleep(delay);
        Job* job = new Job();
        AddJob(job);
    }
    // 5 more without delays.
    for ( size_t i = 0; i < 5; ++i ) {
        AddJob(new Job);
    }
    // null to end the run.
    AddJob(NULL);

    printf("Done with jobs.\n");
    pthread_join(worker1, NULL);
    pthread_join(worker2, NULL);

    return 0;
}

关于c++ - 查询简单的 C++ 线程池实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11234894/

有关c++ - 查询简单的 C++ 线程池实现的更多相关文章

  1. ruby - ECONNRESET (Whois::ConnectionError) - 尝试在 Ruby 中查询 Whois 时出错 - 2

    我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.

  2. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  3. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  4. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  5. ruby-on-rails - 在 Rails 和 ActiveRecord 中查询时忽略某些字段 - 2

    我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr

  6. ruby - 简单获取法拉第超时 - 2

    有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url

  7. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  8. ruby-on-rails - 简单的 Ruby on Rails 问题——如何将评论附加到用户和文章? - 2

    我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。

  9. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  10. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

随机推荐