我正在编写一个应用程序,用户可以从多个“作业”(实际上是 URL)开始。在开始(主例程)时,我将这些 URL 添加到队列中,然后启动 x 个处理这些 URL 的 goroutines。
在特殊情况下,URL 指向的资源可能包含更多必须添加到队列中的 URL。这 3 名 worker 正在等待新工作的到来并处理它们。问题是:一旦每个 worker 都在等待工作(并且没有人在生产), worker 应该完全停止。因此,要么所有人都工作,要么没有人工作。
我当前的实现看起来像这样,但我认为它并不优雅。不幸的是,我想不出一个不包含竞争条件的更好方法,而且我不完全确定这个实现是否真的按预期工作:
var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}
func work(working chan int) {
absent := make(chan struct{}, 1)
// if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
// This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
one := false
for {
select {
case u, ok := <-queue.Pop():
if !ok {
close(absent)
return
}
if !one {
// I have started working (delta + 1)
working <- 1
absent <- struct{}{}
one = true
}
// do work with u (which may lead to queue.Push(urls...))
case <-absent: // no jobs at the moment. consume absent => wait
one = false
working <- -1
}
}
}
func Start() {
working := make(chan int)
for i := 0; i < WORKER_COUNT; i++ {
go work(working)
}
// the amount of actually working workers...
sum := 0
for {
delta := <-working
sum += delta
if sum == 0 {
queue.Close() // close channel -> kill workers.
done <- struct{}{}
return
}
}
}
有没有更好的方法来解决这个问题?
最佳答案
您可以 use a sync.WaitGroup (参见 docs )控制工作人员的生命周期,并使用非阻塞发送,这样工作人员在尝试排队更多作业时就不会死锁:
package main
import "sync"
const workers = 4
type job struct{}
func (j *job) do(enqueue func(job)) {
// do the job, calling enqueue() for subtasks as needed
}
func main() {
jobs, wg := make(chan job), new(sync.WaitGroup)
var enqueue func(job)
// workers
for i := 0; i < workers; i++ {
go func() {
for j := range jobs {
j.do(enqueue)
wg.Done()
}
}()
}
// how to queue a job
enqueue = func(j job) {
wg.Add(1)
select {
case jobs <- j: // another worker took it
default: // no free worker; do the job now
j.do(enqueue)
wg.Done()
}
}
todo := make([]job, 1000)
for _, j := range todo {
enqueue(j)
}
wg.Wait()
close(jobs)
}
尝试使用缓冲 channel 避免死锁的困难在于,您必须预先分配一个足够大的 channel ,以确保在不阻塞的情况下容纳所有待处理的任务。有问题,除非,比如说,您要抓取的 URL 数量很少且已知。
当您回退到在当前线程中执行普通递归时,您就没有静态缓冲区大小限制。当然,仍然存在限制:如果有太多待处理的工作,您可能会耗尽 RAM,并且理论上您可以通过深度递归耗尽堆栈(但这很难!)。因此,如果您要抓取整个 Web,则需要以更复杂的方式跟踪未决任务。
最后,作为一个更完整的例子,我对这段代码并不感到非常自豪,但我碰巧写了一个 function to kick off a parallel sort这与您的 URL 提取的递归方式相同。
关于concurrency - 潜在递归任务的工作池(即,每个作业都可以排队其他作业),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29616241/
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
查看Ruby的CSV库的文档,我非常确定这是可能且简单的。我只需要使用Ruby删除CSV文件的前三列,但我没有成功运行它。 最佳答案 csv_table=CSV.read(file_path_in,:headers=>true)csv_table.delete("header_name")csv_table.to_csv#=>ThenewCSVinstringformat检查CSV::Table文档:http://ruby-doc.org/stdlib-1.9.2/libdoc/csv/rdoc/CSV/Table.html
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
我花了三天的时间用头撞墙,试图弄清楚为什么简单的“rake”不能通过我的规范文件。如果您遇到这种情况:任何文件夹路径中都不要有空格!。严重地。事实上,从现在开始,您命名的任何内容都没有空格。这是我的控制台输出:(在/Users/*****/Desktop/LearningRuby/learn_ruby)$rake/Users/*******/Desktop/LearningRuby/learn_ruby/00_hello/hello_spec.rb:116:in`require':cannotloadsuchfile--hello(LoadError) 最佳
我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的