背景: MapReduce是谷歌于2004年提出的一种用于并行处理海量数据的算法模型。 MapReduce、 GFS、和Bigtable被成为谷歌分布式体系的”三驾马车”, 一起开启了工业化的大数据时代,。虽然谷歌公开了MapReduce的论文, 但其间的配图和完结细节仍是有些模糊和懊恼, 目前也缺少具体介绍怎么复现的文章。这篇文章致力于 MapReduce 的最具体解说与算法复现。看完本文章你将有才能独立完结 MIT 6.8214(6.5804) Lab1。 期望你提前看过mapreduce论文和MIT 6.824(6.5804) lab 1, 这样了解起来愈加顺利。
MapReduce 微观介绍
依据MapReduce论文的摘要, MapReduce是一种用于处理和生成大规模数据的一种算法模型。关于输入数据, 用户指定一种 MapFunc 则能够生成一个个键值对(称作 intermediate) , 这一进程被叫做 Map。关于Intermediate, 用户指定一种 Reduce Function,则能够依据Intermediate 的 key 对Intermediate 进行兼并归类, 这一进程被叫做 Reduce。
依照MapReduce模型完结的程序, 天然地就能够并行地跑在不同的机器上(原文中称作商业服务器集群)。MapReduce对大规模数据并行化处理的一系列要害性问题提出了解决方案, 包含怎么处理输入信息,怎么将一个总使命拆分红若干子使命, 怎么向集群的机器分发使命, 怎么对集群中的机器容错处理等。即便程序员曾经没有分布式体系的相关经验,只需完结了MapReduce算法, 就能够使用起集群进行大规模数据处理。
MapReduce 应用实例
MapReduce 一个最常见单的应用子是word count, 也便是给定一系列文本文件, 然后计算出每一个单词的数量。出于尽或许具体解说的意图, 我将直接采用分布式完结的示意图, 而不是单机版的完结方式。简略来说, MIT 6.824 Lab 1 的完结逻辑便是对这张示意图的代码翻译。
一个抱负的, 完整的使用MapReduce完结word count使命, 履行次序如下:
- 初始化 coordinator(或许称为master), 读取输入文件
- master 将输入文件转化成 map task
- master 将准备好的 map task 分配给各个 mapper (需求收到 mapper 恳求 map task)
- 各个mapper依据输入文件, 将文本内容拆分红一个个 key-val 键值对, key 为 word, val 恒为1, 键值对列表称作 intermediate
- mapper 将 intermediate 分红NReduce(reducer个数) 份, 拆分规则为对 key 取hash再对NReduce取模
- mapper 将 拆分后的 intermediate 写入暂时途径, 并将暂时途径奉告给master, 表明 map task 使命完结
- master 校验 mapper 的恳求, 假如恳求合法则将 intermediate 暂时途径的文件写入终究途径
- master 假如承认一切 map task 完结, 则依据 intermediate 终究途径生成 reduce task
- master 将准备好的 reduce task 分配给各个 reducer (需求收到 reducer 恳求 reducer task)
- reducer 收到 reduce task 后, 第一步便是搜集各个 mapper 生成的 intermediate
- reducer 随后对 搜集来的 intermediate 依照 key 进行排序
- reducer 对排序后的 intermediate 进行 reduce 操作, 也便是计算出 key 相同的键值对个数并将其作为新的 val
- reducer 将 reduce result 写入暂时途径, 并将暂时途径奉告给master, 表明 reduce task 使命完结
- master 校验 reducer 的恳求, 假如恳求合法则将 reduce task 暂时途径的文件写入终究途径
- master 假如承认一切 reduce task 完结, 则完毕程序
这和论文里边的插图略有不同,其首要区别在于论文里边更重视微观的 MapReduce 思维: 依据输入生成 intermediate, mapper 奉告 master intermediate 方位, master 奉告 reducer intermediate 方位, 由 reducer 自行去 处理 intermediate 。而我上面的示意图特指 word count 使命, 疏忽了一些不重要的细节比方 user program(用于初始化worker, master, 指定使命类型之类的功用), 并且补充了补充了许多细节, 首要有以下部分:
- 对intermediate 和 reduce 成果文件进行分批次处理, 划分出暂时途径和终究途径, 相当于 git add 和 git commit 的。 只需当master 判别使命的确完结之后, 才会将intermediate 和 reducer生成的 result 文件写入 终究途径。
- 清晰 intermediate 将会依照 hash(key) mod NReducer (reducer 的并发实例) 进行拆分, 其意图在于尽或许随机公平地分发给不同的 reducer, 当然你也能够 取 val 的hash值 这都不重要。
- 清晰 一个输入文件对应 一个 mapper, 其意图在于简略处理输入, 假如你不嫌麻烦, 也能够把一切输入文件中的每一个单词读取出来然后汇总,再把汇总想办法均匀地 随机地拆成 N(mapper并发实例) 份。
- 清晰了 map 操作,即将文本拆分红 intermediate, 其间key 为单词, word 恒为1, 能够了解为一个单词, 一个 key-val 对。清晰了一个 reduce 操作, 关于 key 相同的 intermediate, 计算其个数作为 val。假如只需一个 reducer, 那么终究这个 reducer的输出便是 输入文件里一切单词的 词频计算。
上面的示意图出于简练的意图, 我只画了 mapper 1 与 reducer 1 与 master 的通讯进程。事实上 mapper 2 和 reducer 2 也是相同的, 也有 ReqTask, MapTask Done, ReduceTask Done 的进程, 只不过图片上不好展现, 故此省掉。
MapReduce 完结要害
为什么说上面的过程是一个抱负的次序呢? 由于这个过程有许多要害性的问题没有考虑到, 我将其分为下面几大类。
怎么容错
- 假如 worker(mapper 或 reducer )履行到某一步后报错,worker 怎么处理, 从头履行一遍, 仍是放弃当时使命, 从头恳求新 task 直接履行 新task? 要不要自动奉告 master? 中途生成的中心文件怎么处理。
- master 假如发现某一个 task 超时了, 怎么处理?
- master 假如发现某一个 task 被重复履行了屡次, 怎么处理?
怎么操控 worker 并行数量
- 上面的示意图我将 mapper 和 worker 部分进行了区别, 这是契合直观逻辑的。但在实践机器中这是效率低下的, 比方示意图中有两个 mapper 和 三个 worker。难道我就真的要发动两个进程(或许两台机器)去履行 map task, 3个进程(或许3台机器)去履行 reduce task 了吗? 或许这整个 task 关于 reduce 来说比较重, 我想要加到4个 reducer 难道要手动去加4个进程或是4台机器吗? 这显然是不行承受的。
数据竞赛
- 一个 MapReduce 模型中 master 与 worker (mapper 或是 reducer) 存在一对多的关系, 当多个 worker 一起恳求 master 时,或许会造成 master 内部数据 读写抵触, 这时怎么处理?
MapReduce 完结思路
环绕这些要害问题, 这儿给出我的解决思路
容错处理
- worker 只需没有收到悉数 task 完结的信号, 就会一向 for 循环, 不断地向 master 恳求 task, 履行task。worker 如哦履行进程中捕获到 error 无需向 master 进行报告, 直接进入下一个 循环恳求新 task。 当然, worker 也能够自意向 master 上报过错, 但这回消耗一次恳求, 而且 master 自身会监控 task 状况(下面会讲), 如此则显得剩余。
- master 不断监控 task 履行状况, 若发现 task 超时, 则会从头分配给 worker 履行,所以此处应该有一个 task maintainer, 用于保护 task 状况(如 task 类型, 履行时间, task Id 等)
- 为了避免一个 task 被履行屡次, 能够用一个 taskId 符号这个 task, taskId 是一个递加的 counter, 每分配给 worker 一次就会 加1, taskId会被 master的 task maintainer 所记载, 也会被 送到 worker。 当 worker 恳求 master 完结 task今后, master 会对 worker 传回来的 taskId 进行校验, 假如 worker 传回来的 taskId 在 master task maintainer 里边找不到, 则阐明这个 task 现已超时被 再次分配出去(再次分配 taskId 就会递加所以找不到匹配)。此时 master 会认为 worker 恳求 完结的 task 非法, 由于 taskId 不匹配。worker收到 使命完结失利的信号后不做任何处理, 直接进入下一个循环, 再次向 master 恳求 task, 履行 task, 向master汇报完结 task。
worker 并行操控
- 不对 worker 做严格区别, 也便是 worker 到底是 mapper 仍是 reducer, 去决议当时 task 的状况。若 master 回来的是 map task, 则这个 worker 便是一个 mapper。 若 master 回来的是 reduce task, 则这个 worker 便是一个 reducer。 只需当悉数 map task 完结今后, master 才会生成 reduce task。mapper 或许 reducer。这种思路也是MIT 6.824 Lab1 所推荐的, 其间的 test-mr.sh 直接发动 多个worker进程, 而没有分隔 mapper 和 reducer。
- mapper 和 reducer 数量由 用户程序确定(也便是论文中的 user program)。 mapper 或许 reducer 实践工作的 并行数由 物理机器所决议实。 比方你在 userprogram 里边指定我要有10个 mapper , 10 个 reducer。你在你的机器上也的确开了10个进程, 但你的机器CPU一共只需4个核心, 这就意味着即便机器功率全开也最多只需4个 worker 在一起工作,其余 worker 处于等候状况。
数据竞赛
- 对 master 一切成员变量进行加锁, 保证每一次操作完结员变量后及时开释锁, 尽量做到锁范围的最小化, 避免某些操作长时间占用锁(后边会说到)
完结代码
rpc.go 代码与注释
rpc.go 中我界说了一些 master(lab 中叫 coordinator 不过我仍是习气叫 master) 和 worker 共享的常量, 首要都是些华人使命状况相关的变量. 特别的是需求提前创建好 TmpMapFilePath, TmpReduceFilePath, 和 FinalMapFilePath 这三个途径, 否则运行时将会报错。FinalReduceFilePath 一定要需求位于 6.824/src/main/mr-tmp 途径下, 这是由测试脚本 test-mr.sh所确定的。
源码如下:
package mr
import "time"
type TaskType string
type TaskStatus string
type AssignPhase string
const (
// Define task type
Map TaskType = "MapTask"
Reduce TaskType = "ReduceTask"
// Define AssignPhase
MapPhase AssignPhase = "MapPhase"
ReducePhase AssignPhase = "ReducePhase"
// Define tmp files and final files directories
TmpMapFilePath = "/tmp/tmp_map_file/"
TmpReduceFilePath = "/tmp/tmp_reduce_file/"
FinalMapFilePath = "/tmp/final_map_file/"
FinalReduceFilePath = "/home/6.824/src/main/mr-tmp/"
// Define task expired time
TaskExpiredTime = 10
// Define task status
Ready TaskStatus = "Ready"
Running TaskStatus = "Running"
Finished TaskStatus = "Finished"
)
type Task struct {
TaskType TaskType // the task type, map or reduce
MapWorkerId int // worker id if in map phase, given my master
ReduceWorkerId int // worker id if in reduce phase, given my master
InputFile string // if in map phase it should be a single file, if in reduce phase it should be a file pattern
BeginTime time.Time // task begin time, given by master
TaskStatus TaskStatus // task status, ready, running or finished, for worker it should always be running
}
// worker request master for task
type TaskArgs struct {
WorkerId int
}
// master reply worker a task(the task might be nil if no task available)
type TaskReply struct {
Task *Task
ReducerNum int // the number of reducer, so the mapper can seperate intermediate for different reducer
AllDone bool // true if all task done then the worker will exit, otherwise loop request master for task
}
// mapper reports to master that map task should be done
type MapTaskDoneArgs struct {
MapWorkerId int
Files []string // the intermediate files for different reducer
}
// master reply for mapper's map task done request
type MapTaskDoneReply struct {
Err error // nil if the task done is confirmed by master
}
// reducer reports to master that reduce task should be done
type ReduceTaskDoneArgs struct {
ReduceWorkerId int
File string
}
// master reply for reducer's reduce task done request
type ReduceTaskDoneReply struct {
Err error
}
// key-val pair for intermediate
type KeyValue struct {
Key string
Value string
}
// intermediate
type ByKey []KeyValue
// for sort the intermediate
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
coordinator.go 代码与注释
具体解说以下首要函数的效果:
- MakeCoordinator, 进行 master的初始化, 使用两个 channel 对 map task 和 reduce task进行保护,mapper 数量取决于 file 个数, reducer 数量由 user program 指定, 也便是 6.824/src/main.mrcoordinator.go 所决议(写死为10),初始化 map task, 开一个协程专门用来监控 task 状况。
- periodicallyRmExpTasks, 不断地从存储 map task 和 reduce task 的channel 中获取task 来检查 task 是否超时, 若超时则从头置为 ready状况, 等候被分配出去。 由于遍历 channel的进程中需求全程上锁, 所以每一个 for 循环内都需求休眠一段时间 , 将锁开释出来
- AssignTask, 呼应 worker 恳求 task。每分配出去一次, master的 workerId 都会加1, 这样相同的 task 被分配出去屡次(比方第一次超时, 需求再次分配出去) 关于 master来说也不会搞混。当worker恳求使命完结时比照恳求的 workeriD 和 master 的 channel 中的 workerId 就能够一一对应。
- MapTaskDone 与 ReduceTaskDone, 呼应 mapper 和 reducer task 完结的恳求, 不断地遍历channel 中的task, 看看是否有满意条件的 task 能够被设置为 finished, 趁便也检查一下哪些task超时。
- genMapTasks, genReduceTasks 别离用来生成 map task 和 reduce task。 留意 reduce task 只需在 一切 map task 完结今后才会生成, 其间的输入文件是一个 file pattern, 其实是多个 mapper 生成的文件调集。
- genMapFinalFiles, genReduceFinalFiles 都是将 mapper 和 reducer 生成的文件进行承认并写入一个新途径, 相当于 git 中的 commit 操作。
- rmMapTmpFiles 和 rmReduceTmpFiles 别离用于在一切 map task 和 reduce task完结后清除暂时途径下的文件。
- ifAllMapDone 和 ifAllReduceDone 别离用于判别是否一切 map task 和 reduce task 完结, 都是辅助函数。
- Done() 回来是否一切 task完结, 用于 user program 判别是否能够退出程序。
源码如下:
package mr
import (
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/rpc"
"os"
"path/filepath"
"strconv"
"sync"
"time"
)
type Coordinator struct {
MapChannel chan *Task // the channel with buffer to maintain map tasks
ReduceChannel chan *Task // the channel with buffer to maintain reduce task
Files []string // the input file list, each file corresponds to a map worker
MapperNum int // the number of map worker, determined by the length of input file list
ReducerNum int // the number of reduce worker, determined by the main function
AssignPhase AssignPhase // the variable that indicates the working phases: MapPhase or Reduce Phase
MapWorkerId int // the counter for map task, staring from 0
ReduceWorkerId int // the counter for reduce task, staring from 0
AllMapDone bool // the variable that indicates if all map tasks are done
AllReduceDone bool // the variable that indicates if all reduce tasks are done
Lock sync.Mutex // the lock to lock coordinator's member variable(ie MapChannel, ReduceChannel,etc) avoiding read-write confilicts
}
// Intialize the coordinater
func MakeCoordinator(files []string, nReduce int) *Coordinator {
log.Println("Make MakeCoordinato")
c := Coordinator{
MapChannel: make(chan *Task, len(files)), // buffer size is determined by the length of Files
ReduceChannel: make(chan *Task, nReduce), // buffer size is determined by the main function
Files: files, // input file list
MapperNum: len(files), // the number of map worker, determined by the length of input file list
ReducerNum: nReduce, // the number of reduce worker, determined by the user program
AssignPhase: MapPhase, // start with map phase
MapWorkerId: 0, // starts from 0
ReduceWorkerId: 0, // starts from 0
}
// generate map task given input file list
c.genMapTasks(files)
// periodically re
go c.periodicallyRmExpTasks()
// listen to rpc call
err := c.server()
if err != nil {
log.Println("MakeCoordinator.c.server() err = ", err)
return nil
}
return &c
}
// Generate map tasks and send these task to map channel
func (c *Coordinator) genMapTasks(files []string) {
c.Lock.Lock()
for _, file := range files {
task := Task{
TaskType: Map,
InputFile: file,
TaskStatus: Ready,
}
c.MapChannel <- &task
log.Println("Finish generating map task : ", task)
}
c.Lock.Unlock()
log.Println("Finish generating all map tasks")
}
// Generate reduce tasks and send these task to map channel
func (c *Coordinator) genReduceTasks() {
c.Lock.Lock()
for i := 0; i < c.ReducerNum; i++ {
task := Task{
TaskType: Reduce,
InputFile: fmt.Sprintf("%vmr-*-%v", FinalMapFilePath, i),
TaskStatus: Ready,
}
log.Println("Finish generating map task : ", task)
c.ReduceChannel <- &task
}
c.Lock.Unlock()
log.Println("Finish generating all reduce tasks")
}
// Periodically remove expired tasks and reset them into ready status
func (c *Coordinator) periodicallyRmExpTasks() {
for !c.Done() {
time.Sleep(time.Second)
c.Lock.Lock()
if c.AssignPhase == MapPhase {
for i := 0; i < c.MapperNum; i++ {
task := <-c.MapChannel
c.MapChannel <- task
if task.TaskStatus == Running && (time.Now().Sub(task.BeginTime) > (TaskExpiredTime)*time.Second) {
task.TaskStatus = Ready
log.Printf("Task with MapWorker id = %d is expired", task.MapWorkerId)
}
}
} else {
for i := 0; i < c.ReducerNum; i++ {
task := <-c.ReduceChannel
c.ReduceChannel <- task
if task.TaskStatus == Running && (time.Now().Sub(task.BeginTime) > (TaskExpiredTime)*time.Second) {
task.TaskStatus = Ready
log.Printf("Task with ReduceWorker id = %d is expired", task.ReduceWorkerId)
}
}
}
c.Lock.Unlock()
}
}
// Assign available task from map channel or reduce channel to worker by rpc call
func (c *Coordinator) AssignTask(args *TaskArgs, reply *TaskReply) error {
c.Lock.Lock()
if c.AllMapDone && c.AllReduceDone {
reply.AllDone = true
c.Lock.Unlock()
return nil
}
if c.AssignPhase == MapPhase {
for i := 0; i < c.MapperNum; i++ {
task := <-c.MapChannel
c.MapChannel <- task
if task.TaskStatus == Ready {
task.MapWorkerId = c.MapWorkerId
c.MapWorkerId++
task.TaskStatus = Running
task.BeginTime = time.Now()
reply.Task = task
reply.ReducerNum = c.ReducerNum
reply.AllDone = false
log.Println("Distribute map task, task = ", task)
c.Lock.Unlock()
return nil
}
}
c.Lock.Unlock()
if c.ifAllMapDone() {
c.genReduceTasks()
c.Lock.Lock()
c.AssignPhase = ReducePhase
c.Lock.Unlock()
err := c.rmMapTmpFiles()
if err != nil {
log.Println("AssignTask.rmMapTmpFiles err = ", err)
}
}
log.Println("No map task available")
return errors.New("No map task available")
} else {
for i := 0; i < c.ReducerNum; i++ {
task := <-c.ReduceChannel
c.ReduceChannel <- task
if task.TaskStatus == Ready {
task.ReduceWorkerId = c.ReduceWorkerId
c.ReduceWorkerId++
task.TaskStatus = Running
task.TaskType = Reduce
task.BeginTime = time.Now()
reply.Task = task
reply.AllDone = false
log.Println("Distribute reduce task = ", task)
c.Lock.Unlock()
return nil
}
}
c.Lock.Unlock()
if c.ifAllReduceDone() {
reply.AllDone = true
err := c.rmMapFinalFiles()
if err != nil {
log.Println("AssignTask.rmMapFinalFiles err = ", err)
}
err = c.rmReduceTmpFiles()
if err != nil {
log.Println("AssignTask.rmMapFinalFiles err = ", err)
}
return nil
}
log.Println("No reduce task available")
return errors.New("No reduce task available")
}
}
// Response to Map task done request from worker
func (c *Coordinator) MapTaskDone(args *MapTaskDoneArgs, reply *MapTaskDoneReply) error {
c.Lock.Lock()
if c.AllMapDone {
c.Lock.Unlock()
return errors.New("All map done")
}
for i := 0; i < c.MapperNum; i++ {
task := <-c.MapChannel
c.MapChannel <- task
if task.TaskStatus == Running && time.Now().Sub(task.BeginTime) > (TaskExpiredTime)*time.Second {
task.TaskStatus = Ready
log.Printf("Map task with MapWorkerId = %d expired", task.MapWorkerId)
continue
}
if args.MapWorkerId == task.MapWorkerId && task.TaskStatus == Running && time.Now().Sub(task.BeginTime) <= TaskExpiredTime*time.Second {
task.TaskStatus = Finished
err := c.genMapFinalFiles(args.Files)
if err != nil {
task.TaskStatus = Ready
reply.Err = err
log.Println("MapTaskDone.genMapFinalFiles err = ", err)
c.Lock.Unlock()
return err
}
log.Printf("Map task with MapWorkerId = %d is finished in time", task.MapWorkerId)
c.Lock.Unlock()
return nil
}
}
c.Lock.Unlock()
reply.Err = errors.New(fmt.Sprintf("Map task with MapWorkerId = %d cannot be done", args.MapWorkerId))
log.Println(fmt.Sprintf("Map task with MapWorkerId = %d cannot be done", args.MapWorkerId))
return errors.New(fmt.Sprintf("Map task with MapWorkerId = %d cannot be done", args.MapWorkerId))
}
// Generate Map task final file
func (c *Coordinator) genMapFinalFiles(files []string) error {
for _, file := range files {
tmp_file, err := os.Open(file)
if err != nil {
log.Println("genMapFinalFiles err = ", err)
return err
}
defer tmp_file.Close()
tmp_file_name := filepath.Base(file)
final_file_path := FinalMapFilePath + tmp_file_name
final_file, err := os.Create(final_file_path)
if err != nil {
log.Println("genMapFinalFiles.os.Create err = ", err)
return err
}
defer final_file.Close()
_, err = io.Copy(final_file, tmp_file)
if err != nil {
log.Println("genMapFinalFiles.io.Copy err = ", err)
return err
}
}
return nil
}
// Response to reduce task done request from worker
func (c *Coordinator) ReduceTaskDone(args *ReduceTaskDoneArgs, reply *ReduceTaskDoneReply) error {
c.Lock.Lock()
if c.AllReduceDone {
log.Println("All reduce task done")
c.Lock.Unlock()
return errors.New("All reduce task done")
}
for i := 0; i < c.ReducerNum; i++ {
task := <-c.ReduceChannel
c.ReduceChannel <- task
if task.TaskStatus == Running && time.Now().Sub(task.BeginTime) > (TaskExpiredTime)*time.Second {
task.TaskStatus = Ready
log.Printf("Reduce task with ReduceWorkerId = %d expired", task.ReduceWorkerId)
continue
}
if args.ReduceWorkerId == task.ReduceWorkerId && task.TaskStatus == Running && time.Now().Sub(task.BeginTime) <= (TaskExpiredTime)*time.Second {
task.TaskStatus = Finished
err := c.genReduceFinalFile(args.File)
if err != nil {
log.Println("ReduceTaskDone.genReduceFinalFile err = ", err)
task.TaskStatus = Ready
reply.Err = err
c.Lock.Unlock()
return err
}
log.Printf("Reduce task with ReduceWorkerId = %d is finished in time", task.ReduceWorkerId)
c.Lock.Unlock()
return nil
}
}
c.Lock.Unlock()
reply.Err = errors.New(fmt.Sprintf("Reduce task with ReduceWorkerId = %d cannot be done", args.ReduceWorkerId))
log.Println(fmt.Sprintf("Reduce task with ReduceWorkerId = %d cannot be done", args.ReduceWorkerId))
return errors.New(fmt.Sprintf("Reduce task with ReduceWorkerId = %d cannot be done", args.ReduceWorkerId))
}
// Generate Reduce task final file
func (c *Coordinator) genReduceFinalFile(file string) error {
tmp_file, err := os.Open(file)
if err != nil {
log.Println("genReduceFinalFile.os.Open err = ", err)
return err
}
defer tmp_file.Close()
tmp_file_name := filepath.Base(file)
final_file_path := FinalReduceFilePath + tmp_file_name
final_file, err := os.Create(final_file_path)
if err != nil {
log.Println("genReduceFinalFile.os.Create err = ", err)
return err
}
defer final_file.Close()
_, err = io.Copy(final_file, tmp_file)
if err != nil {
log.Println("genReduceFinalFile.os.Copy err = ", err)
return err
}
return nil
}
// Remove map task's temporary files
func (c *Coordinator) rmMapTmpFiles() error {
d, err := os.Open(TmpMapFilePath)
if err != nil {
log.Println("rmMapTmpFiles os.Open err = ", err)
return err
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
log.Println("rmMapTmpFiles.d.Readdirnames err = ", err)
return err
}
for _, name := range names {
err = os.RemoveAll(TmpMapFilePath + name)
if err != nil {
log.Println("rmMapTmpFiles.os.RemoveAll err = ", err)
return err
}
}
return nil
}
// Remove map task's final files
func (c *Coordinator) rmMapFinalFiles() error {
d, err := os.Open(FinalMapFilePath)
if err != nil {
log.Println("rmMapFinalFiles.os.Open err = ", err)
return err
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
log.Println("rmMapFinalFiles.d.Readdirnames err = ", err)
return err
}
for _, name := range names {
err = os.RemoveAll(FinalMapFilePath + name)
if err != nil {
log.Println("rmMapFinalFiles.os.RemoveAll err = ", err)
return err
}
}
return nil
}
// Remove reduce task's temporary files
func (c *Coordinator) rmReduceTmpFiles() error {
d, err := os.Open(TmpReduceFilePath)
if err != nil {
log.Println("rmReduceTmpFiles.os.Open err = ", err)
return err
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
log.Println("rmReduceTmpFiles.d.Readdirnames err = ", err)
return err
}
for _, name := range names {
err = os.RemoveAll(TmpReduceFilePath + name)
if err != nil {
log.Println("rmReduceTmpFiles.os.RemoveAll err = ", err)
return err
}
}
return nil
}
// Vist every task in map channel and find out if all map task were done
func (c *Coordinator) ifAllMapDone() bool {
c.Lock.Lock()
for i := 0; i < c.MapperNum; i++ {
task := <-c.MapChannel
if task.TaskStatus != Finished {
c.MapChannel <- task
c.Lock.Unlock()
return false
}
c.MapChannel <- task
}
c.AllMapDone = true
c.Lock.Unlock()
return true
}
// Vist every task in reduce channel and find out if all reduce task were done
func (c *Coordinator) ifAllReduceDone() bool {
c.Lock.Lock()
for i := 0; i < c.ReducerNum; i++ {
task := <-c.ReduceChannel
c.ReduceChannel <- task
if task.TaskStatus != Finished {
c.Lock.Unlock()
return false
}
}
c.AllReduceDone = true
c.Lock.Unlock()
return true
}
// Return if all task were done
func (c *Coordinator) Done() bool {
c.Lock.Lock()
if c.AllMapDone && c.AllReduceDone {
c.Lock.Unlock()
log.Println("All map tasks and reduce task done!")
time.Sleep(3 * time.Second) // Sleep 3s to let all workers know all task done and it's time to exit before main routine
return true
}
c.Lock.Unlock()
return false
}
func (c *Coordinator) server() error {
rpc.Register(c)
rpc.HandleHTTP()
sockname := "/var/tmp/824-mr-"
sockname += strconv.Itoa(os.Getuid())
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
return nil
}
worker.go 代码与注释
具体解说以下首要函数的效果:
-
Worker, 用于初始化 worker, 当 worker并没有获取到 task时, 其workerId 为 -1.
-
work, 只需没有收到悉数 task 完结的信号, 就不断地恳求 task, 履行 task。特别的是当恳求 task遇到 error 时(比方没有可被履行的 task), 需求休眠1秒, 其意图在于不至于频频发起恳求把 master 打挂, 由于 worker的数量远大于 master
-
doMap, 履行 map task 进程, 先读文件生成intermediate(此时 key 为 word, val 为1), 然后将intermediate 拆分到不同的暂时文件中(给不同的 reducer 使用), 随后恳求 master上报 task完结
-
doReduce, 履行 reduce task进程, 先搜集不同 mapper 生成 intermediate, 再对 intermediate 排序, 再计算 key 相同的 数量作为 intermediate 的 val, 再将成果写入暂时文件, 最终恳求 master上报 task 完结
-
call, 用于恳求 master, 留意此处的 sockname 不能更改, 这是由测试脚本 test-mr.sh决议的, 里边会用到。
package mr
import (
"encoding/json"
"fmt"
"hash/fnv"
"io"
"log"
"net/rpc"
"os"
"path/filepath"
"sort"
"strconv"
)
type MRWorker struct {
WorkerId int // WorkerId, initialized by task.MapWorkerId or task.ReduceId
MapFunc func(string, string) []KeyValue // Map function, initialized by main function
ReduceFunc func(string, []string) string // Reduce function, initialized by main function
Task *Task // Task, given by coordinator.
NReduce int // the numer of reduce worker, given by coordinator
IsDone bool // the variable indicates if all task were done
}
// Initialized the worker
func Worker(map_func func(string, string) []KeyValue, reduce_func func(string, []string) string) {
worker := MRWorker{
WorkerId: -1, // initialized to -1 when the worker isn't handle a task
MapFunc: map_func,
ReduceFunc: reduce_func,
IsDone: false,
}
log.Println("Initial worker done")
worker.work()
log.Println("Worker with WorkerId = ", worker.WorkerId, " finished all works")
}
// Loop working as long as all tasks wern't finished
func (worker *MRWorker) work() {
for !worker.IsDone {
task, err := worker.reqTask()
if err != nil {
log.Println("work.reqTask err = ", err)
continue
}
if task == nil {
log.Printf("Worker with WorkerId = %d received all tasks done", worker.WorkerId)
worker.IsDone = true
break
}
log.Printf("Worker with WorkerId = %d received task = %v", worker.WorkerId, task)
worker.Task = task
if task.TaskType == Map {
worker.WorkerId = task.MapWorkerId
err := worker.doMap()
if err != nil {
log.Println("worker.doMap err = ", err)
time.Sleep(time.Second)
continue
}
log.Println("Map task done, task = ", task)
} else {
worker.WorkerId = task.ReduceWorkerId
err := worker.doReduce()
if err != nil {
log.Println("worker.doReduce err = ", err)
continue
}
log.Println("Reduce task done, task = ", task)
}
}
}
// Request task to coordinator
func (worker *MRWorker) reqTask() (*Task, error) {
args := TaskArgs{}
reply := TaskReply{}
err := call("Coordinator.AssignTask", &args, &reply)
if err != nil {
log.Println("reqTask.call err = ", err)
return nil, err
}
worker.Task = reply.Task
if reply.AllDone {
worker.IsDone = true
return nil, nil
}
worker.NReduce = reply.ReducerNum
return reply.Task, nil
}
// Execute map tesk details
func (worker *MRWorker) doMap() error {
task := worker.Task
intermediate, err := worker.genIntermediate(task.InputFile)
if err != nil {
log.Println("doMap.genIntermediate err = ", err)
return err
}
tmp_files, err := worker.writeIntermediateToTmpFiles(intermediate)
if err != nil {
log.Println("doMap.writeIntermediateToTmpFiles :", err)
return err
}
err = worker.mapTaskDone(tmp_files)
if err != nil {
log.Println("doMap.mapTaskDone err = ", err)
for _, file := range tmp_files {
err := os.Remove(file)
if err != nil {
log.Println("worker.mapTaskDone.os.Remove err = ", err)
}
}
return err
}
return nil
}
// Generate intermediate key-val list
func (worker *MRWorker) genIntermediate(filename string) ([]KeyValue, error) {
intermediate := make([]KeyValue, 0)
file, err := os.Open(filename)
if err != nil {
log.Println("genIntermediate.os.Open", err)
return nil, err
}
content, err := io.ReadAll(file)
if err != nil {
log.Println("genIntermediate.io.ReadAll", err)
return nil, err
}
defer file.Close()
kva := worker.MapFunc(filename, string(content))
intermediate = append(intermediate, kva...)
return intermediate, nil
}
// Write intermediate to map task's temporary files
func (worker *MRWorker) writeIntermediateToTmpFiles(intermediate []KeyValue) ([]string, error) {
tmp_files := []string{}
hashed_intermediate := make([][]KeyValue, worker.NReduce)
for _, kv := range intermediate {
hash_val := ihash(kv.Key) % worker.NReduce
hashed_intermediate[hash_val] = append(hashed_intermediate[hash_val], kv)
}
for i := 0; i < worker.NReduce; i++ {
tmp_file, err := os.CreateTemp(TmpMapFilePath, "mr-*.txt")
if err != nil {
log.Println("writeIntermediateToTmpFiles.os.CreateTemp err = ", err)
return nil, err
}
defer os.Remove(tmp_file.Name())
defer tmp_file.Close()
enc := json.NewEncoder(tmp_file)
for _, kv := range hashed_intermediate[i] {
err := enc.Encode(&kv)
if err != nil {
log.Println("writeIntermediateToTmpFiles.enc.Encode", err)
return nil, err
}
}
file_path := fmt.Sprintf("mr-%v-%v", worker.WorkerId, i)
err = os.Rename(tmp_file.Name(), TmpMapFilePath+file_path)
if err != nil {
log.Println("writeIntermediateToTmpFiles os.Rename: ", err)
return nil, err
}
tmp_files = append(tmp_files, TmpMapFilePath+file_path)
}
return tmp_files, nil
}
// Report map task done to coordinator
func (worker *MRWorker) mapTaskDone(files []string) error {
args := MapTaskDoneArgs{
MapWorkerId: worker.WorkerId,
Files: files,
}
reply := MapTaskDoneReply{}
err := call("Coordinator.MapTaskDone", &args, &reply)
if err != nil {
log.Println("mapTaskDone.call err = ", err)
return err
}
if reply.Err != nil {
log.Println("mapTaskDone.reply.Err = ", reply.Err)
return reply.Err
}
return nil
}
// Execute reduce tesk details
func (worker *MRWorker) doReduce() error {
intermediate, err := worker.collectIntermediate(worker.Task.InputFile)
if err != nil {
log.Println("doReduce.collectIntermediate err = ", err)
return err
}
sort.Sort(ByKey(intermediate))
res := worker.genReduceRes(intermediate)
tmp_file, err := worker.writeReduceResToTmpFile(res)
if err != nil {
log.Println("doReduce.writeReduceResToTmpFile err = ", err)
return err
}
err = worker.reduceTaskDone(tmp_file)
if err != nil {
log.Println("doReduce.reduceTaskDone err = ", err)
err := os.Remove(tmp_file)
if err != nil {
log.Println("doReduce.os.Remove err = ", err)
}
return err
}
return nil
}
// Collect intermediate from different map workers' result files
func (worker *MRWorker) collectIntermediate(file_pattern string) ([]KeyValue, error) {
intermediate := make([]KeyValue, 0)
files, err := filepath.Glob(file_pattern)
if err != nil {
log.Println("collectIntermediate.filepath.Glob err = ", err)
return nil, err
}
for _, file_path := range files {
file, err := os.Open(file_path)
if err != nil {
log.Println("collectIntermediateos.Open err = ", err)
return nil, err
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
}
return intermediate, nil
}
// Gen reduce result
func (worker *MRWorker) genReduceRes(intermediate []KeyValue) []KeyValue {
res := make([]KeyValue, 0)
i := 0
for i < len(intermediate) {
// the key in intermediate [i...j] is the same since intermediate is already sorted
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
// sum the val number of intermediate [i...j]
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := worker.ReduceFunc(intermediate[i].Key, values)
kv := KeyValue{Key: intermediate[i].Key, Value: output}
res = append(res, kv)
i = j
}
return res
}
// write reduce task's result to temporary file
func (worker *MRWorker) writeReduceResToTmpFile(res []KeyValue) (string, error) {
tempFile, err := os.CreateTemp(TmpReduceFilePath, "mr-") ///home/distributed_system/tmp_res_file/mr-xxxxx(随机字符串)
if err != nil {
log.Println("writeReduceResToTmpFile.os.CreateTemp err = ", err)
return "", err
}
// write key-val pair into tmp file
for _, kv := range res {
fmt.Fprintf(tempFile, "%s %s\n", kv.Key, kv.Value)
}
temp_name := TmpReduceFilePath + "mr-out-" + strconv.Itoa(worker.WorkerId) + ".txt"
err = os.Rename(tempFile.Name(), temp_name)
if err != nil {
log.Println("writeReduceResToTmpFile.os.Rename err = ", err)
return "", err
}
return temp_name, nil
}
// Report reduce task done to coordinator
func (worker *MRWorker) reduceTaskDone(file string) error {
args := ReduceTaskDoneArgs{
ReduceWorkerId: worker.WorkerId,
File: file,
}
reply := ReduceTaskDoneReply{}
err := call("Coordinator.ReduceTaskDone", &args, &reply)
if err != nil {
log.Println("reduceTaskDone.call ", err)
return err
}
if reply.Err != nil {
log.Println(err)
return reply.Err
}
return nil
}
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
// rpc call function that send request to coordinator
func call(fun_name string, args interface{}, reply interface{}) error {
sockname := "/var/tmp/824-mr-"
sockname += strconv.Itoa(os.Getuid())
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
return err
}
defer c.Close()
err = c.Call(fun_name, args, reply)
if err != nil {
log.Println("call.call", err)
return err
}
return nil
}
其他阐明
这只是我是完结的方法, 其他完结方法也都迥然不同。有些一起的要害的点需求留意
-
持有锁的操作过程要尽或许地少, 由于性能的瓶颈在于maser, master的瓶颈在于锁, 一旦对资源操作完结就应该当即开释锁。 也要保证每一个函数 return 之前一定要开释锁, 再调用其他函数的时分也要开释锁, 避免其他函数获取不到锁。
-
每一次task被分发出去, 都需求对这个task做出符号, 比方我用一个递加的 workerId 符号, 这样避免了一个 task 被分发出去屡次无法区别的情况。
-
test-mr.sh 脚本有点问题, 在某些体系中或许某些命令履行不了导致 跑不过测试用例。比方在 centos-7 中, test-mr.sh 中的 early exit test 中, 需求将
wait -n
换成
wait
伟人的膀子
- static.googleusercontent.com/media/resea…
- pdos.csail.mit.edu/6.824/labs/…
- /post/693944…
- github.com/asiaWu3/mit…
- www.bilibili.com/video/BV1rS…