一开始被长篇的instructions吓到,有些无从下手。Hints里给出了starting point:
• One way to get started is to modify mr/worker.go’s Worker() to send an RPC to the coordinator asking for a task. Then modify the coordinator to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.
Phase 1
Phase 1实现的是三个部分:
- Coordinator initializes map tasks
- Coordinator与Worker间通信
- Workers do map tasks
Challenge: 在实现过程中,困住我的一点是要如何让worker区分task是Map还是Reduce
Solution: 在Task结构体加上标识
Challenge: How to deal with concurrency?
Solution: Use channels in Go
Problem: rpc: gob error encoding body: gob: type mr.Response has no exported fields
Cause: In Go, fields and variables that start with an Uppercase letter are “Exported”, and are visible to other packages. Fields that start with a lowercase letter are “unexported”, and are only visible inside their own package.
Solution: need to capitalize all fields in structs
把DoMapTask功能实现后,跑了一下coordinator和worker,call succeeded!并且输出了一系列intermediate files。打开其中一个文件,可以看到key-value pairs是按照hashed key分到不同文件的,说明同一个key只会存在于一个文件里。但一个文件里,相同的key并没有group到一起。
Phase 2
Phase 2实现的是
- Coordinator initializes reduce tasks
- Workers do reduce tasks
有了Phase 1的基础,这部分就好做很多。
Coordinator makes reduce tasks是需要读取我们在phase 1中输出的中间文件的。网上搜了下读取文件的方法,我用到的是ioutil.ReadDir()
读取指定目录下的所有文件,filepath.Match(pattern, file.Name())
筛选出符合pattern的文件。
Worker does reduce tasks的实现直接参考mrsequential.go
❓How to combine keys across different reduce workers? Say reduce worker 1 has <key 1, value 1> and reduce worker 2 has <key 1, value 2>, how to make sure two intermediate pairs with the same key1 go to one output file?
Answer: Sort by key, and use two pointers to check the range of pairs with the same key. See mrsequential.go
reduce part.
下面是输出文件:
Phase 3
Phase 3实现的是
- Add Map and Reduce together
- Coordinator should know the current processing state
这个部分是我耗时最多的,也是涉及到Concurrency,比较重要的一部分。
明确状态
一共有几个state?
- There are map tasks left to be assigned
- All map tasks are being assigned to workers, and need to wait all map tasks to finish to start reduce tasks
- All map tasks are finished, and can start reduce tasks
- There are reduce tasks left to be assigned
- All reduce tasks are being assigned to workers, and need to wait all reduce tasks to finish
- All reduce tasks are finished, and can exit the program
Coordinator:State转换,并准备对应state下的task
- 传给worker map task
- 告知worker等待,这里我是通过传给worker一个TaskType为Waiting的task来实现的
- 转换到下一个state,也就是Reduce state,以initialize reduce tasks
- 传给worker reduce task
- 告知worker等待,同2
- 转换到下一个state,也就是Finished state
Worker:在不同state下做不同的事
- worker gets a map task and does map task
- worker should wait,这里是通过
time.Sleep()
实现的 - 跳过,worker不会知道这个state
- worker gets a reduce task and does reduce task
- worker should wait,同2
- worker exits the program
❓Coordinator如何知道worker做完了task?
Worker需要告知Coordinator Task已完成。这里需要给Task结构体添加一个State,State可能的取值有Unassigned, Processing, Finished。当task已完成时,coordinator需要将这个task的state从Processing变为Finished。这个State会在判断tasks是否全部完成的时候用到。
下面附上Coordinator这部分的代码。
Crash处理
❓Coordinator如何知道worker is dead/timeout?如何实现?
在一个新thread里每隔一段时间查看正在处理中的task是否超时。创建新thread用到的是gorountine
。判断超时是通过给Task结构体加一个StartTime,在分配给worker的时候,给StartTime附上当前的时间戳。若当前时间 - StartTime > 10 seconds时,说明task超时,需要将其重新分配给新的worker。
❓因为有dead worker存在的可能性,所以map worker在写入intermediate文件和reduce worker在写入最终输出文件的中途可能crash,从而导致文件无效。如何处理这种情况?
Instructions中的Hints给出了解决方法。一开始先用os.CreateTemp
写进temporary文件,当确定文件完整正确的时候,再用os.Rename
将temporary文件重命名成期望的文件名。
Data race
运行bash test-mr.sh
之前,要把文件里的RACE=-race
uncomment以开启race detector。
犯错点1
一开始没弄清是write加锁还是read & write都需要加锁,所以在这里也花了不少时间debug。
答案是每次read & write shared resources时都需要先加锁,比如在查看是否有超时task时,读取coordinator当前的状态和coordinator所拥有全部tasks前需要加锁。如果一不小心有代码忘加了锁也没关系,后来才发现这部分还是很好debug的,直接看error output,在第一个代码出错的地方加上锁就好了。比如下图为detect到的某个data race。
通过error output发现race发生在coordinator.go
的第77行代码处。在77行处加上锁就可以了。
犯错点2
在加锁的试验过程中,有几次运行时发现代码一直在跑不退出,worker也没有收到coordinator发送的任务。一开始我以为coordinator.go
的GetTask()里不能加锁,后来发现其实是有地方加了锁但是没有正确解锁导致的。
最终结果
总结
做这个实验的时候意识到实践是非常必要的,在学习MapReduce理论知识的时候,以为自己理解了,但在做实验的过程中还是出现了因为认知不到位、理解不透彻的缘故,导致写代码没思路,好多细节不知道如何处理,以及最后read & write加锁的问题。以后要多动手、多动手!
总耗时:两周😭