265 lines
9.2 KiB
Python
265 lines
9.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
'''
|
|
# An implementation of deep learning for counting symbols
|
|
Input: [10, 12, 10, 11, 2, 2, 2, 1, 1]
|
|
Output: words=[2, 10, 1, 12, 11] counts=[3, 2, 2, 1, 1] (Not necessarily in this order)
|
|
'''
|
|
from __future__ import print_function
|
|
from keras.models import Sequential
|
|
from keras import layers, metrics
|
|
from keras import backend as K
|
|
from keras.utils import plot_model
|
|
from keras.utils import to_categorical
|
|
import numpy as np
|
|
from six.moves import range
|
|
import string, re, collections, os, sys
|
|
|
|
# Parameters for the model and dataset.
|
|
TRAINING_SIZE = 50000
|
|
VOCAB_SIZE = 1000
|
|
SAMPLE_SIZE = 100
|
|
TOP = 2
|
|
BATCH_SIZE = 50
|
|
|
|
data_folder = 'words_data'
|
|
if len(sys.argv) > 1:
|
|
data_folder = data_folder + '_' + sys.argv[1]
|
|
train_x = os.path.join(data_folder, 'train_x.txt')
|
|
train_y = os.path.join(data_folder, 'train_y.txt')
|
|
val_x = os.path.join(data_folder, 'val_x.txt')
|
|
val_y = os.path.join(data_folder, 'val_y.txt')
|
|
|
|
|
|
class WordTable(object):
|
|
"""Given a text file:
|
|
+ Encode the words to a one-hot integer representation
|
|
+ Decode the one-hot or integer representation to their character output
|
|
+ Decode a vector of probabilities to their character output
|
|
"""
|
|
def __init__(self):
|
|
"""Initialize words table.
|
|
|
|
# Arguments
|
|
filename: The file from which to map the words.
|
|
"""
|
|
global TRAINING_SIZE
|
|
global VOCAB_SIZE
|
|
global SAMPLE_SIZE
|
|
global BATCH_SIZE
|
|
|
|
self.words = set()
|
|
nlines = 0
|
|
max_words = 0
|
|
with open(train_x) as f:
|
|
for line in f:
|
|
words = line.split()
|
|
self.words.update(words)
|
|
|
|
nlines = nlines + 1
|
|
if max_words < len(words):
|
|
max_words = len(words)
|
|
|
|
self.words = list(self.words)
|
|
self.word_indices = dict((w, i) for i, w in enumerate(self.words))
|
|
self.indices_word = dict((i, w) for i, w in enumerate(self.words))
|
|
|
|
TRAINING_SIZE = nlines
|
|
VOCAB_SIZE = len(self.words)
|
|
SAMPLE_SIZE = max_words
|
|
BATCH_SIZE = 50
|
|
|
|
def words_to_indices(self, words):
|
|
return [self.word_indices[w] for w in words]
|
|
|
|
def indices_to_words(self, indices):
|
|
return [self.indices_word[i] for i in indices]
|
|
|
|
def encode_one_hot(self, W, forConv=False):
|
|
"""One-hot encode given word, or list of indices, W.
|
|
|
|
# Arguments
|
|
W: either a word or a list of indices, to be encoded.
|
|
"""
|
|
if type(W) is string:
|
|
x = np.zeros(VOCAB_SIZE)
|
|
x[self.word_indices[W]] = 1
|
|
return x
|
|
elif type(W) is list: # Example: [3, 9, 5]
|
|
x = np.zeros((SAMPLE_SIZE, VOCAB_SIZE)) if not forConv else np.zeros((SAMPLE_SIZE, VOCAB_SIZE, 1))
|
|
for i, w in enumerate(W):
|
|
if i >= SAMPLE_SIZE: break
|
|
if not forConv:
|
|
x[i, w] = 1
|
|
else:
|
|
x[i, w, 0] = 1
|
|
return x
|
|
else:
|
|
raise Exception("Bad type to encode")
|
|
|
|
|
|
def decode(self, x):
|
|
"""Decode the given vector or 1D array to their character output.
|
|
|
|
# Arguments
|
|
x: A vector or a 2D array of probabilities or one-hot representations;
|
|
or a vector of word indices (used with `calc_argmax=False`).
|
|
calc_argmax: Whether to find the word index with maximum
|
|
probability, defaults to `True`.
|
|
"""
|
|
if x.ndim == 1: # either a single word, one-hot encoded, or multiple words
|
|
#one_idxs = [i for i, v in enumerate(x) if v >= 0.5]
|
|
one_idxs = np.argpartition(x, -TOP)[-TOP:]
|
|
print(f'Top 2 indices are {one_idxs} and values are ', np.rint(x[one_idxs]))
|
|
return [self.indices_word[i] for i in one_idxs]
|
|
elif x.ndim == 2: # a list of words, each one-hot encoded
|
|
words = []
|
|
for w in x:
|
|
words.append(self.decode(w))
|
|
return words
|
|
else:
|
|
raise Exception("Bad type to decode")
|
|
|
|
|
|
ctable = WordTable()
|
|
print(f'Words table with training size {TRAINING_SIZE}, batch size {BATCH_SIZE}, vocab size {VOCAB_SIZE} and sample size {SAMPLE_SIZE}')
|
|
|
|
|
|
def line_x_to_indices(line):
|
|
words = line.split()
|
|
return ctable.words_to_indices(words)
|
|
|
|
def line_y_to_indices(line):
|
|
pairs = line.split(',')
|
|
if len(pairs[0]) < 2: # no counts here
|
|
return list(zip(ctable.words_to_indices(pairs), [1 for _ in range(len(pairs))]))
|
|
else:
|
|
words = [p.split()[0] for p in pairs]
|
|
counts = [int(p.split()[1]) for p in pairs]
|
|
w_indices = ctable.words_to_indices(words)
|
|
return w_indices, counts
|
|
|
|
def input_generator(nsamples, train=True, forConv=False):
|
|
print('Generating input for ', 'training' if train else 'validation')
|
|
f_x, f_y = (train_x, train_y) if train else (val_x, val_y)
|
|
with open(f_x) as fx, open(f_y) as fy:
|
|
j = 0
|
|
x = np.zeros((nsamples, SAMPLE_SIZE, VOCAB_SIZE), dtype=np.int) if not forConv else np.zeros((nsamples, SAMPLE_SIZE, VOCAB_SIZE, 1), dtype=np.int)
|
|
y = np.zeros((nsamples, VOCAB_SIZE), dtype=np.float64)
|
|
for line_x, line_y in zip(fx, fy):
|
|
question = line_x_to_indices(line_x)
|
|
expected_w, expected_c = line_y_to_indices(line_y)
|
|
x[j] = ctable.encode_one_hot(question, forConv)
|
|
y[j][expected_w] = expected_c
|
|
j = j + 1
|
|
if j % nsamples == 0:
|
|
yield x, y
|
|
j = 0
|
|
x = np.zeros((nsamples, SAMPLE_SIZE, VOCAB_SIZE), dtype=np.int) if not forConv else np.zeros((nsamples, SAMPLE_SIZE, VOCAB_SIZE, 1), dtype=np.int)
|
|
y = np.zeros((nsamples, VOCAB_SIZE), dtype=np.float64)
|
|
print("End of ", 'training' if train else 'validation')
|
|
return x, y
|
|
|
|
def topcats(y_true, y_pred):
|
|
return metrics.top_k_categorical_accuracy(y_true, y_pred, k=TOP)
|
|
|
|
def model_ff():
|
|
print('Build model...')
|
|
epochs = 50
|
|
model = Sequential()
|
|
model.add(layers.Dense(VOCAB_SIZE, input_shape=(SAMPLE_SIZE, VOCAB_SIZE)))
|
|
# model.add(layers.Dense(VOCAB_SIZE, activation='relu'))
|
|
# model.add(layers.Dropout(0.5))
|
|
# model.add(layers.Dense(150, activation='relu'))
|
|
model.add(layers.Flatten())
|
|
# model.add(layers.Dense(VOCAB_SIZE * 2, activation='relu'))
|
|
model.add(layers.Dense(VOCAB_SIZE, activation='sigmoid'))
|
|
model.compile(loss='binary_crossentropy',
|
|
optimizer='adam',
|
|
metrics=['acc', topcats])
|
|
return model, epochs, "words-ff2-{}b-{}ep".format(BATCH_SIZE, epochs)
|
|
|
|
def model_convnet1D():
|
|
print('Build model...')
|
|
epochs = 1
|
|
model = Sequential()
|
|
model.add(layers.Conv1D(32, 10, activation='relu',
|
|
input_shape=(SAMPLE_SIZE, VOCAB_SIZE)))
|
|
model.add(layers.MaxPooling1D(2))
|
|
model.add(layers.Conv1D(64, 10, activation='relu'))
|
|
model.add(layers.MaxPooling1D(2))
|
|
model.add(layers.Conv1D(64, 10, activation='relu'))
|
|
model.add(layers.GlobalMaxPooling1D())
|
|
model.add(layers.Dense(VOCAB_SIZE, activation='sigmoid'))
|
|
|
|
model.compile(loss='binary_crossentropy',
|
|
optimizer='adam',
|
|
metrics=['acc', topcats])
|
|
|
|
return model, epochs, "words-convnet1D-{}b-{}ep".format(BATCH_SIZE, epochs)
|
|
|
|
def SumPooling2D(x):
|
|
return K.sum(x, axis=1)
|
|
|
|
def model_convnet2D():
|
|
print('Build model...')
|
|
epochs = 150
|
|
model = Sequential()
|
|
model.add(layers.Conv2D(VOCAB_SIZE, (1, VOCAB_SIZE),
|
|
input_shape=(SAMPLE_SIZE, VOCAB_SIZE, 1)))
|
|
model.add(layers.Lambda(SumPooling2D))
|
|
model.add(layers.Reshape((VOCAB_SIZE,)))
|
|
# model.add(layers.Flatten())
|
|
# model.add(layers.Dense(VOCAB_SIZE))
|
|
|
|
model.compile(loss='mean_squared_error',
|
|
optimizer='adam',
|
|
metrics=['acc', topcats])
|
|
|
|
# model.compile(loss='binary_crossentropy',
|
|
# optimizer='adam',
|
|
# metrics=['acc', topcategories])
|
|
|
|
return model, epochs, "words-convnet2D-{}b-{}ep".format(BATCH_SIZE, epochs)
|
|
|
|
|
|
model, epochs, name = model_convnet2D()
|
|
model.summary()
|
|
plot_model(model, to_file=name + '.png', show_shapes=True)
|
|
|
|
# Train the model each generation and show predictions against the validation
|
|
# dataset.
|
|
val_gen_2 = input_generator(5, train=False, forConv=True)
|
|
for iteration in range(1, epochs):
|
|
print()
|
|
print('-' * 50)
|
|
print('Iteration', iteration)
|
|
input_gen = input_generator(BATCH_SIZE, forConv=True)
|
|
val_gen = input_generator(BATCH_SIZE, False, forConv=True)
|
|
model.fit_generator(input_gen,
|
|
epochs = 1,
|
|
steps_per_epoch = 20,
|
|
validation_data = val_gen,
|
|
validation_steps = 10, workers=1)
|
|
# Select 10 samples from the validation set at random so we can visualize
|
|
# errors.
|
|
# print(batch_y)
|
|
# print(preds)
|
|
batch_x, batch_y = next(val_gen_2)
|
|
for i in range(len(batch_x)):
|
|
preds = model.predict(batch_x)
|
|
query = batch_x[i]
|
|
expected = batch_y[i]
|
|
prediction = preds[i]
|
|
#print(preds)
|
|
# preds[preds>=0.5] = 1
|
|
# preds[preds<0.5] = 0
|
|
|
|
#q = ctable.decode(query)
|
|
correct = ctable.decode(expected)
|
|
guess = ctable.decode(prediction)
|
|
print('T', correct, ' G', guess)
|
|
|
|
model.summary()
|
|
model.save(name + '.h5')
|
|
|