September 5, 2008

Three different implementations of MapReduce

So far as I can see, there are three implementations of MapReduce that matter for enterprise analytic use – Hadoop, Greenplum’s, and Aster Data’s.* Hadoop has of course been available for a while, and used for a number of different things, while Greenplum’s and Aster Data’s versions of MapReduce – both in late-stage beta – have far fewer users.

*Perhaps Nokia’s Disco or another implementation will at some point join the list.

Earlier this evening I posted some Mike Stonebraker criticisms of MapReduce. It turns out that they aren’t all accurate across all MapReduce implementations. So this seems like a good time for me to stop stalling and put up a few notes about specific features of different MapReduce implementations. Here goes.

Aster Data has actually posted quite a bit about MapReduce, including in its excellent blog, which has joined Vertica’s more academic Database Column as being among the best vendor blogs in the business. There’s also an excellent Aster Data MapReduce white paper, if you can stand to register for it, which among other virtues illustrates Aster’s MapReduce syntax.

Greenplum’s MapReduce marketing materials, while sparser, aren’t bad either. The syntax isn’t published yet, however, so here’s a documentation excerpt Greenplum helpfully emailed to me.

# The following map reduce script takes as input two tables:

# – documents (doc_id integer, url text, data text)

# – keywords (keyword_id integer, keyword text)

#

# The keywords are “searched for” in the documents data and the results

# of url, data and keyword found are printed to the screen. A given

# keyword can actually be composed of multiple terms:

# ie “computer software” is a keyword that contains two terms.

%YAML 1.1

VERSION: 1.0.0.1

DATABASE: webdata

USER: jsmith

DEFINE:

# First we declare both of the input tables.

# Alternative input formats include accessing external files

# or executing arbitrary SQL.

- INPUT:

NAME: doc

QUERY: select * from documents

- INPUT:

NAME: kw

QUERY: select * from keywords

# Next we create MAP functions to extract “terms” from both documents

and from

# keywords. This implementation simply splits on whitespace, in

general it would be

# possible to make use of a python library like nltk (the natural

language toolkit) to

# perform more complex tokenization and word stemming.

- MAP:

NAME: doc_map

LANGUAGE: python

FUNCTION: |

i = 0 # “i” tracks the index of a word within the

document

terms = {} # a hash of terms with their indices within the

document

# Lower-case and split the text string on space

for term in data.lower().split():

i = i + 1 # Increment i

# Check for the term in the terms list:

#

# – If stem word already exists, append the i value to the

array entry

# corresponding to the term. This will allow for multiple

times for a

# word’s occurance to show up.

#

# – If stem word does not already exist in the terms list for

the line of

# text currently being processed, the stem word is added to

the dictionary

# along with the position i.

#

# Example:

# data: “a computer is a machine that manipulates data”

# “a” [1, 4]

# “computer” [2]

# “machine” [3]

# …

if term in terms:

terms[term] += ‘,’+str(i)

else:

terms[term] = str(i)

# Return multiple lines for each document. Each line consists of

the doc_id, a

# term and the positions in the data where the term appeared.

#

# Example:

# (doc_id => 100, term => “a”, [1,4]

# (doc_id => 100, term => “computer”, [2]

# …

for term in terms:

yield([doc_id, term, terms[term]])

OPTIMIZE: STRICT IMMUTABLE

MODE: MULTI

PARAMETERS:

- doc_id integer

- data text

RETURNS:

- doc_id integer

- term text

- positions text

# The function for keywords is almost identical to the one for

documents,

# but we also want a count of the number of terms in the keyword.

- MAP:

NAME: kw_map

LANGUAGE: python

FUNCTION: |

i = 0

terms = {}

for term in keyword.lower().split():

i = i + 1

if term in terms:

terms[term] += ‘,’+str(i)

else:

terms[term] = str(i)

# note we output 4 values including “i” the total count

for term in terms:

yield([keyword_id, i, term, terms[term]])

MODE: MULTI

OPTIMIZE: STRICT IMMUTABLE

PARAMETERS:

- keyword_id integer

- keyword text

RETURNS:

- keyword_id integer

- nterms integer

- term text

- positions text

# A “TASK” is an object that defines an entire “input/map/reduce” stage

within a map

# reduce pipeline, it is like an execution except that it is not

immediately executed

# and can be used as inputs to further proccessing stages.

#

# Identify a task called doc_prep which takes in the “doc” source (INPUT

defined above)

# This task uses the mapper called doc_map that returns doc_id, term,

[term_position] records

- TASK:

NAME: doc_prep

SOURCE: doc

MAPPER: doc_map

# Identify a task called kw_prep which takes in the “kw” source (INPUT

defined above)

# This task uses the mapped called kw_map that returns kw_id, term,

[term_position] records

- TASK:

NAME: kw_prep

SOURCE: kw

MAPPER: kw_map

# One of the advantages of Greenplum MapReduce is that it can be freely

mixed with SQL

# operations either as an input for MapReduce stages or as processing

over MapReduce

# stages.

#

# Identify an input source that will be created from a sql join of the

doc_prep output

# and the kw_prep output where terms are the same. This creates a list

of the “candidate”

# keywords in a document, ie any keyword that shares at least one term

with the document.

- INPUT:

NAME: term_join

QUERY: |

SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms,

doc.positions as doc_positions,

kw.positions as kw_positions

FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term =

kw.term)

# In Greenplum MapReduce a REDUCER is defined with one or more

functions.

#

# A reducer has an initial ‘state’ variable defined for each grouping

key that is

# adjusted by every value in that key grouping by means of a TRANSITION

function.

#

# If present an optional (name pending) function can be defined that

combines multiple

# ‘state’ variables. This allows the TRANSITION function to be executed

locally on the

# mapping segments and only ship the accumulated ‘state’ over the

network thus reducing

# network bandwidth.

#

# If present an optional FINALIZER function can be used to perform final

computation on a

# state and emit one or more rows of output from the state.

#

# – Create a reducer function that is called term_reducer, with

transition function

# called term_transition and finalizer called term_finalizer

- REDUCER:

NAME: term_reducer

TRANSITION: term_transition

FINALIZER: term_finalizer

- TRANSITION:

NAME: term_transition

LANGUAGE: python

PARAMETERS:

- state text

- term text

- nterms integer

- doc_positions text

- kw_positions text

FUNCTION: |

# STATE

# – initial value: ”

# – is a colon delimited set of keyword positions

# – keyword positions are a comma delimited set of integers

#

# an example STATE could be ‘1,3,2:4:’

#

# If we have an existing state split it into the set of keyword

positions

# Otherwise construct an set of nterms keyword positions – all

empty

if state:

kw_split = state.split(‘:’)

else:

kw_split = []

for i in range(0,nterms):

kw_split.append(”)

# kw_positions is a comma delimited field of integers indicating

what

# positions a single term occur within a given keyword.

Splitting based on ‘,’

# converts the string into a python list.

# add doc_positions for the current term

for kw_p in kw_positions.split(‘,’):

kw_split[int(kw_p)-1] = doc_positions

# reconstruct the delimited state

# This section takes each element in the kw_split array and

strings them together

# placing a “:” in between each element from the array.

#

# Example:

# For the keyword “computer software computer hardware”, the

kw_split array

# matched up to the document data “in the business of computer

software software

# engineers” would look like:

# ['5', '6,7', '5', '']

# and the outstate would look like:

# 5:6,7:5:

outstate = kw_split[0]

for s in kw_split[1:]:

outstate = outstate + ‘:’ + s

return outstate

- FINALIZER:

NAME: term_finalizer

LANGUAGE: python

RETURNS:

- count integer

FUNCTION: |

if not state:

return 0

kw_split = state.split(‘:’)

# We adjust each document position list based on the offset of

the term in the

# keyword and then intersect all resulting lists

# This function does the following:

# 1) Splits kw_split which looks like 1,5,7:2,8 on “:”

# which creates ‘1,5,7’ and ‘2,8’

# 2) For each group of positions in kw_split, splits the set on

‘,’ to create

# ['1','5','7'] from Set 0: 1,5,7 and eventually ['2', '8']

from Set 1: 2,8

# 3) Checks for empty strings

# 4) Adjusts the split sets by subtracting the position of the

set in the

# kw_split array

# ['1','5','7'] – 0 from each element = ['1','5','7']

# ['2', '8'] – 1 from each element = ['1', '7']

# 5) Resulting arrays after subtracting the offset in step 4

will be intersected

# and their overlaping values kept:

['1','5','7'].intersect['1', '7'] = [1,7]

# 6) Determine the length of the intersection and this will be

the number of

# times that an entire keyword (with all its pieces) matches

in the document

# data.

#

# Further extensions could involve more in the way of fuzzy

matching.

previous = None

for i in range(0,len(kw_split)):

isplit = kw_split[i].split(‘,’)

if any(map(lambda(x): x == ”, isplit)):

return 0

adjusted = set(map(lambda(x): int(x)-i, isplit))

if (previous):

previous = adjusted.intersection(previous)

else:

previous = adjusted

# return the final count

if previous:

return len(previous)

return 0

- TASK:

NAME: term_match

SOURCE: term_join

REDUCER: term_reducer

- INPUT:

NAME: final_output

QUERY: |

SELECT doc.*, kw.*, tm.count

FROM documents doc, keywords kw, term_match tm

WHERE doc.doc_id = tm.doc_id

AND kw.keyword_id = tm.keyword_id

AND tm.count > 0

EXECUTE:

- RUN:

SOURCE: final_output

TARGET: STDOUT

Comments

3 Responses to “Three different implementations of MapReduce”

  1. Steve Wooledge on September 5th, 2008 3:16 pm

    Phew! It took me a while to scroll down here. :-)

    Great discussion on this topic. Glad to see the community correcting misperceptions on MapReduce. Just a few points on Aster:

    Re: “SQL analytics”, Aster can:
    – Implement SQL analytics-like functionality using In-Database MapReduce
    – Feed MapReduce results into SQL analytic functions
    – Pass results from MapReduce to analytic functions

    Re: “materializing intermediate result sets” and “pipelining”

    Aster can avoid materializing intermediate results on disk – this is consistent with Aster’s philosophy for general SQL, as well. Similarly, In-Database MapReduce supports full pipelining of analytics/MR to avoid materialization of intermediate results on disk by passing on data from one phase to the next (we avoid multiple passes of data).

    Re: “Aster support of different languages for Map and Reduce steps”

    We’ll write more about this separately but it’s worth noting that Aster is not using Postgres UDFs at all in its MapReduce implementation. Instead, we can run in an arbitrary language runtime, and would rather not force the developer to choose one from PostgreSQL’s collection. This lets Aster (and the developer) support/utilize all the popular languages (Java/C/C++/Python/Perl…). It is NOT PL/R, PL/Python, etc…

  2. Amazon Elastic MapReduce | DBMS2 -- DataBase Management System Services on April 3rd, 2009 4:57 am

    [...] see if you like it. But for serious use, I don’t know why you wouldn’t prefer MapReduce more closely integrated into a DBMS. Share: These icons link to social bookmarking sites where readers can share and discover new web [...]

  3. EMC/Greenplum notes | DBMS 2 : DataBase Management System Services on April 8th, 2011 12:04 am

    [...] has had integrated MapReduce for quite a [...]

Leave a Reply




Feed: DBMS (database management system), DW (data warehousing), BI (business intelligence), and analytics technology Subscribe to the Monash Research feed via RSS or email:

Login

Search our blogs and white papers

Monash Research blogs

User consulting

Building a short list? Refining your strategic plan? We can help.

Vendor advisory

We tell vendors what's happening -- and, more important, what they should do about it.

Monash Research highlights

Learn about white papers, webcasts, and blog highlights, by RSS or email.