Text Generation using RNN
Longshortterm memory models, or LSTMs, employ gates to control the flow of information to tackle the problem of shortterm memory. These models feature procedures for deciding whether or not to maintain the information, allowing them to keep vital data for a long period. They are widely employed in machine translation, speech recognition, handwriting recognition and creation, language modeling and translation, speech synthesis, and many other deep learning applications due to their capacity to learn longterm dependencies.
Using LSTM, we can build a model and train it to generate text in the style of the text used to train the model.
To build, train and run the model, the following things need to be done.
 Data Preprocessing involves the following steps,
 Take a novel or any other form of literature in the form of a text file, and load it into the program in the form of a string. By assigning numbers to each character, we can represent the text in the form of numbers so that it can be passed through the model. This is what is done in tokenization.
 Make minibatches out of the preprocessed data for batch learning.
 Define the LSTM model that can be trained on the data, and then train the model for a desired number of epochs. Tune the hyperparameters to get the best result out of the model.
 Define a predict function that instantiates the model, takes in the argument string (the initial word that the user provides), and then predicts the desired number of characters succeeding the initial word.
All in all, the data is preprocessed to be represented in the form of numbers, then the model is trained on this data in the form of minibatches, and then a prompt word is sent to the model which then predicts the next character based on the data it has been trained on.
Import Resources
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
Load in Data
# open text file and read in data as `text`
with open('data/anna.txt', 'r') as f:
text = f.read()
The novel Anna Karenina is used to train this characterwise RNN. That needs to be loaded. All the characters from the text file are loaded onto the string, txt.
Tokenization
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}
# encode the text
encoded = np.array([char2int[ch] for ch in text])
set(text)
casts the string datatype which istext
into a set, which removes all duplicate characters, and then this is cast into a tuple.enumerate(chars)
assigns a number for each character in the tuple, which is used as the numerical equivalent of each character to be sent through the model after encoding, and this is cast into a dictionary as casting it into a list gives a list of tuples with the number and the corresponding character. This gives a dictionary mapping from numbers to characters. Through dictionary comprehension, another dictionary is created which gives a mapping from character to number. Using these two dictionaries, it is possible to obtain the number equivalent for a character and vice versa
 The characters in the text string are encoded by obtaining their numerical equivalent from the character to integer dictionary, which is then cast into a
numpy
array.
Preprocessing the data
To send the data through the model, each input needs to be onehot encoded,
def one_hot_encode(arr, n_labels):
# Initialize the encoded array
one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
# Fill the appropriate elements with ones
one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
# Finally reshape it to get back to the original array
one_hot = one_hot.reshape((*arr.shape, n_labels))
return one_hot
 In
one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
, a zero matrix is created that has as many rows as the number of elements in the encoded array, each row having as many elements as the number of labels. Each row can be changed to a onehot vector for each of the corresponding elements in the encoded array.  In
one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1
,np.arange(one_hot.shape[0])
,one_hot.shape[0]
gives the number of rows in the onehot vector created in the last step,np.arange()
is used to obtain evenly spaced values in a given interval. For eg.,np.arange(3)
gives[0,1,2]
. As the argument given is the number of elements in the encoded array/number of rows in the onehot vector, it gives a list of integers from 0 to that number.
arr.flatten()
returns a copy of the array collapsed into one dimension. Therefore, the output will have a single dimension, i.e., a single row with all the elements of the matrix in that row.
Through indexing, the first argument is used to invoke each of the rows, the second argument is used to invoke the element in that row which has an index equal to the numerical values in $arr$, which is supposed to be the encoded array when this function is used for onehot encoding.
 In
one_hot = one_hot.reshape((*arr.shape, n_labels))
, the shape of the matrix with a onehot vector is reshaped to the shape of the encoded array.
Making Training minibatches
def get_batches(arr, batch_size, seq_length):
'''Create a generator that returns batches of size
batch_size x seq_length from arr.
Arguments

arr: Array you want to make batches from
batch_size: Batch size, the number of sequences per batch
seq_length: Number of encoded chars in a sequence
'''
batch_size_total = batch_size * seq_length
# total number of batches we can make
n_batches = len(arr)//batch_size_total
# Keep only enough characters to make full batches
arr = arr[:n_batches * batch_size_total]
# Reshape into batch_size rows
arr = arr.reshape((batch_size, 1))
# iterate through the array, one sequence at a time
for n in range(0, arr.shape[1], seq_length):
# The features
x = arr[:, n:n+seq_length]
# The targets, shifted by one
y = np.zeros_like(x)
try:
y[:, :1], y[:, 1] = x[:, 1:], arr[:, n+seq_length]
except IndexError:
y[:, :1], y[:, 1] = x[:, 1:], arr[:, 0]
yield x, y
 The first thing we need to do is discard some of the text so we **only have full minibatches.**
batch_size_total
which is the total batch size is the product of the number of sequences in a batch and the number of elements in each sequence is calculated first.n_batches
which is used to calculate how many batches can be made from the data by getting an integer as the quotient of the division of the total size of the data and the number of elements in each batch. The data is redefined to contain only as many elements as can be used to make the required number of batches. The elements, in the end, are omitted.
 After that, we need to split
arr
intoN
batches.N
is the number of sequences in each batch which is represented bybatch_size
that is sent to the function. This is taken as the first dimension of the data by reshaping the data to divide it into as many rows as the number of sequences.
 Now that we have this array, we can iterate through it to get our minibatches.
Since this is a generator function, we can iterate over the data and send a single minibatch each time. We can use the range function for the division of the data into minibatches. We can set a variable $n$ that ranges between 0 and the total number of elements in each (full) sequence/each ‘batch’, and increments by the sequence length,seq_length
.Defining the network
class CharRNN(nn.Module): def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob=0.5, lr=0.001): super().__init__() self.drop_prob = drop_prob self.n_layers = n_layers self.n_hidden = n_hidden self.lr = lr # creating character dictionaries self.chars = tokens self.int2char = dict(enumerate(self.chars)) self.char2int = {ch: ii for ii, ch in self.int2char.items()} ## define the LSTM self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, dropout=drop_prob, batch_first=True) ## define a dropout layer self.dropout = nn.Dropout(drop_prob) ## define the final, fullyconnected output layer self.fc = nn.Linear(n_hidden, len(self.chars)) def forward(self, x, hidden): ''' Forward pass through the network. These inputs are x, and the hidden/cell state `hidden`. ''' ## Get the outputs and the new hidden state from the lstm r_output, hidden = self.lstm(x, hidden) ## pass through a dropout layer out = self.dropout(r_output) # Stack up LSTM outputs using view # you may need to use contiguous to reshape the output out = out.contiguous().view(1, self.n_hidden) ## put x through the fullyconnected layer out = self.fc(out) # return the final output and the hidden state return out, hidden def init_hidden(self, batch_size): ''' Initializes hidden state ''' # Create two new tensors with sizes n_layers x batch_size x n_hidden, # initialized to zero, for hidden state and cell state of LSTM weight = next(self.parameters()).data if (train_on_gpu): hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(), weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda()) else: hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(), weight.new(self.n_layers, batch_size, self.n_hidden).zero_()) return hidden
 In the
_init_
method, The arguments that are taken in the
_init_
method are tokens which are the inputs,n_hidden
which is the dimension of the hidden state,n_layers
which is the number of LSTM layers that are stacked upon one another,drop_prob
which is the dropout probability to be used in dropout layer,lr
which is the learning rate used in training.  The
super().__init__()
super corresponds to the methods and attributes of the parent class, it is used to initialize the methods and attributes of the parent class(nn.Module())
.  The character dictionaries are created to convert characters to integers and vice versa.
 The LSTM layer is defined as a network parameter.
 The dropout layer is defined using the dropout parameter.
 A fully connected layer, which is the last layer, is defined, with the arguments as the input dimension which is the hidden state dimension which is the output of the LSTM layers, and the output dimension which is the same as the dimension of the inputs passed into the network.
 The arguments that are taken in the
 In the
forward
method, The input and the initial hidden state are passed through the LSTM layer, and the output and the final hidden state are obtained.
 The output is then passed through the dropout layer.

The output is reshaped before it is passed through the fully connected layer, as the output is of the dimension
(self.n_layers,batch_size,self.n_hidden)
and it is squished.  The reshaped output is passed through the fully connected layer to obtain the final output.
 The final output and the final hidden state are returned from this method.
 In the
init_hidden
method, The hidden tuple which consists of the hidden state and the cell state is initialized to zero.
Training the model
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
''' Training a network
Arguments

net: CharRNN network
data: text data to train the network
epochs: Number of epochs to train
batch_size: Number of minisequences per minibatch, aka batch size
seq_length: Number of character steps per minibatch
lr: learning rate
clip: gradient clipping
val_frac: Fraction of data to hold out for validation
print_every: Number of steps for printing training and validation loss
'''
net.train()
opt = torch.optim.Adam(net.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
# create training and validation data
val_idx = int(len(data)*(1val_frac))
data, val_data = data[:val_idx], data[val_idx:]
if(train_on_gpu):
net.cuda()
counter = 0
n_chars = len(net.chars)
for e in range(epochs):
# initialize hidden state
h = net.init_hidden(batch_size)
for x, y in get_batches(data, batch_size, seq_length):
counter += 1
# Onehot encode our data and make them Torch tensors
x = one_hot_encode(x, n_chars)
inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
if(train_on_gpu):
inputs, targets = inputs.cuda(), targets.cuda()
# Creating new variables for the hidden state, otherwise
# we'd backprop through the entire training history
h = tuple([each.data for each in h])
# zero accumulated gradients
net.zero_grad()
# get the output from the model
output, h = net(inputs, h)
# calculate the loss and perform backprop
loss = criterion(output, targets.view(batch_size*seq_length).long())
loss.backward()
# `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
nn.utils.clip_grad_norm_(net.parameters(), clip)
opt.step()
# loss stats
if counter % print_every == 0:
# Get validation loss
val_h = net.init_hidden(batch_size)
val_losses = []
net.eval()
for x, y in get_batches(val_data, batch_size, seq_length):
# Onehot encode our data and make them Torch tensors
x = one_hot_encode(x, n_chars)
x, y = torch.from_numpy(x), torch.from_numpy(y)
# Creating new variables for the hidden state, otherwise
# we'd backprop through the entire training history
val_h = tuple([each.data for each in val_h])
inputs, targets = x, y
if(train_on_gpu):
inputs, targets = inputs.cuda(), targets.cuda()
output, val_h = net(inputs, val_h)
val_loss = criterion(output, targets.view(batch_size*seq_length).long())
val_losses.append(val_loss.item())
net.train() # reset to train mode after iterationg through validation data
print("Epoch: {}/{}...".format(e+1, epochs),
"Step: {}...".format(counter),
"Loss: {:.4f}...".format(loss.item()),
"Val Loss: {:.4f}".format(np.mean(val_losses)))
 The model is set to training mode using
net.train()
which allows using the gradients for backpropagation.  The optimizer and the loss functions are defined, and the loss function is taken to be a crossentropy loss.
 Some amount of data is set aside to calculate the validation loss at each epoch.
 Inside the for loop that iterates through the number of epochs,
 The initial hidden state is obtained from the
init_hidden
method that sets all the elements to zero.  Inside the for loop that is used to iterate over all the batches of data,
(
get_batches()
is a generator function so at each iteration the next batch is used) The input data is onehot encoded using the function that has been already defined,
 The input and target arrays are cast into tensors to be sent through the PyTorchdefined network,
 The hidden variable is redefined to avoid backpropagating through the entire training history,
 The accumulated gradients are set to zero,
 The output and the next hidden state are obtained from passing the inputs and the previous hidden state through the model,
 The loss is calculated by calling the loss function and passing the outputs and the target tensors as arguments, (As the outputs have the squished shape from before, the targets are also squished to make the batch size and sequence length into one dimension)
 Backpropagation is done using the loss calculated,
 As RNN has the problem of exploding gradients, they are clipped when they go
past a certain value that is defined as 5 here, using the
clip_grad_norm_()
function that takes in the network parameters and the value of the clip as arguments,  The optimizer is used to upgrade the weights by calling the
opt.step()
function, 
When the counter value equals the number of steps for which the training details need to be printed, All the steps as training are done for the validation data, only backpropagation and upgradation of weights are not done. Note that the network needs to be set to evaluation mode by
net.eval()
and then back to training mode bynet.train()
. In evaluation mode, the gradients do not need to be used.All the training details are then printed.
 The initial hidden state is obtained from the
Instantiating the model
n_hidden=512
n_layers=2
net = CharRNN(chars, n_hidden, n_layers)
batch_size = 128
seq_length = 100
n_epochs = 5 # start smaller if you are just testing initial behavior
# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)
Making Predictions
def predict(net, char, h=None, top_k=None):
''' Given a character, predict the next character.
Returns the predicted character and the hidden state.
'''
# tensor inputs
x = np.array([[net.char2int[char]]])
x = one_hot_encode(x, len(net.chars))
inputs = torch.from_numpy(x)
if(train_on_gpu):
inputs = inputs.cuda()
# detach hidden state from history
h = tuple([each.data for each in h])
# get the output of the model
out, h = net(inputs, h)
# get the character probabilities
p = F.softmax(out, dim=1).data
if(train_on_gpu):
p = p.cpu() # move to cpu
# get top characters
if top_k is None:
top_ch = np.arange(len(net.chars))
else:
p, top_ch = p.topk(top_k)
top_ch = top_ch.numpy().squeeze()
# select the likely next character with some element of randomness
p = p.numpy().squeeze()
char = np.random.choice(top_ch, p=p/p.sum())
# return the encoded value of the predicted char and the hidden state
return net.int2char[char], h
Priming and Generating Text
def sample(net, size, prime='The', top_k=None):
if(train_on_gpu):
net.cuda()
else:
net.cpu()
net.eval() # eval mode
# First off, run through the prime characters
chars = [ch for ch in prime]
h = net.init_hidden(1)
for ch in prime:
char, h = predict(net, ch, h, top_k=top_k)
chars.append(char)
# Now pass in the previous character and get a new one
for ii in range(size):
char, h = predict(net, chars[1], h, top_k=top_k)
chars.append(char)
return ''.join(chars)
print(sample(net, 1000, prime='Anna', top_k=5))
Generated text,
As you can see, the generated text has some gibberish words, but all in all, it reflects a sort of efficiency of RNNs in generating text through sequential learning.