我目前正在从事一个搜索引擎项目。为了更快的爬行速度,我在每次链接访问时使用一个 goroutine。但是我遇到了两个让我疑惑的问题!
第一个是代码示例:
package main
import "fmt"
import "sync"
import "time"
type test struct {
running int
max int
mu sync.Mutex
}
func main() {
t := &test{max: 1000}
t.start()
}
func (t *test) start() {
for {
if t.running >= t.max {
time.Sleep(200 * time.Millisecond)
continue
}
go t.visit()
}
}
func (t *test) visit() {
t.inc()
defer t.dec()
fmt.Println("visit called")
fmt.Printf("running: %d, max: %d\n", t.running, t.max)
fmt.Println()
time.Sleep(time.Second)
}
func (t *test) inc() {
t.mu.Lock()
t.running++
t.mu.Unlock()
}
func (t *test) dec() {
t.mu.Lock()
t.running--
t.mu.Unlock()
}
输出(裁剪):
running: 2485, max: 1000
running: 2485, max: 1000
running: 2485, max: 1000
visit called
running: 2485, max: 1000
running: 2485, max: 1000
running: 2485, max: 1000
running: 2485, max: 1000
visit called
running: 2485, max: 1000
running: 2485, max: 1000
虽然我在循环中明确检查允许的最大 goroutines,但为什么运行的 goroutines 超过了最大值?
第二个是部分真实项目代码:
更新:这实际上已修复,问题出在 LinkProvider.Get() 实现中,返回时间太长。 parser.visit() 同时返回,但是 Parser.Start() 中的循环正在等待一个新链接...并且输出似乎是连续的!
package worker
import (
"errors"
"fmt"
"sync"
"time"
"bitbucket.org/codictive/ise/components/crawler/models"
"bitbucket.org/codictive/ise/components/log/logger"
"bitbucket.org/codictive/ise/core/component"
"bitbucket.org/codictive/ise/core/database"
)
// Worker is a service that processes crawlable links.
type Worker interface {
Start() error
Stop() error
Restart() error
Status() Status
}
// Status contains runtime status of a worker.
type Status struct {
Running bool
RunningParsersCount int
}
// New return a new defaultWorker with given config.
func New() Worker {
return &defaultWorker{
flow: make(chan bool),
stop: make(chan bool),
}
}
// defaultWorker is a Worker implementation.
type defaultWorker struct {
linkProvider LinkProvider
handlersLimit int
runningHandlersCount int
running bool
mu sync.Mutex
flow chan bool
stop chan bool
}
func (w *defaultWorker) init() {
prate, _ := component.IntConfig("crawler.crawlInterval")
arate, _ := component.IntConfig("crawler.ad_crawlInterval")
concLimit, _ := component.IntConfig("crawler.concurrent_workers_limit")
w.linkProvider = NewLinkProvider(time.Duration(prate)*time.Hour, time.Duration(arate)*time.Hour)
w.handlersLimit = concLimit
}
// Start runs worker.
func (w *defaultWorker) Start() error {
logger.Info("Starting crawler worker...")
w.running = true
w.init()
defer func() {
w.running = false
logger.Info("Worker stopped.")
}()
for {
select {
case <-w.stop:
w.flow <- true
return nil
default:
fmt.Printf("running: %d limit: %d\n", w.runningHandlersCount, w.handlersLimit)
if w.runningHandlersCount >= w.handlersLimit {
time.Sleep(200 * time.Millisecond)
continue
}
link := w.linkProvider.Get()
if link.ID == 0 {
logger.Debug("no link to crawl")
time.Sleep(time.Minute)
continue
}
go func(l *models.CrawlLink) {
go w.visit(l)
}(link)
}
}
}
// Stop stops worker.
func (w *defaultWorker) Stop() error {
logger.Info("Stopping crawler worker...")
w.stop <- true
select {
case <-w.flow:
return nil
case <-time.After(2 * time.Minute):
return errors.New("worker did not stopped properly")
}
}
// Restart re-starts worker.
func (w *defaultWorker) Restart() error {
logger.Info("Re-starting crawler worker...")
w.stop <- true
select {
case <-w.flow:
return w.Start()
case <-time.After(2 * time.Minute):
return errors.New("can not restart worker")
}
}
// Status reports current worker status.
func (w *defaultWorker) Status() Status {
return Status{
Running: w.running,
RunningParsersCount: w.runningHandlersCount,
}
}
func (w *defaultWorker) visit(cl *models.CrawlLink) {
w.incrementRunningWorkers()
defer w.decrementRunningWorkers()
if cl == nil {
logger.Warning("[crawler.worker.visit] Can not visit a nil link.")
return
}
if err := cl.LoadFull(); err != nil {
logger.Error("[crawler.worker.visit] Can not load link relations. (%v)", err)
return
}
parser := NewParser(cl)
if parser == nil {
logger.Error("[crawler.worker.visit] Parser instantiation failed.")
return
}
before := time.Now()
if err := parser.Parse(); err != nil {
cl.Error = err.Error()
logger.Error("[crawler.worker.visit] Parser finished with error: %v.", err)
db := database.Open()
if err := db.Save(&cl).Error; err != nil {
logger.Error("[crawler.worker.visit] can not update crawl link. (%v)", err)
}
}
logger.Debug("[crawler.worker.visit] Parsing %q took %s.", cl.URL, time.Since(before))
fmt.Printf("[crawler.worker.visit] Parsing %q took %s.\n", cl.URL, time.Since(before))
}
func (w *defaultWorker) incrementRunningWorkers() {
w.mu.Lock()
w.runningHandlersCount++
w.mu.Unlock()
fmt.Printf("increment called. current: %d\n", w.runningHandlersCount)
}
func (w *defaultWorker) decrementRunningWorkers() {
w.mu.Lock()
w.runningHandlersCount--
w.mu.Unlock()
fmt.Printf("decrement called. current: %d\n", w.runningHandlersCount)
}
输出:
2017/12/03 11:24:36 profile: cpu profiling enabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof
running: 0 limit: 1000
Running server on :8080
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%81%D8%B1%D8%A7%D8%B4%D8%A8%D9%86%D8%AF/%D8%A7%D9%85%D9%84%D8%A7%DA%A9/%D9%81%D8%B1%D9%88%D8%B4-%D8%A7%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88-%D8%AA%D8%AC%D8%A7%D8%B1%DB%8C" took 370.140513ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D8%B3%D8%A7%D9%85%D8%B3%D9%88%D9%86%DA%AF-s3-neo-24252682.html" took 193.193357ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%85%DB%8C%D8%B2%D9%88%D8%B5%D9%86%D8%AF%D9%84%DB%8C-%D8%AA%D8%A7%D9%84%D8%A7%D8%B1-22399505.html" took 201.636741ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/50000%D9%85%D8%AA%D8%B1-%D8%B2%D9%85%DB%8C%D9%86-%D9%85%D8%B1%D8%BA%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88%D8%A7%D9%82%D8%B9-%D8%AF%D8%B1-%D8%AE%D8%B1%D9%85%D8%AF%D8%B1%D9%87-23075331.html" took 210.360596ms.
decrement called. current: 0
^C2017/12/03 11:24:43 profile: caught interrupt, stopping profiles
2017/12/03 11:24:43 profile: cpu profiling disabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof
如您所见,visit 方法完全按顺序运行!我是用 go visit(link) 还是用上面的代码调用它。
为什么会这样?是什么阻止循环迭代?
最佳答案
我会使用 channel 和阻塞功能解决这个问题 - https://play.golang.org/p/KbYOI1oGNs
主要的变化是我们有一个 channel guard,我们在 goroutine 启动时将新项目放在那里(如果大小达到限制它会阻塞),完成后释放。
func (t *test) start() {
maxGoroutines := t.max
guard := make(chan struct{}, maxGoroutines)
for {
guard <- struct{}{}
go func() {
t.visit()
<-guard
}()
}
}
关于go - 并发性:限制 goroutines 没有按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47616537/
我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我好像记得Lua有类似Ruby的method_missing的东西。还是我记错了? 最佳答案 表的metatable的__index和__newindex可以用于与Ruby的method_missing相同的效果。 关于ruby-难道Lua没有和Ruby的method_missing相媲美的东西吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/7732154/
我有一个奇怪的问题:我在rvm上安装了rubyonrails。一切正常,我可以创建项目。但是在我输入“railsnew”时重新启动后,我有“程序'rails'当前未安装。”。SystemUbuntu12.04ruby-v"1.9.3p194"gemlistactionmailer(3.2.5)actionpack(3.2.5)activemodel(3.2.5)activerecord(3.2.5)activeresource(3.2.5)activesupport(3.2.5)arel(3.0.2)builder(3.0.0)bundler(1.1.4)coffee-rails(
我花了三天的时间用头撞墙,试图弄清楚为什么简单的“rake”不能通过我的规范文件。如果您遇到这种情况:任何文件夹路径中都不要有空格!。严重地。事实上,从现在开始,您命名的任何内容都没有空格。这是我的控制台输出:(在/Users/*****/Desktop/LearningRuby/learn_ruby)$rake/Users/*******/Desktop/LearningRuby/learn_ruby/00_hello/hello_spec.rb:116:in`require':cannotloadsuchfile--hello(LoadError) 最佳
我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re
关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion在首页我有:汽车:VolvoSaabMercedesAudistatic_pages_spec.rb中的测试代码:it"shouldhavetherightselect"dovisithome_pathit{shouldhave_select('cars',:options=>['volvo','saab','mercedes','audi'])}end响应是rspec./spec/request
在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo
大家好!我想知道Ruby中未使用语法ClassName.method_name调用的方法是如何工作的。我头脑中的一些是puts、print、gets、chomp。可以在不使用点运算符的情况下调用这些方法。为什么是这样?他们来自哪里?我怎样才能看到这些方法的完整列表? 最佳答案 Kernel中的所有方法都可用于Object类的所有对象或从Object派生的任何类。您可以使用Kernel.instance_methods列出它们。 关于没有类的Ruby方法?,我们在StackOverflow
我真的为这个而疯狂。我一直在搜索答案并尝试我找到的所有内容,包括相关问题和stackoverflow上的答案,但仍然无法正常工作。我正在使用嵌套资源,但无法使表单正常工作。我总是遇到错误,例如没有路线匹配[PUT]"/galleries/1/photos"表格在这里:/galleries/1/photos/1/edit路线.rbresources:galleriesdoresources:photosendresources:galleriesresources:photos照片Controller.rbdefnew@gallery=Gallery.find(params[:galle