我正在尝试在 Golang 中实现类似 mapreduce 的方法。我的设计如下:
Map worker 从 mapper 输入 channel 中提取项目并输出到 mapper 输出 channel
映射器输出 channel 然后由单个 goroutine 读取。这个例程维护一个以前见过的键值对的映射。如果映射器输出的下一项具有匹配键,它会将具有匹配键的新值和旧值发送到归约输入 channel 。
这导致映射器输出和归约输入之间的循环依赖,我现在不知道如何发出映射器输出完成的信号(并关闭 channel )。
打破这种循环依赖或知道何时关闭具有这种循环行为的 channel 的最佳方法是什么?
下面的代码有一个死锁,map 输出 channel 和 reduce 输入 channel 相互等待。
type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
go func() {
for v := range input {
inputMapChan <- v
}
close(inputMapChan)
}()
for i := 0; i < nMappers; i++ {
go func() {
for v := range inputMapChan {
k, v := mapFn(v)
outputMapChan <- &kvPair{k, v}
}
}()
}
for i := 0; i < nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan <- &kvPair{v.k, reduceValue}
}
}()
}
for v := range outputMapChan {
key := v.k
value := v.v
other, ok := outputMapMap[key]
if ok {
delete(outputMapMap, key)
reduceInputChan <- &reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
return outputMapMap, nil
}
最佳答案
试试这个:
package main
import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"
type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for _, v := range input {
inputMapChan <- v
}
close(inputMapChan)
}()
for i := 0; i < nMappers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range inputMapChan {
outputMapChan <- mapFn(v)
}
}()
}
finished := false
go func() {
wg.Wait()
finished = true
}()
var count int64
for i := 0; i < nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan <- &kvPair{v.k, reduceValue}
atomic.AddInt64(&count, -1)
}
}()
}
wg2 := sync.WaitGroup{}
wg2.Add(1)
go func() {
defer wg2.Done()
for {
select {
default:
if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
return
}
//runtime.Gosched()
case v := <-outputMapChan:
key := v.k
value := v.v
if other, ok := outputMapMap[key]; ok {
delete(outputMapMap, key)
atomic.AddInt64(&count, 1)
reduceInputChan <- &reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
}
}()
wg2.Wait()
return outputMapMap, nil
}
func main() {
fmt.Println("NumCPU =", runtime.NumCPU())
t := time.Now()
a := rand.Perm(1000000)
//a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
m, err := MapReduce(mp, rdc, a, 2, 2)
if err != nil {
panic(err)
}
fmt.Println(time.Since(t)) //883ms
fmt.Println(m)
fmt.Println("done.")
}
func mp(input int) *kvPair {
return &kvPair{input & 7, input >> 3}
}
func rdc(a int, b int) int {
b <<= 3
if a != 0 {
b |= a
}
return b
}
关于go - 关闭具有循环依赖性的 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39069854/
我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..
我脑子里浮现出一些关于一种新编程语言的想法,所以我想我会尝试实现它。一位friend建议我尝试使用Treetop(Rubygem)来创建一个解析器。Treetop的文档很少,我以前从未做过这种事情。我的解析器表现得好像有一个无限循环,但没有堆栈跟踪;事实证明很难追踪到。有人可以指出入门级解析/AST指南的方向吗?我真的需要一些列出规则、常见用法等的东西来使用像Treetop这样的工具。我的语法分析器在GitHub上,以防有人希望帮助我改进它。class{initialize=lambda(name){receiver.name=name}greet=lambda{IO.puts("He
我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代
我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
我正在使用Rails3.1并在一个论坛上工作。我有一个名为Topic的模型,每个模型都有许多Post。当用户创建新主题时,他们也应该创建第一个Post。但是,我不确定如何以相同的形式执行此操作。这是我的代码:classTopic:destroyaccepts_nested_attributes_for:postsvalidates_presence_of:titleendclassPost...但这似乎不起作用。有什么想法吗?谢谢! 最佳答案 @Pablo的回答似乎有你需要的一切。但更具体地说...首先改变你View中的这一行对此#
下面的代码在我第一次运行它时就可以正常工作:require'rubygems'require'spreadsheet'book=Spreadsheet.open'/Users/me/myruby/Mywks.xls'sheet=book.worksheet0row=sheet.row(1)putsrow[1]book.write'/Users/me/myruby/Mywks.xls'当我再次运行它时,我会收到更多消息,例如:/Library/Ruby/Gems/1.8/gems/spreadsheet-0.6.5.9/lib/spreadsheet/excel/reader.rb:11
我从用户Hirolau那里找到了这段代码:defsum_to_n?(a,n)a.combination(2).find{|x,y|x+y==n}enda=[1,2,3,4,5]sum_to_n?(a,9)#=>[4,5]sum_to_n?(a,11)#=>nil我如何知道何时可以将两个参数发送到预定义方法(如find)?我不清楚,因为有时它不起作用。这是重新定义的东西吗? 最佳答案 如果您查看Enumerable#find的文档,您会发现它只接受一个block参数。您可以将它发送两次的原因是因为Ruby可以方便地让您根据它的“并行赋
RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)
我是Ruby的新手,有些闭包逻辑让我感到困惑。考虑这段代码:array=[]foriin(1..5)array[5,5,5,5,5]这对我来说很有意义,因为i被绑定(bind)在循环之外,所以每次循环都会捕获相同的变量。使用每个block可以解决这个问题对我来说也很有意义:array=[](1..5).each{|i|array[1,2,3,4,5]...因为现在每次通过时都单独声明i。但现在我迷路了:为什么我不能通过引入一个中间变量来修复它?array=[]foriin1..5j=iarray[5,5,5,5,5]因为j每次循环都是新的,我认为每次循环都会捕获不同的变量。例如,这绝对