MIT 6.5840 Lab 1 MapReduce

一开始被长篇的instructions吓到,有些无从下手。Hints里给出了starting point:

• One way to get started is to modify mr/worker.go’s Worker() to send an RPC to the coordinator asking for a task. Then modify the coordinator to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.

Phase 1

Phase 1实现的是三个部分:

  1. Coordinator initializes map tasks
  2. Coordinator与Worker间通信
  3. Workers do map tasks

Challenge: 在实现过程中,困住我的一点是要如何让worker区分task是Map还是Reduce
Solution: 在Task结构体加上标识

Challenge: How to deal with concurrency?
Solution: Use channels in Go

Problem: rpc: gob error encoding body: gob: type mr.Response has no exported fields
Cause: In Go, fields and variables that start with an Uppercase letter are “Exported”, and are visible to other packages. Fields that start with a lowercase letter are “unexported”, and are only visible inside their own package.
Solution: need to capitalize all fields in structs

把DoMapTask功能实现后,跑了一下coordinator和worker,call succeeded!并且输出了一系列intermediate files。打开其中一个文件,可以看到key-value pairs是按照hashed key分到不同文件的,说明同一个key只会存在于一个文件里。但一个文件里,相同的key并没有group到一起。

MapReduce-1
MapReduce-2

Phase 2

Phase 2实现的是

  1. Coordinator initializes reduce tasks
  2. Workers do reduce tasks

有了Phase 1的基础,这部分就好做很多。

Coordinator makes reduce tasks是需要读取我们在phase 1中输出的中间文件的。网上搜了下读取文件的方法,我用到的是ioutil.ReadDir()读取指定目录下的所有文件,filepath.Match(pattern, file.Name())筛选出符合pattern的文件。

Worker does reduce tasks的实现直接参考mrsequential.go

❓How to combine keys across different reduce workers? Say reduce worker 1 has <key 1, value 1> and reduce worker 2 has <key 1, value 2>, how to make sure two intermediate pairs with the same key1 go to one output file?
Answer: Sort by key, and use two pointers to check the range of pairs with the same key. See mrsequential.go reduce part.

下面是输出文件:
MapReduce-3

Phase 3

Phase 3实现的是

  1. Add Map and Reduce together
  2. Coordinator should know the current processing state

这个部分是我耗时最多的,也是涉及到Concurrency,比较重要的一部分。

明确状态

一共有几个state?

  1. There are map tasks left to be assigned
  2. All map tasks are being assigned to workers, and need to wait all map tasks to finish to start reduce tasks
  3. All map tasks are finished, and can start reduce tasks
  4. There are reduce tasks left to be assigned
  5. All reduce tasks are being assigned to workers, and need to wait all reduce tasks to finish
  6. All reduce tasks are finished, and can exit the program

Coordinator:State转换,并准备对应state下的task

  1. 传给worker map task
  2. 告知worker等待,这里我是通过传给worker一个TaskType为Waiting的task来实现的
  3. 转换到下一个state,也就是Reduce state,以initialize reduce tasks
  4. 传给worker reduce task
  5. 告知worker等待,同2
  6. 转换到下一个state,也就是Finished state

Worker:在不同state下做不同的事

  1. worker gets a map task and does map task
  2. worker should wait,这里是通过time.Sleep()实现的
  3. 跳过,worker不会知道这个state
  4. worker gets a reduce task and does reduce task
  5. worker should wait,同2
  6. worker exits the program

❓Coordinator如何知道worker做完了task?
Worker需要告知Coordinator Task已完成。这里需要给Task结构体添加一个State,State可能的取值有Unassigned, Processing, Finished。当task已完成时,coordinator需要将这个task的state从Processing变为Finished。这个State会在判断tasks是否全部完成的时候用到。

下面附上Coordinator这部分的代码。
MapReduce-6

Crash处理

❓Coordinator如何知道worker is dead/timeout?如何实现?
在一个新thread里每隔一段时间查看正在处理中的task是否超时。创建新thread用到的是gorountine。判断超时是通过给Task结构体加一个StartTime,在分配给worker的时候,给StartTime附上当前的时间戳。若当前时间 - StartTime > 10 seconds时,说明task超时,需要将其重新分配给新的worker。

❓因为有dead worker存在的可能性,所以map worker在写入intermediate文件和reduce worker在写入最终输出文件的中途可能crash,从而导致文件无效。如何处理这种情况?
Instructions中的Hints给出了解决方法。一开始先用os.CreateTemp写进temporary文件,当确定文件完整正确的时候,再用os.Rename将temporary文件重命名成期望的文件名。

Data race

运行bash test-mr.sh之前,要把文件里的RACE=-race uncomment以开启race detector。

犯错点1
一开始没弄清是write加锁还是read & write都需要加锁,所以在这里也花了不少时间debug。

答案是每次read & write shared resources时都需要先加锁,比如在查看是否有超时task时,读取coordinator当前的状态和coordinator所拥有全部tasks前需要加锁。如果一不小心有代码忘加了锁也没关系,后来才发现这部分还是很好debug的,直接看error output,在第一个代码出错的地方加上锁就好了。比如下图为detect到的某个data race。

MapReduce-4

通过error output发现race发生在coordinator.go的第77行代码处。在77行处加上锁就可以了。

犯错点2
在加锁的试验过程中,有几次运行时发现代码一直在跑不退出,worker也没有收到coordinator发送的任务。一开始我以为coordinator.go的GetTask()里不能加锁,后来发现其实是有地方加了锁但是没有正确解锁导致的。

最终结果

MapReduce-5

总结

做这个实验的时候意识到实践是非常必要的,在学习MapReduce理论知识的时候,以为自己理解了,但在做实验的过程中还是出现了因为认知不到位、理解不透彻的缘故,导致写代码没思路,好多细节不知道如何处理,以及最后read & write加锁的问题。以后要多动手、多动手!

总耗时:两周😭