草庐IT

multithreading - Goroutines, channels select 语句

coder 2024-07-09 原文

我在构建我的 goroutines 和 channel 时遇到问题。我的 select 语句在所有 goroutine 完成之前一直退出,我知道问题出在我发送完成信号的地方。我应该在哪里发送完成信号。

func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool, wg *sync.WaitGroup) {
    var results ProcessResults
    defer wg.Done()
    log.Info("Starting . . .")
    start := time.Now()

    for {
        select {
        case lead := <-ok:
            results.BackFill = append(results.BackFill, lead.Lead)
        case err := <-err:
            results.BadLeads = append(results.BadLeads, err)
        case <-quit:
            if verbose {
                log.Info("Logging errors from unprocessed leads . . .")
                logBl(results.BadLeads)
            }
            log.WithFields(log.Fields{
                "time-elapsed":                time.Since(start),
                "number-of-unprocessed-leads": len(results.BadLeads),
                "number-of-backfilled-leads":  len(results.BackFill),
            }).Info("Done")
            return
        }
    }
}

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    var wg sync.WaitGroup
    gl, bl, d := getChans()
    for i, lead := range leads {
        done := false
        if len(leads)-1 == i {
            done = true
        }
        wg.Add(1)
        go func(lead Lead, done bool, wg *sync.WaitGroup) {
            ProcessLead(lead, gl, bl, d, done, wg)
        }(lead, done, &wg)

    }
    startWorker(gl, bl, d, verbose, &wg)
}

//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, c3 chan int, done bool, wg *sync.WaitGroup) {
    defer wg.Done()
    var payloads []Payload
    for _, p := range lead.Payload {
        decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
        if err != nil {
            c2 <- LeadResErr{lead, err.Error()}
        }
        var decMetadata Metadata
        if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
            goodMetadata, err := FixMDStr(string(decMDStr))
            if err != nil {
                c2 <- LeadResErr{lead, err.Error()}
            }
            p.MetaData = goodMetadata

            payloads = append(payloads, p)
        }
    }

    lead.Payload = payloads
    c1 <- LeadRes{lead}
    if done {
        c3 <- 0
    }
}

最佳答案

首先评论一下我在代码中看到的主要问题:

您将 done 变量传递给最后一个 ProcessLead 调用,您又在 ProcessLead 中使用该变量通过 停止您的工作人员退出 channel 。问题在于,“最后”ProcessLead 调用可能会在其他 ProcessLead 调用之前完成,因为它们是并行执行的。

第一次改进

将您的问题视为管道。您有 3 个步骤:

  1. 检查所有线索并为每个线索启动例程
  2. 程序处理他们的领导
  3. 收集结果

在第 2 步展开后,最简单的同步方法是 WaitGroup。正如已经提到的,您没有调用同步,如果您调用,您当前会创建一个与您的收集例程相关的死锁。您需要另一个 goroutine 将同步与收集例程分开才能工作。

它看起来怎么样(很抱歉删除了一些代码,这样我可以更好地看到结构):

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    gl, bl, d := make(chan LeadRes), make(chan LeadResErr), make(chan int)
    // additional goroutine with wg.Wait() and closing the quit channel
    go func(d chan int) {
        var wg sync.WaitGroup
        for i, lead := range leads {
            wg.Add(1)
            go func(lead Lead, wg *sync.WaitGroup) {
                ProcessLead(lead, gl, bl, wg)
            }(lead, &wg)
        }
        wg.Wait()
        // stop routine after all other routines are done
        // if your channels have buffers you might want make sure there is nothing in the buffer before closing
        close(d) // you can simply close a quit channel. just make sure to only close it once
    }(d)

    // now startworker is running parallel to wg.Wait() and close(d)
    startWorker(gl, bl, d, verbose)
}

func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool) {
    for {
        select {
        case lead := <-ok:
            fmt.Println(lead)
        case err := <-err:
            fmt.Println(err)
        case <-quit:
            return
        }
    }
}

//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, wg *sync.WaitGroup) {
    defer wg.Done()
    var payloads []Payload
    for _, p := range lead.Payload {
        decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
        if err != nil {
            c2 <- LeadResErr{lead, err.Error()}
        }
        var decMetadata Metadata
        if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
            goodMetadata, err := FixMDStr(string(decMDStr))
            if err != nil {
                c2 <- LeadResErr{lead, err.Error()}
            }
            p.MetaData = goodMetadata

            payloads = append(payloads, p)
        }
    }

    lead.Payload = payloads
    c1 <- LeadRes{lead}
}

建议的解决方案

如评论中所述,如果您有缓冲 channel ,您可能会遇到麻烦。复杂性来自您拥有的两个输出 channel (用于 Lead 和 LeadErr)。您可以使用以下结构避免这种情况:

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    gl, bl := make(chan LeadRes), make(chan LeadResErr)

    // one goroutine that blocks until all ProcessLead functions are done
    go func(gl chan LeadRes, bl chan LeadResErr) {
        var wg sync.WaitGroup
        for _, lead := range leads {
            wg.Add(1)
            go func(lead Lead, wg *sync.WaitGroup) {
                ProcessLead(lead, gl, bl, wg)
            }(lead, &wg)
        }
        wg.Wait()
    }(gl, bl)

    // main routine blocks until all results and errors are collected
    var wg sync.WaitGroup
    res, errs := []LeadRes{}, []LeadResErr{}
    wg.Add(2) // add 2 for resCollector and errCollector
    go resCollector(&wg, gl, res)
    go errCollector(&wg, bl, errs)
    wg.Wait()

    fmt.Println(res, errs) // in these two variables you will have the results.
}

func resCollector(wg *sync.WaitGroup, ok chan LeadRes, res []LeadRes) {
    defer wg.Done()
    for lead := range ok {
        res = append(res, lead)
    }
}

func errCollector(wg *sync.WaitGroup, ok chan LeadResErr, res []LeadResErr) {
    defer wg.Done()
    for err := range ok {
        res = append(res, err)
    }
}

// ProcessLead function as in "First improvement"

关于multithreading - Goroutines, channels select 语句,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45271129/

有关multithreading - Goroutines, channels select 语句的更多相关文章

  1. ruby - 如何在 Ruby 中向现有方法定义添加语句 - 2

    我注意到类定义,如果我打开classMyClass,并在不覆盖的情况下添加一些东西我仍然得到了之前定义的原始方法。添加的新语句扩充了现有语句。但是对于方法定义,我仍然想要与类定义相同的行为,但是当我打开defmy_method时似乎,def中的现有语句和end被覆盖了,我需要重写一遍。那么有什么方法可以使方法定义的行为与定义相同,类似于super,但不一定是子类? 最佳答案 我想您正在寻找alias_method:classAalias_method:old_func,:funcdeffuncold_func#similartoca

  2. ruby - ruby 乘法语句中星号中断语法前的空格 - 2

    在添加一些空格以使代码更具可读性时(与上面的代码对齐),我遇到了这个:classCdefx42endendm=C.new现在这将给出“错误数量的参数”:m.x*m.x这将给出“语法错误,意外的tSTAR,期待$end”:2/m.x*m.x这里的解析器到底发生了什么?我使用Ruby1.9.2和2.1.5进行了测试。 最佳答案 *用于运算符(42*42)和参数解包(myfun*[42,42])。当你这样做时:m.x*m.x2/m.x*m.xRuby将此解释为参数解包,而不是*运算符(即乘法)。如果您不熟悉它,参数解包(有时也称为“spl

  3. ruby - 有没有办法从 ruby​​ case 语句中访问表达式? - 2

    我想从then子句中访问c​​ase语句表达式,即food="cheese"casefoodwhen"dip"then"carrotsticks"when"cheese"then"#{expr}crackers"else"mayo"end在这种情况下,expr是食物的当前值(value)。在这种情况下,我知道,我可以简单地访问变量food,但是在某些情况下,该值可能无法再访问(array.shift等)。除了将expr移出到局部变量然后访问它之外,是否有直接访问caseexpr值的方法?罗亚附注我知道这个具体示例很简单,只是一个示例场景。 最佳答案

  4. ruby - 在 Ruby 的 if 语句中检查 bash 命令 - 2

    如何在Ruby的if语句中检查bash命令的返回值(true/false)。我想要这样的东西,if("/usr/bin/fswscell>/dev/null2>&1")has_afs="true"elsehas_afs="false"end它会提示以下错误含义,它总是返回true。(irb):5:warning:stringliteralincondition正确的语法是什么?更新:/usr/bin/fswscell寻找afs安装和运行状态。它会抛出这样的字符串,Thisworkstationbelongstocell如果afs没有运行,命令以状态1退出 最

  5. ruby - 变量赋值后的 if 语句 - 有多常见? - 2

    我最近与一位同事讨论了以下Ruby语法:value=ifa==0"foo"elsifa>42"bar"else"fizz"end我个人并没有看到太多这种逻辑,但我的同事指出,这实际上是一种相当普遍的Rubyism。我试着用谷歌搜索这个主题,但没有找到任何文章、页面或SO问题来讨论它,这让我相信这可能是一种非常实际的技术。然而,另一位同事发现语法令人困惑,而是将上面的逻辑写成这样:ifa==0value="foo"elsifa>42value="bar"elsevalue="fizz"end缺点是value=的重复声明和隐式elsenil的丢失,如果我们想使用它的话。这也感觉它与Ruby

  6. ruby - 当你有一个没有参数的 case 语句并且 when 子句是 lambda 时会发生什么? - 2

    这段代码没有像我预期的那样执行:casewhen->{false}then"why?"else"ThisiswhatIexpect"end#=>"why?"这也不是casewhen->(x){false}then"why?"else"ThisiswhatIexpect"end#=>"why?"第一个then子句在两种情况下都被执行,这意味着我提供给when子句的lambda没有被调用。我知道无论when子句的主题是什么,都应该调用大小写相等运算符===。我想知道当没有为case提供参数时,===的另一边会发生什么。我在想它可能是nil,但它不可能是:->{false}===nil#=>

  7. Ruby:用于检查 nil/false 条件语句的干净代码? - 2

    我总是遇到这个Ruby问题,我想把它写得更干净。varacanbenila.valuecanalsobenila.valuehaspossibletrueorfalsevalueif(nota.nil?)&&(nota.value.nil?)&&a.value==falseputs"avalueisnotavailable"elseputs"avalueistrue"end问题在于条件语句过于笨拙且难以阅读。如何改进检查nil和false条件语句?谢谢,我是Ruby新手 最佳答案 Rubyonrails有一个叫做try的扩展,它允许

  8. ruby-on-rails - 放置调试语句 - 2

    当我编写代码时,我非常频繁地使用“puts”语句进行调试。它允许我查看服务器中发生的情况。在调试代码的时候,不知道是什么原因,我把这些“puts”语句去掉了。这是个好主意还是我应该保留它们以使我的服务器日志更加清晰? 最佳答案 您应该使用记录器而不是puts。使用这种语句:Rails.logger.debug"DEBUG:#{self.inspect}#{caller(0).first}"ifRails.logger.debug?如果您想(几乎)实时查看调试,只需在另一个终端窗口中使用tail命令:tail-Flog/develop

  9. ruby - 使用 include 的简短 Ruby Case 语句?不管用 - 2

    我有以下代码,其中有一个小错误,case语句返回值“other”,即使第一个“when”语句评估为true并且应该返回“boats”。我已经关注这个很久了,一定是个小东西。CATEGORY_CLASSES={:boats=>[1,2,3,4,5,6],:houses=>[7,8,9,10],:other=>[11,12,13,14,15,16]}category_id=1category=casecategory_idwhenCATEGORY_CLASSES[:boats].include?(category_id);"boats"whenCATEGORY_CLASSES[:house

  10. ruby - Ruby 中的语句和表达式有什么区别? - 2

    我知道在Ruby中,几乎所有东西都是表达式。即使是其他语言中的if-else语句、case语句、赋值语句、loop语句在Ruby中也是表达式。所以我想从Ruby的角度了解,statement和expression有什么区别? 最佳答案 Ruby中表达式和语句没有区别。一切都计算为一个值,所以一切都是表达式。 关于ruby-Ruby中的语句和表达式有什么区别?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.c

随机推荐