草庐IT

go - 在生产者速度慢,消费者速度快的情况下,如何处理 channel 关闭同步?

coder 2024-07-12 原文

关闭。这个问题是opinion-based .它目前不接受答案。












想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题.

3年前关闭。




Improve this question




我是新手,找不到这个问题的答案。我正在做的是在生产者中读取 CSV 文件,做一些可能需要时间的事情,然后通过 channel 将输出发送给消费者。有一连串生产者-消费者 s,并且任何生产者最终都可能比它的消费者慢。

producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)



这里最多可以有 15 个消费者。

现在我面临的问题是,如果生产者完成了,我们如何在消费者端决定,我们可以停止处理。

我需要实现的是:
  • 一旦生产者完成,所有消费者最终应该做一些清理并在完成剩余的
  • 后退出。
  • 如果消费者在特定的超时时间内没有获得任何数据,它可以退出(最好是有信号)而不会进一步阻塞。
  • 它发生在整个序列中的所有生产者-消费者对中。

  • 我使用了以下方法。
  • 为每个数据 channel 保留一个信号 channel ,并为其下一个消费者的每个 goroutine 发布一个“完成”。
  • 读完之后,每个消费者应该只读取 channel 中剩余的缓冲数据,然后在下一个信号 channel 上放置,比如 5 “done”。确保每个 goroutine 只有 5 个,而不是 5 个(使用 https://golang.org/pkg/sync/#Once.Do )。
  • 下面是我能想到的。
    processRemaining = false
    for processRemaining == false{
            select {
            case stuff, ok := <-input_messages:
                    do_stuff(stuff)
                    if ok == false { // if channel has been closed
                        processRemaining = true
                    }
                    if result != nil {
                            //send to channel output_messages
                    }
            case sig := <-input_signals: // if signaled to stopped.
                    fmt.Println("received signal", sig)
                    processRemaining = true
            default:
                    fmt.Println("no activity")
            }
    }
    if processRemaining {
            for stuff := range input_messages {
                    do_stuff(stuff)
                    if result != nil {
                            //send to channel output_messages
                    }
            }
            // send "output_routine" number of "done" to a channel "output_signals".
    }
    

  • 但即使在这种方法中,我也无法想出任何与关闭的“input_messages” channel 相同的方式,如果没有可用的时间,比如 10 秒。

    我忽略了这种方法有什么问题吗?解决此问题的可能方法(或并发模式)是什么?确保:
  • 一旦第一个“chan0”关闭,所有后续 channel 都会关闭。
  • 所有生产者在关闭它们的输出 channel 之前都会更新,并且只有在它们都完成写入后才会关闭 channel 。
  • 如果消费者在指定的超时时间内未从 channel 获取数据,则应将其视为已关闭,并自行解除阻塞。
  • 最佳答案

    使用 sync.WaitGroup跟踪正在运行的 goroutine 的数量。每个 goroutine 在不再从 channel 获取数据后退出。曾经WaitGroup完成后,清理工作就可以完成了。

    像这样的东西:

    import (
            "sync"
            "time"
    )
    
    type Data interface{} // just an example
    
    type Consumer interface {
            Consume(Data) Data
            CleanUp()
            Count() int
            Timeout() time.Duration
    }
    
    func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
            wg := sync.WaitGroup{}
            for i := 0; i < consumer.Count(); i++ {
                    wg.Add(1)
                    go func() {
                    consumeLoop:
                            for {
                                    select {
                                    case v, ok := <-inCh: // 'ok' says if the channel is still open
                                            if !ok {
                                                    break consumeLoop
                                            }
                                            outCh <- consumer.Consume(v)
                                    case <-time.After(consumer.Timeout()):
                                            break consumeLoop
                                    }
                            }
    
                            wg.Done()
                    }()
            }
            wg.Wait()
    
            consumer.CleanUp()
            close(outCh)
    }
    

    在管道的每个阶段,您都可以使用与上述类似的过程来启动消费者。

    关于go - 在生产者速度慢,消费者速度快的情况下,如何处理 channel 关闭同步?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52127485/

    有关go - 在生产者速度慢,消费者速度快的情况下,如何处理 channel 关闭同步?的更多相关文章

    1. ruby - 默认情况下使选项为 false - 2

      这是在Ruby中设置默认值的常用方法:classQuietByDefaultdefinitialize(opts={})@verbose=opts[:verbose]endend这是一个容易落入的陷阱:classVerboseNoMatterWhatdefinitialize(opts={})@verbose=opts[:verbose]||trueendend正确的做法是:classVerboseByDefaultdefinitialize(opts={})@verbose=opts.include?(:verbose)?opts[:verbose]:trueendend编写Verb

    2. ruby - 在没有 sass 引擎的情况下使用 sass 颜色函数 - 2

      我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re

    3. ruby-on-rails - Enumerator.new 如何处理已通过的 block ? - 2

      我在理解Enumerator.new方法的工作原理时遇到了一些困难。假设文档中的示例:fib=Enumerator.newdo|y|a=b=1loopdoy[1,1,2,3,5,8,13,21,34,55]循环中断条件在哪里,它如何知道循环应该迭代多少次(因为它没有任何明确的中断条件并且看起来像无限循环)? 最佳答案 Enumerator使用Fibers在内部。您的示例等效于:require'fiber'fiber=Fiber.newdoa=b=1loopdoFiber.yieldaa,b=b,a+bendend10.times.m

    4. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

      在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

    5. ruby - 在不使用 RVM 的情况下在 Mac 上卸载和升级 Ruby - 2

      我最近决定从我的系统中卸载RVM。在thispage提出的一些论点说服我:实际上,我的决定是,我根本不想担心Ruby的多个版本。我只想使用1.9.2-p290版本而不用担心其他任何事情。但是,当我在我的Mac上运行ruby--version时,它告诉我我的版本是1.8.7。我四处寻找如何简单地从我的Mac上卸载这个Ruby,但奇怪的是我没有找到任何东西。似乎唯一想卸载Ruby的人运行linux,而使用Mac的每个人都推荐RVM。如何从我的Mac上卸载Ruby1.8.7?我想升级到1.9.2-p290版本,并且我希望我的系统上只有一个版本。 最佳答案

    6. ruby - 如何关闭 ruby​​ gem "Spreadsheet?"中的文件 - 2

      下面的代码在我第一次运行它时就可以正常工作:require'rubygems'require'spreadsheet'book=Spreadsheet.open'/Users/me/myruby/Mywks.xls'sheet=book.worksheet0row=sheet.row(1)putsrow[1]book.write'/Users/me/myruby/Mywks.xls'当我再次运行它时,我会收到更多消息,例如:/Library/Ruby/Gems/1.8/gems/spreadsheet-0.6.5.9/lib/spreadsheet/excel/reader.rb:11

    7. ruby-on-rails - 如何处理 Grape 中特定操作的过滤器之前? - 2

      我正在我的Rails项目中安装Grape以构建RESTfulAPI。现在一些端点的操作需要身份验证,而另一些则不需要身份验证。例如,我有users端点,看起来像这样:moduleBackendmoduleV1classUsers现在如您所见,除了password/forget之外的所有操作都需要用户登录/验证。创建一个新的端点也没有意义,比如passwords并且只是删除password/forget从逻辑上讲,这个端点应该与用户资源。问题是Grapebefore过滤器没有像except,only这样的选项,我可以在其中说对某些操作应用过滤器。您通常如何干净利落地处理这种情况?

    8. ruby - 在什么情况下会使用 Sinatra 或 Merb? - 2

      我正在学习Rails,对Sinatra和Merb知之甚少。我想知道您会在哪些情况下使用Merb/Sinatra。感谢您的反馈! 最佳答案 Sinatra是一个比Rails更小、更轻的框架。如果你想让一些东西快速运行,只需发送几个URL并返回一些简单的内容,就可以使用它。看看Sinatrahomepage;这就是启动和运行“Hello,World”所需的全部内容,而在Rails中,您需要生成整个项目结构、设置Controller和View、设置路由等等(我还没有有一段时间写了一个Rails应用程序,所以我不知道“Hello,World

    9. ruby - 是否可以在不实际发送或读取数据的情况下查明 ruby​​ 套接字是否处于 ESTABLISHED 或 CLOSE_WAIT 状态? - 2

      s=Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)s.connect(Socket.pack_sockaddr_in('port','hostname'))ssl=OpenSSL::SSL::SSLSocket.new(s,sslcert)ssl.connect从这里开始,如果ssl连接和底层套接字仍然是ESTABLISHED,或者它是否在默认值7200之后进入CLOSE_WAIT,我想检查一个线程几秒钟甚至更糟的是在实际上不需要.write()或.read()的情况下关闭。是用select()、IO.select()还是其他方法完成

    10. ruby-on-rails - 在这种情况下我如何模拟一个对象?没有明显的方法可以用模拟替换对象 - 2

      假设我在Store的模型中有这个非常简单的方法:defgeocode_addressloc=Store.geocode(address)self.lat=loc.latself.lng=loc.lngend如果我想编写一些不受地理编码服务影响的测试脚本,这些脚本可能已关闭、有限制或取决于我的互联网连接,我该如何模拟地理编码服务?如果我可以将地理编码对象传递到该方法中,那将很容易,但我不知道在这种情况下该怎么做。谢谢!特里斯坦 最佳答案 使用内置模拟和stub的rspecs,你可以做这样的事情:setupdo@subject=MyCl

    随机推荐