Sequence to Sequence Model Implementation(Machine Translation)
All about Practical Implementation of Sequence to Sequence Model
What is Machine Translation?
- Machine translation is the process of automatically translating content from one language (the source) to another (the target) without any human input.
- Here input sequences and output sequences have different lengths and the entire input sequence is required in order to start predicting the target.
- This is the example of Sequence to Sequence Model. Lets see how to implement it.
- To understand following code you should have a good grap of what RNN, LSTM, Sequence to Sequence Model. So if you don't know then go through my previous post.
Data
- Here in this project, we are going to translate the English text to Indian text and we are using this data.
Data Preprocessing
- Vectorizing Data
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense
import tensorflow as tf
import numpy as np
batch_size = 64 # Batch size for training.
epochs = 130 # Number of epochs to train for.
latent_dim = 256 # Latent dimensionality of the encoding space.
num_samples = 10000 # Number of samples to train on.
#Path to the data txt file on disk.
data_path = './lan_data/fra.txt'
input_texts = []
target_texts = []
input_charecters = set()
target_charecters = set()
def vectorize_text(text_path):
with open(text_path, 'r', encoding='utf-8') as f:
lines = f.read().split('\n')
for line in lines[: min(num_samples, len(lines) - 1)]:
input_text, target_text, _ = line.split('\t')
# We use "tab" as the "start sequence" character
# for the targets, and "\n" as "end sequence" character.
target_text = '\t' + target_text + '\n'
input_texts.append(input_text)
target_texts.append(target_text)
for char in input_text:
if char not in input_charecters:
input_charecters.add(char)
for char in target_text:
if char not in target_charecters:
target_charecters.add(char)
return input_charecters, target_charecters
input_charecters, target_charecters = vectorize_text(data_path)
input_charecters= sorted(list(input_charecters))
target_charecters = sorted(list(target_charecters))
num_encoder_tokens = len(input_charecters)
num_decoder_tokens = len(target_charecters)
max_encoder_seq_length = max([len(txt) for txt in input_texts])
max_decoder_seq_length = max([len(txt) for txt in target_texts])
print("Number of samples:", len(input_texts))
print("Number of unique input tokens:", num_encoder_tokens)
print("Number of unique output tokens:", num_decoder_tokens)
print("Max sequence length for inputs:", max_encoder_seq_length)
print("Max sequence length for outputs:", max_decoder_seq_length)
input_token_index = dict([(char, i) for i, char in enumerate(input_charecters)])
target_token_index = dict([(char, i) for i, char in enumerate(target_charecters)])
Turn the sentences into 3 Numpy arrays, encoder_input_data, decoder_input_data, decoder_target_data:
- encoder_input_data is a 3D array of shape (num_pairs, max_english_sentence_length, num_english_characters) containing a one-hot vectorization of the English sentences.
- decoder_input_data is a 3D array of shape (num_pairs, max_french_sentence_length, num_french_characters) containg a one-hot vectorization of the French sentences.
- decoder_target_data is the same as decoder_input_data but offset by one timestep. decoder_target_data[:, t, :] will be the same as decoder_input_data[:, t + 1, :].
encoder_input_data = np.zeros((len(input_texts), max_encoder_seq_length, num_encoder_tokens), dtype='float32')
decoder_input_data = np.zeros((len(input_texts), max_decoder_seq_length, num_decoder_tokens), dtype='float32')
decoder_target_data = np.zeros((len(input_texts), max_decoder_seq_length, num_decoder_tokens), dtype='float32')
This following code creates the one hot encoding of the English and French sentences.
for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
for t, char in enumerate(input_text):
encoder_input_data[i, t, input_token_index[char]] = 1.
for t, char in enumerate(target_text):
# decoder_target_data is ahead of decoder_input_data by one timestep
decoder_input_data[i, t, target_token_index[char]] = 1.
if t > 0:
# decoder_target_data will be ahead by one timestep
# and will not include the start character.
decoder_target_data[i, t - 1, target_token_index[char]] = 1.
Building the Model
- The first step is to define an input sequence for the encoder.
- Because it's a character-level translation, it plugs the input into the encoder character by character.
- Now you need the encoder's final output as an initial state/input to the decoder.
- So, for the encoder LSTM model, the return_state = True. With this, you can get the hidden state representation of the encoder at the end of the input sequence. state_h denotes a hidden state and state_c denotes cell state.
def encoder(encoder_tokens):
"""This function returns the encoder output"""
encoder_inputs = Input(shape=(None, encoder_tokens))
encoder = LSTM(latent_dim, return_state=True)
encoder_output, state_h, state_c = encoder(encoder_inputs)
encoder_states = [state_h, state_c]
return encoder_inputs, encoder_output, encoder_states
def decoder(decoder_tokens, encoder_states):
"""This function returns the decoder output"""
# Set up the decoder, using `encoder_states` as initial state.
decoder_inputs = Input(shape=(None, decoder_tokens))
# We set up our decoder to return full output sequences,
# and to return internal states as well. We don't use the
# return states in the training model, but we will use them in inference.
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs,
initial_state=encoder_states)
decoder_dense = Dense(decoder_tokens, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)
return decoder_inputs, decoder_outputs
encoder_inputs, encoder_outputs, encoder_states = encoder(num_encoder_tokens)
decoder_inputs, decoder_outputs = decoder(num_decoder_tokens, encoder_states)
# Define the model that will turn
# `encoder_input_data` & `decoder_input_data` into `decoder_target_data`
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='rmsprop', loss='categorical_crossentropy')
model.fit([encoder_input_data, decoder_input_data], decoder_target_data,
batch_size=batch_size,
epochs=epochs,
validation_split=0.2)
model.save("./trained_data/E2F")
# Restore the model and construct the encoder and decoder.
model = tf.keras.models.load_model("./trained_data/E2F/")
encoder_inputs = model.input[0] # input_1
encoder_outputs, state_h_enc, state_c_enc = model.layers[2].output # lstm_1
encoder_states = [state_h_enc, state_c_enc]
encoder_model = Model(encoder_inputs, encoder_states)
decoder_inputs = model.input[1] # input_2
decoder_state_input_h = Input(shape=(latent_dim,))
decoder_state_input_c = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_lstm = model.layers[3]
decoder_outputs, state_h_dec, state_c_dec = decoder_lstm(
decoder_inputs, initial_state=decoder_states_inputs
)
decoder_states = [state_h_dec, state_c_dec]
decoder_dense = model.layers[4]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = Model(
[decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states
)
Create two reverse-lookup token indexes to decode the sequence to make it readable.
reverse_input_char_index = dict((i, char) for char, i in input_token_index.items())
reverse_target_char_index = dict((i, char) for char, i in target_token_index.items())
Next, create a predict function named decode_sequence. After generating the empty sequence of length 1, the model should know when to start and stop reading the text. To read the model will check out for \t in this case. Keep two conditions, either when the max length of sentence is hit or find stop character \n. Keep on updating the target sequence by one and update the states.
def decode_sequence(input_seq):
"""This function returns the decoded sequence"""
states_value = encoder_model.predict(input_seq)
target_seq = np.zeros((1, 1, num_decoder_tokens))
target_seq[0, 0, target_token_index["\t"]] = 1.0
stop_condition = False
decoded_sentence = ""
while not stop_condition:
output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
sampled_token_index = np.argmax(output_tokens[0, -1, :])
sampled_char = reverse_target_char_index[sampled_token_index]
decoded_sentence += sampled_char
if sampled_char == "\n" or len(decoded_sentence) > max_decoder_seq_length:
stop_condition = True
target_seq = np.zeros((1, 1, num_decoder_tokens))
target_seq[0, 0, sampled_token_index] = 1.0
states_value = [h, c]
return decoded_sentence
A random sentence will appear when you run the cell. The sentences are basic. It's always an add-on to your skills to learn a new foreign language. Also, it will be helpful when you visit France.
i = np.random.choice(len(input_texts))
input_seq = encoder_input_data[i:i+1]
translation = decode_sequence(input_seq)
print('-')
print('Input:', input_texts[i])
print('Translation:', translation)