Tuesday, October 25, 2011

MapReduce Questions and Answers

With all the hype and buzz surrounding NoSQL, I decided to have a look at it. I quickly found that there is not one NoSQL I could learn. Rather, there are various different solutions with different purposes and trade offs. Those various solutions tend to have one thing in common: processing of data in NoSQL storage is usually done using MapReduce framework.

Search on MapReduce found various scattered blog posts, some universities courses pages and one book that seems to contain almost everything other sources did.

This post contains MapReduce questions and answers based on the book. Basically, if I would be a student, this is what I would have made as a test preparation notes. If I would be a teacher, this is what I would ask on the exam.

First chapter gives credit where the credit is due, the rest contains questions. Last chapter contains hands-on coding exercises.

The Book

The book is named Data-Intensive Text Processing with MapReduce. If you are unsure whether you want to buy it or not, pre-production manuscript is available for free.

Do not be fooled by the title. The book is more about MapReduce itself and less about text processing. First half of the book describes general purpose tricks (design patterns) useful for any task. Second half contains a chapters on text processing, graph algorithms and expectation maximization.

The book contains almost everything I found on various blogs, university courses pages and much more.

Questions

Questions are split by book chapters. With one minor exception (counters), questions are mostly based on the first half of the book. Second half contains concrete algorithms and I wanted to focus on general purpose knowledge.

It does not mean that learning them is not useful. Especially Graph Algorithms chapter contains easily generalizable ideas.

All answers are by default collapsed. Any of them can be expanded by click on '[+] Click to Expand the Answer' link. Use following links to expand/collapse all of them:

2 MapReduce Basics
2.2 Mappers and Reducers
Describe general MapReduce algorithm. Split it into phases. For each phase include:
  • who is responsible (framework/programmer/customizable),
  • what it does,
  • phase input,
  • phase output.
The answer is:
MapReduce has four phases:
  • map,
  • combine,
  • shuttle and sort,
  • reduce.

Map phase is done by mappers. Mappers run on unsorted input key/values pairs. The same physical nodes that keeps input data run also mappers. Each mapper emits zero, one or multiple output key/value pairs for each input key/value pair. Output key/value pairs are called intermediate key/value pairs. Their type is usually different from input key/value pair type. Mapper must be supplied by programmers.

Combine phase is done by combiners. Combiner should combine key/value pairs with the same key together. Each combiner may run zero, once or multiple times. Framework decides whether and how many times to run the combiner, programmer has no control over it. Combiners output key/value pair type must be the same as its input key/value pair types.

Shuttle and sort phase is done by framework. Data from all mappers are grouped by the key, split among reducers and sorted by the key. Each reducer obtains all values associated with the same key. Programmer may supply custom compare function for sorting and partitioner for data split. All key/value pairs going to the same reducer are sorted by the key, but there is no global sorting.

Reducer obtains sorted key/[values list] pairs sorted by the key. Values list contains all values with the same key produced by mappers. Each reducer emits zero, one or multiple output key/value pairs for each input key/value pair. Output key/value pair type is usually different from input key/value pair type. Reducer must be supplied by programmers.

If the algorithm requires multiple MapReduce iterations, each combiner may increment global counter. Driver program would read the counter after the reduce phase. It then decides whether next iteration is needed or not.

Note: chapter 2 does not mention counters. They are explained later, in the chapter 5.
Decide if the statement is true or false: All MapReduce implementations implement exactly same algorithm.
The answer is:
False. For example, Google's implementation does not allow change of key in the reducer, but provides sorting for values. Hadoop does not provide values sorting, but reducer can change the key.
True or false: Each mapper must generate the same number of key/value pairs as its input had.
The answer is:
False. Mapper may generate any number of key/value pairs (including zero).
True or false: Mappers input key/value pairs are sorted by the key.
The answer is:
False. Mapper's input is not sorted in any way.
True or false: Mappers output key/value must be of the same type as its input.
The answer is:
False. Mapper may produce key/value pairs of any type.
True or false: Reducer is applied to all values associated with the same key.
The answer is:
True. Reducer is applied to all values associated with the same key.
True or false: Reducers input key/value pairs are sorted by the key.
The answer is:
True. Reducers input key/value pairs are sorted by the key.
implementation.


True or false: Each reducer must generate the same number of key/value pairs as its input had.
The answer is:
False. Reducer may generate any number of key/value pairs (including zero).
True or false: Reducers output key/value pair must be of the same type as its input.
The answer is:
False. The statement is false in Hadoop and true in Google's implementation.
2.3 The Execution Framework
What happens in case of hardware/software failure?
The answer is:
MapReduce framework must be able to recover from both hardware (disk failures, RAM errors) and software (bugs, unexpected exceptions) errors. Both are common and expected.
Is it possible to start reducers while some mappers still run? Why?
The answer is:
No. Reducer's input is grouped by the key. The last mapper could theoretically produce key already consumed by running reducer.
Define a straggler.
The answer is:
Straggler is either map or reduce task that takes unusually long time to complete.
What is speculative execution (also called backup tasks)? What problem does it solve?
The answer is:
Identical copy of the same task is executed on multiple nodes. Output of the fastest task used.

Speculative execution helps if the task is slow because of hardware problem. It does not help if the distribution of values over keys is skewed.
2.4 Partitioners and Combiners
What does partitioner do?
The answer is:
Partitioner divides key/values pairs produced by map tasks between reducers.
What does combiner do?
The answer is:
Combiner does local aggregation of key/values pairs produced by mapper before or during shuttle and sort phase. In general, it reduces amount of data to be transferred between nodes.

The framework decides how many times to run it. Combiner may run zero, one or multiple times on the same input.
Decide if the statement is true or false: Each combiner runs exactly once.
The answer is:
False. The framework decides whether combiner runs zero, once or multiple times.
2.5 The Distributed File System
Briefly describe HDFS architecture.
The answer is:
HDFS has one namenode and a lot of datanodes. Namenode is master and coordinates file operations, ensures integrity of the system and keeps namespace (metadata, directory structure, file to block mapping etc.).

Data are stored in big blocks on datanodes. Each block is stored on multiple, by default three, datanodes. Namenode checks whether datanodes work correctly and manages data replication.

Client contacts namenode which answers with data block id and datanode id. Datanode then sends data directly to the client.
Decide if the statement is true or false: HDFS is good at managing large number of small files.
The answer is:
False. HDFS is good at managing large files.
2.6 Hadoop Cluster Architecture
Explain difference between jobtracker and tasktracker?
The answer is:
Client executes jobs on jobtracker. Jobtracker runs on the master. Jobtracker monitors MapReduce jobs. It also coordinates mappers and reducers.

Tasktracker runs both user code and datanode daemon on slave nodes. It is never contacted by the client.
Explain mapper lifecycle.
The answer is:
Initialization method is called before any other method is called. It has no parameters and no output.

Map method is called separately for each key/value pair. It process input key/value pairs and emits intermediate key/value pairs.

Close method runs after all input key/value have been processed. The method should close all open resources. It may also emit key/value pairs.
Explain reducer lifecycle.
The answer is:
Initialization method is called before any other method is called. It has no parameters and no output.

Reduce method is called separately for each key/[values list] pair. It process intermediate key/value pairs and emits final key/value pairs. Its input is a key and iterator over all intermediate values associated with the same key.

Close method runs after all input key/value have been processed. The method should close all open resources. It may also emit key/value pairs.
3 MapReduce Algorithm Design
3.1 Local Aggregation
What is local aggregation and why is it used?
The answer is:
Either combiner or a mapper combines key/value pairs with the same key together. They may do also some additional preprocessing of combined values. Only key/value pairs produced by the same mapper are combined.

Key/Value pairs created by map tasks are transferred between nodes during shuffle and sort phase. Local aggregation reduces amount of data to be transferred.

If the distribution of values over keys is skewed, data preprocessing in combiner helps to eliminate reduce stragglers.
What is in-mapper combining? State advantages and disadvantages over writing custom combiner.
The answer is:
Local aggregation (combining of key/value pairs) done inside the mapper.

Map method does not emit key/value pairs, it only updates internal data structure. Close method combines and preprocess all stored data and emits final key/value pairs. Internal data structure is initialized in init method.

Advantages:
  • It will run exactly once. Combiner may run multiple times or not at all.
  • We are sure it will run during map phase. Combiner may run either after map phase or before reduce phase. The latter case provides no reduction in transferred data.
  • In-mapper combining is typically more effective. Combiner does not reduce amount of data produced by mappers, it only groups generated data together. That causes unnecessary object creation, destruction, serialization and deserialization.

Disadvantages:
  • Scalability bottleneck: the technique depends on having enough memory to store all partial results. We have to flush partial results regularly to avoid it. Combiner use produce no scalability bottleneck.
3.2 Pairs and Stripes
Explain Pair design patter on a co-occurence example. Include advantages/disadvantages against Stripes approach, possible optimizations and their efficacy.
The answer is:
Mapper generates keys composed from pairs of words that occurred together. The value contains the number 1. Framework groups key/value pairs with the same work pairs together and reducer simply counts the number values for each incoming key/value pairs.

Each final pair encodes a cell in co-occurrence matrix. Local aggregation, e.g. combiner or in-mapper combining, can be used.

Advantages:
  • Simple values, less serialization/deserialization overhead.
  • Simpler memory management. No scalability bottleneck (only if in-mapper optimization would be used).

Disadvantages:
  • Huge amount of intermediate key/value pairs. Shuffle and sort phase is slower.
  • Local aggregation is less effective - too many distinct keys.
Explain Stripes design patter on a co-occurence example. Include advantages/disadvantages against Pairs approach, possible optimizations and their efficacy.
The answer is:
Mapper generates a distinct key from each encountered word. Associated value contains a map of all co-occurred words as map keys and number of co-occurrences as map values. Framework groups same words together and reducer merges value maps.

Each final pair encodes a row in co-occurrence matrix. Combiner or in-mapper combining can be used.

Advantages:
  • Small amount of intermediate key/value pairs. Shuffle and sort phase is faster.
  • Intermediate keys are smaller.
  • Effective local aggregation - smaller number of distinct keys.

Disadvantages:
  • Complex values, more serialization/deserialization overhead.
  • More complex memory management. As value maps may grow too big, the approach has potential for scalability bottleneck.
Explain scalability bottleneck caused by stripes approach.
The answer is:
Stripes solution keeps a map of co-occurred words in memory. As the amount of co-occurred words is unlimited, the map size is unlimited too. Huge map does not fit into the memory and causes paging or out of memory errors.
3.3 Computing Relative Frequencies
Relative frequencies of co-occurrences problem:
Input: text documents
    key: document id
  value: text document

Output: key/value pairs where
    key: pair(word1, word2)
  value: #co-occurrences(word1, word2)/#co-occurrences(word1, any word)

Fix following solution to relative frequencies of co-occurrences problem:
class MAPPER
  method INITIALIZE
    H = new hash map    

  method MAP(docid a, doc d)
    for all term w in doc d do
      for all term u patri neighbors(w) do
        H(w) = H(w) + 1
        emit(pair(u, w), count 1)

  method CLOSE 
    for all term w in H
      emit(pair(w, *), H(w))    

class REDUCER
  variable total_occurrences = 0

  method REDUCE(pair (p, u), counts[c1, c2, ..., cn])
    s = 0
    for all c in counts[c1, c2, ..., cn] do 
      s = s + c

    if u = * 
      total_occurrences = s
    else
      emit(pair p, s/total_occurrences)

class SORTING_COMPARATOR
  method compare(key (p1, u1), key (p2, u2))
    if p1 = p2 AND u1 = *
      return key1 is lower

    if p1 = p2 AND u2 = *
      return key2 is lower

    return compare(p1, p2)
The answer is:
Partitioner is missing, framework could send key/value pairs with totals to different reducer than key/pairs with word pairs.
class PARTITIONING_COMPARATOR
  method compare(key (p1, u1), key (p2, u2))
    if p1 = p2 
      return keys are equal

    return keys are different

Describe order inversion design pattern.
The answer is:
Order inversion is used if the algorithm requires two passes through mapper generated key/value pairs with the same key. The first pass generates some overall statistic which is then applied to data during the second pass. The reducer would need to buffer data in the memory just to be able to pass twice through them.

First pass result is calculated by mappers and stored in some internal data structure. The mapper emits the result in closing method, after all usual intermediate key/value pairs.

The pattern requires custom partitioning and sort. First pass result must come to the reducer before usual key/value pairs. Of course, it must come to the same reducer.
3.4 Secondary Sorting
Describe value-to-key design pattern.
The answer is:
Hadoop implementation does not provide sorting for grouped values in reducers input. Value-to-key is used as a workaround.

Part of the value is added to the key. Custom sort then sorts primary by the key and secondary by the added value. Custom partitioner must move all data with the same original key to the same reducer.
3.5 Relational Joins
Describe reduce side join between tables with one-on-one relationship.
The answer is:
Mapper produces key/value pairs with join ids as keys and row values as value. Corresponding rows from both tables are grouped together by the framework during shuffle and sort phase.

Reduce method in reducer obtains join id and two values, each represents row from one table. Reducer joins the data.
Describe reduce side join between tables with one-to-many relationship.
The answer is:
We assume that the join key is primary key in table called S. Second table is called T. In other words, the table S in on the 'one' side of the relationship and the table T is on the 'many' side of the relationship.

We have to implement mapper, custom sorter, partitioner and reducer.

Mapper produces key composed from join id and table flag. Partitioner splits the data in such a way, that all key/value pairs with the same join id goes to the same reducer. Custom sort puts key/value pair generated from the table S right before key/value pair with the same join id from the table T.

Reducers input looks like this:
((JoinId1, s)-> row)
((JoinId1, t)-> [rows])
((JoinId2, s)-> row)
((JoinId2, t)-> [rows])
...
((JoinIdn, s), row)
((JoinIdn, t), [rows])

The reducer joins all rows from s pair with all rows from following t pair.
Describe reduce side join between tables with many-to-many relationship.
The answer is:
We assume that data are stored in tables called S and T. The table S is smaller. We have to implement mapper, custom sorter, partitioner and reducer.

Mapper produces key composed from join id and table flag. Partitioner splits the data in such a way, that all key/value pairs with the same join id goes to the same reducer. Custom sort puts the key/value pairs generated from the table S is right before all key/value pair with the data from the table T.

Reducers input looks like this:
((JoinId1, s)-> [rows])
((JoinId1, t)-> [rows])
((JoinId2, s)-> [rows])
((JoinId2, t)-> [rows])
...
((JoinIdn, s), [rows])
((JoinIdn, t), [rows])

The reducer buffers all rows with the same JoinId from the table S into the memory and joins them with following T table rows.

All data from the smaller table must fit into the memory - the algorithm has scalability bottleneck problem.
Describe map side join between two database tables.
The answer is:
Map side join works only if following assumptions hold:
  • both datasets are sorted by the join key,
  • both datasets are partitioned the same way.

Mapper maps over larger dataset and reads corresponding part of smaller dataset inside the mapper. As the smaller set is partitioned the same way as bigger one, only one map task access the same data. As the data are sorted by the join key, we can perform merge join O(n).
Describe memory backed join.
The answer is:
Smaller set of data is loaded into the memory in every mapper. Mappers loop over larger dataset and joins it with data in the memory. If the smaller set is too big to fit into the memory, dataset is loaded into memcached or some other caching solution.
Which one is faster? Map side join or reduce side join?
The answer is:
Map side join is faster.
4 Inverting Indexing for Text Retrieval
The chapter contains a lot of details about integer numbers encoding and compression. Since these topics are not directly about MapReduce, I made no questions about them.
4.4 Inverting Indexing: Revised Implementation
Explain inverting index retrieval algorithm. You may assume that each document fits into the memory. Assume also then there is a huge number of documents. Which part should be optimized by integer encoding and compression?
Input: text documents
    key: document id
  value: text document

Output: key/value pairs where
    key: word 
  value: list[documentId, numberOfOccurences]
         list elements must be sorted by numberOfOccurences
The answer is:
Mapper counts number of occurrences in the document for each word. As the whole document fits into the memory, we can hold partial results in a map.
Intermediate key/values: 
    key: word, numberOfOccurences
  value: documentId

Custom partitioner groups intermediate key/values by word. Custom sort sorts them primary by word and secondary by the number of occurrences.

Reducer uses initialize method to initialize list of all postings. Reduce method handles two cases:
  • current word equal to previous word - add documentId and numberOfOccurences to posting list.
  • current word equal to previous word - emit previous word and posting list; initialize posting list.

Posting list in reducer should be compressed.
class MAPPER
  method INITIALIZE
    H = new hash map    

  method MAP(docid, doc d)
    H = new hash map
    for all term w in doc d do
        H(w) = H(w) + 1

    for all term w in H do
        emit(pair(u, w), count 1)

  method CLOSE 
    for all term w in H
      emit(pair(w, H(w)), docid)    

class REDUCER
  variable previous_word = 0
  variable PL = new list of postings

  method REDUCE(pair (w, #occurrences), docid)
    if w <> previous_word && previous_word <> 0 do
      emit(w, PL)
      PL = new list of postings
    
    PL.add(pair(#occurrences, docid))
    previous_word = w

  method compare(key (w1, o1), key (w2, o2))
    if w1 = w2 
      return keys are equal

    return keys are different

class SORTING_COMPARATOR
  method compare(key (w1, o1), key (w2, o2))
    if w1 = w2 do
      return compare(o1, o2)
         
    return compare(w1, w2)

5 Graph Algorithms
The chapter contains two algorithms: shortest path in the graph and page ranking algorithm. The questions are straightforward.
5.2 Parallel Breadth-First Search
Find shortest path from one node origin to all other nodes. Each edge has a weight associated. Input key/value pairs have already bean preprocessed into comfortable form.
Input: graph
    key: node id
  value: distance to origin, list[adjacent node, edge length]

Output: key/value pairs where
    key: node id
  value: distance to origin, list[adjacent node, edge length]
The answer is:
The algorithm requires multiple iterations. It stops the iteration does not change any 'distance to origin'. At worst, there will be O(n) iterations where n is a number of nodes in the graph.

Mapper passes original graph to the next iteration as it is. Plus, it generates key/value pair for each adjacent node. The value contains the minimum known distance from origin if the route would go through node.
class MAPPER
  method MAP(node, pair(dist, adjacencylist))
    emit(node, pair(dist, adjacencylist))
    for all (closenode, nodedist) in adjacencylist do
      emit(closenode, pair(dist + nodedist, empty))

Reducer finds the minimum known distance from each node. It passes the distance along with the original graph to the next iteration. It also increments global counter whenever minimum known distance to any node changes.
class REDUCER
  method REDUCE(node, list(dist, adjacencylist))
    minimum = infinity
    previous_iteration_solution = infinity
    original_graph = empty
    for all (dist, adjacencylist) in list do
      if adjacencylist not empty do 
        original_graph = adjacencylist
        previous_iteration_solution = dist
      if minimum > dist
        minimum = dist
    
    if previous_iteration_solution <> minimum
      increment global counter
    emit(node, pair(minimum, original_graph)) 

If the global counter is 0, the algorithm stops. Otherwise another iteration is needed.
Explain page rank algorithm, assume alpha = 0.
The answer is:
Page rank P(n) of a page n is calculated form page ranks of all pages linking to it.
P(n) = sum_m (P(m)/C(m))
The sum goes through all pages m linking to the page n. C(m) is the number of outgoing links of the page m.

Page rank algorithm runs in iterations. Mapper passes page rank contribution of each page to adjacent pages. Reducer updates page rank of each node. The algorithm stops when page ranks no longer moves.
class MAPPER
  method MAP(page, (page_rank, adjacency_list))
    emit(page, (0, adjacency_list))
    contribution = page_rank/adjacency_list.length
    for all node in adjacency_list do
      emit(node, (contribution, empty))

class REDUCER
  method REDUCE(page, contributions[c1, c2, ..., cn])
    rank = 0
    adjacency_list = new list
    for all c in contributions do
      adjacency_list.addAll(c.adjacency_list)
      rank = rank + c.contribution 

    emit(page, (rank, adjacency_list))

6 EM Algorithms For Text Processing
I made no questions out of this chapter.

Exercises

This chapter contains hands-on exercises for MapReduce. Some of them require multiple iterations.

Warm Up
Count number of occurrences of every word in a text collection.
Input:
    key: document id,
  value: text document.

Output:
    key: word,
  value: number of occurences.
The answer is:
Intermediate pairs:
    key: word
  value: integer - how many times was the word seen in the input.
class MAPPER
  method MAP(docid a, doc d)
    for all term w in doc d do
      emit(w, 1)

class COMBINER
  method COMBINE(word w, counts[c1, c2, ..., cn])
    s = 0
    for all c in counts[c1, c2, ..., cn] do 
      s = s + c

    emit(word w, s)

class REDUCER
  variable total_occurrences = 0

  method REDUCE(word w, counts[c1, c2, ..., cn])
    s = 0
    for all c in counts[c1, c2, ..., cn] do 
      s = s + c

    emit(word w, s)

Alternative solution would use in-mapper combining.
Web Store
Website user log contains user ids and length of each session. The website has modest number of registered users. Compute the average session length for each user.
Input:
    key: user id,
  value: session length.

Output:
    key: user id,
  value: average session length.
The answer is:
As the number of registered users is modest, we can use in-mapper combining.
class MAPPER
  variable total_time = new hash map 
  variable sessions_number = new hash map 

  method MAP(user_id, session_length)
    total_time(user_id) = total_time(user_id) + session_length
    sessions_number(user_id) = sessions_number(user_id) + 1

  method CLOSE 
    for all user_id in total_logged_in_time
      tt = total_time(user_id)
      sn = sessions_number(user_id)
      emit(user_id, pair(tt, sn))    

class REDUCER
  method REDUCE(user_id, [pairs(time, sessions_number)])
    total_time = 0
    total_sessions = 0
    for all pairs in [pairs(time, sessions_number)] do 
      total_time = total_time + time
      total_sessions = total_sessions + sessions_number

    emit(user_id, total_time/total_sessions)

Web store log contains user id and bought item for each sale. You need to implement "buyers of item also bought" functionality. Whenever the item is shown, the store will suggest five items most often bought by items buyers.
Input:
    key: user id,
  value: brought item.

Output:
    key: item,
  value: list of five most common "buyers of item also bought" items.
The answer is:
Our solution has two iterations. First iteration generates lists of all items brought by the same user. Grouping is done by the framework, both mapper and reducer perform an identity function.
Input:
    key: user id,
  value: brought item.

Output:
    key: user id,
  value: list of all brought items.
class MAPPER
  method MAP(user_id, item)
    emit(user_id, item)

class REDUCER
  method REDUCE(user_id, items[i1, i2, ..., in])
    emit(user_id, items)

Second iteration solves co-occurrences problem on list items. It uses the stripes approach. Only difference against the standard solution is that we have emit only five most common co-occurrences.
Input:
    key: user id,
  value: list of all brought items.

Output:
    key: item,
  value: list of five most common co-occurrences.
class MAPPER
  method MAP(user_id, items[i1, i2, ..., in])
    for all item in items do
      H = new hash map
      for all item j in items do
        H(j) = H(j) + 1
      emit(item, H)

class REDUCER
  method REDUCE(item, stripes[H1, H2, ..., Hn])
    T = new hash map
    for all H in stripes do
      for all (key/value) in H do
        T(key) = T(key) + value
    emit(user_id, max_five(T))

Web store log contains user id, timestamp, item and number of brought pieces for each sale. The store is looking for items whose sales rise or decline at the same time. Find 20 item couples with maximum of such months.
Input:
    key: user id,
  value: timestamp, brought item, count.

Output:
    key: item, item
  value: number of months when both items sales rose or decline.
      #: the output contains 20 key/value pairs with maximum value
The answer is:
Our solution requires multiple MapReduce iterations. We have to:
  • calculate whether items sales for any given month went up or down,
  • create lists of items with the same sales change during the same month,
  • find number of co-occurrences in those lists,
  • choose items with maximum co-occurrences.

First iteration calculates sales changes for any given month. We have to supply mapper, partitioner, custom sort and reducer. Mapper generates one intermediate key/value pair for each input key/value. Key is composed of sold item and sales month. Value contains number of sold pieces.

Partitioner sends all key/value pairs with the same item to the same reducer. Custom sort sorts them by months. Finally, reducer calculates sales changes.
Input:
    key: user id,
  value: timestamp, item, count.

Intermediate key/values:
    key: item, month 
  value: count.

Output:
    key: month, up/down/equal
  value: item.
class MAPPER
  method MAP(user_id, (timestamp, item, count))
    month = get_month(timestamp) 
    emit((item, month), count)

class PARTITIONING_COMPARATOR
  method compare(key (item1, month1), key (item2, month2))
    if item1 = item2 
      return keys are equal

    return keys are different

class SORTING_COMPARATOR
  method compare(key (item1, month1), key (item2, month2))
    if item1 = item2 do
      return compare(month1, month2)
         
    return compare(item1, item2)

class REDUCER
  method REDUCE((item, month), counts[c1, c2, ..., cn])
    c = sum([c1, c2, ..., cn])
    if last_item = item
      if last_month + 1 = month
        //emit correct up/down/equal flags
        if last_count < count
          emit((item, month), up)
        if last_count > count
          emit((item, month), down)
        if last_count = count
          emit((item, month), equal)
      else
        //no sales during some months
        emit((item, last_month + 1), down)
        emit((item, month), up)
    else 
      // new item
      emit((last_item, last_month + 1), down)
      emit((item, month), up)

    last_item = item
    last_count = count
    last_month = month

Second iteration groups first iteration results by keys. It generates lists of items with same sales changes during the same month. Framework does all the work. Both mapper and reducer perform an identity function.
Input:
    key: month, up/down/equal
  value: item.

Output:
    key: month, up/down/equal
  value: [items].
Third iteration performs standard 'co-occurrences by pairs' algorithm.
Input:
    key: month, up/down/equal
  value: [items].

Intermediate key/values:
    key: item, item
  value: partial number of co-occurrences.

Output:
    key: item, item
  value: number of months when both items sales rose or decline.
      #: the output contains all items couples
class MAPPER
  method MAP((month, change), items[i1, i2, ..., in])
    for each i in items do 
      for each j in items do
        if i != j 
          emit((i, j), 1) 

class COMBINER
  method COMBINE((item1, item2), co-occurrences[c1, c2, ..., cn])
    s = 0
    for all c in co-occurrences[c1, c2, ..., cn] do 
      s = s + c

    emit((item1, item2), s)

class REDUCER
  method REDUCE((item, item), co-occurrences[c1, c2, ..., cn])
    s = 0
    for all c in co-occurrences[c1, c2, ..., cn] do 
      s = s + c

    emit((item1, item2), s)

Finally, we have to choose 20 key/value pairs with maximum value. Each mapper selects 20 key/value pairs with maximum value and emits them with the same key. There will be only one reducer which selects final 20 key/value pairs.
Input:
    key: item, item
  value: number of months when both items sales rose or decline.
      #: the output contains all items couples

Intermediate key/values:
    key: 1
  value: item, item, number of months when both items sales rose or decline.
      #: the output contains 20 key/value pairs with maximum value for each mapper

Output:
    key: item, item
  value: number of months when both items sales rose or decline.
      #: the output contains 20 key/value pairs with maximum value
the code is very simple but long

Criminal Agency
Inputs to all exercises in this chapter uses the same data structure.

Criminal agency stole Facebook's friendships database and wants to analyze new data. Friendships are stored in form key/value pairs, each friendship corresponds to two key/value pairs:
Friends:
    key: first friend name
  value: second friend name

    key: second friend name
  value: first friend name

The agency owns also criminal records of all citizens:
Criminal record:
    key: citizen name
  value: when, where, accomplices, description
Find at risk youths. A person is considered at risk youth if more than half of his friends have criminal record.
The answer is:
Our solution has two iterations. First iteration joins two sets and flags each 'value friend' with has_record/law_abiding flags.
Output:
    key: first friend  
    value: second friend, has_record/law_abiding
The mapper flags each key with data set name. Partitioner groups data according to names in keys and sorter puts criminal records before friendships. We could use local aggregation to remove multiple criminal records for the same person.
class MAPPER
  method MAP(name, value)
    if value is name do
      emit(name, friendship, item)
    else
      emit(name, criminal, item)

class PARTITIONING_COMPARATOR
  method compare(key (name1, dataset1), key (name2, dataset2))
    if name1 = name2
      return keys are equal
 
    return keys are different
 
class SORTING_COMPARATOR
  method compare(key (name1, dataset1), key (name2, dataset2))
    if name1 = name2 AND dataset1 is criminal
      return key1 is lower

    if name1 = name2 AND dataset2 is criminal
      return key2 is lower

    return compare(name1, name2)

class REDUCER
  variable previous_name

  method REDUCE(pair(name, flag), items[i1, i2, ..., in])
    if flag is criminal do 
      previous_name = name
      has_record = criminal
      return 

    if previous_name <> name do 
      has_record = law_abiding
    else 
      has_record = criminal

    previous_name = name
    for all i in items do
      emit(i.name, pair(name, has_record))

Second iteration counts both total number of friends and number of friends with criminal record. Reducer emits key/value pairs only for at risk youths. Also this iteration could use some kind of local aggregation.
Intermediate key/value:
    key: name  
    value: total friends, total friend criminals
    # totals are relative only to in data sets subsets 

Output:
    key: name  
    value: empty
    # only at risk youths
class MAPPER
  method MAP(name, pair(name, has_record))
    if has_record is law_abiding do
      emit(name, pair(0, 1))
    else
      emit(name, pair(1, 1))

class REDUCER
  method REDUCE(name, items[pair(total, criminals)])
    total = 0
    criminals = 0
    for all i in items do
      total = total + i.total
      criminals = criminals + i.criminals

    if criminals / total > 0.5 do
      emit(name, empty) 

Find gangs. Gang is a group of people that:
  • has exactly 5 members,
  • each member is friend with all other members,
  • each two members committed at least 3 crimes together.
The answer is:
Again, we need three iterations. The idea is to first clean up the graph of all useless edges, so that only criminal contacts remain. Then, we split graph into smaller manageable sub-graphs. We attach all criminal contacts and edges between them to each person:
Last iteration reducers input:
    key: person
    values: all his criminal contacts and relationships between them.
Final reducer takes smaller graphs represented by value in each key/value pair and finds complete sub-graphs with 4 vertices in it. Add person from the key in it, and you have found a complete sub-graph with 5 vertices. The reducer may use any polynomial algorithm.

First iteration uses pairs approach to clear the graph. We omit both local aggregation and removal of duplicities. Both would make the algorithm more efficient.
Intermediate key/values:
    key: first friend, second friend, friendship/accomplice
    value: 1

Output:
    key: first friend, second friend
    value: empty
    # only friends with common criminal record

class MAPPER
  method MAP(name, value)
    if value is name do
      emit(triple(name, value, friendship), empty)
    else
      for all crime_accomplice in value.accomplices do
        emit(triple(name, crime_accomplice, accomplice), 1)

class PARTITIONING_COMPARATOR
  method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
    if name1 = name2 AND accomplice1 = accomplice2
      return keys are equal
 
    return keys are different
 
class SORTING_COMPARATOR
  method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
    if name1 = name2 AND accomplice1 AND flag1 is friendship
      return key1 is lower

    if name1 = name2 AND accomplice1 AND flag2 is friendship
      return key2 is lower

    return compare(pair(name1, accomplice1), pair(name2, accomplice2))

class REDUCER
  variable previous_name
  variable previous_accomplice

  method sameAsPrevious(name, accomplice) 
    if previous_name <> name
      return false

    if previous_accomplice <> accomplice
      return false

    return true

  method REDUCE(triple(name, accomplice, flag), items[i1, i2, ..., in])
    if sameAsPrevious(name, accomplice) do 
      if items.length > 2 do 
        emit(name, accomplice)
      return

    if flag is friendship do 
      previous_name = name
      previous_accomplice = accomplice

Second iteration attaches lists of all 'second degree' friends to edges:
Input
    key: first friend, second friend
    value: empty

Intermediate key/values:
    key: first friend
    value: first friend, second friend

    key: second friend
    value: first friend, second friend

Output:
    key: first friend, second friend
    value: all friends of second friend 

    key: second friend, first friend
    value: all friends of first friend 
class MAPPER
  method MAP((first friend, second friend), empty)
    emit(first friend, (first friend, second friend))
    emit(second friend, (first friend, second friend))

class REDUCER
  method REDUCE(name, edges[e1, e2, ..., en])
    friends = new Set
    friends.add(name)

    for all edge in edges do
      friends.add(edge.v1, edge.v2)

    for all edge in edges do
       emit(edge, friends)      

Finally, mapper and shuffle and sort phase together generate lists of all friends of any given person and relationships between them.
Input
    key: friend 1, friend 2
    value: all friends of friend 2

Intermediate key/values:
    key: friend 1
    value: friend 2, all friends of friend 2

Reducers input (after shuffle and sort):
    key: person
    values: all his friends and relationships between them.

Output:
    key: first friend, second friend, third friend, fourth friend, fifth friend 
    value: gang
class MAPPER
  method MAP((friend , friend 2), all friends of second friend)
    emit(friend 1, (friend 2, all friends of friend 2))

class REDUCER
  method REDUCE(name, graph attached to it)
    any polynomial algorithm will work

6 comments:

Anonymous said...

Nice!

PS: Link to "Data-Intensive Text Processing with MapReduce" book is broken

Meri said...

@Anonymous Thank you, I fixed the link. They have it hosted on Github now.

Anonymous said...

This covers most topics on hadoop, thanks

Kumar said...

Thanks a lot.its gives more idea about Mapreduce

Anonymous said...

excellent!!!

Eqbal Murad said...

Thanks Meri .. nice article

Post a Comment