Simplification of map-reduce, hadoop

This commit is contained in:
Crista Lopes
2013-12-27 14:52:04 -08:00
parent 84f1310591
commit f6bc66f298

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python
import sys, re, operator, string import sys, re, operator, string
# #
@@ -7,7 +6,7 @@ import sys, re, operator, string
# #
def partition(data_str, nlines): def partition(data_str, nlines):
""" """
Generator function that partitions the input data_str (a big string) Partitions the input data_str (a big string)
into chunks of nlines. into chunks of nlines.
""" """
lines = data_str.split('\n') lines = data_str.split('\n')
@@ -16,55 +15,35 @@ def partition(data_str, nlines):
def split_words(data_str): def split_words(data_str):
""" """
Takes a string, filters non alphanumeric characters, normalizes to Takes a string, returns a list of pairs (word, 1),
lower case, scans for words, and filters the stop words. one for each word in the input, so
It returns a list of pairs (word, 1), one for each word in the input, so
[(w1, 1), (w2, 1), ..., (wn, 1)] [(w1, 1), (w2, 1), ..., (wn, 1)]
""" """
def _filter_chars(str_data):
"""
Takes a string and returns a copy with all nonalphanumeric chars
replaced by white space
"""
pattern = re.compile('[\W_]+')
return pattern.sub(' ', str_data)
def _normalize(str_data):
"""
Takes a string and returns a copy with all characters in lower case
"""
return str_data.lower()
def _scan(str_data): def _scan(str_data):
""" pattern = re.compile('[\W_]+')
Takes a string and scans for words, returning return pattern.sub(' ', str_data).lower().split()
a list of words.
"""
return str_data.split()
def _remove_stop_words(word_list): def _remove_stop_words(word_list):
f = open('../stop_words.txt') with open('../stop_words.txt') as f:
stop_words = f.read().split(',') stop_words = f.read().split(',')
f.close()
# add single-letter words
stop_words.extend(list(string.ascii_lowercase)) stop_words.extend(list(string.ascii_lowercase))
return [w for w in word_list if not w in stop_words] return [w for w in word_list if not w in stop_words]
# The actual work of the mapper # The actual work of the mapper
result = [] result = []
words = _remove_stop_words(_scan(_normalize(_filter_chars(data_str)))) words = _remove_stop_words(_scan(data_str))
for w in words: for w in words:
result.append((w, 1)) result.append((w, 1))
return result return result
def regroup(pairs_list): def regroup(pairs_list):
""" """
Takes a list of a list of pairs of the form Takes a list of lists of pairs of the form
[[(w1, 1), (w2, 1), ..., (wn, 1)], [[(w1, 1), (w2, 1), ..., (wn, 1)],
[(w1, 1), (w2, 1), ..., (wn, 1)], [(w1, 1), (w2, 1), ..., (wn, 1)],
...] ...]
and returns a dictionary mapping each unique word to the corresponding and returns a dictionary mapping each unique word to the
list of pairs, so corresponding list of pairs, so
{ w1 : [(w1, 1), (w1, 1)...], { w1 : [(w1, 1), (w1, 1)...],
w2 : [(w2, 1), (w2, 1)...], w2 : [(w2, 1), (w2, 1)...],
...} ...}
@@ -81,38 +60,25 @@ def regroup(pairs_list):
def count_words(mapping): def count_words(mapping):
""" """
Takes a mapping of the form (word, [(word, 1), (word, 1)...)]) Takes a mapping of the form (word, [(word, 1), (word, 1)...)])
and returns a pair (word, frequency), where frequency is the sum and returns a pair (word, frequency), where frequency is the
of all the reported occurrences sum of all the reported occurrences
""" """
def add(x, y): def add(x, y):
return x+y return x+y
return (mapping[0], reduce(add, (pair[1] for pair in mapping[1]))) return (mapping[0], reduce(add, (pair[1] for pair in mapping[1])))
# #
# Auxiliary functions # Auxiliary functions
# #
def read_file(path_to_file): def read_file(path_to_file):
""" with open(path_to_file) as f:
Takes a path to a file and returns the entire data = f.read()
contents of the file as a string
"""
f = open(path_to_file)
data = f.read()
f.close()
return data return data
def sort(word_freq): def sort(word_freq):
"""
Takes a collection of words and their frequencies
and returns a collection of pairs where the entries are
sorted by frequency
"""
return sorted(word_freq, key=operator.itemgetter(1), reverse=True) return sorted(word_freq, key=operator.itemgetter(1), reverse=True)
# #
# The main function # The main function
# #
@@ -120,6 +86,6 @@ splits = map(split_words, partition(read_file(sys.argv[1]), 200))
splits_per_word = regroup(splits) splits_per_word = regroup(splits)
word_freqs = sort(map(count_words, splits_per_word.items())) word_freqs = sort(map(count_words, splits_per_word.items()))
for tf in word_freqs[0:25]: for (w, c) in word_freqs[0:25]:
print tf[0], ' - ', tf[1] print w, ' - ', c