for presentation on 5/28
Evaluating MapReduce for Multi-core and Multiprocessor Systems (HPCA 2007)
[ Abstract ]
This paper evaluates the suitability of the MapReduce model for multi-core and multi-processor systems.
MapReduce allows programmers to write functional-style code that is automatically parallelized and scheduled in a distributed system.
* Phoenix: an implementation of MapReduce for shared-memory systems that includes a programming API and an efficient runtime system.
What they did:
1. Study Phoenix with multi-core and symmetric multiprocessor systems.
2. Compare MapReduce code to code written in lower-level APIs such as P-threads.
[ 1 Introduction ]
Traditional parallel programming techniques (ex. message passing and shared-memory threads)
require programmers to do:
Manage concurrency explicitly by creating threads and synchronizing them through messages or locks.
Manual management of data locality.
-> Very difficult to write correct and scalable parallel code for non-trivial algorithms.
-> must re-tune the code when the application is ported to a different or larger-scale system.
To simplify parallel coding:
1. Practical programming model
2. Efficient runtime system
Phoenix: a programming API and runtime system based on Google's MapReduce model.
Map: processes the input data & generates a set of intermediate key/value pairs.
Reduce: properly merges the intermediate pairs which have the same key.
Phoenix implementation is based on the same principles but targets shared-memory systems such as multi-core chip and symmetric multiprocessors.
Compare the performance of Phoenix code to tuned parallel code written directly with P-threads.
[ 2 MapReduce Overview ]
==== 2.1 Programming Model
MapReduce: is inspired by functional languages and targets data-intensive computations.
Input data format is application specific, and is specified by the user.
Output is a set of <key, value> pairs.
Map is applied on the input data and produces a list of intermediate <key, value> pairs.
can be executed in parallel on non-overlapping portion of the input data.
Reduce is applied to all intermediate pairs with the same key.
can be executed in parallel on each set of intermediate pairs with same key.
Main benefit: Simplicity!
The programmer provides a simple description of the algorithm that focuses on functionality and not on parallelism. The actual parallelization and the details of concurrency management are left to the runtime system.
==== 2.2 Runtime System
The MapReduce runtime is responsible for parallelization and concurrency control.
To parallelize the Map, it splits the input pairs into units that are processed concurrently on multiple nodes. next, partitions the intermediate pairs using a scheme that keeps pairs with the same key in the same unit (processed in parallel by Reduce tasks). Finally, it must merge and sort the output pairs from all Reduce tasks.
Factors: the size of units, the number of nodes, how units are assigned to nodes dynamically, how buffer space is allocated.
Optimization:
Reduce function-call overheads by increasing the granularity of Map or Reduce tasks.
Reduce load imbalance by adjusting task granularity or the number of nodes used.
Optimize locality -> prefetch pairs for its current Map or Reduce tasks and prefetch the input for its next Map or Reduce task.
Fault tolerance: when it detects that a node has failed, re-assign the Map or Reduce task it was processing at the time to another node. avoid interference, replicated task will use separate output buffers.
Runtime can dynamically adjust the number of nodes.
There can be non-trivial overheads due to key management, data copying, data sorting, or memory allocation between execution steps.
[ 3 The Phoenix System ]
MapReduce for shared-memory systems.
Efficient execution on multiple cores without burdening the programmer with concurrency management.
Simple API: visible to application programmers
& Efficient Runtime: handles parallelization, resource management, and fault recovery.
==== 3.1 The Phoenix API
==== 3.2 The Phoenix Runtime
======== 3.2.1 Basic Operation and Control Flow
The runtime is controlled by the scheduler, which is initiated by user code.
The scheduler:
Creates and manages the threads that run Map and Reduce tasks.
Manages the buffers used for task communication.
Determines the number of cores to use for this computation.
Spawns a worker thread that is dynamically assigned some number of Map and Reduce tasks.
Map Stage>
Splitter: divide input pairs into equally sized units to be processed by the Map tasks.
Map tasks are allocated dynamically to workers and each one emits <key, value> pairs.
Partition: splits the intermediate pairs into units for the Reduce tasks.
All values of the same key go to the same unit.
The scheduler must wait for all Map tasks to complete before initiating the Reduce stage.
Reduce Stage>
Reduce tasks are also assigned to workers dynamically.
Must process all values for the same key in one task -> may exhibit higher imbalance across workers and dynamic scheduling is more important. Final output from all tasks is merged into a single buffer, sorted by keys.
======== 3.2.2 Buffer Management
All buffers are allocated in shared memory.
Map-Reduce buffers: store the intermediate output pairs. Each worker has its own set of buffer.
Reduce-Merge buffers: store the outputs of Reduce tasks before they are sorted. After sorting, the final output is available in the user allocated Output_data buffer.
======== 3.2.3 Fault Recovery
Phoenix detects faults through timeouts.
The execution time of similar tasks on other workers is used as a yardstick for the timeout interval.
Once a fault is detected or at least suspected, the runtime attempts to re-execute the failed task. Since the original task may still be running, separate output buffers are allocated for the new task to avoid conflict and data corruption.
The current Phoenix code does not provide fault recovery for the scheduler itself.
Google's MapReduce -> always spawn redundant executions of the remaining tasks. As they proactively assume that some workers have performance or failure issues.
======== 3.2.4 Concurrency and Locality Management
Three scheduling approaches
1. Default policy for the specific system
2. Dynamically determine the best policy for each decision
3. Allow the programmer to provide application specific policies.
* Number of Cores and Workers/Core:
Spawn workers to all available cores.
In system with multithreaded cores, we spawn one worker per hardware thread.
* Task Assignment:
To achieve load balance, we always assign Map and Reduce task to workers dynamically.
* Task size:
Each Map task processes a unit of the input data.
Adjust the unit size so that the input and output data for a Map task fit in the L1 data cache.
Tradeoff -> lower overheads (few larger units) & load balance (more smaller units)
* Partition Function:
Determines the distribution of intermediate data.
[ 4 Methodology ]
==== 4.1 Shared Memory Systems
CMP: UltraSparc T1 multi-core chip with 8 multithreaded cores sharing the L2 cache.
SMP: symmetric multiprocessor with 24 chips.
Promise: the same program should run as efficiently as possible on any type of shared-memory system without any involvement by the user.
==== 4.2 Applications
From domains>
Enterprise computing: WordCount, ReverseIndex, StringMatch
Scientific computing: Matrix Multiply
Artificial intelligence: Kmeans, PCA, Linear Regression
Image processing: Histogram
MapReduce version using Phoenix
& conventional parallel version using P-threads.
* WordCount: determine frequency of words in a file.
Map: process different sections of the input files -> <word, 1> (if the word was found)
Reduce: add up the values for each word
* MatrixMultiply: dense integer matrix multiplication.
Map: computes the results for a set of rows of the output matrix-> <(x,y), value>
Reduce: Identify function
* Reverse Index: Build reverse index for links in HTML files.
Map: parses a collection of HTML files -> <link_key, file_info>
Reduce: combines all files referencing the tame link into a single linked-list.
* Kmeans: Iterative clustering algorithm to classify 3D data points into groups.
Phoenix scheduler is called multiple times until it converges.
Map: finds distance between each point and each mean and assigns the point to the closest cluster.
emits <cluster_id, data_vector>
Reduce: gathers all points with the same cluster_id, find their centurion (mean vector)
emits <cluster_id, mean vector>
* String Match: Search file with keys for an encrypted word.
Map: processes "encrypt" file & "keys" file, parses a portion of the "keys" files and returns
<a word in the "keys" file, a flag to indicate whether it was a match as the value>
Reduce: Identify function.
* PCA: Principal components analysis on a matrix.
find the mean vector and the covariance matrix of a set of data points. Use 2 MapReduce iterations.
Map 1: compute the mean for a set of rows -> <row_number, mean>
Map 2: compute a few elements in the required covariance matrix, and is provided with the data required to calculate the value of those elements. -> <(row, column), covariance>
Reduce: identify function
* Histogram: Determine frequency of each RGB component in a set of image.
Map: processes different portion of the image -> <image, frequency of component occurrence>
Reduce: sum up numbers.
* Linear Regression: Compute the best fit line for a set of points.
Map: processes different portions of the file, compute certain summary statistics like sum of squares.
Reduce: finally determine the best fit line.
[ 5 Evaluation ]
Evaluation results for Phoenix using the CMP and SMP shared-memory systems.
==== 5.1 Basic Performance Evaluation
==== 5.2 Dependency to Dataset Size
Increasing the dataset leads to higher speedups over the sequential version for most applications.
Larger dataset allows the Phoenix runtime to better amortize its overheads.
Caching effects are more significant when processing large datasets and load imbalance is more rare.
==== 5.3 Dependency to Unit Size
The unit size determines the number of Map tasks, their memory footprint, and how well their overhead is amortized.
Applications with short term temporal locality in their access patterns perform better with smaller units.
Large units can lead to better performance for some applications which have significant portion of time spent on spawning tasks and merging their outputs.
==== 5.4 Comparison to Pthreads
For the applications that fit naturally into the MapReduce model, Phoenix leads to similar or slightly better speedups.
For Kmeans, PCA, and Histogram, P-threads outperform Phoenix significantly. For these applications, MapReduce program structure is not an efficient fit.
==== 5.5 Fault Recovery