MIT 6.5840(6.824) Lab1:MapReduce 设计实现

1 介绍

本次实验是实现一个简易版本的MapReduce,你需要实现一个工作程序(worker process)和一个调度程序(coordinator process)。工作程序用来调用Map和Reduce函数,并处理文件的读取和写入。调度程序用来协调工作任务并处理失败的任务。你将构建出跟 MapReduce论文 里描述的类似的东西。(注意:本实验中用"coordinator"替代里论文中的"master"。)


  • 阅读MapReduce论文

  • 阅读lab文档

  • 理解MapReduce框架

  • 理解原框架代码,理清所需完成任务


  • Complete the basic requirements for MapReduce
  • Handling worker failures
  • No data competition, a big lock ensures safety
  • Pass lab test
  • Communicate over TCP/IP and read/write files using a shared file system

2 原框架解析

  • src/mrapps/wc.go

    这是一个用于 MapReduce 的字数统计(Word Count)插件。该插件包含 Map 和 Reduce 函数,用于统计输入文本中的单词频率。

    func Map(filename string, contents string) []mr.KeyValue {// function to detect word separators.ff := func(r rune) bool { return !unicode.IsLetter(r) }// split contents into an array of words.words := strings.FieldsFunc(contents, ff)kva := []mr.KeyValue{}for _, w := range words {kv := mr.KeyValue{w, "1"}kva = append(kva, kv)}return kva
    }func Reduce(key string, values []string) string {// return the number of occurrences of this word.return strconv.Itoa(len(values))
  • src/main/mrcoordinator.go

    mrcoordinator.go 定义了调度器(Coordinator)的主要逻辑。调度器通过 MakeCoordinator 启动一个 Coordinator 实例 c,并在 c.server() 中通过协程 go http.Serve(l, nil) 启动一个 HTTP 服务器来接收和处理 RPC 调用。

    func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
    }func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{}c.server()return &c

    注意:在 Go 的 net/http 包中,使用 http.Serve(l, nil) 启动 HTTP 服务器时,服务器会为每个传入的请求自动启动一个新的协程。这意味着每个 RPC 调用都是在独立的协程中处理的,从而允许并发处理多个请求。因此,在设计时可能需要使用锁等同步原语来保护共享资源。此外,Coordinator 不会主动与 Worker 通信(除非额外实现),只能通过 Worker 的 RPC 通信来完成任务。同时,当所有任务完成时,Done 方法将返回 false,从而关闭 Coordinator。

  • src/main/mrworker.go

    mrworker.go 通过 Worker 函数运行。因此,Worker 函数需要完成请求任务、执行任务、报告任务状态等多个任务。可以推测,Worker 需要在这个函数中不断地轮询 Coordinator,并根据 Coordinator 的不同回复来驱动当前 Worker 完成各种任务。

  • src/main/mrsequential.go

    mrsequential.go 实现了一个简单的顺序 MapReduce 应用程序。该程序读取输入文件,执行 Map 和 Reduce 操作,并将结果写入输出文件。


3 设计实现

3.1 任务分析


其中,在此实验中Map任务数量就是输入文件数量,每个Map Task的任务就是处理一个.txt文件;Reduce任务的数量是nReduce









3.2 RPC

通信时首先需要确定这个消息是什么类型, 通过前述分析可知:

  • 对于Worker发送消息,Worker需要跟Coordinator报告MapReduce任务的执行情况(成功或失败)

    type TaskCompletedStatus int
    const (MapTaskCompleted = iotaMapTaskFailedReduceTaskCompletedReduceTaskFailed
  • 对于Coordinator回复消息,Coordinator需要分配ReduceMap任务,告知任务的类型,或者告知Worker休眠(暂时没有任务需要执行)、Worker退出(所有任务执行成功)

    type TaskType int
    const (MapTask = iotaReduceTaskWaitExit


type MessageSend struct {TaskID              int                 // task idTaskCompletedStatus TaskCompletedStatus // task completed status


type MessageReply struct {TaskID   int      // task idTaskType TaskType // task type, map or reduce or wait or exitTaskFile string   // task file nameNReduce  int      // reduce number, indicate the number of reduce tasksNMap     int      // map number, indicate the number of map tasks


对于通信,原框架已提供Unix套接字通信,如果有想法,我们可以将 RPC 设置为通过 TCP/IP 而不是 Unix 套接字进行通信(请参阅 Coordinator.server() 中注释掉的行),并使用共享文件系统读/写文件。

3.2 Coordinator

3.2.1 结构


type TaskStatus int
const (Unassigned = iotaAssignedCompletedFailed


type TaskInfo struct {TaskStatus TaskStatus // task statusTaskFile   string     // task fileTimeStamp  time.Time  // time stamp, indicating the running time of the task


type Coordinator struct {NMap                   int        // number of map tasksNReduce                int        // number of reduce tasksMapTasks               []TaskInfo // map taskReduceTasks            []TaskInfo // reduce taskAllMapTaskCompleted    bool       // whether all map tasks have been completedAllReduceTaskCompleted bool       // whether all reduce tasks have been completedMutex                  sync.Mutex // mutex, used to protect the shared data
3.2.2 初始化


func (c *Coordinator) InitTask(file []string) {for idx := range file {c.MapTasks[idx] = TaskInfo{TaskFile:   file[idx],TaskStatus: Unassigned,TimeStamp:  time.Now(),}}for idx := range c.ReduceTasks {c.ReduceTasks[idx] = TaskInfo{TaskStatus: Unassigned,}}
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{NReduce:                nReduce,NMap:                   len(files),MapTasks:               make([]TaskInfo, len(files)),ReduceTasks:            make([]TaskInfo, nReduce),AllMapTaskCompleted:    false,AllReduceTaskCompleted: false,Mutex:                  sync.Mutex{},}c.InitTask(files)c.server()return &c
3.2.3 RequestTask函数


  1. 如果有未分配的任务、之前执行失败、已分配但已经超时(10s)的Map任务,则选择这个任务进行分配;
  2. 如果以上的Map任务均不存在,但Map又没有全部执行完成,告知Worker先等待;
  3. Map任务全部执行完成的情况下,按照12相同的逻辑进行Reduce任务的分配;
  4. 所有的任务都执行完成了, 告知Worker退出。


func (c *Coordinator) RequestTask(args *MessageSend, reply *MessageReply) error {// lockc.Mutex.Lock()defer c.Mutex.Unlock()// assign map taskif !c.AllMapTaskCompleted {// count the number of completed map tasksNMapTaskCompleted := 0for idx, taskInfo := range c.MapTasks {if taskInfo.TaskStatus == Unassigned || taskInfo.TaskStatus == Failed ||(taskInfo.TaskStatus == Assigned && time.Since(taskInfo.TimeStamp) > 10*time.Second) {reply.TaskFile = taskInfo.TaskFilereply.TaskID = idxreply.TaskType = MapTaskreply.NReduce = c.NReducereply.NMap = c.NMapc.MapTasks[idx].TaskStatus = Assigned  // mark the task as assignedc.MapTasks[idx].TimeStamp = time.Now() // update the time stampreturn nil} else if taskInfo.TaskStatus == Completed {NMapTaskCompleted++}}// check if all map tasks have been completedif NMapTaskCompleted == len(c.MapTasks) {c.AllMapTaskCompleted = true} else {reply.TaskType = Waitreturn nil}}// assign reduce taskif !c.AllReduceTaskCompleted {// count the number of completed reduce tasksNReduceTaskCompleted := 0for idx, taskInfo := range c.ReduceTasks {if taskInfo.TaskStatus == Unassigned || taskInfo.TaskStatus == Failed ||(taskInfo.TaskStatus == Assigned && time.Since(taskInfo.TimeStamp) > 10*time.Second) {reply.TaskID = idxreply.TaskType = ReduceTaskreply.NReduce = c.NReducereply.NMap = c.NMapc.ReduceTasks[idx].TaskStatus = Assigned  // mark the task as assignedc.ReduceTasks[idx].TimeStamp = time.Now() // update the time stampreturn nil} else if taskInfo.TaskStatus == Completed {NReduceTaskCompleted++}}// check if all reduce tasks have been completedif NReduceTaskCompleted == len(c.ReduceTasks) {c.AllReduceTaskCompleted = true} else {reply.TaskType = Waitreturn nil}}// all tasks have been completedreply.TaskType = Exitreturn nil
3.2.4 ReportTask函数


func (c *Coordinator) ReportTask(args *MessageSend, reply *MessageReply) error {c.Mutex.Lock()defer c.Mutex.Unlock()if args.TaskCompletedStatus == MapTaskCompleted {c.MapTasks[args.TaskID].TaskStatus = Completedreturn nil} else if args.TaskCompletedStatus == MapTaskFailed {c.MapTasks[args.TaskID].TaskStatus = Failedreturn nil} else if args.TaskCompletedStatus == ReduceTaskCompleted {c.ReduceTasks[args.TaskID].TaskStatus = Completedreturn nil} else if args.TaskCompletedStatus == ReduceTaskFailed {c.ReduceTasks[args.TaskID].TaskStatus = Failedreturn nil}return nil

3.3 Worker

3.3.1 Worker轮询


func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {for {args := MessageSend{}reply := MessageReply{}call("Coordinator.RequestTask", &args, &reply)switch reply.TaskType {case MapTask:HandleMapTask(&reply, mapf)case ReduceTask:HandleReduceTask(&reply, reducef)case Wait:time.Sleep(1 * time.Second)case Exit:os.Exit(0)default:time.Sleep(1 * time.Second)}}
3.3.2 处理Map任务


func HandleMapTask(reply *MessageReply, mapf func(string, string) []KeyValue) {// open the filefile, err := os.Open(reply.TaskFile)if err != nil {log.Fatalf("cannot open %v", reply.TaskFile)return}// read the file, get the contentcontent, err := io.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", reply.TaskFile)return}file.Close()// call the map function to get the key-value pairskva := mapf(reply.TaskFile, string(content))// create intermediate filesintermediate := make([][]KeyValue, reply.NReduce)for _, kv := range kva {r := ihash(kv.Key) % reply.NReduceintermediate[r] = append(intermediate[r], kv)}// write the intermediate filesfor r, kva := range intermediate {oname := fmt.Sprintf("mr-%v-%v", reply.TaskID, r)ofile, err := os.CreateTemp("", oname)if err != nil {log.Fatalf("cannot create tempfile %v", oname)}enc := json.NewEncoder(ofile)for _, kv := range kva {// write the key-value pairs to the intermediate fileenc.Encode(kv)}ofile.Close()// Atomic file renaming:rename the tempfile to the final intermediate fileos.Rename(ofile.Name(), oname)}// send the task completion message to the coordinatorargs := MessageSend{TaskID:              reply.TaskID,TaskCompletedStatus: MapTaskCompleted,}call("Coordinator.ReportTask", &args, &MessageReply{})
3.3.3 处理Reduce任务


// generate the intermediate files for reduce tasks
func generateFileName(r int, NMap int) []string {var fileName []stringfor TaskID := 0; TaskID < NMap; TaskID++ {fileName = append(fileName, fmt.Sprintf("mr-%d-%d", TaskID, r))}return fileName
}func HandleReduceTask(reply *MessageReply, reducef func(string, []string) string) {// load the intermediate filesvar intermediate []KeyValue// get the intermediate file namesintermediateFiles := generateFileName(reply.TaskID, reply.NMap)// fmt.Println(intermediateFiles)for _, filename := range intermediateFiles {file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)return}// decode the intermediate filedec := json.NewDecoder(file)for {kv := KeyValue{}if err := dec.Decode(&kv); err == io.EOF {break}intermediate = append(intermediate, kv)}file.Close()}// sort the intermediate key-value pairs by keysort.Slice(intermediate, func(i, j int) bool {return intermediate[i].Key < intermediate[j].Key})// write the key-value pairs to the output fileoname := fmt.Sprintf("mr-out-%v", reply.TaskID)ofile, err := os.Create(oname)if err != nil {log.Fatalf("cannot create %v", oname)return}for i := 0; i < len(intermediate); {j := i + 1for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {j++}var values []stringfor k := i; k < j; k++ {values = append(values, intermediate[k].Value)}// call the reduce function to get the outputoutput := reducef(intermediate[i].Key, values)// write the key-value pairs to the output filefmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)i = j}ofile.Close()// rename the output file to the final output fileos.Rename(ofile.Name(), oname)// send the task completion message to the coordinatorargs := MessageSend{TaskID:              reply.TaskID,TaskCompletedStatus: ReduceTaskCompleted,}call("Coordinator.ReportTask", &args, &MessageReply{})

4 测试和常见问题

test-mr.sh为测试脚本,也可以通过运行sh n来运行 n n n次测试。

*** Starting wc test
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting job count test.
--- job count test: PASS
*** Starting early exit test.
--- early exit test: PASS
*** Starting crash test.
--- crash test: PASS


  1. 不能通过job-count测试

    *** Starting job count test.
    --- map jobs ran incorrect number of times (10 != 8)
    --- job count test: FAIL






最终效果 系列导航 文章目录 最终效果系列导航前言三段攻击攻击设置只对敌人造成伤害限制可以移动攻击问题 角色连续按四下攻击&#xff0c;最后会多a一下问题&#xff1a;站在原地连续攻击野猪&#xff0c;只有第一下攻击野猪才掉血&#xff0c;后面的攻击野猪不掉血源码完结 …


1问题&#xff08;已排除硬件问题和Debug配置问题&#xff09; 再次烧写不能识别下如图&#xff08;提示为不能识别到芯片&#xff09; 硬件识别正常 Debug配置正常 就是不能识别到芯片 2为什么会出现这个问题 在STM32Cude设置中没有设置SYS&#xff08;默认是No Debug&…

[Cesium for Supermap]加载iserver发布的wms服务

1&#xff0c;wms服务 2&#xff0c;加载代码 let provider new Cesium.WebMapServiceImageryProvider({url: "",enablePickFeatures:true,rectangle: Cesium.Rectangle.fromD…


距离软考考试的时间越来越近了&#xff0c;趁着这两周赶紧准备起来 今天给大家整理了——软考网络工程师考前冲刺几页纸&#xff0c;都是核心重点&#xff0c;有PDF版&#xff0c;可打印下来&#xff0c;每天背一点。 计算机总线分类 ①总线的分类&#xff1a;数据总线、地址总…




场景&#xff1a;夜间进行了断电维护&#xff0c;重启后发现业务无法使用&#xff0c;检查发现一个node节点显示NotReady. 去到目标服务器查看kubelet服务未成功启动 journalctl -u kubelet 执行journalctl -u kubelet 查看日志发现提示&#xff1a; ailed to run Kubelet: run…


一、UBuntu环境 二、官网下载QT 安装所需选择版本下载&#xff0c;可以现在windows下载在复制进去 三、安装QT 1、复制到ubuntu 2、打开终端&#xff0c;改变刚下载文件的权限 权限代号 r&#xff1a;读取权限&#xff0c;数字代号为 “…


一、绘制流程图页面配置 1、指定固定审批角色组织的实现 如上图红框部分&#xff0c;需要修改此处为需求对应。比如此时红框不支持指定某个部门下的指定角色这种组合判断的审批人。则需要修改页面变成选完角色同时也选择上部门统一生成一个group标识。 修改完后&#xff0c;生…


数据中台是一个通用性的基础平台&#xff0c;适用于各类行业场景&#xff0c;数据中台包含多元数据汇聚、数据标准化、数据开发、数据共享、数据智能、数据资产管理等功能&#xff0c;助力企业数字化转型。 数据汇聚 数据汇聚是将不同系统、不同类型的多元源数据汇聚至目标数据…


最近在研究高速直线识别&#xff0c;搜了一圈看了很多文章&#xff0c;确定了以下的主要流程。 霍夫变换 lines cv2.HoughLinesP(image,rho,theta,threshold,lines,minLineLength,maxLineGap)#概率霍夫变换 image:必须是二值图像&#xff0c;推荐使用canny边缘检测的结果图像…


文章目录 锁机制的应用1.实验目标2.实验过程记录(1).阅读kernel/kalloc.c(2).理解xv6的自旋锁工作原理(3).阅读kernel/bio.c(4).阅读kernel/sleeplock.c(5).重新设计内存分配器(6).重新设计磁盘缓存 3.问题及解决方案问题1问题2问题3 实验小结 锁机制的应用 1.实验目标 基于xv…