6.5840 Lab 1: MapReduce

Series - 6.5840

“In this lab you’ll build a MapReduce system. You’ll implement a worker process that calls application Map and Reduce functions and handles reading and writing files, and a coordinator process that hands out tasks to workers and copes with failed workers. You’ll be building something similar to the MapReduce paper.”

This lab implements a simplified version of the MapReduce system. The system consists of a coordinator and multiple workers. The coordinator assigns tasks to workers and handles failures. The workers execute the tasks and report back to the coordinator.

Some notable simplifications from the original MapReduce paper include:

  • The coordinator does not ping every worker periodically, we simply assume each task takes at most 10 seconds to complete. If a task has not been completed in 10 seconds, we reschedule it. Therefore we don’t even need to keep track of which worker is working on which task.
  • The coordinator does not need to handle stragglers. We can choose to implement this if we want to, but it is not required for the tests to pass.
  • We don’t really need to pass the locations of the intermediate files back to the coordinator if we use a consistent naming scheme, because we assume the workers and the coordinator share the same file system.
  • We simply load the entire intermediate file into memory, rather than streaming it in from disk and use it with an iterator. We don’t really care that much about memory usage.
  • The original paper tries to overlap the map and reduce phases, the idea being that we can start transfering intermediate files to reduce workers before all map tasks are completed, as the data transfer is a huge bottleneck. We don’t need to implement this feature, we simply wait for all map tasks to complete before starting the reduce phase because we don’t need to transfer data at all.

Go is a terrible language for data modeling, it throws decades of progress in programming language design away: it does not even have a sum type in 2024. Come on, even C++ got std::variant. (No, interface{} is not a solution.) Even worse, there is no enum or exhaustive switch statement. const and iota is a step back even from C. We will continue discussing the data model while disgustingly ignoring all the representable illegal states.

What does the coordinator need to keep track of?

  • The state of each map and reduce task: idle, in progress, completed.
  • The number of map and reduce tasks. nMap and nReduce.
  • The input file path of each map task.

What I want to write:

hs

data TaskStatus = Idle | InProgress | Completed
newtype MapTaskId = MapTaskId Int
newtype ReduceTaskId = ReduceTaskId Int
data MapTask = MapTask MapTaskId TaskStatus FilePath
data ReduceTask = ReduceTask ReduceTaskId TaskStatus
data Task = MapT MapTask | ReduceT ReduceTask | SleepT | HaltT
data Coordinator = Coordinator {mapTasks :: [MapTask], reduceTasks :: [ReduceTask], nMap :: Int, nReduce :: Int}

What I have to write:

go

type TaskType uint8

const (
  MapTask TaskType = iota
  ReduceTask
  HaltTask
  SleepTask
)

type TaskStatus uint8

const (
  TaskIdle TaskStatus = iota
  TaskInProgress
  TaskCompleted
)

type TaskId uint64

type Task struct {
  taskType   TaskType
  taskId     TaskId
  taskStatus TaskStatus
  filename   string
}

type Coordinator struct {
  mu          sync.Mutex
  mapTasks    map[TaskId]*Task
  reduceTasks map[TaskId]*Task
  nMap        int
  nReduce     int
}

Now it’s easy to define the Done method. We are done if and only if all map tasks and reduce tasks are in the TaskCompleted state.

We define two RPC methods RequestTask for workers to request new tasks from the coordinator and CompleteTask for workers to report task completion back to the coordinator.

When we request a new task:

  • If there exists an idle map task, we return it and mark it as in progress.
  • If there are no idle map tasks but there are in-progress map tasks, we return a sleep task.
  • If all map tasks are completed and there exists an idle reduce task, we return it and mark it as in progress.
  • If there are no idle reduce tasks but there are in-progress reduce tasks, we return a sleep task.
  • If all map and reduce tasks are completed, we return a halt task.

We can manage the 10 second timeout by using a goroutine that sleeps for 10 seconds and then check for the task status. If the task is still in progress, we reschedule it, and it will be picked up by the next worker that requests a task.

go

func (c *Coordinator) rescheduleTimeoutTask(task *Task) {
  time.Sleep(10 * time.Second)

  c.mu.Lock()
  defer c.mu.Unlock()

  if task.taskStatus != TaskCompleted {
    log.Printf("Timeout for task %v, rescheduling...\n", *task)
    task.taskStatus = TaskIdle
  }
}

When we complete a task we simply mark its state to completed.

The worker spins in a loop, requesting tasks from the coordinator and completing them until it receives a halt task.

Unlike the original paper, we use a global file system for both the intermediate files and the output files. Therefore, we need to ensure that the file renaming operation is atomic on both occasions. We can achieve this with something like the following:

go

func atomicRename(f func(*os.File) error, dir string, pattern string, filename string) error {
  file, err := os.CreateTemp(dir, pattern)
  if err != nil {
    return fmt.Errorf("atomicRename: cannot create temp file: %w", err)
  }
  err = f(file)
  if err != nil {
    return fmt.Errorf("atomicRename: f failed: %w", err)
  }
  file.Close()
  err = os.Rename(file.Name(), filename)
  if err != nil {
    return fmt.Errorf("atomicRename: rename failed: %w", err)
  }
  return nil
}

Err, error handling in Go is really noisy.

The coordinator as an RPC server will be concurrent. So the easiest way to synchronize access is to just have a big lock around everything. We can use a sync.Mutex for this.

This concludes Lab 1.