草庐IT

go - 同步 worker 以进行递归抓取

coder 2024-07-09 原文

我想用 n 个 worker 实现一个“爬虫”,每个 worker 都可以添加额外的工作。当没有剩下的工作并且所有 worker 都完成了工作时,程序应该停止。

我有以下代码(您可以在 https://play.golang.org/p/_j22p_OfYv 中使用它):

package main

import (
    "fmt"
    "sync"
)

func main() {
    pathChan := make(chan string)
    fileChan := make(chan string)
    workers := 3
    var wg sync.WaitGroup

    paths := map[string][]string{
        "/":     {"/test", "/foo", "a", "b"},
        "/test": {"aa", "bb", "cc"},
        "/foo":  {"/bar", "bbb", "ccc"},
        "/bar":  {"aaaa", "bbbb", "cccc"},
    }

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            for {
                path, ok := <-pathChan
                if !ok {
                    break
                }

                for _, f := range paths[path] {
                    if f[0] == '/' {
                        pathChan <- f
                    } else {
                        fileChan <- f
                    }
                }
            }

            wg.Done()
        }()
    }

    pathChan <- "/"

    for {
        filePath, ok := <-fileChan
        if !ok {
            break
        }

        fmt.Println(filePath)
    }

    wg.Wait()
    close(pathChan)
}

不幸的是,这以死锁告终。问题到底出在哪里?另外,编写此类功能的最佳做法是什么? channel 是正确使用的功能吗?

编辑:

我更新了我的代码以使用两个 WaitGroup ,一个用于作业,一个用于工作人员(参见 https://play.golang.org/p/bueUJzMhqj):

package main

import (
    "fmt"
    "sync"
)

func main() {
    pathChan := make(chan string)
    fileChan := make(chan string)
    jobs := new(sync.WaitGroup)
    workers := new(sync.WaitGroup)
    nworkers := 2

    paths := map[string][]string{
        "/":     {"/test", "/foo", "a", "b"},
        "/test": {"aa", "bb", "cc"},
        "/foo":  {"/bar", "bbb", "ccc"},
        "/bar":  {"aaaa", "bbbb", "cccc"},
    }

    for i := 0; i < nworkers; i++ {
        workers.Add(1)
        go func() {
            defer workers.Done()
            for {
                path, ok := <-pathChan
                if !ok {
                    break
                }

                for _, f := range paths[path] {
                    if f[0] == '/' {
                        jobs.Add(1)
                        pathChan <- f
                    } else {
                        fileChan <- f
                    }
                }

                jobs.Done()
            }

        }()
    }

    jobs.Add(1)
    pathChan <- "/"

    go func() {
        jobs.Wait()
        close(pathChan)
        workers.Wait()
        close(fileChan)
    }()

    for {
        filePath, ok := <-fileChan
        if !ok {
            break
        }

        fmt.Println(filePath)
    }

}

这似乎确实可行,但显然如果将nworkers设置为1,仍然会发生死锁,因为单个worker在向 channel 添加内容时将永远等待路径陈。要解决此问题,可以增加 channel 缓冲区(例如 pathChan := make(chan string, 2)),但这仅在两个缓冲区未完全满时才有效。当然,缓冲区大小可以设置为一个很大的数字,比如 10000,但是代码仍然会遇到死锁。此外,这对我来说似乎不是一个干净的解决方案。

这是我意识到使用某种队列而不是 channel 会更容易的地方,在 channel 中可以添加和删除元素而不会阻塞,并且队列的大小不固定。 Go 标准库中是否存在这样的队列?

最佳答案

如果你想等待任意数量的 worker 完成,标准库包括sync.WaitGroup正是为了这个目的。

还有其他并发问题:

  • 您正在使用 channel 关闭信号,但您有多个 goroutines 在同一个 channel 上发送。这通常是一种不好的做法:因为每个例程永远无法知道其他例程何时完成 channel ,您永远无法正确关闭 channel 。
  • 关闭一个 channel 会等待另一个 channel 先关闭,但它永远不会关闭,因此会出现死锁。
  • 它不会立即死锁的唯一原因是您的示例碰巧拥有比“/”下的目录更多的 worker 。在“/”下再添加两个目录,立即死锁。

有一些解决方案:

  • 转储工作池并为每个子目录旋转一个 goroutine,让调度程序处理其余部分:https://play.golang.org/p/ck2DkNFnyF
  • 每个根级目录使用一个 worker,并让每个 worker 递归地处理其目录,而不是将它找到的子目录排队到一个 channel 。

关于go - 同步 worker 以进行递归抓取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44788548/

有关go - 同步 worker 以进行递归抓取的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby-on-rails - 按天对 Mongoid 对象进行分组 - 2

    在控制台中反复尝试之后,我想到了这种方法,可以按发生日期对类似activerecord的(Mongoid)对象进行分组。我不确定这是完成此任务的最佳方法,但它确实有效。有没有人有更好的建议,或者这是一个很好的方法?#eventsisanarrayofactiverecord-likeobjectsthatincludeatimeattributeevents.map{|event|#converteventsarrayintoanarrayofhasheswiththedayofthemonthandtheevent{:number=>event.time.day,:event=>ev

  3. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  4. ruby - 如何进行排列以有效地定制输出 - 2

    这是一道面试题,我没有答对,但还是很好奇怎么解。你有N个人的大家庭,分别是1,2,3,...,N岁。你想给你的大家庭拍张照片。所有的家庭成员都排成一排。“我是家里的friend,建议家庭成员安排如下:”1岁的家庭成员坐在这一排的最左边。每两个坐在一起的家庭成员的年龄相差不得超过2岁。输入:整数N,1≤N≤55。输出:摄影师可以拍摄的照片数量。示例->输入:4,输出:4符合条件的数组:[1,2,3,4][1,2,4,3][1,3,2,4][1,3,4,2]另一个例子:输入:5输出:6符合条件的数组:[1,2,3,4,5][1,2,3,5,4][1,2,4,3,5][1,2,4,5,3][

  5. ruby - 即使失败也继续进行多主机测试 - 2

    我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r

  6. ruby - 是否可以覆盖 gemfile 进行本地开发? - 2

    我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI

  7. ruby - 在 Windows 机器上使用 Ruby 进行开发是否会适得其反? - 2

    这似乎非常适得其反,因为太多的gem会在window上破裂。我一直在处理很多mysql和ruby​​-mysqlgem问题(gem本身发生段错误,一个名为UnixSocket的类显然在Windows机器上不能正常工作,等等)。我只是在浪费时间吗?我应该转向不同的脚本语言吗? 最佳答案 我在Windows上使用Ruby的经验很少,但是当我开始使用Ruby时,我是在Windows上,我的总体印象是它不是Windows原生系统。因此,在主要使用Windows多年之后,开始使用Ruby促使我切换回原来的系统Unix,这次是Linux。Rub

  8. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  9. ruby - 我需要从 facebook 游戏中抓取数据——使用 ruby - 2

    修改(澄清问题)我已经花了几天时间试图弄清楚如何从Facebook游戏中抓取特定信息;但是,我遇到了一堵又一堵砖墙。据我所知,主要问题如下。我可以使用Chrome的检查元素工具手动查找我需要的html-它似乎位于iframe中。但是,当我尝试抓取该iframe时,它​​是空的(属性除外):如果我使用浏览器的“查看页面源代码”工具,这与我看到的输出相同。我不明白为什么我看不到iframe中的数据。答案不是它是由AJAX之后添加的。(我知道这既是因为“查看页面源代码”可以读取Ajax添加的数据,也是因为我有b/c我一直等到我可以看到数据页面之后才抓取它,但它仍然不存在)。发生这种情况是因为

  10. RUBY - 网页抓取 - (OpenURI::HTTPError) - 2

    我正在尝试用ruby​​编写一个简单的网络抓取代码。它一直工作到第29个url,然后我收到此错误消息:C:/Ruby193/lib/ruby/1.9.1/open-uri.rb:346:in`open_http':500InternalServerError(OpenURI::HTTPError)fromC:/Ruby193/lib/ruby/1.9.1/open-uri.rb:775:in`buffer_open'fromC:/Ruby193/lib/ruby/1.9.1/open-uri.rb:203:in`blockinopen_loop'fromC:/Ruby193/lib/r

随机推荐