%load_ext d2lbook.tab
tab.interact_select(['mxnet', 'pytorch', 'tensorflow', 'jax'])
Utility Functions and Classes⚓︎
:label:sec_utils
This section contains the implementations of utility functions and classes used in this book.
%%tab mxnet
import inspect
import collections
from d2l import mxnet as d2l
from IPython import display
from mxnet import autograd, gluon, np, npx
from mxnet.gluon import nn
import random
npx.set_np()
%%tab pytorch
import inspect
import collections
from d2l import torch as d2l
from IPython import display
from torch import nn
%%tab tensorflow
import inspect
from IPython import display
import collections
from d2l import tensorflow as d2l
import tensorflow as tf
%%tab jax
import inspect
from IPython import display
import collections
from d2l import jax as d2l
import jax
Hyperparameters.
%%tab all
@d2l.add_to_class(d2l.HyperParameters) #@save
def save_hyperparameters(self, ignore=[]):
"""Save function arguments into class attributes."""
frame = inspect.currentframe().f_back
_, _, _, local_vars = inspect.getargvalues(frame)
self.hparams = {k:v for k, v in local_vars.items()
if k not in set(ignore+['self']) and not k.startswith('_')}
for k, v in self.hparams.items():
setattr(self, k, v)
Progress bar.
%%tab all
@d2l.add_to_class(d2l.ProgressBoard) #@save
def draw(self, x, y, label, every_n=1):
Point = collections.namedtuple('Point', ['x', 'y'])
if not hasattr(self, 'raw_points'):
self.raw_points = collections.OrderedDict()
self.data = collections.OrderedDict()
if label not in self.raw_points:
self.raw_points[label] = []
self.data[label] = []
points = self.raw_points[label]
line = self.data[label]
points.append(Point(x, y))
if len(points) != every_n:
return
mean = lambda x: sum(x) / len(x)
line.append(Point(mean([p.x for p in points]),
mean([p.y for p in points])))
points.clear()
if not self.display:
return
d2l.use_svg_display()
if self.fig is None:
self.fig = d2l.plt.figure(figsize=self.figsize)
plt_lines, labels = [], []
for (k, v), ls, color in zip(self.data.items(), self.ls, self.colors):
plt_lines.append(d2l.plt.plot([p.x for p in v], [p.y for p in v],
linestyle=ls, color=color)[0])
labels.append(k)
axes = self.axes if self.axes else d2l.plt.gca()
if self.xlim: axes.set_xlim(self.xlim)
if self.ylim: axes.set_ylim(self.ylim)
if not self.xlabel: self.xlabel = self.x
axes.set_xlabel(self.xlabel)
axes.set_ylabel(self.ylabel)
axes.set_xscale(self.xscale)
axes.set_yscale(self.yscale)
axes.legend(plt_lines, labels)
display.display(self.fig)
display.clear_output(wait=True)
Add FrozenLake enviroment
%%tab pytorch
def frozen_lake(seed): #@save
# See https://www.gymlibrary.dev/environments/toy_text/frozen_lake/ to learn more about this env
# How to process env.P.items is adpated from https://sites.google.com/view/deep-rl-bootcamp/labs
import gym
env = gym.make('FrozenLake-v1', is_slippery=False)
env.seed(seed)
env.action_space.np_random.seed(seed)
env.action_space.seed(seed)
env_info = {}
env_info['desc'] = env.desc # 2D array specifying what each grid item means
env_info['num_states'] = env.nS # Number of observations/states or obs/state dim
env_info['num_actions'] = env.nA # Number of actions or action dim
# Define indices for (transition probability, nextstate, reward, done) tuple
env_info['trans_prob_idx'] = 0 # Index of transition probability entry
env_info['nextstate_idx'] = 1 # Index of next state entry
env_info['reward_idx'] = 2 # Index of reward entry
env_info['done_idx'] = 3 # Index of done entry
env_info['mdp'] = {}
env_info['env'] = env
for (s, others) in env.P.items():
# others(s) = {a0: [ (p(s'|s,a0), s', reward, done),...], a1:[...], ...}
for (a, pxrds) in others.items():
# pxrds is [(p1,next1,r1,d1),(p2,next2,r2,d2),..].
# e.g. [(0.3, 0, 0, False), (0.3, 0, 0, False), (0.3, 4, 1, False)]
env_info['mdp'][(s,a)] = pxrds
return env_info
Create enviroment
%%tab pytorch
def make_env(name ='', seed=0): #@save
# Input parameters:
# name: specifies a gym environment.
# For Value iteration, only FrozenLake-v1 is supported.
if name == 'FrozenLake-v1':
return frozen_lake(seed)
else:
raise ValueError("%s env is not supported in this Notebook")
Show value function
%%tab pytorch
def show_value_function_progress(env_desc, V, pi): #@save
# This function visualizes how value and policy changes over time.
# V: [num_iters, num_states]
# pi: [num_iters, num_states]
# How to visualize value function is adapted (but changed) from: https://sites.google.com/view/deep-rl-bootcamp/labs
num_iters = V.shape[0]
fig, ax = plt.subplots(figsize=(15, 15))
for k in range(V.shape[0]):
plt.subplot(4, 4, k + 1)
plt.imshow(V[k].reshape(4,4), cmap="bone")
ax = plt.gca()
ax.set_xticks(np.arange(0, 5)-.5, minor=True)
ax.set_yticks(np.arange(0, 5)-.5, minor=True)
ax.grid(which="minor", color="w", linestyle='-', linewidth=3)
ax.tick_params(which="minor", bottom=False, left=False)
ax.set_xticks([])
ax.set_yticks([])
# LEFT action: 0, DOWN action: 1
# RIGHT action: 2, UP action: 3
action2dxdy = {0:(-.25, 0),1: (0, .25),
2:(0.25, 0),3: (-.25, 0)}
for y in range(4):
for x in range(4):
action = pi[k].reshape(4,4)[y, x]
dx, dy = action2dxdy[action]
if env_desc[y,x].decode() == 'H':
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="y",
size=20, fontweight='bold')
elif env_desc[y,x].decode() == 'G':
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="w",
size=20, fontweight='bold')
else:
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="g",
size=15, fontweight='bold')
# No arrow for cells with G and H labels
if env_desc[y,x].decode() != 'G' and env_desc[y,x].decode() != 'H':
ax.arrow(x, y, dx, dy, color='r', head_width=0.2, head_length=0.15)
ax.set_title("Step = " + str(k + 1), fontsize=20)
fig.tight_layout()
plt.show()
%%tab pytorch
def show_Q_function_progress(env_desc, V_all, pi_all): #@save
# This function visualizes how value and policy changes over time.
# V: [num_iters, num_states]
# pi: [num_iters, num_states]
# We want to only shows few values
num_iters_all = V_all.shape[0]
num_iters = num_iters_all // 10
vis_indx = np.arange(0, num_iters_all, num_iters).tolist()
vis_indx.append(num_iters_all - 1)
V = np.zeros((len(vis_indx), V_all.shape[1]))
pi = np.zeros((len(vis_indx), V_all.shape[1]))
for c, i in enumerate(vis_indx):
V[c] = V_all[i]
pi[c] = pi_all[i]
num_iters = V.shape[0]
fig, ax = plt.subplots(figsize=(15, 15))
for k in range(V.shape[0]):
plt.subplot(4, 4, k + 1)
plt.imshow(V[k].reshape(4,4), cmap="bone")
ax = plt.gca()
ax.set_xticks(np.arange(0, 5)-.5, minor=True)
ax.set_yticks(np.arange(0, 5)-.5, minor=True)
ax.grid(which="minor", color="w", linestyle='-', linewidth=3)
ax.tick_params(which="minor", bottom=False, left=False)
ax.set_xticks([])
ax.set_yticks([])
# LEFT action: 0, DOWN action: 1
# RIGHT action: 2, UP action: 3
action2dxdy = {0:(-.25, 0),1:(0, .25),
2:(0.25, 0),3:(-.25, 0)}
for y in range(4):
for x in range(4):
action = pi[k].reshape(4,4)[y, x]
dx, dy = action2dxdy[action]
if env_desc[y,x].decode() == 'H':
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="y",
size=20, fontweight='bold')
elif env_desc[y,x].decode() == 'G':
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="w",
size=20, fontweight='bold')
else:
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="g",
size=15, fontweight='bold')
# No arrow for cells with G and H labels
if env_desc[y,x].decode() != 'G' and env_desc[y,x].decode() != 'H':
ax.arrow(x, y, dx, dy, color='r', head_width=0.2, head_length=0.15)
ax.set_title("Step = " + str(vis_indx[k] + 1), fontsize=20)
fig.tight_layout()
plt.show()
Trainer
A bunch of functions that will be deprecated:
%%tab mxnet
def load_array(data_arrays, batch_size, is_train=True): #@save
"""Construct a Gluon data iterator."""
dataset = gluon.data.ArrayDataset(*data_arrays)
return gluon.data.DataLoader(dataset, batch_size, shuffle=is_train)
def synthetic_data(w, b, num_examples): #@save
"""Generate y = Xw + b + noise."""
X = d2l.normal(0, 1, (num_examples, len(w)))
y = d2l.matmul(X, w) + b
y += d2l.normal(0, 0.01, y.shape)
return X, d2l.reshape(y, (-1, 1))
def sgd(params, lr, batch_size): #@save
"""Minibatch stochastic gradient descent."""
for param in params:
param[:] = param - lr * param.grad / batch_size
def get_dataloader_workers(): #@save
"""Use 4 processes to read the data except for Windows."""
return 0 if sys.platform.startswith('win') else 4
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Download the Fashion-MNIST dataset and then load it into memory."""
dataset = gluon.data.vision
trans = [dataset.transforms.ToTensor()]
if resize:
trans.insert(0, dataset.transforms.Resize(resize))
trans = dataset.transforms.Compose(trans)
mnist_train = dataset.FashionMNIST(train=True).transform_first(trans)
mnist_test = dataset.FashionMNIST(train=False).transform_first(trans)
return (gluon.data.DataLoader(mnist_train, batch_size, shuffle=True,
num_workers=get_dataloader_workers()),
gluon.data.DataLoader(mnist_test, batch_size, shuffle=False,
num_workers=get_dataloader_workers()))
def evaluate_accuracy_gpu(net, data_iter, device=None): #@save
"""Compute the accuracy for a model on a dataset using a GPU."""
if not device: # Query the first device where the first parameter is on
device = list(net.collect_params().values())[0].list_ctx()[0]
# No. of correct predictions, no. of predictions
metric = d2l.Accumulator(2)
for X, y in data_iter:
X, y = X.as_in_ctx(device), y.as_in_ctx(device)
metric.add(d2l.accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
#@save
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
"""Train a model with a GPU (defined in Chapter 6)."""
net.initialize(force_reinit=True, ctx=device, init=init.Xavier())
loss = gluon.loss.SoftmaxCrossEntropyLoss()
trainer = gluon.Trainer(net.collect_params(),
'sgd', {'learning_rate': lr})
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
legend=['train loss', 'train acc', 'test acc'])
timer, num_batches = d2l.Timer(), len(train_iter)
for epoch in range(num_epochs):
# Sum of training loss, sum of training accuracy, no. of examples
metric = d2l.Accumulator(3)
for i, (X, y) in enumerate(train_iter):
timer.start()
# Here is the major difference from `d2l.train_epoch_ch3`
X, y = X.as_in_ctx(device), y.as_in_ctx(device)
with autograd.record():
y_hat = net(X)
l = loss(y_hat, y)
l.backward()
trainer.step(X.shape[0])
metric.add(l.sum(), d2l.accuracy(y_hat, y), X.shape[0])
timer.stop()
train_l = metric[0] / metric[2]
train_acc = metric[1] / metric[2]
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,
(train_l, train_acc, None))
test_acc = evaluate_accuracy_gpu(net, test_iter)
animator.add(epoch + 1, (None, None, test_acc))
print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
f'test acc {test_acc:.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
f'on {str(device)}')
def grad_clipping(net, theta): #@save
"""Clip the gradient."""
if isinstance(net, gluon.Block):
params = [p.data() for p in net.collect_params().values()]
else:
params = net.params
norm = math.sqrt(sum((p.grad ** 2).sum() for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
%%tab pytorch
def load_array(data_arrays, batch_size, is_train=True): #@save
"""Construct a PyTorch data iterator."""
dataset = torch.utils.data.TensorDataset(*data_arrays)
return torch.utils.data.DataLoader(dataset, batch_size, shuffle=is_train)
def synthetic_data(w, b, num_examples): #@save
"""Generate y = Xw + b + noise."""
X = d2l.normal(0, 1, (num_examples, len(w)))
y = d2l.matmul(X, w) + b
y += d2l.normal(0, 0.01, y.shape)
return X, d2l.reshape(y, (-1, 1))
def sgd(params, lr, batch_size): #@save
"""Minibatch stochastic gradient descent."""
with torch.no_grad():
for param in params:
param -= lr * param.grad / batch_size
param.grad.zero_()
def get_dataloader_workers(): #@save
"""Use 4 processes to read the data."""
return 4
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Download the Fashion-MNIST dataset and then load it into memory."""
trans = [transforms.ToTensor()]
if resize:
trans.insert(0, transforms.Resize(resize))
trans = transforms.Compose(trans)
mnist_train = torchvision.datasets.FashionMNIST(
root="../data", train=True, transform=trans, download=True)
mnist_test = torchvision.datasets.FashionMNIST(
root="../data", train=False, transform=trans, download=True)
return (torch.utils.data.DataLoader(mnist_train, batch_size, shuffle=True,
num_workers=get_dataloader_workers()),
torch.utils.data.DataLoader(mnist_test, batch_size, shuffle=False,
num_workers=get_dataloader_workers()))
def evaluate_accuracy_gpu(net, data_iter, device=None): #@save
"""Compute the accuracy for a model on a dataset using a GPU."""
if isinstance(net, nn.Module):
net.eval() # Set the model to evaluation mode
if not device:
device = next(iter(net.parameters())).device
# No. of correct predictions, no. of predictions
metric = d2l.Accumulator(2)
with torch.no_grad():
for X, y in data_iter:
if isinstance(X, list):
# Required for BERT Fine-tuning (to be covered later)
X = [x.to(device) for x in X]
else:
X = X.to(device)
y = y.to(device)
metric.add(d2l.accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
#@save
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
"""Train a model with a GPU (defined in Chapter 6)."""
def init_weights(m):
if type(m) == nn.Linear or type(m) == nn.Conv2d:
nn.init.xavier_uniform_(m.weight)
net.apply(init_weights)
print('training on', device)
net.to(device)
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss()
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
legend=['train loss', 'train acc', 'test acc'])
timer, num_batches = d2l.Timer(), len(train_iter)
for epoch in range(num_epochs):
# Sum of training loss, sum of training accuracy, no. of examples
metric = d2l.Accumulator(3)
net.train()
for i, (X, y) in enumerate(train_iter):
timer.start()
optimizer.zero_grad()
X, y = X.to(device), y.to(device)
y_hat = net(X)
l = loss(y_hat, y)
l.backward()
optimizer.step()
with torch.no_grad():
metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
timer.stop()
train_l = metric[0] / metric[2]
train_acc = metric[1] / metric[2]
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,
(train_l, train_acc, None))
test_acc = evaluate_accuracy_gpu(net, test_iter)
animator.add(epoch + 1, (None, None, test_acc))
print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
f'test acc {test_acc:.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
f'on {str(device)}')
%%tab tensorflow
def load_array(data_arrays, batch_size, is_train=True): #@save
"""Construct a TensorFlow data iterator."""
dataset = tf.data.Dataset.from_tensor_slices(data_arrays)
if is_train:
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size)
return dataset
def synthetic_data(w, b, num_examples): #@save
"""Generate y = Xw + b + noise."""
X = tf.zeros((num_examples, w.shape[0]))
X += tf.random.normal(shape=X.shape)
y = tf.matmul(X, tf.reshape(w, (-1, 1))) + b
y += tf.random.normal(shape=y.shape, stddev=0.01)
y = tf.reshape(y, (-1, 1))
return X, y
def sgd(params, grads, lr, batch_size): #@save
"""Minibatch stochastic gradient descent."""
for param, grad in zip(params, grads):
param.assign_sub(lr * grad / batch_size)
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Download the Fashion-MNIST dataset and then load it into memory."""
mnist_train, mnist_test = tf.keras.datasets.fashion_mnist.load_data()
# Divide all numbers by 255 so that all pixel values are between
# 0 and 1, add a batch dimension at the last. And cast label to int32
process = lambda X, y: (tf.expand_dims(X, axis=3) / 255,
tf.cast(y, dtype='int32'))
resize_fn = lambda X, y: (
tf.image.resize_with_pad(X, resize, resize) if resize else X, y)
return (
tf.data.Dataset.from_tensor_slices(process(*mnist_train)).batch(
batch_size).shuffle(len(mnist_train[0])).map(resize_fn),
tf.data.Dataset.from_tensor_slices(process(*mnist_test)).batch(
batch_size).map(resize_fn))
class TrainCallback(tf.keras.callbacks.Callback): #@save
"""A callback to visiualize the training progress."""
def __init__(self, net, train_iter, test_iter, num_epochs, device_name):
self.timer = d2l.Timer()
self.animator = d2l.Animator(
xlabel='epoch', xlim=[1, num_epochs], legend=[
'train loss', 'train acc', 'test acc'])
self.net = net
self.train_iter = train_iter
self.test_iter = test_iter
self.num_epochs = num_epochs
self.device_name = device_name
def on_epoch_begin(self, epoch, logs=None):
self.timer.start()
def on_epoch_end(self, epoch, logs):
self.timer.stop()
test_acc = self.net.evaluate(
self.test_iter, verbose=0, return_dict=True)['accuracy']
metrics = (logs['loss'], logs['accuracy'], test_acc)
self.animator.add(epoch + 1, metrics)
if epoch == self.num_epochs - 1:
batch_size = next(iter(self.train_iter))[0].shape[0]
num_examples = batch_size * tf.data.experimental.cardinality(
self.train_iter).numpy()
print(f'loss {metrics[0]:.3f}, train acc {metrics[1]:.3f}, '
f'test acc {metrics[2]:.3f}')
print(f'{num_examples / self.timer.avg():.1f} examples/sec on '
f'{str(self.device_name)}')
#@save
def train_ch6(net_fn, train_iter, test_iter, num_epochs, lr, device):
"""Train a model with a GPU (defined in Chapter 6)."""
device_name = device._device_name
strategy = tf.distribute.OneDeviceStrategy(device_name)
with strategy.scope():
optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
net = net_fn()
net.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
callback = TrainCallback(net, train_iter, test_iter, num_epochs,
device_name)
net.fit(train_iter, epochs=num_epochs, verbose=0, callbacks=[callback])
return net
%%tab mxnet, tensorflow
def evaluate_accuracy(net, data_iter): #@save
"""Compute the accuracy for a model on a dataset."""
metric = Accumulator(2) # No. of correct predictions, no. of predictions
for X, y in data_iter:
metric.add(accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
%%tab all
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""Plot a list of images."""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = d2l.numpy(img)
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
%%tab pytorch, mxnet, tensorflow
def linreg(X, w, b): #@save
"""The linear regression model."""
return d2l.matmul(X, w) + b
def squared_loss(y_hat, y): #@save
"""Squared loss."""
return (y_hat - d2l.reshape(y, y_hat.shape)) ** 2 / 2
def get_fashion_mnist_labels(labels): #@save
"""Return text labels for the Fashion-MNIST dataset."""
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
return [text_labels[int(i)] for i in labels]
#@tab pytorch, mxnet, tensorflow
class Animator: #@save
"""For plotting data in animation."""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
# Incrementally plot multiple lines
if legend is None:
legend = []
d2l.use_svg_display()
self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes, ]
# Use a lambda function to capture arguments
self.config_axes = lambda: d2l.set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
def add(self, x, y):
# Add multiple data points into the figure
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
#@tab pytorch, mxnet, tensorflow
class Accumulator: #@save
"""For accumulating sums over `n` variables."""
def __init__(self, n):
self.data = [0.0] * n
def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]
def reset(self):
self.data = [0.0] * len(self.data)
def __getitem__(self, idx):
return self.data[idx]
#@tab pytorch, mxnet, tensorflow
def accuracy(y_hat, y): #@save
"""Compute the number of correct predictions."""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = d2l.argmax(y_hat, axis=1)
cmp = d2l.astype(y_hat, y.dtype) == y
return float(d2l.reduce_sum(d2l.astype(cmp, y.dtype)))
%%tab all
import os
import requests
import zipfile
import tarfile
import hashlib
def download(url, folder='../data', sha1_hash=None): #@save
"""Download a file to folder and return the local filepath."""
if not url.startswith('http'):
# For back compatability
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# Check if hit cache
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# Download
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""Extract a zip/tar file into folder."""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
%%tab pytorch, mxnet, tensorflow
def download_extract(name, folder=None): #@save
"""Download and extract a zip/tar file."""
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False, 'Only zip/tar files can be extracted.'
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
def tokenize(lines, token='word'): #@save
"""Split text lines into word or character tokens."""
assert token in ('word', 'char'), 'Unknown token type: ' + token
return [line.split() if token == 'word' else list(line) for line in lines]
%%tab pytorch
def evaluate_loss(net, data_iter, loss): #@save
"""Evaluate the loss of a model on the given dataset."""
metric = d2l.Accumulator(2) # Sum of losses, no. of examples
for X, y in data_iter:
out = net(X)
y = d2l.reshape(y, out.shape)
l = loss(out, y)
metric.add(d2l.reduce_sum(l), d2l.size(l))
return metric[0] / metric[1]
%%tab mxnet, tensorflow
def evaluate_loss(net, data_iter, loss): #@save
"""Evaluate the loss of a model on the given dataset."""
metric = d2l.Accumulator(2) # Sum of losses, no. of examples
for X, y in data_iter:
l = loss(net(X), y)
metric.add(d2l.reduce_sum(l), d2l.size(l))
return metric[0] / metric[1]
%%tab pytorch
def grad_clipping(net, theta): #@save
"""Clip the gradient."""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
%%tab tensorflow
def grad_clipping(grads, theta): #@save
"""Clip the gradient."""
theta = tf.constant(theta, dtype=tf.float32)
new_grad = []
for grad in grads:
if isinstance(grad, tf.IndexedSlices):
new_grad.append(tf.convert_to_tensor(grad))
else:
new_grad.append(grad)
norm = tf.math.sqrt(sum((tf.reduce_sum(grad ** 2)).numpy()
for grad in new_grad))
norm = tf.cast(norm, tf.float32)
if tf.greater(norm, theta):
for i, grad in enumerate(new_grad):
new_grad[i] = grad * theta / norm
else:
new_grad = new_grad
return new_grad
More for the attention chapter.
%%tab pytorch, mxnet, tensorflow
#@save
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
'94646ad1522d915e7b0f9296181140edcf86a4f5')
#@save
def read_data_nmt():
"""Load the English-French dataset."""
data_dir = d2l.download_extract('fra-eng')
with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f:
return f.read()
#@save
def preprocess_nmt(text):
"""Preprocess the English-French dataset."""
def no_space(char, prev_char):
return char in set(',.!?') and prev_char != ' '
# Replace non-breaking space with space, and convert uppercase letters to
# lowercase ones
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
# Insert space between words and punctuation marks
out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char
for i, char in enumerate(text)]
return ''.join(out)
#@save
def tokenize_nmt(text, num_examples=None):
"""Tokenize the English-French dataset."""
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target
#@save
def truncate_pad(line, num_steps, padding_token):
"""Truncate or pad sequences."""
if len(line) > num_steps:
return line[:num_steps] # Truncate
return line + [padding_token] * (num_steps - len(line)) # Pad
#@save
def build_array_nmt(lines, vocab, num_steps):
"""Transform text sequences of machine translation into minibatches."""
lines = [vocab[l] for l in lines]
lines = [l + [vocab['<eos>']] for l in lines]
array = d2l.tensor([truncate_pad(
l, num_steps, vocab['<pad>']) for l in lines])
valid_len = d2l.reduce_sum(
d2l.astype(array != vocab['<pad>'], d2l.int32), 1)
return array, valid_len
#@save
def load_data_nmt(batch_size, num_steps, num_examples=600):
"""Return the iterator and the vocabularies of the translation dataset."""
text = preprocess_nmt(read_data_nmt())
source, target = tokenize_nmt(text, num_examples)
src_vocab = d2l.Vocab(source, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = d2l.Vocab(target, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
data_iter = d2l.load_array(data_arrays, batch_size)
return data_iter, src_vocab, tgt_vocab
%%tab mxnet
#@save
class MaskedSoftmaxCELoss(gluon.loss.SoftmaxCELoss):
"""The softmax cross-entropy loss with masks."""
# `pred` shape: (`batch_size`, `num_steps`, `vocab_size`)
# `label` shape: (`batch_size`, `num_steps`)
# `valid_len` shape: (`batch_size`,)
def forward(self, pred, label, valid_len):
# `weights` shape: (`batch_size`, `num_steps`, 1)
weights = np.expand_dims(np.ones_like(label), axis=-1)
weights = npx.sequence_mask(weights, valid_len, True, axis=1)
return super(MaskedSoftmaxCELoss, self).forward(pred, label, weights)
#@save
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""Train a model for sequence to sequence."""
net.initialize(init.Xavier(), force_reinit=True, ctx=device)
trainer = gluon.Trainer(net.collect_params(), 'adam',
{'learning_rate': lr})
loss = MaskedSoftmaxCELoss()
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens
for batch in data_iter:
X, X_valid_len, Y, Y_valid_len = [
x.as_in_ctx(device) for x in batch]
bos = np.array(
[tgt_vocab['<bos>']] * Y.shape[0], ctx=device).reshape(-1, 1)
dec_input = d2l.concat([bos, Y[:, :-1]], 1) # Teacher forcing
with autograd.record():
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.backward()
d2l.grad_clipping(net, 1)
num_tokens = Y_valid_len.sum()
trainer.step(num_tokens)
metric.add(l.sum(), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device)}')
#@save
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
device, save_attention_weights=False):
"""Predict for sequence to sequence."""
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
src_vocab['<eos>']]
enc_valid_len = np.array([len(src_tokens)], ctx=device)
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# Add the batch axis
enc_X = np.expand_dims(np.array(src_tokens, ctx=device), axis=0)
enc_outputs = net.encoder(enc_X, enc_valid_len)
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# Add the batch axis
dec_X = np.expand_dims(np.array([tgt_vocab['<bos>']], ctx=device), axis=0)
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state)
# We use the token with the highest prediction likelihood as input
# of the decoder at the next time step
dec_X = Y.argmax(axis=2)
pred = dec_X.squeeze(axis=0).astype('int32').item()
# Save attention weights (to be covered later)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# Once the end-of-sequence token is predicted, the generation of the
# output sequence is complete
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred)
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
%%tab pytorch
#@save
def sequence_mask(X, valid_len, value=0):
"""Mask irrelevant entries in sequences."""
maxlen = X.size(1)
mask = torch.arange((maxlen), dtype=torch.float32,
device=X.device)[None, :] < valid_len[:, None]
X[~mask] = value
return X
#@save
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
"""The softmax cross-entropy loss with masks."""
# `pred` shape: (`batch_size`, `num_steps`, `vocab_size`)
# `label` shape: (`batch_size`, `num_steps`)
# `valid_len` shape: (`batch_size`,)
def forward(self, pred, label, valid_len):
weights = torch.ones_like(label)
weights = sequence_mask(weights, valid_len)
self.reduction='none'
unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(
pred.permute(0, 2, 1), label)
weighted_loss = (unweighted_loss * weights).mean(dim=1)
return weighted_loss
#@save
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""Train a model for sequence to sequence."""
def xavier_init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
if type(m) == nn.GRU:
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])
net.apply(xavier_init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()
net.train()
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens
for batch in data_iter:
optimizer.zero_grad()
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
device=device).reshape(-1, 1)
dec_input = d2l.concat([bos, Y[:, :-1]], 1) # Teacher forcing
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.sum().backward() # Make the loss scalar for `backward`
d2l.grad_clipping(net, 1)
num_tokens = Y_valid_len.sum()
optimizer.step()
with torch.no_grad():
metric.add(l.sum(), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device)}')
#@save
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
device, save_attention_weights=False):
"""Predict for sequence to sequence."""
# Set `net` to eval mode for inference
net.eval()
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
src_vocab['<eos>']]
enc_valid_len = torch.tensor([len(src_tokens)], device=device)
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# Add the batch axis
enc_X = torch.unsqueeze(
torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
enc_outputs = net.encoder(enc_X, enc_valid_len)
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# Add the batch axis
dec_X = torch.unsqueeze(torch.tensor(
[tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state)
# We use the token with the highest prediction likelihood as input
# of the decoder at the next time step
dec_X = Y.argmax(dim=2)
pred = dec_X.squeeze(dim=0).type(torch.int32).item()
# Save attention weights (to be covered later)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# Once the end-of-sequence token is predicted, the generation of the
# output sequence is complete
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred)
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
%%tab tensorflow
#@save
def sequence_mask(X, valid_len, value=0):
"""Mask irrelevant entries in sequences."""
maxlen = X.shape[1]
mask = tf.range(start=0, limit=maxlen, dtype=tf.float32)[
None, :] < tf.cast(valid_len[:, None], dtype=tf.float32)
if len(X.shape) == 3:
return tf.where(tf.expand_dims(mask, axis=-1), X, value)
else:
return tf.where(mask, X, value)
#@save
class MaskedSoftmaxCELoss(tf.keras.losses.Loss):
"""The softmax cross-entropy loss with masks."""
def __init__(self, valid_len):
super().__init__(reduction='none')
self.valid_len = valid_len
# `pred` shape: (`batch_size`, `num_steps`, `vocab_size`)
# `label` shape: (`batch_size`, `num_steps`)
# `valid_len` shape: (`batch_size`,)
def call(self, label, pred):
weights = tf.ones_like(label, dtype=tf.float32)
weights = sequence_mask(weights, self.valid_len)
label_one_hot = tf.one_hot(label, depth=pred.shape[-1])
unweighted_loss = tf.keras.losses.CategoricalCrossentropy(
from_logits=True, reduction='none')(label_one_hot, pred)
weighted_loss = tf.reduce_mean((unweighted_loss*weights), axis=1)
return weighted_loss
#@save
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""Train a model for sequence to sequence."""
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
animator = d2l.Animator(xlabel="epoch", ylabel="loss",
xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens
for batch in data_iter:
X, X_valid_len, Y, Y_valid_len = [x for x in batch]
bos = tf.reshape(tf.constant([tgt_vocab['<bos>']] * Y.shape[0]),
shape=(-1, 1))
dec_input = tf.concat([bos, Y[:, :-1]], 1) # Teacher forcing
with tf.GradientTape() as tape:
Y_hat, _ = net(X, dec_input, X_valid_len, training=True)
l = MaskedSoftmaxCELoss(Y_valid_len)(Y, Y_hat)
gradients = tape.gradient(l, net.trainable_variables)
gradients = d2l.grad_clipping(gradients, 1)
optimizer.apply_gradients(zip(gradients, net.trainable_variables))
num_tokens = tf.reduce_sum(Y_valid_len).numpy()
metric.add(tf.reduce_sum(l), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device._device_name)}')
#@save
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
save_attention_weights=False):
"""Predict for sequence to sequence."""
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
src_vocab['<eos>']]
enc_valid_len = tf.constant([len(src_tokens)])
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# Add the batch axis
enc_X = tf.expand_dims(src_tokens, axis=0)
enc_outputs = net.encoder(enc_X, enc_valid_len, training=False)
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# Add the batch axis
dec_X = tf.expand_dims(tf.constant([tgt_vocab['<bos>']]), axis=0)
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state, training=False)
# We use the token with the highest prediction likelihood as input
# of the decoder at the next time step
dec_X = tf.argmax(Y, axis=2)
pred = tf.squeeze(dec_X, axis=0)
# Save attention weights
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# Once the end-of-sequence token is predicted, the generation of the
# output sequence is complete
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred.numpy())
return ' '.join(tgt_vocab.to_tokens(tf.reshape(output_seq, shape = -1).numpy().tolist())), attention_weight_seq
最后更新:
November 25, 2023
创建日期: November 25, 2023
创建日期: November 25, 2023