草庐IT

Mit6.824 lab1全解析(推导历程+代码)

ligen1112 2023-07-09 原文

0.前言

mit 6.824分布式系统
课程主页

lab1是第一次作业,本菜鸡用了好几天独立完成,经过一次改版优化了数据结构和解决任务元数据并发环境下的data race问题,建议大家做之前有自己独立的思考,有很多可行方案都能完成任务。比如看到有的小伙伴采用master(coordinator)轮询slave(worker)进行交互,我是用slave定时发送请求触发master懒执行大部分任务(后面会聊到原因)。也有的小伙伴用队列增删加锁实现并发安全,本人用的golang自带的channel作为任务队列。不得不感叹人家本科生就有机会学这么有意思的课程,听说lab2更酸爽,后面会接着去冲塔。总之,集中一段时间做这个lab1挺有趣的。

我的代码在这里 :送餐员小李Gitee

1.实验准备

  • MapReduce论文阅读

  • Golang入门:需要掌握 基本语法/IDE配置/数组/面向对象/管道协程,小案例都实现一遍即可,十几个小时够了

    Golang核心-韩顺平

  • 阅读 lab1 note lab1
    看下mrsequential.go的流程,全程一个worker干到底无coordinator,看下map,reduce函数的加载和数据的流向

2 goland配置调试环境

  • windows

windows不支持动态插件编译,硬塞一个linux编译好的so文件也不行,当然可以通过改代码的形式把map reduce 的wordcount函数硬编码进main/worker.go,但不建议这样做后面会很麻烦。

buildmode=plugin not supported on windows/amd64

VMware搞个Ubuntu,然后golang,goland配置好

Ubuntu 16 镜像下载
VMware
VMware如何安装Ubuntu

  • linux

GOPATH选择git clone下来的文件夹6.824

GOROOT是golang的安装位置

wc.so运行前每次都要重新build,因此在写个build-wc.sh脚本放在main路径下面 configuration取名"build wc"

也可以在脚本加一个去掉"mr-"开头文件的句子,把后面频繁调试mr程序生成的结果文件去掉,省的每次运行前都去手删

go build -buildmode=plugin ../mrapps/wc.go
# rm -f mr-*

mrsequential.go 最下面的Before launch点击加号Run Another Configuration添加"build wc",这样每次运行之前都build一下wc.so

  • 至此mrsequential.go 应该可以在goland运行起来了

同理mrworker.go也需要这样配置

mrcoordinator.go 需要将所有mian下面的txt文本加入进去

pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt

2.任务梳理

lab1 note 里面的内容,内容梳理,一开始比较模糊也没关系,做到哪里回看就行

2.1 描述

实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker请求任务,进行运算,写出结果到文件。coordinator需要关心worker的任务是否完成,在超时情况下将任务重新分配给别的worker。

2.2 规则

  • map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()

  • worker需要将第X个reduce task结果放到mr-out-X中。

  • mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了

  • main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。

  • 任务都完成后,worker也得关闭

2.3 提示

  • 一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。

  • map reduce函数都是通过go插件装载 (.so文件)

  • mr/ 文件变了就需要重新build

  • 都在一个文件系统,worker天然实现文件共享,先凑合着起步

  • 中间文件命名 mr-X-Y X是map任务号,y是reduce任务号

  • worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。

  • worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号

  • mrsequential.go 代码可以借鉴

  • coordinator里面的共享数据需要加锁

  • worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下

  • 如果任务重试机制,记得不要生成重复任务

  • mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。

  • 确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名

3.开始做任务

每一步提示思路,是我的心路历程,应该符合大多数人的逻辑

3.1 小目标:coordinator分配好任务worker取到后打印出来

首先目标锁定在coordinator和worker的互动,其他的都不需要考虑,worker打印出来自己要做任务的文件名

  • mrsequential.go 里面逻辑看懂

  • 看懂coordinator和worker的rpc交互流程

    call 函数通过1234端口传入args和reply的内存地址,调用rpcname(Coordinator.函数名),通过反射机制"远程"调用Coordinator的该函数,Coordinator通过内存地址读取入参写出结果。worker.go里面的Worker方法调用CallExample,先运行Coordinator,再运行worker,看看worker端打印返回来的经过Coordinator加工过的数字

  • 至此worker和coordinator可以互动了

  • 发放给worker任务的结构体

    Job类型(枚举常量):JobType,以便worker知道这是Map任务还是Reduce任务。

    InputFile 文件名数组 :map情况数组里面就一个分配给worker的文件(Hadoop里面大文件是需要切块的,但是这里面的文件都很小就不切了直接一个文件给一个worker),reduce情况下是worker需要选取的一些需要聚合到一起的中间文件

    JobId/ReducerNum worker需要知道这些以便生成中间结果文件"mr-tmp-x-y"
    x是jobid,y是经过hash后的reduce id, y用来标识哪些文件汇入同一个reduce

type Job struct {
   JobType    JobType
   InputFile  []string
   JobId      int
   ReducerNum int
   //TmpFileList []string
}
  • 任务存放

放channel里面就行,省得自己写队列的各种方法,还能天然并发安全

channel 是golang特有的类型化消息的队列,可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的通信方式保证了同步性。数据通过通道:同一时间只有一个协程可以访问数据:所以不会出现数据竞争,设计如此。数据的归属(可以读写数据的能力)被传递。

因此Coordinator定义了两个channel来存放做好的map和reduce,jobMetaHolder(元数据管理相关)和CoordinatorCondition(coordinator状态),暂时不用管可以注释掉

type Coordinator struct {
   // channel which hold uncompleted task
   JobChannelMap        chan *Job
   JobChannelReduce     chan *Job
   ReducerNum           int
   MapNum               int
   CoordinatorCondition Condition
   uniqueJobId          int
   jobMetaHolder        JobMetaHolder
}

  • map任务制作
    Coordinator制作map任务,在一开始程序运行的时候就执行
func (c *Coordinator) makeMapJobs(files []string) {
  for _, v := range files {
  	id := c.generateJobId()
  	//fmt.Println("making map job :", id)
  	job := Job{
  		JobType:    MapJob,
  		InputFile:  []string{v},
  		JobId:      id,
  		ReducerNum: c.ReducerNum,
  	}

  //这下面暂时不需要
  	jobMetaINfo := JobMetaInfo{
  		condition: JobWaiting,
  		JobPtr:    &job,
  	}
  	c.jobMetaHolder.putJob(&jobMetaINfo)
  	fmt.Println("making map job :", &job)
  	c.JobChannelMap <- &job
  }
  //这上面暂时不需要
  fmt.Println("done making map jobs")
  c.jobMetaHolder.checkJobDone()
}

照着把那个样例rpc交互函数写个distribute方法,把的coordinate端和worker端入参数据类型分别改下,加入这句话即可将JobChannelMap里面的一个job给reply

 *reply = *<-c.JobChannelMap

worker端对reply取InputFile第一个元素,打印结果如下,worker线程取到任务了

worker get job which is pg-being_ernest.txt
  • 3.1小目标完成

3.2 worker通过传过来的文件名做map任务,写出结果

这个步骤简单粗暴,照着mrsequential.go里面写一下,记得用ihash处理下key分成Nreduce份用json编码后写出到"mr-tmp-x-y"文件。注意mr论文这步是有排序的,因为真正生产活动数据量是非常巨大的,map端提前排序好后,reduce的排序压力会减小很多。这里排不排序无所谓。

  • 3.2小目标完成

3.3 coordinator感知各个job任务运行完毕和map转reduce时机

  • 任务元数据管理

    从coordinator视角看任务分发

    制作任务 -> 放入队列 -> worker来取

    如果worker维护任务的状态显然不合理,每个任务的运行开始时间,任务状态。这些内容worker没必要知晓,是coordinator用来判断任务超时,或者map转reduce的。因此用JobMetaInfo把Job + Job状态包装下

    制作任务 -> 放入JobMetaInfo(元数据) -> 放入队列 -> worker来取

type JobMetaInfo struct {
condition JobCondition
StartTime time.Time
JobPtr *Job
}


 condition job状态: 包含等待,运行,完成
 StartTime : 开始运行的时间(等待变为运行)
 JobPtr  : job内存地址(golang是值传递,用地址更高效)

所有任务用map存储在JobMetaHolder,key是job的唯一id
```go
type JobMetaHolder struct {
  MetaMap map[int]*JobMetaInfo
}

针对这个JobMetaHolder的操作都可以安排上了,比如放入任务,任务状态更新,检查同一阶段任务是否完成

放入任务函数 putJob

 func (j *JobMetaHolder) putJob(JobInfo *JobMetaInfo) bool {
  jobId := JobInfo.JobPtr.JobId
  meta, _ := j.MetaMap[jobId]
  if meta != nil {
  	fmt.Println("meta contains job which id = ", jobId)
  	return false
  } else {
  	j.MetaMap[jobId] = JobInfo
  }
  return true
}

任务发射方法,当channel给出任务后,元数据管理器对任务元数据进行状态变更和运行开始时间记录(后面超时任务有用)。和Kafka的InflightMessage有点像。

func (j *JobMetaHolder) fireTheJob(jobId int) bool {
	ok, jobInfo := j.getJobMetaInfo(jobId)
	if !ok || jobInfo.condition != JobWaiting {
		return false
	}
	jobInfo.condition = JobWorking
	jobInfo.StartTime = time.Now()
	return true
}

检查当前阶段任务是否完成。因为每次制作jobs后,实在加锁情况下一股脑更新到元数据的,因此这边通过遍历先检查reduce完成未完成数量再检查map就能判断两种情况下的
完成情况。每次也会print 任务的数量信息方便调试

func (j *JobMetaHolder) checkJobDone() bool {
  reduceDoneNum := 0
  reduceUndoneNum := 0
  mapDoneNum := 0
  mapUndoneNum := 0
  for _, v := range j.MetaMap {
  	if v.JobPtr.JobType == MapJob {
  		if v.condition == JobDone {
  			mapDoneNum += 1
  		} else {
  			mapUndoneNum++
  		}
  	} else {
  		if v.condition == JobDone {
  			reduceDoneNum++
  		} else {
  			reduceUndoneNum++
  		}
  	}
  }
  fmt.Printf("%d/%d map jobs are done, %d/%d reduce job are done\n",
  	mapDoneNum, mapDoneNum+mapUndoneNum, reduceDoneNum, reduceDoneNum+reduceUndoneNum)

  return (reduceDoneNum > 0 && reduceUndoneNum == 0) || (mapDoneNum > 0 && mapUndoneNum == 0)
}
  • coordinator的状态转换
    每次当channel的长度为0的时候,去checkJobDone检查一下,当这个阶段所有任务都完成以后,进行状态转换并作出相应操作nextPhase()
func (c *Coordinator) nextPhase() {
	if c.CoordinatorCondition == MapPhase {
		c.makeReduceJobs()
		c.CoordinatorCondition = ReducePhase
	} else if c.CoordinatorCondition == ReducePhase {
		c.CoordinatorCondition = AllDone
	}
}
  • 3.3小目标完成

3.4 coordinator 的任务分配函数

  • 任务分配方法
    任务分配方法是coordinator最核心的函数,worker每次来询问都会调用这个方法
func (c *Coordinator) DistributeJob(args *ExampleArgs, reply *Job) error {
	mu.Lock()
	defer mu.Unlock()
	fmt.Println("coordinator get a request from worker :")
	if c.CoordinatorCondition == MapPhase {
		if len(c.JobChannelMap) > 0 {
			*reply = *<-c.JobChannelMap
			if !c.jobMetaHolder.fireTheJob(reply.JobId) {
				fmt.Printf("[duplicated job id]job %d is running\n", reply.JobId)
			}
		} else {
			reply.JobType = WaittingJob
			if c.jobMetaHolder.checkJobDone() {
				c.nextPhase()
			}
			return nil
		}
	} else if c.CoordinatorCondition == ReducePhase {
		if len(c.JobChannelReduce) > 0 {
			*reply = *<-c.JobChannelReduce
			if !c.jobMetaHolder.fireTheJob(reply.JobId) {
				fmt.Printf("job %d is running\n", reply.JobId)
			}
		} else {
			reply.JobType = WaittingJob
			if c.jobMetaHolder.checkJobDone() {
				c.nextPhase()
			}
			return nil
		}
	} else {
		reply.JobType = KillJob
	}
	return nil
}
  • worker完成任务调用的操作
    这里使用了判断元数据的办法看看任务是否会重复完成,在后面的崩溃测试下,worker会失效,coordinator需要重新发放Jobg给另一个worker,加入刚才那个失效的worker恢复了然后写入回来,不能覆盖已经完成的数据(甚至都下一个阶段了)。

    func (c *Coordinator) JobIsDone(args *Job, reply *ExampleReply) error {
      mu.Lock()
      defer mu.Unlock()
      switch args.JobType {
      case MapJob:
      	ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)
      	//prevent a duplicated work which returned from another worker
      	if ok && meta.condition == JobWorking {
      		meta.condition = JobDone
      		fmt.Printf("Map task on %d complete\n", args.JobId)
      	} else {
      		fmt.Println("[duplicated] job done", args.JobId)
      	}
      	break
      case ReduceJob:
      	fmt.Printf("Reduce task on %d complete\n", args.JobId)
      	ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)
      	//prevent a duplicated work which returned from another worker
      	if ok && meta.condition == JobWorking {
      		meta.condition = JobDone
      	} else {
      		fmt.Println("[duplicated] job done", args.JobId)
      	}
      	break
      default:
      	panic("wrong job done")
      }
      return nil
    

``

3.5 加锁

由于我用了channel所以在任务分配队列实现了天然并发安全,但是在别的地方还是遇到了问题,比如Done函数通过mrcoordinator主线程去时不时读取coordinator的状态来判断是否结束死循环。还有在一个worker调coordinator拉取数据的时候,另一个worker调coordinator的checkJobDone()函数进行检查。因此在响应可能发生冲突的地方加锁。

3.6 crash test

这里我是又起了一个线程去检查JobMetaHolder里面超时的任务,具体可以参考代码
但是无法通过test-mr.sh

去看了crash.go和test-mr.sh里面的逻辑,发现起的worker线程太少了,crash.go里面的maybecrash方法很有可能瘫痪这个worker,只起一个甚至三个都会导致最后没有可用worker,即使将任务重新放到channel里面

因此我修改了test-mr.sh,起了更多的worker,通过了测试。

有关Mit6.824 lab1全解析(推导历程+代码)的更多相关文章

  1. Ruby 解析字符串 - 2

    我有一个字符串input="maybe(thisis|thatwas)some((nice|ugly)(day|night)|(strange(weather|time)))"Ruby中解析该字符串的最佳方法是什么?我的意思是脚本应该能够像这样构建句子:maybethisissomeuglynightmaybethatwassomenicenightmaybethiswassomestrangetime等等,你明白了......我应该一个字符一个字符地读取字符串并构建一个带有堆栈的状态机来存储括号值以供以后计算,还是有更好的方法?也许为此目的准备了一个开箱即用的库?

  2. ruby - 如何在 buildr 项目中使用 Ruby 代码? - 2

    如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby​​

  3. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  4. ruby-on-rails - Rails 源代码 : initialize hash in a weird way? - 2

    在rails源中:https://github.com/rails/rails/blob/master/activesupport/lib/active_support/lazy_load_hooks.rb可以看到以下内容@load_hooks=Hash.new{|h,k|h[k]=[]}在IRB中,它只是初始化一个空哈希。和做有什么区别@load_hooks=Hash.new 最佳答案 查看rubydocumentationforHashnew→new_hashclicktotogglesourcenew(obj)→new_has

  5. ruby - 用逗号、双引号和编码解析 csv - 2

    我正在使用ruby​​1.9解析以下带有MacRoman字符的csv文件#encoding:ISO-8859-1#csv_parse.csvName,main-dialogue"Marceu","Giveittohimóhe,hiswife."我做了以下解析。require'csv'input_string=File.read("../csv_parse.rb").force_encoding("ISO-8859-1").encode("UTF-8")#=>"Name,main-dialogue\r\n\"Marceu\",\"Giveittohim\x97he,hiswife.\"\

  6. ruby-on-rails - 浏览 Ruby 源代码 - 2

    我的主要目标是能够完全理解我正在使用的库/gem。我尝试在Github上从头到尾阅读源代码,但这真的很难。我认为更有趣、更温和的踏脚石就是在使用时阅读每个库/gem方法的源代码。例如,我想知道RubyonRails中的redirect_to方法是如何工作的:如何查找redirect_to方法的源代码?我知道在pry中我可以执行类似show-methodmethod的操作,但我如何才能对Rails框架中的方法执行此操作?您对我如何更好地理解Gem及其API有什么建议吗?仅仅阅读源代码似乎真的很难,尤其是对于框架。谢谢! 最佳答案 Ru

  7. ruby - 模块嵌套代码风格偏好 - 2

    我的假设是moduleAmoduleBendend和moduleA::Bend是一样的。我能够从thisblog找到解决方案,thisSOthread和andthisSOthread.为什么以及什么时候应该更喜欢紧凑语法A::B而不是另一个,因为它显然有一个缺点?我有一种直觉,它可能与性能有关,因为在更多命名空间中查找常量需要更多计算。但是我无法通过对普通类进行基准测试来验证这一点。 最佳答案 这两种写作方法经常被混淆。首先要说的是,据我所知,没有可衡量的性能差异。(在下面的书面示例中不断查找)最明显的区别,可能也是最著名的,是你的

  8. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  9. ruby - Net::HTTP 获取源代码和状态 - 2

    我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur

  10. ruby-on-rails - 我更新了 ruby​​ gems,现在到处都收到解析树错误和弃用警告! - 2

    简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und

随机推荐