草庐IT

c++ - pthread广播然后等待?

coder 2024-02-21 原文

我正在尝试设置多个线程处于等待状态,直到它们收到 pthread_cond_broadcast()

完成一项工作后,我希望线程回到它们的等待状态。

我还希望调用 pthread_cond_broadcast() 的进程在继续之前等待所有线程返回到它们的等待状态。在这种情况下,调用广播的是 main() 函数。我正在尝试让 main(0 在调用广播后执行 pthread_cond_wait()

void* Work::job(void* id)
{
    int idx = (long)id;

    while(1)
    {
        pthread_mutex_lock(&job_lock);

        while(!jobs_complete)
        {
            // wait for main to broadcast
            pthread_cond_wait(&can_work, &job_lock);
            pthread_mutex_unlock(&job_lock);

            // work here

            pthread_mutex_lock(&job_lock);
            ++jobs_completed;

            if(jobs_completed == NUM_THREADS)
            {
                jobs_complete = true;
                pthread_cond_signal(&jobs_done);
                pthread_mutex_unlock(&job_lock);
            }
            pthread_mutex_unlock(&job_lock);
        }

        pthread_mutex_unlock(&job_lock);
    }

    return NULL;
}

NUM_THREADS 是 4,job_lock 是一个 pthread_mutex_tcan_workjobs_donepthread_cond_tjobs_completed 是一个 booljobs_complete 是一个 int

// work

jobs_completed = false;
jobs_complete = 0;
pthread_mutex_lock(&job_lock);
pthread_cond_broadcast(&can_work);
pthread_cond_wait(&jobs_complete);
pthread_mutex_unlock(&job_lock);

// work that depends on jobs_complete

现在,我正在通过调用 pthread_cond_broadcast() 然后在它之后调用 pthread_cond_wait() 来执行此操作,但这似乎陷入了僵局。

任何人都可以解释我应该如何做或哪里出错了吗?如果有任何帮助,我将不胜感激。

谢谢!

最佳答案

我只是发布这个(这几乎是所有 C 代码,但 pthreads 也是如此,所以请稍微放松一下)来演示一种做我认为你正在尝试的事情的方法去完成。显然,您可能希望将其中的大部分内容正确地封装在适当的类等中。希望向您展示的是条件变量、互斥量以及它们与谓词管理和通知的关系是如何工作的。

希望您觉得它有用。祝你有美好的一天。

#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;

// our global condition variable and mutex
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

// our predicate values.
bool finished = false;
int jobs_waiting = 0;
int jobs_completed = 0;

// our thread proc
static void *worker_proc(void* p)
{
    intptr_t id = (intptr_t)p;  // our id
    size_t n_completed = 0;     // our job completion count

    // always latch prior to eval'ing predicate vars.
    pthread_mutex_lock(&mtx);
    while (!finished)
    {
        // wait for finish or work-waiting predicate
        while (!finished && jobs_waiting == 0)
            pthread_cond_wait(&cv, &mtx);

        // we own the mutex, so we're free to look at, modify
        //  etc. the values(s) that we're using for our predicate
        if (finished)
            break;

        // must be a job_waiting, reduce that number by one, then
        //  unlock the mutex and start our work. Note that we're
        //  changing the predicate (jobs_waiting is part of it) and
        //  we therefore need to let anyone that is monitoring know.
        --jobs_waiting;
        pthread_cond_broadcast(&cv);
        pthread_mutex_unlock(&mtx);

        // DO WORK HERE (this just runs a lame summation)
        for (int i=0,x=0;i<1048576; x += ++i);
        ++n_completed;

        // finished work latch mutex and setup changes
        pthread_mutex_lock(&mtx);
        ++jobs_completed;
        pthread_cond_broadcast(&cv);
    }

    // final report
    cout << id << ": jobs completed = " << n_completed << endl;

    // we always exit owning the mutex, so unlock it now. but
    //  let anyone else know they should be quitting as well.
    pthread_cond_broadcast(&cv);
    pthread_mutex_unlock(&mtx);
    return p;
}

// sets up a batch of work and waits for it to finish.
void run_batch(int num)
{
    pthread_mutex_lock(&mtx);
    jobs_waiting = num;
    jobs_completed = 0;
    pthread_cond_broadcast(&cv);

    // wait or all jobs to complete.
    while (jobs_completed != num)
        pthread_cond_wait(&cv, &mtx);

    // we own this coming out, so let it go.
    pthread_mutex_unlock(&mtx);
}

// main entry point.
int main()
{
    // number of threads in our crew
    static const size_t N = 7;
    pthread_t thrds[N] = {0};

    // startup thread crew.
    intptr_t id = 0;
    for (size_t i=0; i<N; ++i)
        pthread_create(thrds + i, NULL, worker_proc, (void*)(++id));

    // run through batches. each batch is one larger
    //  than the prior batch. this should result in some
    //  interesting job-counts per-thread.
    for (int i=0; i<64; ++i)
        run_batch(i);

    // flag for shutdown state.
    pthread_mutex_lock(&mtx);
    finished = true;
    pthread_cond_broadcast(&cv);
    pthread_mutex_unlock(&mtx);
    for (size_t i=0; i<N; pthread_join(thrds[i++], NULL));

    return 0;
}

示例输出 #1

3: jobs completed = 256
6: jobs completed = 282
5: jobs completed = 292
2: jobs completed = 242
1: jobs completed = 339
4: jobs completed = 260
7: jobs completed = 409

示例输出 #2

6: jobs completed = 882
1: jobs completed = 210
4: jobs completed = 179
5: jobs completed = 178
2: jobs completed = 187
7: jobs completed = 186
3: jobs completed = 194

示例输出 #3

1: jobs completed = 268
6: jobs completed = 559
3: jobs completed = 279
5: jobs completed = 270
2: jobs completed = 164
4: jobs completed = 317
7: jobs completed = 159

固定批量大小

相同的代码,但改变了这个:

for (int i=0; i<64; ++i)
    run_batch(i);

为此:

for (int i=0; i<64; ++i)
    run_batch(N);

给出以下内容,这可能更接近您真正要找的内容。

示例输出 #1

4: jobs completed = 65
2: jobs completed = 63
5: jobs completed = 66
3: jobs completed = 63
1: jobs completed = 64
7: jobs completed = 63
6: jobs completed = 64

示例输出 #2

3: jobs completed = 65
5: jobs completed = 62
1: jobs completed = 67
7: jobs completed = 63
2: jobs completed = 65
6: jobs completed = 61
4: jobs completed = 65

示例输出 #3

2: jobs completed = 58
4: jobs completed = 61
5: jobs completed = 69
7: jobs completed = 68
3: jobs completed = 61
1: jobs completed = 64
6: jobs completed = 67

关于c++ - pthread广播然后等待?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15676027/

有关c++ - pthread广播然后等待?的更多相关文章

  1. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  2. ruby - 按值降序排列散列,然后按升序键入 ruby - 2

    我有这样的哈希trial_hash={"key1"=>1000,"key2"=>34,"key3"=>500,"key4"=>500,"key5"=>500,"key6"=>500}我按值降序排列:my_hash=trial_hash.sort_by{|k,v|v}.reverse我现在是这样理解的:[["key1",1000],["key4",500],["key5",500],["key6",500],["key3",500],["key2",34]]但我希望当值相同时按键的升序排序。我该怎么做?例如:上面的散列将以这种方式排序:[["key1",1000],["key3",500

  3. ruby - 使用 `+=` 和 `send` 方法 - 2

    如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

  4. ruby - 如何计算 Liquid 中的变量 +1 - 2

    我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我

  5. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  6. arrays - Ruby 数组 += vs 推送 - 2

    我有一个数组数组,想将元素附加到子数组。+=做我想做的,但我想了解为什么push不做。我期望的行为(并与+=一起工作):b=Array.new(3,[])b[0]+=["apple"]b[1]+=["orange"]b[2]+=["frog"]b=>[["苹果"],["橙子"],["Frog"]]通过推送,我将推送的元素附加到每个子数组(为什么?):a=Array.new(3,[])a[0].push("apple")a[1].push("orange")a[2].push("frog")a=>[[“苹果”、“橙子”、“Frog”]、[“苹果”、“橙子”、“Frog”]、[“苹果”、“

  7. ruby - 按数字(从大到大)然后按字母(字母顺序)对对象集合进行排序 - 2

    我正在构建一个小部件来显示奥运会的奖牌数。我有一个“国家”对象的集合,其中每个对象都有一个“名称”属性,以及奖牌计数的“金”、“银”、“铜”。列表应该排序:1.首先是奖牌总数2.如果奖牌相同,按类型分割(金>银>铜,即2金>1金+1银)3.如果奖牌和类型相同,则按字母顺序子排序我正在用ruby​​做这件事,但我想语言并不重要。我确实找到了一个解决方案,但如果感觉必须有更优雅的方法来实现它。这是我做的:使用加权奖牌总数创建一个虚拟属性。因此,如果他们有2个金牌和1个银牌,加权总数将为“3.020100”。1金1银1铜为“3.010101”由于我们希望将奖牌数排序为最高的,因此列表按降序排

  8. += 的 Ruby 方法 - 2

    有没有办法让Ruby能够做这样的事情?classPlane@moved=0@x=0defx+=(v)#thisiserror@x+=v@moved+=1enddefto_s"moved#{@moved}times,currentxis#{@x}"endendplane=Plane.newplane.x+=5plane.x+=10putsplane.to_s#moved2times,currentxis15 最佳答案 您不能在Ruby中覆盖复合赋值运算符。任务在内部处理。您应该覆盖+,而不是+=。plane.a+=b与plane.a=

  9. ruby - Sinatra + Heroku + Datamapper 使用 dm-sqlite-adapter 部署问题 - 2

    出于某种原因,heroku尝试要求dm-sqlite-adapter,即使它应该在这里使用Postgres。请注意,这发生在我打开任何URL时-而不是在gitpush本身期间。我构建了一个默认的Facebook应用程序。gem文件:source:gemcuttergem"foreman"gem"sinatra"gem"mogli"gem"json"gem"httparty"gem"thin"gem"data_mapper"gem"heroku"group:productiondogem"pg"gem"dm-postgres-adapter"endgroup:development,:t

  10. ruby - Ruby 中字符串运算符 + 和 << 的区别 - 2

    我是Ruby和这个网站的新手。下面两个函数是不同的,一个在函数外修改变量,一个不修改。defm1(x)x我想确保我理解正确-当调用m1时,对str的引用被复制并传递给将其视为x的函数。运算符当调用m2时,对str的引用被复制并传递给将其视为x的函数。运算符+创建一个新字符串,赋值x=x+"4"只是将x重定向到新字符串,而原始str变量保持不变。对吧?谢谢 最佳答案 String#+::str+other_str→new_strConcatenation—ReturnsanewStringcontainingother_strconc

随机推荐