草庐IT

producer-consumer

全部标签

c++ - 快速 C++ 单生产者单消费者实现

我正在寻找一个单一生产者、单一消费者FIFO实现,它比普通的锁-写-解锁-信号/waitForSignal-锁-读-解锁东西执行得更快。我正在寻找用C或C++编写的大多数POSIX操作系统(特定于x86很好)支持的东西。我不想传递任何比指针更大的东西。我不一定喜欢无锁的想法,但我确实想要快速和正确的东西。我读过的一篇关于这个主题的论文提到了一种看起来很有趣的双队列方法,但从那以后我就没能找到太多关于它的信息。从目前为止我所做的研究来看,0mq(据说它的inproc://方案使用无锁结构)看起来是最有吸引力的选择。话虽这么说,但在我走这条路之前,我想确定我没有错过任何东西。另一种选择可能

c++ - 高吞吐量非阻塞服务器设计 : Alternatives to busy wait

我一直在构建一个用于多媒体消息传递的高吞吐量服务器应用程序,实现语言是C++。每个服务器都可以独立使用,也可以将许多服务器连接在一起以创建基于DHT的覆盖网络;服务器就像Skype中的super节点一样。工作正在进行中。目前,服务器每秒可以处理大约200,000条消息(256字节消息),并且在我的机器(Inteli3Mobile2GHz、FedoraCore18(64位)、4GBRAM)上的最大吞吐量约为256MB/s长度为4096字节的消息。服务器有两个线程,一个线程用于处理所有IO(基于epoll,边缘触发),另一个线程用于处理传入消息。覆盖管理还有另一个线程,但在当前讨论中无关紧

c++ - 为什么condition_variable在producer-consumer中等待锁呢? C++

看下面经典的生产者消费者代码:intmain(){std::queueproduced_nums;std::mutexm;std::condition_variablecond_var;booldone=false;boolnotified=false;std::threadproducer([&](){for(inti=0;ilock(m);std::coutlock(m);while(!done){while(!notified){//looptoavoidspuriouswakeupscond_var.wait(lock);}while(!produced_nums.empty(

C++ memory_order_consume, kill_dependency, dependency-ordered-before, 同步

我正在阅读C++ConcurrencyinAction安东尼·威廉姆斯。目前我在他描述memory_order_consume的地方。在那block之后有:NowthatI’vecoveredthebasicsofthememoryorderings,it’stimetolookatthemorecomplexparts这让我有点害怕,因为我不完全理解几件事:dependency-ordered-before与synchronizes-with有何不同?他们都创建了先发生后发生的关系。确切的区别是什么?我对以下示例感到困惑:intglobal_data[]={…};std::atomi

c++ - 如何将并发解决方案应用于类似生产者-消费者的情况

我有一个包含一系列节点的XML文件。每个节点代表一个我需要解析并添加到排序列表中的元素(顺序必须与文件中找到的节点的顺序相同)。目前我使用的是顺序解决方案:structGraphic{boolparse(){//parsing...returnparse_outcome;}};vector>graphics;voidproducer(){for(size_ti=0;iparse())graphics.emplace_back(g);elsedeleteg;}}因此,仅当图形(实际上是派生自Graphic的类的实例、Line、Rectangle等时,这就是new的原因)可以正确解析,它将

Kafka - 消费进度监控(Consumer Lag)

所谓滞后程度,就是指消费者当前落后于生产者的程度。Lag应该算是最最重要的监控指标了。它直接反映了一个消费者的运行情况。一个正常工作的消费者,它的Lag值应该很小,甚至是接近于0的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者Lag值很大,通常就表明它无法跟上生产者的速度,最终Lag会越来越大,从而拖慢下游消息的处理速度。通常来说,Lag的单位是消息数,而且我们一般是在主题这个级别上讨论Lag的,但实际上,Kafka 监控Lag的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的Lag,将它们累加起来,合并成最终的Lag值。在实际业务场

c++ - 生产者完成后通知消费者的优雅方式?

我正在实现一个具有最少功能的concurrent_blocking_queue://athinwrapperoverstd::queuetemplateclassconcurrent_blocking_queue{std::queuem_internal_queue;//...public:voidadd(Tconst&item);T&remove();boolempty();};我打算将其用于producer-consumerproblem(我想,这是人们使用这种数据结构的地方?)。但我被困在一个问题上:生产者完成后如何优雅地通知消费者?生产者如何在完成后通知队列?通过调用特定的成员

c++ - 如何使用 POSIX 线程实现阻塞读取

我想实现一个遵循大致如下接口(interface)的生产者/消费者场景:classConsumer{private:vectorread(size_tn){//Iftheinternalbufferhas`n`elements,thendequeuethem//Otherwisewaitformoredataandtryagain}public:voidrun(){read(10);read(4839);//etc}voidfeed(constvector&more){//Safelyqueuethedata//Notify`read`thatthereisnowmoredata}};

【Kafka】生产者Producer详解

目录消息发送消息生产流程ProducerRecord序列化器分区器拦截器生产者原理剖析主线程消息累加器发送线程生产者参数消息发送消息生产流程整个流程如下:Producer创建时,会创建一个Sender线程并设置为守护线程。生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端

kafka消费者API consumer.poll()没有犯错,没有例外,只是阻止

我正在学习kafka之后的apachekafka文档。我从默认配置开始。bin/zookeeper-server-start.shconfig/zookeeper.propertiesbin/kafka-server-start.shconfig/server.properties&我运行了kafka-console-producer.sh和kafka-console-consumer.sh来制作和消费消息,这是成功的。我使用ProducerAPI编写了Java代码来产生消息,这没关系。这是由Kafka-Console-Consumer.sh验证的。该代码与ApacheKafka指南相同:Pr