草庐IT

concurrency - Go lang关闭管道死锁

coder 2023-06-28 原文

我在用Go语言做一个数据导入的工作,我想把每一步都写成一个闭包,用channels进行通信,即每一步都是并发的。问题可以通过以下结构定义。

  1. 从数据源获取Widgets
    1. 将源 1 的翻译添加到小部件
    2. 将来源 2 的翻译添加到小部件
    3. 将源 1 中的定价添加到小部件
    4. WidgetRevisions 添加到 Widget
      1. 将来源 1 的翻译添加到 WidgetRevisions
      2. 将来源 2 的翻译添加到 WidgetRevisions

出于这个问题的目的,我只处理必须在新的 Widget 上执行的前三个步骤。在此基础上,我假设第四步可以作为一个流水线步骤来实现,它本身是根据一个子三步流水线来实现的,以控制 *WidgetRevision*s

为此,我编写了一小段代码来提供以下 API:

// A Pipeline is just a list of closures, and a smart 
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()

// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)

// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)

我已经实现了它(代码在 GistGo Playground )但它陷入僵局,成功 失败率 100%

当调用 p.Execute() 时出现死锁,因为大概其中一个 channel 最终无事可做,没有任何事情被发送,也没有工作可做。 .

将几行调试输出添加到 emit()drain(),我看到以下输出,我相信闭包调用之间的流水线是正确的,我看到一些小部件被遗漏了。

Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000

以下是我对这种方法的一些了解:

  • 我相信这种设计“饿死”一个或另一个协程的情况并不少见,我相信这就是死锁的原因
  • 如果管道首先将东西输入其中,我更愿意(API 将实现 Pipeline.Process(*Widget)
    • 如果我能做到这一点,drain 可能是一个“步骤”,它只是没有将任何东西传递给下一个函数,这可能是一个更干净的 API
  • 我知道我没有实现任何类型的梯级缓冲区,所以我完全有可能让机器的可用内存重载
  • 我真的不相信这是好的 Go 风格......但它似乎利用了很多 Go 功能,但这并不是真正的好处
  • 因为 WidgetRevisions 也需要一个管道,我想让我的管道更通用,也许 interface{} 类型是解决方案,我不知道去够不够确定是否明智。
  • 有人建议我考虑实现互斥锁来防止竞争条件,但我相信我会保留,因为每个闭包都会在 Widget 结构的一个特定单元上运行,但是我很乐意接受教育关于那个话题。

总结:我该如何修复这段代码,应该修复这段代码,如果你是比我更有经验的 go 程序员,你会如何解决这个问题“顺序工作单元”问题?

最佳答案

我只是认为我不会构建远离 channel 的抽象。显式管道。

您可以很容易地为所有实际的管道操作创建一个函数,看起来像这样:

type StageMangler func(*Widget)

func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
    for widget := range chi {
                f(widget)
                cho <- widget
    }
    close(cho)
}

然后你可以传入func(w *Widget) { w.Whiz = true} 或类似于stage builder。

此时您的 add 可以收集这些及其 worker 数量,因此特定阶段可以更轻松地拥有 n 个 worker 。

我只是不确定这比直接将 channel 拼接在一起更容易,除非您在运行时构建这些管道。

关于concurrency - Go lang关闭管道死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13810261/

有关concurrency - Go lang关闭管道死锁的更多相关文章

  1. ruby - 如何关闭 ruby​​ gem "Spreadsheet?"中的文件 - 2

    下面的代码在我第一次运行它时就可以正常工作:require'rubygems'require'spreadsheet'book=Spreadsheet.open'/Users/me/myruby/Mywks.xls'sheet=book.worksheet0row=sheet.row(1)putsrow[1]book.write'/Users/me/myruby/Mywks.xls'当我再次运行它时,我会收到更多消息,例如:/Library/Ruby/Gems/1.8/gems/spreadsheet-0.6.5.9/lib/spreadsheet/excel/reader.rb:11

  2. ruby-on-rails - Ruby 的 'open_uri' 是否在读取或失败后可靠地关闭套接字? - 2

    一段时间以来,我一直在使用open_uri下拉ftp路径作为数据源,但突然发现我几乎连续不断地收到“530抱歉,允许的最大客户端数(95)已经连接。”我不确定我的代码是否有问题,或者是否是其他人在访问服务器,不幸的是,我无法真正确定谁有问题。本质上,我正在读取FTPURI:defself.read_uri(uri)beginuri=open(uri).readuri=="Error"?nil:urirescueOpenURI::HTTPErrornilendend我猜我需要在这里添加一些额外的错误处理代码...我想确保我采取一切预防措施来关闭所有连接,这样我的连接就不是问题所在,但是我

  3. ruby - Faye WebSocket,关闭处理程序被触发后重新连接到套接字 - 2

    我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d

  4. ruby-on-rails - Assets 管道损坏 : Not compiling on the fly css and js files - 2

    我开始了一个新的Rails3.2.5项目,Assets管道不再工作了。CSS和Javascript文件不再编译。这是尝试生成Assets时日志的输出:StartedGET"/assets/application.css?body=1"for127.0.0.1at2012-06-1623:59:11-0700Servedasset/application.css-200OK(0ms)[2012-06-1623:59:11]ERRORNoMethodError:undefinedmethod`each'fornil:NilClass/Users/greg/.rbenv/versions/1

  5. ruby - 如何在 watir 测试套件结束时关闭浏览器? - 2

    使用ruby​​的watir测试网络应用程序时,浏览器最后会保持打开状态。网上的一些建议是,要进行真正的单元测试,您应该在每次测试时(在拆卸调用中)打开和关闭浏览器,但这很慢而且毫无意义。或者他们做这样的事情:defself.suites=superdefs.afterClass#Closebrowserenddefs.run(*args)superafterClassendsend但这会导致摘要输出不再显示(诸如“100次测试、100次断言、0次失败、0次错误”之类的内容仍应显示)。我怎样才能让ruby​​或watir在我的测试结束时关闭浏览器? 最佳答案

  6. ruby-on-rails - 如何在一段时间后关闭 Rails 闪现消息? - 2

    我想设置秒数aflash在自动关闭之前向用户显示通知。 最佳答案 您可以在页面上使用一些简单的JavaScript(在此示例中使用jQuery):$('document').ready(function(){setTimeout(function(){$('#flash').slideUp();},3000);});假设保存您的flash消息的HTML元素的id是#flash,这将向上滑动并在3000毫秒(3秒)后将其隐藏。 关于ruby-on-rails-如何在一段时间后关闭Rails

  7. ruby-on-rails - 如何在关闭 cache_classes 的情况下使用来自中间件的域对象? - 2

    在rails开发环境中,cache_classes是关闭的,所以你可以修改app/下的代码,不用重启服务器就可以看到变化。不过,在所有环境中,中间件只会创建一次。所以如果我有这样的中间件:classMyMiddlewaredefinitialize(app)@app=appenddefcall(env)env['model']=MyModel.firstendend我在config/environments/development.rb中执行此操作:config.cache_classes=false#thedefaultfordevelopmentconfig.middleware.

  8. ruby - 重新连接 tcpsocket(或如何检测已关闭的套接字) - 2

    我有一个连接到服务器的ruby​​tcpsocket客户端。在发送数据之前如何检查套接字是否已连接?我是否尝试“拯救”断开连接的tcpsocket,重新连接然后重新发送?如果是这样,有没有人有一个简单的代码示例,因为我不知道从哪里开始:(我很自豪我设法在rails中获得了一个持久连接的客户端tcpsocket。然后服务器决定杀死客户端,一切都崩溃了;)编辑我已经使用此代码解决了一些问题-如果未连接,它将尝试重新连接,但如果服务器已关闭则不会处理这种情况(它将继续重试)。这是正确方法的开始吗?谢谢defself.write(data)begin@@my_connection.write(

  9. ruby-on-rails - 我将 Rails3 与 tinymce 一起使用。如何呈现用户关闭浏览器javascript然后输入xss? - 2

    我有一个用Rails3编写的站点。我的帖子模型有一个名为“内容”的文本列。在帖子面板中,html表单使用tinymce将“content”列设置为textarea字段。在首页,因为使用了tinymce,post.html.erb的代码需要用这样的原始方法来实现。.好的,现在如果我关闭浏览器javascript,这个文本区域可以在没有tinymce的情况下输入,也许用户会输入任何xss,比如alert('xss');.我的前台会显示那个警告框。我尝试sanitize(@post.content)在posts_controller中,但sanitize方法将相互过滤tinymce样式。例如

  10. ruby-on-rails - Rails add_index 算法 : :concurrently still causes database lock up during migration - 2

    为了防止在迁移到生产站点期间出现数据库事务错误,我们遵循了https://github.com/LendingHome/zero_downtime_migrations中列出的建议。(具体由https://robots.thoughtbot.com/how-to-create-postgres-indexes-concurrently-in概述),但在特别大的表上创建索引期间,即使是索引创建的“并发”方法也会锁定表并导致该表上的任何ActiveRecord创建或更新导致各自的事务失败有PG::InFailedSqlTransaction异常。下面是我们运行Rails4.2(使用Acti

随机推荐