草庐IT

go - 如何等待所有工作完成再关闭 channel ?

coder 2023-07-01 原文

我正在用 Go 创建一个电子邮件发送应用程序。

我遇到了无法成功发送电子邮件的问题。在这些情况下,我想在 go channel 中返回电子邮件,但该 channel 已关闭。

如何防止 channel 在所有任务完成之前关闭?

这是有这个问题的函数:

func worker(toSend chan combo, tried chan combo, s smptInfo, wg *sync.WaitGroup) {
    for try := range toSend {

        startSend := time.Now()
        delegate := make(chan bool, 1)
        go sendEmailSSH(try, delegate, s, try.line)
        select {
        case res := <-delegate:
            if res == true {
                try.success = res
                tried <- try
            } else {
                toSend <- try // if send not successfull return back to channel
            }
        case <-time.After(smtpTimeout):
            toSend <- try // if send timeout return back to channel
        }

        Pauza(startSend) // if send more fast like limit send, wait.
    }

    wg.Done()
}

可以在 GitHub 上找到完整的应用程序.

最佳答案

这是我建议的方法。为简单起见,我删除了几个 SMTP 特定部分。您可以重新插入它们。

<强>1。示例代码:

package main

import (
    "fmt"
    "log"
    "math/rand"
    "sync"
    "time"
)

type combo struct {
    success bool
    toEmail string
}

func sendEmailSSH(try combo, delegate chan bool) {

    defer close(delegate)

    r := rand.Intn(7)
    time.Sleep(time.Duration(r) * time.Second)
    if r%2 == 0 {
        fmt.Printf("sendEmailSSH: failing randomly after %d seconds: %+v\n", r, try)

        delegate <- false
    } else {
        fmt.Printf("sendEmailSSH: successful after %d seconds: %+v\n", r, try)

        delegate <- true

    }

}

func sendEmailSSHWrapper(try combo, toRetry chan combo) {

    // do something with try
    delegate := make(chan bool, 1)
    smtpTimeout := 3 // if no success within 3 seconds, mark it as failed

    go sendEmailSSH(try, delegate)


    go func() {
                select {
        case res := <-delegate:
            if res == true {
                try.success = res
            } else {
                toRetry <- try // if send not successfull, add to retry channel
            }
        case <-time.After(time.Duration(smtpTimeout) * time.Second):
            fmt.Printf("sendEmailSSHWrapper: failing due to timeout: %+v\n", try)

            toRetry <- try // if send timeout, add to retry channel
        }

    }()

}

// where we actually do the work
func worker(toSend chan combo, wg *sync.WaitGroup, id int) {

    toRetry := make(chan combo, 5)
    retryDone := false

    for try := range toSend {
        go sendEmailSSHWrapper(try, toRetry)
    }

    for !retryDone {
        select {

        case try, ok := <-toRetry:
            if !ok {
                log.Println("toRetry is already closed")
                retryDone = true
                continue
            }
            fmt.Printf("worker %d: picking up %+v for retry\n", id, try)
            go sendEmailSSHWrapper(try, toRetry)
        case <-time.After(time.Duration(15) * time.Second):
            fmt.Printf("worker %d: no data in toRetry(%d) channel for 15 seconds. closing toRetry now\n", id, id)
            close(toRetry)
            toRetry = nil
            retryDone = true
        }

    }

    wg.Done()
}

func main() {

    var wg sync.WaitGroup // keep track of the workers

    toSend := make(chan combo) // to the workers

    // emailList := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
    emailList := []string{"a", "b", "c", "d", "e", "f"}


    // initialize n workers

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(toSend, &wg, i)
    }

    var count int = 0

    for i := 0; i < len(emailList); i++ {
        count++
        toSend <- combo{toEmail: emailList[i], success: false}
    }

    close(toSend)
    wg.Wait()

    fmt.Println("Send emails Done!")
}

在这里试试: https://play.golang.org/p/W7ur2wr3HeD

<强>2。我做了什么:

  1. 只要 main 添加完数据,它就应该关闭 toSend channel 。它不会再被使用。
  2. 为 sendEmailSSH 函数创建一个包装器。您当前调用 go sendEmailSSH(try, delegate, s, try.line) 的实现不是很优化,因为在启动 go 例程后,它无论如何都会在一元大小的 channel 上被阻塞,就像适合连续通话。
  3. 或者,我调用了 go sendEmailSSHWrapper(try, toRetry),它在内部调用实际的 sendEmailSSHWrapper 函数并监控委托(delegate) channel 。
  4. sendEmailSSH 随机失败一些请求,用于演示目的。
  5. sendEmailSSHWrapper 监控 delegate 它自己的失败 channel ,并检查超时(我将其保留 3 秒用于演示)
  6. Worker 首先完全耗尽 toSend channel 。然后移动到 toRetry channel 。它可以正常工作,除非它在 ​​15 秒内在此 channel 上没有看到任何数据(再次保持高电平以进行演示)

<强>3。您需要添加的内容:

  1. 您确实应该实现某种机制,以便您对任何给定的电子邮件地址仅重试给定的次数。否则,对于任何错误的电子邮件地址或任何意外情况(由于延迟导致多次连续超时),您可能会永远陷入 for 循环,可能会使用映射来跟踪重试条目的计数。或者更简单,将 retryCount int 添加到您的 combo 结构,并在每次重试时递增它,并在每次重试之前进行检查。

  2. 请注意,在您和我的方法中,我们都会在超时时重试。您确定如果发生超时,将永远不会发送电子邮件吗?如果没有,您真的应该使用 context with cancellation,或者如果需要更多时间,则以某种方式终止实际的 go 例程,然后再重试。否则,在某个糟糕的日子,如果延迟时间更长,您可能会超时很多,然后继续重试。而实际上,您会向用户发送大量电子邮件垃圾邮件。

关于go - 如何等待所有工作完成再关闭 channel ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52265146/

有关go - 如何等待所有工作完成再关闭 channel ?的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  4. ruby-on-rails - 由于 "wkhtmltopdf",PDFKIT 显然无法正常工作 - 2

    我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-

  5. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  6. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  7. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  8. ruby - 如何以所有可能的方式将字符串拆分为长度最多为 3 的连续子字符串? - 2

    我试图获取一个长度在1到10之间的字符串,并输出将字符串分解为大小为1、2或3的连续子字符串的所有可能方式。例如:输入:123456将整数分割成单个字符,然后继续查找组合。该代码将返回以下所有数组。[1,2,3,4,5,6][12,3,4,5,6][1,23,4,5,6][1,2,34,5,6][1,2,3,45,6][1,2,3,4,56][12,34,5,6][12,3,45,6][12,3,4,56][1,23,45,6][1,2,34,56][1,23,4,56][12,34,56][123,4,5,6][1,234,5,6][1,2,345,6][1,2,3,456][123

  9. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  10. ruby - 如何每月在 Heroku 运行一次 Scheduler 插件? - 2

    在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/

随机推荐