草庐IT

producer-consumer

全部标签

concurrency - 戈朗 : Producer/Consumer concurrency model but with serialized results

funcmain(){jobs:=[]Job{job1,job2,job3}numOfJobs:=len(jobs)resultsChan:=make(chan*Result,numOfJobs)jobChan:=make(chan*job,numOfJobs)goconsume(numOfJobs,jobChan,resultsChan)fori:=0;i在上面的示例中,作业被推送到jobChan中,goroutines将其从jobChan中拉出并并发执行作业并将结果推送到resultsChan中。然后我们将从resultsChan中提取结果。问题一:在我的代码中,没有序列化/线性化

与多个生产者/多个消费者并发

我可能遗漏了一些东西,或者不理解Go如何处理并发(或者我对并发本身的了解),我设计了一些代码来理解多个生产者/消费者。这是代码:packagemainimport("fmt""time"//"math/rand""sync")varsequint64=0vargeneratorChanchanuint64varrequestChanchanuint64funcmakeTimestamp()int64{returntime.Now().UnixNano()/int64(time.Millisecond)}funcgenerateStuff(genIdint){varcrapuint64f

go - 有条件的 Go Routine/Channel

我想让统计例程有条件地运行,这样它只在某些情况下运行,否则它会浪费一半的时间。现在我有一个例程作为生产者通过缓冲channel提供两个消费者例程。有没有办法让统计例程是有条件的,或者我应该遵循更好的模式?在此先感谢您提供的所有帮助!funcmain(){options()goproduce(readCSV(loc))goprocess()gostatistics()//onlyonflag 最佳答案 将此设置为条件并没有错:varstatschan[]string//Don'tinitializestats.funcmain(){o

go - 如何使用 Sarama Go Kafka Consumer 从最新的偏移量中消费

我有三个问题:“最早的偏移量”是什么意思?最早的偏移量并不意味着偏移量为0?//OffsetOldeststandsfortheoldestoffsetavailableonthebrokerfora//partition.OffsetOldestint64=-2假设一个。三个代理在同一台机器上运行B、消费组只有一个消费线程C.消费者配置OffsetOldest标志。D.已经产生了100条消息,目前消费者线程已经消耗了90条消息。那么如果消费者线程重启了,那么这个消费者会从哪个offset开始消费呢?是91还是0?在我们下面的代码中,似乎每次启动消费者时都会重新消费消息。但实际上它确实

parsing - 在 Go 中并发解析二进制文件中的记录

我有一个要解析的二进制文件。该文件被分解为每条1024字节的记录。所需的高级步骤是:一次从文件中读取1024个字节。解析每个1024字节的“记录”(block)并将解析的数据放入映射或结构中。将解析后的数据和任何错误返回给用户。我不是在寻找代码,只是在寻找设计/方法方面的帮助。由于I/O限制,我认为尝试从文件中并发读取没有意义。但是,我看不出为什么不能使用goroutine解析1024字节的记录,以便同时解析多个1024字节的记录。我是Go的新手,所以我想看看这是否有意义,或者是否有更好(更快)的方法:主函数打开文件并一次将1024个字节读入字节数组(记录)。记录被传递给一个函数,该函

windows - 如何在 Windows 8 Consumer Preview 中使用 TcpClient

我正在Windows8ConsumerPreview中编写Metro应用程序。但是,我无法在.NET4.5中使用TcpClient,似乎没有地方可以添加程序集引用。http://msdn.microsoft.com/en-us/library/1612451t(v=vs.110).aspx 最佳答案 城域网不支持TcpClient。您可以使用StreamSocket类代替。Here是关于如何使用它创建TCP套接字、建立连接、发送和接收数据的示例。这些示例使用JS和C++,但同一类适用于C#。

C++ 模板化生产者-消费者 BlockingQueue,无界缓冲区 : How do I end elegantly?

我写了一个BlockingQueue来让两个线程进行通信。您可以说它遵循生产者-消费者模式,具有无限缓冲区。因此,我使用临界区和信号量实现它,如下所示:#pragmaonce#include"Semaphore.h"#include"Guard.h"#includenamespaceDRA{namespaceCommonCpp{templateclassBlockingQueue{CCriticalSectionm_csQueue;CSemaphorem_semElementCount;std::queuem_Queue;//ForbidcopyandassignmentBlockin

c# - 如何批量消费 BlockingCollection<T>

我想出了一些代码来消耗队列中所有等待的项目。与其一个接一个地处理项目,不如将所有等待的项目作为一个集合来处理。我已经这样声明了我的队列。privateBlockingCollectionitems=newBlockingCollection(newConcurrentQueue);然后,在消费者线程上,我计划像这样批量读取项目,ItemnextItem;while(this.items.TryTake(outnextItem,-1)){varworkToDo=newList();workToDo.Add(nextItem);while(this.items.TryTake(outnex

c# - 在 BlockingCollection<T> 上调用 Dispose

我在Albahari(http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT)的Nutshell书中重用了C#中的示例生产者消费者队列,一位同事评论道:“为什么不在集合的Dispose中对BlockingCollection调用Dispose?”我找不到答案,我能想到的唯一原因是队列剩余工作负载的执行不会被处理。但是,当我处理队列时,为什么它不会停止处理?除了“为什么你不应该处理BlockingCollection?”我还有第二个问题“如果不处理BlockingCollection会有害吗?”。我想当你产生/处

c# - 这是 TPL Dataflow 的工作吗?

我在不同的任务上运行了一个非常典型的生产者/消费者模型。任务1:从二进制文件中读取成批的byte[]并为每个字节数组集合启动一个新任务。(出于内存管理目的,该操作是批处理的)。任务2-n:这些是工作任务,每个任务都对传入的字节数组集合(来自Tasks1)进行操作并反序列化字节数组,按特定标准对它们进行排序,然后存储结果对象的集合(每个字节数组在并发字典中反序列化为此类对象。任务(n+1)我选择了一个并发字典,因为这个任务的工作是合并存储在并发字典中的那些集合,其顺序与它们来自Task1的顺序相同。我通过从Task1一直向下传递到此任务的collectionID(它的类型为int并为Ta