diff --git a/concurrency-mapreduce/README.md b/concurrency-mapreduce/README.md index f937508..b1082a3 100644 --- a/concurrency-mapreduce/README.md +++ b/concurrency-mapreduce/README.md @@ -98,19 +98,23 @@ header file that specifies exactly what you must build in your MapReduce library #ifndef __mapreduce_h__ #define __mapreduce_h__ -// Various function pointers -typedef char *(*Getter)(char *key); +// Different function pointer types used by MR +typedef char *(*Getter)(char *key, int partition_number); typedef void (*Mapper)(char *file_name); typedef void (*Reducer)(char *key, Getter get_func, int partition_number); -typedef unsigned long (*Partitioner)(char *key, int num_buckets); +typedef unsigned long (*Partitioner)(char *key, int num_partitions); -// Key functions exported by MapReduce +// External functions: these are what you must define void MR_Emit(char *key, char *value); -unsigned long MR_DefaultHashPartition(char *key, int num_buckets); -void MR_Run(int argc, char *argv[], - Mapper map, int num_mappers, - Reducer reduce, int num_reducers, - Partitioner partition); + +unsigned long MR_DefaultHashPartition(char *key, int num_partitions); + +void MR_Run(int argc, char *argv[], + Mapper map, int num_mappers, + Reducer reduce, int num_reducers, + Partitioner partition); + +#endif // __mapreduce_h__ ``` The most important function is `MR_Run`, which takes the command line @@ -161,7 +165,7 @@ void Map(char *file_name) { void Reduce(char *key, Getter get_next, int partition_number) { int count = 0; char *value; - while ((value = get_next(key)) != NULL) + while ((value = get_next(key, partition_number)) != NULL) count++; printf("%s %d\n", key, count); } @@ -195,8 +199,9 @@ invoked once per key, and is passed the key along with a function that enables iteration over all of the values that produced that same key. To iterate, the code just calls `get_next()` repeatedly until a NULL value is returned; `get_next` returns a pointer to the value passed in by the `MR_Emit()` -function above. The output, in the example, is just a count of how many times -a given word has appeared. +function above, or NULL when the key's values have been processed. The output, +in the example, is just a count of how many times a given word has appeared, +and is just printed to standard output. All of this computation is started off by a call to `MR_Run()` in the `main()` routine of the user program. This function is passed the `argv` array, and @@ -209,26 +214,26 @@ partitioning function. In most cases, programs will use the default function its implementation: ``` -unsigned long MR_DefaultHashPartition(char *key, int num_buckets) { +unsigned long MR_DefaultHashPartition(char *key, int num_partitions) { unsigned long hash = 5381; int c; while ((c = *key++) != '\0') hash = hash * 33 + c; - return hash % num_buckets; + return hash % num_partitions; } ``` The function's role is to take a given `key` and map it to a number, from `0` -to `num_buckets - 1`. Its use is internal to the MapReduce library, but +to `num_partitions - 1`. Its use is internal to the MapReduce library, but critical. Specifically, your MR library should use this function to decide -which Reduce thread gets a particular key/list of values to process. For some -applications, which Reducer thread processes a particular key is not -important (and thus the default function above should be passed in to -`MR_Run()`); for others, it is, and this is why the user can even pass in -their own partitioning function as need be. +which partition (and hence, which reducer thread) gets a particular key/list +of values to process. For some applications, which reducer thread processes a +particular key is not important (and thus the default function above should be +passed in to `MR_Run()`); for others, it is, and this is why the user can pass +in their own partitioning function as need be. One last requirement: For each partition, keys (and the value list associated -with said keys) should be *sorted* in ascending key order; thus, when a +with said keys) should be **sorted** in ascending key order; thus, when a particular reducer thread (and its associated partition) are working, the `Reduce()` function should be called on each key in order for that partition. diff --git a/concurrency-mapreduce/mapreduce.h b/concurrency-mapreduce/mapreduce.h index ab867cd..24e1aec 100644 --- a/concurrency-mapreduce/mapreduce.h +++ b/concurrency-mapreduce/mapreduce.h @@ -1,15 +1,16 @@ #ifndef __mapreduce_h__ #define __mapreduce_h__ -typedef char *(*Getter)(char *key); - +// Different function pointer types used by MR +typedef char *(*Getter)(char *key, int partition_number); typedef void (*Mapper)(char *file_name); typedef void (*Reducer)(char *key, Getter get_func, int partition_number); -typedef unsigned long (*Partitioner)(char *key, int num_buckets); +typedef unsigned long (*Partitioner)(char *key, int num_partitions); +// External functions: these are what you must define void MR_Emit(char *key, char *value); -unsigned long MR_DefaultHashPartition(char *key, int num_buckets); +unsigned long MR_DefaultHashPartition(char *key, int num_partitions); void MR_Run(int argc, char *argv[], Mapper map, int num_mappers,