From b8167cd2f59b0d5e666ce0683e7907ad9f7e90bc Mon Sep 17 00:00:00 2001 From: Crista Lopes Date: Sun, 22 Sep 2013 12:22:57 -0700 Subject: [PATCH] Added style #13 --- 13-double-inverse-multiplexer/README.md | 22 +++++ 13-double-inverse-multiplexer/tf-13.py | 123 ++++++++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 13-double-inverse-multiplexer/README.md create mode 100644 13-double-inverse-multiplexer/tf-13.py diff --git a/13-double-inverse-multiplexer/README.md b/13-double-inverse-multiplexer/README.md new file mode 100644 index 0000000..1315dcb --- /dev/null +++ b/13-double-inverse-multiplexer/README.md @@ -0,0 +1,22 @@ +Style #13 +============================== + +Very similar to style #12, but with an additional twist + +Constraints: + +- Input data is divided in chunks, similar to what an inverse multiplexer does to input signals + +- A map function applies a given worker function to each chunk of data, potentially in parallel + +- The results of the many worker functions are reshuffled in a way + that allows for the reduce step to be also parallelized + +- The reshuffled chunks of data are given as input to a second map + function that takes a reducible function as input + +Possible names: + +- Double inverse multiplexer +- Map-reduce +- Hadoop style diff --git a/13-double-inverse-multiplexer/tf-13.py b/13-double-inverse-multiplexer/tf-13.py new file mode 100644 index 0000000..56fff56 --- /dev/null +++ b/13-double-inverse-multiplexer/tf-13.py @@ -0,0 +1,123 @@ +import sys, re, operator, string + +# +# Functions for map reduce +# +def partition(data_str, nlines): + """ + Generator function that partitions the input data_str (a big string) + into chunks of nlines. + """ + lines = data_str.split('\n') + for i in xrange(0, len(lines), nlines): + yield '\n'.join(lines[i:i+nlines]) + +def split_words(data_str): + """ + Takes a string, filters non alphanumeric characters, normalizes to + lower case, scans for words, and filters the stop words. + It returns a list of pairs (word, 1), one for each word in the input, so + [(w1, 1), (w2, 1), ..., (wn, 1)] + """ + def _filter_chars(str_data): + """ + Takes a string and returns a copy with all nonalphanumeric chars + replaced by white space + """ + pattern = re.compile('[\W_]+') + return pattern.sub(' ', str_data) + + def _normalize(str_data): + """ + Takes a string and returns a copy with all characters in lower case + """ + return str_data.lower() + + def _scan(str_data): + """ + Takes a string and scans for words, returning + a list of words. + """ + return str_data.split() + + def _remove_stop_words(word_list): + f = open('../stop_words.txt') + stop_words = f.read().split(',') + f.close() + # add single-letter words + stop_words.extend(list(string.ascii_lowercase)) + return [w for w in word_list if not w in stop_words] + + # The actual work of the mapper + result = [] + words = _remove_stop_words(_scan(_normalize(_filter_chars(data_str)))) + for w in words: + result.append((w, 1)) + return result + +def regroup(pairs_list): + """ + Takes a list of a list of pairs of the form + [[(w1, 1), (w2, 1), ..., (wn, 1)], + [(w1, 1), (w2, 1), ..., (wn, 1)], + ...] + and returns a dictionary mapping each unique word to the corresponding + list of pairs, so + { w1 : [(w1, 1), (w1, 1)...], + w2 : [(w2, 1), (w2, 1)...], + ...} + """ + mapping = {} + for pairs in pairs_list: + for p in pairs: + if p[0] in mapping: + mapping[p[0]].append(p) + else: + mapping[p[0]] = [p] + return mapping + +def count_words(mapping): + """ + Takes a mapping of the form (word, [(word, 1), (word, 1)...)]) + and returns a pair (word, frequency), where frequency is the sum + of all the reported occurrences + """ + def add(x, y): + return x+y + + return (mapping[0], reduce(add, (pair[1] for pair in mapping[1]))) + + +# +# Auxiliary functions +# + +def read_file(path_to_file): + """ + Takes a path to a file and returns the entire + contents of the file as a string + """ + f = open(path_to_file) + data = f.read() + f.close() + return data + +def sort(word_freq): + """ + Takes a collection of words and their frequencies + and returns a collection of pairs where the entries are + sorted by frequency + """ + return sorted(word_freq, key=operator.itemgetter(1), reverse=True) + + +# +# The main function +# +splits = map(split_words, partition(read_file(sys.argv[1]), 200)) +splits_per_word = regroup(splits) +word_freqs = sort(map(count_words, splits_per_word.items())) + +for tf in word_freqs[0:25]: + print tf[0], ' - ', tf[1] +