Cleaned up the actors style a bit. Pulled a send function.
This commit is contained in:
@@ -15,23 +15,25 @@ class ActiveWFObject(Thread):
|
|||||||
def run(self):
|
def run(self):
|
||||||
while not self._stop:
|
while not self._stop:
|
||||||
message = self.queue.get()
|
message = self.queue.get()
|
||||||
self.dispatch(message)
|
self._dispatch(message)
|
||||||
if message[0] == 'die':
|
if message[0] == 'die':
|
||||||
self._stop = True
|
self._stop = True
|
||||||
|
|
||||||
|
def send(receiver, message):
|
||||||
|
receiver.queue.put(message)
|
||||||
|
|
||||||
class DataStorageManager(ActiveWFObject):
|
class DataStorageManager(ActiveWFObject):
|
||||||
_queue = Queue()
|
|
||||||
""" Models the contents of the file """
|
""" Models the contents of the file """
|
||||||
_data = ''
|
_data = ''
|
||||||
|
|
||||||
def dispatch(self, message):
|
def _dispatch(self, message):
|
||||||
if message[0] == 'init':
|
if message[0] == 'init':
|
||||||
self._init(message[1:])
|
self._init(message[1:])
|
||||||
elif message[0] == 'send_word_freqs':
|
elif message[0] == 'send_word_freqs':
|
||||||
self._process_words(message[1:])
|
self._process_words(message[1:])
|
||||||
else:
|
else:
|
||||||
# forward
|
# forward
|
||||||
self._stop_word_manager.queue.put(message)
|
send(self._stop_word_manager, message)
|
||||||
|
|
||||||
def _init(self, message):
|
def _init(self, message):
|
||||||
path_to_file = message[0]
|
path_to_file = message[0]
|
||||||
@@ -46,22 +48,21 @@ class DataStorageManager(ActiveWFObject):
|
|||||||
data_str = ''.join(self._data)
|
data_str = ''.join(self._data)
|
||||||
words = data_str.split()
|
words = data_str.split()
|
||||||
for w in words:
|
for w in words:
|
||||||
self._stop_word_manager.queue.put(['filter', w])
|
send(self._stop_word_manager, ['filter', w])
|
||||||
self._stop_word_manager.queue.put(['top25', recipient])
|
send(self._stop_word_manager, ['top25', recipient])
|
||||||
|
|
||||||
|
|
||||||
class StopWordManager(ActiveWFObject):
|
class StopWordManager(ActiveWFObject):
|
||||||
""" Models the stop word filter """
|
""" Models the stop word filter """
|
||||||
_stop_words = []
|
_stop_words = []
|
||||||
|
|
||||||
def dispatch(self, message):
|
def _dispatch(self, message):
|
||||||
if message[0] == 'init':
|
if message[0] == 'init':
|
||||||
self._init(message[1:])
|
self._init(message[1:])
|
||||||
elif message[0] == 'filter':
|
elif message[0] == 'filter':
|
||||||
return self._filter(message[1:])
|
return self._filter(message[1:])
|
||||||
else:
|
else:
|
||||||
# forward
|
# forward
|
||||||
self._word_freqs_manager.queue.put(message)
|
send(self._word_freqs_manager, message)
|
||||||
|
|
||||||
def _init(self, message):
|
def _init(self, message):
|
||||||
with open('../stop_words.txt') as f:
|
with open('../stop_words.txt') as f:
|
||||||
@@ -72,13 +73,13 @@ class StopWordManager(ActiveWFObject):
|
|||||||
def _filter(self, message):
|
def _filter(self, message):
|
||||||
word = message[0]
|
word = message[0]
|
||||||
if word not in self._stop_words:
|
if word not in self._stop_words:
|
||||||
self._word_freqs_manager.queue.put(['word', word])
|
send(self._word_freqs_manager, ['word', word])
|
||||||
|
|
||||||
class WordFrequencyManager(ActiveWFObject):
|
class WordFrequencyManager(ActiveWFObject):
|
||||||
""" Keeps the word frequency data """
|
""" Keeps the word frequency data """
|
||||||
_word_freqs = {}
|
_word_freqs = {}
|
||||||
|
|
||||||
def dispatch(self, message):
|
def _dispatch(self, message):
|
||||||
if message[0] == 'word':
|
if message[0] == 'word':
|
||||||
self._increment_count(message[1:])
|
self._increment_count(message[1:])
|
||||||
elif message[0] == 'top25':
|
elif message[0] == 'top25':
|
||||||
@@ -94,11 +95,11 @@ class WordFrequencyManager(ActiveWFObject):
|
|||||||
def _top25(self, message):
|
def _top25(self, message):
|
||||||
recipient = message[0]
|
recipient = message[0]
|
||||||
freqs_sorted = sorted(self._word_freqs.iteritems(), key=operator.itemgetter(1), reverse=True)
|
freqs_sorted = sorted(self._word_freqs.iteritems(), key=operator.itemgetter(1), reverse=True)
|
||||||
recipient.queue.put(['top25', freqs_sorted])
|
send(recipient, ['top25', freqs_sorted])
|
||||||
|
|
||||||
class WordFrequencyController(ActiveWFObject):
|
class WordFrequencyController(ActiveWFObject):
|
||||||
|
|
||||||
def dispatch(self, message):
|
def _dispatch(self, message):
|
||||||
if message[0] == 'run':
|
if message[0] == 'run':
|
||||||
self._run(message[1:])
|
self._run(message[1:])
|
||||||
elif message[0] == 'top25':
|
elif message[0] == 'top25':
|
||||||
@@ -108,29 +109,28 @@ class WordFrequencyController(ActiveWFObject):
|
|||||||
|
|
||||||
def _run(self, message):
|
def _run(self, message):
|
||||||
self._storage_manager = message[0]
|
self._storage_manager = message[0]
|
||||||
self._storage_manager.queue.put(['send_word_freqs', self])
|
send(self._storage_manager, ['send_word_freqs', self])
|
||||||
|
|
||||||
def _display(self, message):
|
def _display(self, message):
|
||||||
word_freqs = message[0]
|
word_freqs = message[0]
|
||||||
for (w, f) in word_freqs[0:25]:
|
for (w, f) in word_freqs[0:25]:
|
||||||
print w, ' - ', f
|
print w, ' - ', f
|
||||||
self._storage_manager.queue.put(['die'])
|
send(self._storage_manager, ['die'])
|
||||||
self._stop = True
|
self._stop = True
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# The main function
|
# The main function
|
||||||
#
|
#
|
||||||
word_freq_manager = WordFrequencyManager()
|
word_freq_manager = WordFrequencyManager()
|
||||||
|
|
||||||
stop_word_manager = StopWordManager()
|
stop_word_manager = StopWordManager()
|
||||||
stop_word_manager.queue.put(['init', word_freq_manager])
|
send(stop_word_manager, ['init', word_freq_manager])
|
||||||
|
|
||||||
storage_manager = DataStorageManager()
|
storage_manager = DataStorageManager()
|
||||||
storage_manager.queue.put(['init', sys.argv[1], stop_word_manager])
|
send(storage_manager, ['init', sys.argv[1], stop_word_manager])
|
||||||
|
|
||||||
wfcontroller = WordFrequencyController()
|
wfcontroller = WordFrequencyController()
|
||||||
wfcontroller.dispatch(['run', storage_manager])
|
send(wfcontroller, ['run', storage_manager])
|
||||||
|
|
||||||
# Wait for the active objects to finish
|
# Wait for the active objects to finish
|
||||||
[t.join() for t in [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]]
|
[t.join() for t in [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]]
|
||||||
|
|||||||
Reference in New Issue
Block a user