mit 6.824 lab1 思路贴

前言

为遵守 mit 的约定,这个帖子不贴太多具体的代码,主要聊聊自己在码代码时的一些想法和遇到的问题。

这个实验需要我们去实现一个 map-reduce 的功能。实质上,这个实验分为两个大的板块,map 和 reduce 两个阶段,也就是这个实验的核心部分,两个阶段都包含若干小的子任务,然后用户通过编写 map 和 reduce 函数。这个实验里,我们的任务是,读取 main 文件夹下的八个 txt 文档,扫描其中的单词,并计数,将结果输出到若干个子文件中,最后的话,测试脚本会读取这八个文件,把里面的结果输出到另一个 txt 中并进行排序,比对给出的标准答案,来评判该实验是否通过。

做这个实验的前提是,已经读过这个实验配套的论文:mapreduce-osdi04.pdf (googleusercontent.com) 知道这个实验以及想要做这个实验的人多少都会有点手段上谷歌(当然,也可以去找国内转载的,看不懂的话就看中文的吧,实验文档也是。

Getting Started

当我们打开这个项目工程,我们阅读这个项目的所有文件,以及 lab1 中给出的提示,我们可以先试着运行以下部分代码,来看看我们最重要得到什么结果:

# 当前目录为 src/main
go build -race -buildmode=plugin ../mrapps/wc.go
rm -rf mr-out*
go run -race mrsequential.go wc.so pg*.txt
more mr-out-0

这个是一个单线程的 map-reduce,我们可以查看 mrsequential.go 的内容,大概了解下整个 map-reduce 的过程是怎样的。

然后,我们把目光聚集到以下文件中:

---main
	 |---mrworker.go
	 |---mrcoordinator.go
	 |---mrsequential.go
---mr
	 |---worker.go
	 |---coordinator.go
	 |---rpc.go
---mrapp
	 |---wc.go

后面我们在这个实验中很多内容都要参考这些文件的内容,其中包含一些函数的来源,其中,尤其 main/mrsequential.go 尤其重要。

实现 rpc 通信

如果说想要实现 map-reduce,那么第一步就是实现 worker 和 coordinator 的 rpc 通信,观察 mr 目录下的文件后,我们需要在 rpc.gocoordinator.go 中定义以下结构体 (目前仅实现 rpc 通信):

// coordinator.go
// 专门定义一个Task,用于coordinator向worker分发任务
type Task struct {
    FileName string
}

// 这里声明coordinator相关的结构体
type Coordinator struct {
    task Task
}

// rpc.go
// 这里参考了上面的两个Example
type TaskRequest struct {
    X int
}

type TaskReply struct {
    XTask Task     
}

下一步要做的是,需要让 coordinator 和 worker 之间能够进行 rpc 通信。

实现两者之间的通信是完成这个实验的基础。

worker接收消息

worker 调用 coordinator 的获取任务函数,获取要处理的文件名,然后执行打开操作。

在构建中间体 intermediate 时,可以留意到 mrsequential.go 有提示:

// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.

根据这个思路,相当于提示我们,在构建桶存放中间体时,可能会用到二维 NxM 的数组。

然后经过 map 处理后的键值对切片,需要进一步经过 json 处理,并且将这个结果分成 nReduce 份,存放的文件命名规则是 mr-X-Y,其中 x 是 map 任务的序号,y 是 reduce 任务的序号。

在进行 reduce 任务时,读取结果也需要经过 json 处理,这里很多步骤都可以借鉴 mrsequential.go,包括读取文件等。

在创建目标文件时,可以使用 ioutil.TempFile 来创建临时文件,最后再重新命名。

此阶段的结构体声明如下:

// coordinator.go
type Coordinator struct {
    State            int // 0 map 1 reduce 2 none
    MapTask          Task
    ReduceTask       Task
    NumMapTask       int
    NumReduceTask    int
    MapTaskFinish    chan bool
    ReduceTaskFinish chan bool
}

type Task struct {
    FileName string
    IDMap    int
    IDReduce string
}

// rpc.go
type TaskRequest struct {
    X int
}

type TaskReply struct {
    XTask            Task
    NumMapTask       int
    NumReduceTask    int
    CurNumMapTask    int
    CurNumReduceTask int
}

实现 rpc 通信

根据文档的指引,我们首先要实现 coordinator 和 worker 之间的通信,我们看到 worker.go 中有 call 和 CallExample 两个函数,那也照葫芦画瓢,自己搞一个 CallGetTask,实现 rpc 通信。

Worker 申领 task

看着 Worker() 里的注释,有一行 CallExample(),是需要我们在这个函数里调用自定义的 CallGetTask 函数来获取 coordinator 分发的 task,在 call 之前,我们先要给 coordinator 的成员 MapTask 初始化,在 MakeCoordinator 中,我们可以看到 files 和 nReduce 这两个参数,那就从这两个入手,进行简单的初始化后,我们尝试在 Worker 中输出,能够输出文件名就是阶段性胜利。

照抄 mrsequential.go

文档中有提到,可以随意借鉴 mrsequential 中的函数,那么,走起。不过也要看看注释和文档,可以创建一个 NxM 的桶,和利用 encoder 和 decoder 来处理中间产物。

向 coordinator 的报告

每执行完一个任务,就向 coordinator 报告,方便 coordinator 记录,当所有任务都执行完时,修改 Done 中的条件,解除阻塞。

其实,走到这一步,可以说这个 lab 完成一半了,剩下就是各种断点打印 debug。

如果在执行过程提示无法打开文件,那说明,map 或 reduce 任务完成个数的条件没有控制好,mrsequential.go 中规定了一共会生成 3 个 workers,无法打开文件,只可能是,并发申请 task 时,已经快要到 task 的容量数,没分配到的 worker 自然也就没有分配到 FileName 和 MapID,所以需要设置好这些控制条件

解决 crash

做完上面,7 个 test 就可以 pass 6 个了,剩下一个 crash 的,需要用到锁或原子变量方面的知识。在进行 GetTask 时,我们传递的参数,需要确保其原子性,不然会出现 data race 现象;同时,也要对超过 10s 的任务进行舍弃处理,这里我们加一个时间戳,来记录任务的完成情况和开始时间。

又考虑到在记录任务完成情况时,是一个并发状态,这里考虑使用 sync.Map。在进行最后的 Done 之前,我们还要再定义一个检查函数,来遍历检查是否还有 crash 的任务。

参考链接

6.5840 Lab 1: MapReduce (mit.edu)

mit6.824分布式lab1-MapReduce(1)_哔哩哔哩_bilibili