
В этой статье пойдет речь об одной из самых сложных и интересных архитектур — трансформере, лежащей в основе современных моделей от OpenAI и Google DeepMind. И это не научпоп для обывателя с наивным уровнем объяснения, а полноценный учебный материал, который поможет вам понять работу трансформера на фундаментальном уровне без черных ящиков типа TensorFlow и Pytorch.
А для того чтобы лучше вникнуть, давайте напишем настоящий мини-трансформер на процедурном Python и обучим его!
Коротко об автореЯ энтузиаст-исследователь, который интересуется узкоспециализированными вещами, чтобы рассказать о них понятным языком другим людям.
Почему я вообще говорю о трансформере?
Мною был написан полноценный мини-фреймворк, который представляет из себя энкодер-декодерный трансформер со своей реализацией объекта Тензор, топологическим автоградом и динамическим графом вычислений. А также множеством слоев, функций и гибкими настройками параметров.
Он является моей исследовательской платформой для практической проверки перспективных и интересных технологий, например Латентного внимания. И именно на его основе, я создал этот упрощенный учебный код.
В будущем я планирую выложить его в открытый доступ и написать большую статью-инструкцию о его устройстве, эволюции и проблемах, решенных на этом пути.
Но пока это лишь планы.
Данный материал можно изучать в разных режимах:
Как объяснение архитектуры для общего представления;
Как полноценный гайд с чтением кода и самостоятельной практикой;
Как основу для собственных экспериментов.
Вы сами можете выбрать тот режим, который нужен для ваших целей на данный момент. Статья написана циклично, от поверхностного разбора до конкретной реализации. Однако сразу оговорюсь, что для максимальной пользы, читателю нужен базовый минимум:
Иметь представление о работе нейросетей;
Линейные слои; активация; веса; ошибка и подобное.
Желателен опыт с языковыми моделями;
Векторное представление текста; Сверточные или рекуррентные сети.
Понимать работу СГС метода;
Градиент; Граф вычислений; Прямой и обратный проход.
Знать линейную алгебру на начальном уровне;
Линейный оператор; Матричное представление нейросетей.
Немного программировать.
Python: функции, условия и циклы; Numpy: работа с массивами; Среда: PyCharm, Colab или что-то еще.
Желание разобраться в сложной теме.
Действительно ли вам это нужно?
То есть если вы немного знаете линейку, немного программируйте и знакомы с нейросетями на уровне bootcamp-адепта, то этого уже должно хватить для погружения на уровне реализации. А даже если и нет, то вы все равно можете прочитать статью для общего ознакомления.
В отличии от других архитектур, траснформеры проблематичны именно в понимании. Для их изучения просто необходимы аналогии и примеры. Что-то может показаться очевидным, а что-то непонятным. Поэтому важно просто двигаться дальше, к более подробному рассмотрению всех моментов, и понимание придет само.
Для тех, кто все же хочет освоить код, скажу следующее:
Наш трансформер будет довольно простым: со статическим графом и одноблочными энкодером и декодером. Сам код написан в парадигме процедурного программирования (за исключением некоторых модулей) и может быть прочитан на любом уровне и без знания ООП. И все же это будет полноценный обучаемый трансформер с мультиголовым вниманием, батчами данных, параллельным вычислением и множеством параметров.
Код будет содержаться в спойлерах в конце глав. Для этого откройте редактор кода или ноутбук. Разверните раздел "практика", скопируйте его и запустите в своей среде. Шаг за шагом, глава за главой, модель будет прирастать кодом, пока мы не запустим ее в отдельном разделе "Запуск модели".
Для закрепления материала, выполните домашнее задание, которое ждет вас в конце статьи.
ПрактикаНа текущем этапе мы просто создадим универсальный импорт, который будет работать независимо от того, имеется ли у вас (или на удаленной машине) GPU.
try:
!nvidia-smi
!pip install cupy-cuda12x
import cupy as np
if not np.cuda.runtime.getDeviceCount():
raise ImportError
print("Using GPU")
except:
import numpy as np
print("Using CPU")
import random
from typing import Union, OptionalДля вычислений на GPU вам понадобится библиотека CuPy, которая является полным аналогом NumPy и почти идентична ему в плане интерфейса.
И, разумеется, нужно установить CUDA Toolkit аналогичной версии, если его нет.
# Пример установки библиотеки для нужной версии CUDA:
pip install cupy-cuda12xОбратите внимание!
В тексте я буду использовать скобки для представления размера тензоров. Например [B, H, T, D]. Они нужны для понимания метаморфоз, происходящих с тензорами во время многочисленных умножений, транспонирований, решейпов и подобного.
Нужно запомнить следующие условные обозначения:
B (Batch) — размер батча;
H (Head) — количество голов внимания;
T (Token) — количество токенов в последовательности;
D (D_model) — ширина модели (длина эмбеддингов токенов);
P (Part) — размер партии для каждой головы внимания;
V (Vocab) — размер вокабуляра модели;
MHA (Multi-head attention) — мультиголовое внимание;
MHCA (Multi-head cross-attention) — перекрестное внимание;
FF (Feedforward) — прямой проход;
Вокабуляр — это словарный запас модели, где текстовые единицы представлены в сыром виде под номерами. Его ширина V — это крайне важная характеристика, от которой зависят размеры некоторых обучаемых параметров.
Токены — это представления текстовых единиц в виде номера. Они используются в датасетах для укрупнения информации. Посредством токена можно индексировать вокабуляр.
Эмбеддинги — это векторные представления текстовых единиц, которые непосредственно работают в модели.
Тензоры — это пакеты данных с возможностью параллельной обработки. Их размерности и конкретные размеры, а так же порядок осей напрямую влияют на их обработку моделью: операции, оси суммирования, интерпретация. В данном коде они будут представлены numpy-массивами.
Курсивом будут обозначены примечания к тексту. Они должны ответить на возможные вопросы заранее.
Ну и в конце, без ложной скромности, скажу, что написать подобную статью — это тяжелый труд, который никто не оплатит. Я старался создать исчерпывающий материал, который мог бы использоваться как самостоятельное учебное пособие. У меня ушло 4 месяца на изучение темы и написание собственного фреймворка и еще несколько недель на написание этой статьи. Буду рад вашей поддержке!
Ниже изображена всем известная схема трансформера. Давайте глобально пройдемся по ней для общего представления. На этом этапе мы не будем углубляться в детали: он нужен лишь как оглавление к статье.

Коротко разберем алгоритм работы:
Все начинается с того, как данные (сообщение пользователя, запрос поисковика, обучающий батч) попадают в начало энкодера (которого может и не быть) для извлечения контекста запроса.
Сообщение имеет форму [B T], где B - это размер батча, а T - количество токенов в сообщениях.
Они все еще представлены в сыром виде и прежде чем идти дальше, слой Get_embedding должен подменить номера позиций токенов в вокабуляре их векторными представлениями, которые обучены заранее или будут обучены (дообучены) вместе с трансформером.
Данные, обзаведенные эмбеддингами, теперь имеют форму [B T D], где D - это длина эмбеддингов, например 512.
После мы должны зашифровать информацию о позициях токенов в сообщении, так как трансформеры обрабатывают их в параллельном режиме. Подробнее об этом мы поговорим в разделе "Позиционное кодирование".

В энкодере нас ждут слои мультивнимания и прямого прохода со слоем активации, которые представляют из себя единый блок. Блоки могут повторяться. Чем больше блоков, тем больше параметров у трансформера, тем больше признаков он может запомнить и тем более сложные паттерны извлечь.
Обратите внимание, что в блоках имеются остаточные связи (Residual), которые минуют слои и складываются с их выходами в конце. Этот момент мы разберем в отдельной главе "Стабилизация".
При этом форма данных будет меняться: разбиваться на части для анализа мультиголового внимания [B H T P] и снова собираться в [B T D] перед подачей в Feedforward.
Обработав данные, энкодер возвращает некое абстрактное представление информации из сообщения, которую он смог извлечь, называемое контекстом. Его принято обозначать буквой Z. Эта итоговая "выжимка" из полученного запроса будет использоваться для последующей генерации ответа.
Она все так же имеет стандартную форму [B T D] и будет нужна в декодере.

Декодер не взаимодействует напрямую с данными энкодера. Вместо этого он начинает генерировать свое собственное сообщение по одному токену, начиная с технического Start.
Для этого создается стартовое сообщение с токеном Start для каждого набора в батче.
Форма на этом этапе — [B 1].
Как и в энкодере, генерируемый батч должен получить эмбеддинги и пройти позиционное кодирование. Этот этап происходит при каждой итерации.
После этого батч проходит связку:
Маскированного мультивнимания, которое ко всему прочему запрещает токенам видеть те, что стоят пред ними в последовательностях;
Перекрёстного внимания, которое связывает информацию из генерируемого сообщения и контекста, пришедшего из энкодера (Новое сообщение как бы строится на основе запроса пользователя.);
Прямого прохода, который завершает блок, как и в энкодере.
После прохождения блока(ов) декодера, данные поступают в выходной слой, который проецирует их в пространство с размерностью равной длине словаря [B T V], где V - размер вокабуляра. Таким образом мы получаем логиты нужной формы.
Полученные логиты проходят через Softmax (он еще не раз нам встретится), чтобы их можно было интерпретировать как вероятности. Позиция с самым высоким значением соответствует наиболее вероятному кандидату на следующий токен.
Мы присоединяем номер токена к генерируемому сообщению [B T+1] и снова отправляем его на вход декодера, где процесс повторяется: получение эмбеддингов, кодирование и блоки. Это продолжается пока цикл генерации не создаст максимальное количество токенов или, что наиболее вероятно, все сообщения в батче не получат токен остановки Stop.
Когда это произойдет, генерация оборвется раньше времени. До тех пор, последовательности, которые получили Stop раньше других, будут заполнятся паддингами — это еще один вид технических токенов, нужных для работы сети.
Все это короткое описание работы трансформера, которое может быть слишком нагроможденным и непонятным. Это нормально. Далее мы разберем его составные части и их работу очень подробно.
Должно быть вы уже слышали, что основная идея трансформеров заключается в самовнимании токенов? Начнем с него.
ПрактикаДавайте реализуем прямой проход модели.
Это верхнеуровневая функция последовательного вызова слоев. Она словно скелет модели проводит входные данные через энкодер и декодер, автоматически определяет режим работы (генерация или обучение) и кэширует промежуточные состояния тензоров для обратного прохода.
def call(X: np.ndarray, Y: np.ndarray = None) -> np.ndarray:
"""Проход по блокам"""
# Энкодер:
training = True if Y is not None else False
X = np.take(vocab, X, axis=0) # Получение эмбеддингов (B, T, D)
Xr = positional_encoding(X) # Позиционный энкодинг
X, cache_Norm1 = normalize(Xr)
X, cache_MHAe = MHA(X, index=0)
Xr = Xr + X
X, cache_Norm2 = normalize(Xr)
X, cache_FFe = FF(X, index=0)
Z = Xr + X
# Декодер:
if training: # Режим обучения
X = add_start_token(Y) # Присоединение стартового набора (B, 1+T)
X = np.take(
vocab,
X,
axis=0
).astype(np.float32) # Получение эмбеддингов (B, 1+T, D)
Xr = positional_encoding(X) # Позиционное кодирование
X, cache_Norm3 = normalize(Xr)
X, cache_MHAd = MHA(X, index=1, is_decoder=True)
Xr = Xr + X
X, cache_Norm4 = normalize(Xr)
X, cache_MHCA = MHCA(X, Z)
Xr = Xr + X
X, cache_Norm5 = normalize(Xr)
X, cache_FFd = FF(X, index=1)
Xr = Xr + X
X, cache_Norm6 = normalize(Xr)
X, cache_out = output(X)
cache = {
"Norm1": cache_Norm1, "MHA-E": cache_MHAe,
"Norm2": cache_Norm2, "FF-E": cache_FFe,
"Norm3": cache_Norm3, "MHA-D": cache_MHAd,
"Norm4": cache_Norm4, "MHCA": cache_MHCA,
"Norm5": cache_Norm5, "FF-D": cache_FFd,
"Norm6": cache_Norm6, "Out": cache_out
}
return X, cache
else: # Режим генерации
seq = np.tile(
Start, (Z.shape[0], 1)
).astype(np.int32) # Стартовый набор (B, 1)
for _ in range(max_tokens): # Цикл токенов
X = np.take(vocab, seq, axis=0) # Получение эмбеддингов (B, 1, D)
Xr = positional_encoding(X) # Позиционное кодирование
X, _ = normalize(Xr)
X, _ = MHA(X, index=1, is_decoder=True)
Xr = Xr + X
X, _ = normalize(Xr)
X, _ = MHCA(X, Z)
Xr = Xr + X
X, _ = normalize(Xr)
X, _ = FF(X, index=1)
X = Xr + X
X, _ = output(X)
X = softmax(X) # Вероятности
indexes = np.argmax(
X[:, -1, :], axis=-1
)[..., None] # Получение индексов новых токенов
seq = np.concatenate(
(seq, indexes)
, axis=1
) # Добавление токенов
return seqКод реализует изображенную выше схему.
Кроме того, нам понадобятся вспомогательные функции, использующиеся в слоях и не только:
def softmax(X: np.ndarray) -> np.ndarray:
"""Softmax по последней оси"""
shifted = X - np.max(X, axis=-1, keepdims=True)
exp = np.exp(shifted)
return exp / np.sum(exp, axis=-1, keepdims=True)
def leakyrelu(X: np.ndarray, rate: float = .1) -> np.ndarray:
"""Leaky ReLU"""
return np.where(X > 0, X, rate * X).astype(np.float32)
def add_start_token(data: np.ndarray) -> np.ndarray:
"""Присоединение стартового токена в начало последовательности"""
size = (data.shape[0], 1)
start = np.tile(Start, size).astype(np.int32) # Стартовый токен (B, 1)
return np.concatenate((start, data), axis=1) # Стартовый набор (B, 1+T)
def add_finish_token(data: np.ndarray) -> np.ndarray:
"""Присоединение финишного токена в конец последовательности"""
size = (data.shape[0], 1)
finish = np.tile(Finish, size).astype(np.int32) # Финишный токен (B, 1)
return np.concatenate((data, finish), axis=1) # Финишный набор (B, T+1)
def predict(data: np.ndarray, dtype=np.int32) -> np.ndarray:
"""Выдает предсказание для данных"""
predictions = []
for batch in data:
predictions.append(call(batch))
return np.array(predictions, dtype=dtype)Их изучение не принципиально.
В обычных языковых моделях, мы представляли данные в виде n-грамм, присваивали им эмбеддинги - векторные представления текста, и отправляли их, например, в сверточную сеть для извлечения признаков.

Тогда данные начинали сжиматься в пространственном измерении и расти в канальном, пока на выходе не получалась "выжимка" из признаков, которую можно было развернуть в новую последовательность или вытянуть/усреднить/максимизировать и направить в полносвязный слой.
Или в рекуррентную сеть...

Тогда данные по одному токену (эмбеддингу токена) отправлялись в рекуррентные слои, которые подавали свой выход обратно на вход (рекуррентно) вместе со следующим токеном.
На самом деле все сложнее. Поступающая в нейрон/слой информация обновляет скрытое состояния с отдельными обучаемыми параметрами и только потом к ней применяются основные параметры. Таким образом каждый поступающий эмбеддинг будет сталкиваться с информацией (памятью) предыдущих и работать в локальном контексте.
Далее выход(ы) сети можно подать в классификатор или декодер, который на его основе начнет генерацию последовательности.
В трансформерах все иначе!
Идея заключается в том, чтобы создать матрицу отношений между языковыми единицами и прояснить сложные взаимосвязи между ними. Ее можно представить в виде таблицы, где у каждой такой единицы будет целая строка со значениями, соответствующими другим токенам, размера TxT.
Предположим, у нас есть некий текст, который мы подаем в сеть: "Я иду в магазин.". Тогда на выходе получится подобная матрица внимания:
T1 Я | T2 Иду | T3 В | T4 Магазин | |
Я | .5 | .4 | .05 | .25 |
Иду | .2 | .5 | .05 | .25 |
В | 0 | .15 | .7 | .15 |
Магазин | .2 | .2 | .1 | .5 |
Здесь каждый токен имеет представление о любом другом, выраженное в конкретных цифрах. Это и называется самовниманием токенов.
В нашем синтетическом примере, подлежащие и сказуемое имеют сильную связь, когда как предлог несет меньше смысловой нагрузки и слабо связан с другими токенами.
Но как реализовать подобное на практике?
На ум приходит простой алгоритм, дающий нужную форму. Если умножить X на его транспонированный вариант, то как раз получится тензор [B T T], то есть таблица отношений для каждого набора в батче.
A [B T T] = X [B T D] * Xt [B D T]
Но получение нужной формы не означает получение нужной сути. Да и как мы будем учить нашу модель, если у нее нет обучаемых параметров? Вот почему нам нужны не сырые данные, а их проекции.
Ниже представлена полная схема работы внимания:

Не так уж и много действий, не правда ли?

Вместо единого представления данных и последовательного взаимодействия с обучаемыми матрицами весов, мы создаем целых три проекции, которые будут взаимодействовать друг с другом. Делается это путем умножения на три обучаемые матрицы:
Матрица запросов Q;
Матрица ключей K;
Матрица значений V.
На самом деле, подобные названия — это лишь абстракция, которая облегчает понимание происходящего.
Теперь, входные данные предстают в трех ипостасях:
Xq - запросы токенов. Этакое "Что я ищу?".
Xk - ключи. Представления являющиеся ключами к нашему "замку" — запросу.
Xv - значения. Внутреннее содержание токенов.

И только после этого нужно перемножить две такие проекции, чтобы получить матрицу нужной формы. Каждый токен как бы "опрашивает" другой, насколько тот для него семантически важен. Таким образом и создается матрица внимания, где любой токен имеет представление о другом в конкретной последовательности. Нужно лишь пропустить ее через Softmax для нормирования значений и представления их как вероятностей.
Сырые значения становятся вероятностными распределениями, которые в сумме равны 1.
Так что в итоге это дает и как привести данные обратно к стандартной форме?

После создания матрицы внимания, мы отвечаем на самый главный вопрос о важности токенов друг для друга. Это значит, что если мы умножим такую матрицу на проекцию значений, мы получим итоговое (взвешенное) представление данных о самих себе: что в них важно, а что второстепенно по смыслу; какие части являются ключевыми, а какие можно отбросить; как токены связаны друг с другом. Все это инкорпорировано в итоговый Z тензор.
Кроме того, в техническом смысле, мы возвращаем данные к стандартной форме, которая необходима для работы в других слоях:
Z [B T D] = A [B T T] * Xv [B T D]
Обратите внимание, что перед нормированием и взвешиванием, мы упустили еще одно важно действие — масштабирование. Прежде чем подать логиты в Softmax, нужно разделить их на корень длины эмбеддинга/партии. Этот момент мы подробно разберем в главе "Стабилизация".
На этом моменте, суть алгоритма классического самовнимания раскрыта полностью. Но это еще не все, что нужно знать для полноценной реализации слоя внимания трансформеров.
ПрактикаДавайте создадим наши обучаемые матрицы (тензоры).
Для этого зададим глобальные параметры, которые будут иметь решающее значение для работы модели.
# Глобальные параметры:
batch = 64
std = .1
heads = 4
d_model = 32
part_size = int(d_model / heads)
ff_multiply = 4
vocab_size = 12 # Включая токены Start и Finish
max_tokens = 16Дефолтный размер входных данных у нас будет не очень большой [64, 16] — 64 примера по 16 токенов. Длинна эмбеддингов = 32. Этого хватит для обучения последовательностей. В будущем параметры можно изменить, но эти значения станут отправной точкой (потому что они работают).
# Параметры внимания (для энкодера и декодера):
size1 = (2, heads, part_size, part_size)
size2 = (2, d_model, d_model)
MWQ = np.random.normal(size=size1, scale=std).astype(np.float32)
MWK = np.random.normal(size=size1, scale=std).astype(np.float32)
MWV = np.random.normal(size=size1, scale=std).astype(np.float32)
MWO = np.random.normal(size=size2, scale=std).astype(np.float32)
# Параметры перекрестного внимания:
size1 = (heads, part_size, part_size)
size2 = (d_model, d_model)
CWQ = np.random.normal(size=size1, scale=std).astype(np.float32)
CWK = np.random.normal(size=size1, scale=std).astype(np.float32)
CWV = np.random.normal(size=size1, scale=std).astype(np.float32)
CWO = np.random.normal(size=size2, scale=std).astype(np.float32)
# Параметры прямого прохода:
size1 = (2, d_model, d_model * ff_multiply)
size2 = (2, d_model * ff_multiply, d_model)
size3 = (2, d_model * ff_multiply)
size4 = (2, d_model)
FW1 = np.random.normal(size=size1, scale=std).astype(np.float32)
FW2 = np.random.normal(size=size2, scale=std).astype(np.float32)
FB1 = np.zeros(size3).astype(np.float32)
FB2 = np.zeros(size4).astype(np.float32)
# Параметры выходного слоя:
size1 = (d_model, vocab_size)
size2 = (vocab_size)
WO = np.random.normal(size=size1, scale=std).astype(np.float32)
BO = np.zeros(size2).astype(np.float32)
# Обучаемые параметры (для оптимизатора):
parameters = [
MWQ[0], MWK[0], MWV[0], MWO[0],
FW1[0], FB1[0], FW2[0], FB2[0],
MWQ[1], MWK[1], MWV[1], MWO[1],
CWQ, CWK, CWV, CWO,
FW1[1], FB1[1], FW2[1], FB2[1],
WO, BO,
]Стоит сказать, что во "взрослых" фреймворках используются продвинутые методы инициализации начальных параметров, которые учитывают размеры матриц и распределение. Это нужно для предотвращения быстрого роста значений при многократном умножении больших матриц и, как следствие, переполнение выделенной памяти для используемых типов. А использовать принято Float32, так как он значительно легче, менее затратен при вычислениях и его точности достаточно для машинного обучения.
Но все это — большая тема для отдельной статьи, которая лишь усложнит построение нашего игрушечного трансформера. Поэтому, в качестве выхода из положения, мы просто зададим малое значение разброса данных в пределах сотых значений.
Также сгенерируем фиксированные эмбеддинги для токенов:
# Работа с вокабуляром:
def build_fixed_embeddings(vocab_size: int, random_state: int = 13) -> np.ndarray:
"""Ортогональные эмбеддинги"""
rng = np.random.default_rng(random_state)
mat = rng.standard_normal(size=(d_model, d_model))
Q, _ = np.linalg.qr(mat)
emb = Q[:vocab_size]
return emb.astype(np.float32)
vocab = build_fixed_embeddings(vocab_size)
Start = 10
Finish = 11Несмотря на то, что они не являются обучаемыми, ортогонализация делает их крайне легкими в заучивании моделью, которая хорошо различает подобные особенности.
Обратите на параметр vocab_size. Его вы также можете регулировать, чтобы создать вокабуляр иного размера для своих исследований!
Мультиголовое внимание представляет из себя слой внимания с разбиением входных данных на части по последнему измерению D. Это делается для снижения количества параметров и возможности "раздельного внимания". Таким образом, каждая голова получает возможность запомнить свои уникальные признаки, что в итоге улучшает качество.
Представьте, что в сеть вошел крупный батч, состоящий из множества примеров, в каждом из которых огромное количество токенов. Каждый токен представлен в виде эмбеддинга = 512 значениям. В итоге, данные будут представлять из себя тензор размера [B, T, 512].
Для того чтобы сделать проекцию QKV, нам потребуются матрицы размера 512х512, имеющие приблизительно 2e5 значений. Очевидно, что это очень много и... не нужно. Учитывая, что у нас имеется множество слоев и блоков, в реальности нам хватит гораздо меньшего количества параметров. Так почему бы нам просто не разделить каждый эмбеддинг на партии и не подать их в отдельные головы?
X [B H D] -> X [B H T P], где P — это размер партии (участка эмбеддинга).
Таким образом, данные будут иметь иную размерность, например [Batch Head Token 32] при 16 головах. Соответственно у нас уже будет 16 обучаемых матриц 32х32. При несложных подсчетах мы получим 16 384 параметров, что на порядок меньше, чем раньше!
Рассмотрим модернизированный вариант внимания:

Обратите внимание, что суммирование делается уже по оси партии P. Поэтому нам обязательно нужно поменять ось токенов T и голов H местами, чтобы получить матрицы TxP для каждой головы, по которым мы будем вычислять внимание.

Далее выполняется стандартный алгоритм, но только теперь матриц внимания, каждая из которых была вычислена только на части данных (эмбеддингов), будет множество, и все они будут иметь полный размер по токенам TхT. Это значительно увеличивает гибкость такого механизма, так как интерпретаций отношения между языковыми единицами будет множество. Каждая из них выделит свои уникальные признаки и зависимости, и все это при уменьшенном количестве параметров!
Далее, после взвешивания (умножения на Xv), мы могли бы провести обратное преобразование нашего Z тензора и вернуть его к стандартной форме:
Z [B H T P] -> transpose -> Z [B T H P] -> reshape -> Z [B T D]
Однако, при таком подходе возникает новая проблема: разобщенность признаков. Каждая голова, словно DepthWise свертка в CNN, запоминает только свои признаки и не обменивается информацией с остальными. Мы не можем просто соединить показания каждой головы из конца в конец, не создав семантическую путаницу. Это тоже самое, что сгенерировать картинку путем решейпа последовательности: информация о локальных связях будет отсутствовать, и высокого качества мы никогда не получим.
Тут на помощь приходит еще один обучаемый параметр WO — общая/обобщающая матрица. Умножение на нее позволит объединить все признаки заново собранного тензора Z, извлеченные отдельными головами слоя.

Механизм мультивнимания — это тот случай, когда оптимизация и усиление не противоречат друг другу. Именно поэтому его разбор не является излишеством. А еще важно реализовать именно эту пусть и усложненную версию механизма внимания, дающую столько преимуществ, что мы и сделаем дальше.
ПрактикаДавайте реализуем слой Multi-Head Attention в виде обычной функции Python!
def MHA(X: np.ndarray, is_decoder: bool = False, index: int = 0, neg_inf: float = -1e9) -> tuple[np.ndarray, dict]:
"""Многоголовое внимание"""
T = X.shape[1]
# Разделение входной матрицы для мультивнимания:
X = X.reshape(X.shape[0], T, heads, part_size) # (B T H P)
# Матрицы проекций:
Q = np.einsum('bthd, hdp -> bhtp', X, MWQ[index]) # (B T H P)(H P P) (B H T P)
K = np.einsum('bthd, hdp -> bhtp', X, MWK[index]) # (B T H P)(H P P) (B H T P)
V = np.einsum('bthd, hdp -> bhtp', X, MWV[index]) # (B T H P)(H P P) (B H T P)
# Матрица внимания:
A = np.einsum('bhtp, bhfp -> bhtf', Q, K) # (B H T P)(B H T P) (B H T T)
A = A / np.sqrt(part_size, dtype=np.float32) # Масштабирование
# Маскирование внимания:
if is_decoder:
size = (X.shape[0], heads, T, T)
mask = np.triu(
np.full(size, neg_inf),
k=1
).astype(np.float32)
A = A + mask
# Матрица взвешенны значений:
A_softmax = softmax(A)
Z = np.einsum('bhtf, bhfp -> bhtp', A_softmax, V) # (B H T T)(B H T P) (B H T P)
# Объединение голов:
shape = (X.shape[0], T, d_model)
Zo = Z.transpose(0, 2, 1, 3) # (B H T P) (B T H P)
Zo = Zo.reshape(shape) # (B T H P) (B T D)
Zo = np.einsum('btd, df -> btf', Zo, MWO[index]) # (B T D)(D D) (B T D)
cache = {
"X": X, "Q": Q, "K": K, "V": V,
"A": A, "attn": A_softmax, "Z": Z,
"index": index, "mask": mask if is_decoder else None,
}
return Zo, cacheАлгоритм следующий:
Разделение входных данных на головы [B, T, D] -> [B, H, T, P];
В самом коде явная перестановка осей отсутствует, так как Einsum позволяет избежать ее.
Создание проекций запросов, ключей и значений;
Создание матрицы внимания: Xq * Xk;
Не забываем масштабировать: / sqrt(P);
Здесь используем размер партии, так как данные все еще поделены на головы.
Перевод логитов в вероятность с помощью Softmax;
Получение матрицы взвешенных значений A * Xv;
Соединение данных transpose-reshape;
Меняем местами оси H и T и делаем решейп, избавляясь от измерения голов.
Обобщаем выходы голов: Z * WO.
Обратите внимание на формы тензоров в комментариях к коду. Они помогут понять метаморфозы, которые влечет каждая операция.
*Еще тут есть возможность маскировки внимания, но о ней будет расказано в другом разделе статьи — "Параллельное обучение".
И все же есть нечто, что MHA не под силу — это самостоятельно определить позиции токенов в последовательности. Если в языковых моделях прошлых типов, информация о позиции сохранялась сама по себе, то в трансформере данные обрабатываются параллельно.
Из курса линейной алгебры, мы знаем, что матрицы — это объекты, в которых количество строк и столбцов отвечает за количество векторов, записанных внутри, и измерений пространства. Дополнительные измерения многомерного тензора с формой [B T D] или [B H T P] говорят нам о порядке хранения пакетов данных и их обработке. Но само взаимодействие с линейными операторами происходит лишь по последней оси D — векторных представлений конкретных токенов.

Это означает, что MHA не "видит" то, в каком порядке они расположены друг от друга. Можно сравнить это с чтением учебника по одному слову с каждой страницы за раз. Информация есть, но смысла нет, так как наше внимание не может воспринимать ее таким образом. А значит и качество обучения будет низким. К счастью, мы можем внедрить информацию о позиции в сам токен!
Позиционное кодирование — это группа методов, с помощью которых мы сообщаем слоям внимания о положении токенов в последовательностях. Существует множество вариантов кодирования, среди которых:
Синусоидальное кодирование (Sinusoidal position encoding).
Наложение частот на каждый эмбеддинг в зависимости от положения токена. Самый ранний метод, который был в оригинальной статье.
Относительное кодирование (Relative position encoding).
Таблица фиксированных сдвигов, применяемая к матрице внимания токенов.
Координатное кодирование (Coordinate position encoding).
Вплетение информации о координатах, путем сложения данных и отдельной обучаемой матрицы координат. Иногда даже с нелинейностью.
Это отдельная тема, которая требует хорошего понимания базы. И несмотря на то, что наш трансформер может заучить все комбинации в последовательностях малой длинны и выдать приемлемое качество, не включать реализацию этого слоя было бы большим упущением.
В этой статье мы ограничимся синусоидами. Однако, не стоит думать, что этот метод несерьезен. Это один из самых простых и действенных методов, которые применяются до сих пор!
Он легок для понимания и интерпретации;
Он не затратен по ресурсам;
Он легко экстраполируется на большие длины.
Есть разные объяснения его работы. Для себя я нашел аналогию с радиоволнами, на которые мы накладываем полезный сигнал, но наоборот. В нашем случае, мы уже имеем полезный "сигнал" — векторные представления токенов, на которые мы накладываем синусоиду, таким образом, что первый токен в последовательности получит самую высокую частоту, а последний самую низкую.

Трансформер будет замечать вкрапление этих частот в данные во время обучения, и постепенно заметит связь между позицией любого токена и частотой волны, которая идет с ним в комплекте.
Еще один интересный факт, что данные кодирование является абсолютным. То есть сколько бы данных не содержала последовательность, ее первый токен (эмбеддинг токена) получит одну и ту же волну, что и первый токен в любом другом наборе. Соответственно, если токенов больше, то их волны на одинаковых позициях будут также одинаковыми. Таким образом, запомнив принцип трансформер будет уверено ориентироваться во входных данных, даже если никогда не получал последовательности такой длины.
ПрактикаДавайте убедимся, что мы на одной волне! Код ниже.
def positional_encoding(X: np.ndarray) -> tuple[np.ndarray, dict]:
"""Позиционное кодирование токенов (наложение частот)"""
B, T, D = X.shape
# Номера позиций токенов:
positions = np.arange(T, dtype=np.float32)[:, None] # (T, 1)
# Матрица частот:
i = np.arange(D // 2, dtype=np.float32) # Индексы эмбеддингов
angles = positions * 10000 ** (-2 * i / D) # (1, D)
pe = np.zeros((T, D), dtype=np.float32) # (T, D)
pe[:, 0::2] = np.sin(angles) # Для чётных индексов
pe[:, 1::2] = np.cos(angles) # Для нечётных
pe = np.tile(pe, (B, 1, 1)).astype(np.float32) # (B, T, D)
return X + pe # Наложение частотМы создаем последовательность по формуле positions 10000 * (-2 * i / D), а за тем применяем синусы и косинусы поочередно. После растягиваем на весь батч и добавляем к данным. Все.
Должно быть, внимательный читатель уже заметил, что несмотря на множество шагов и многообразие обучаемых параметров в MHA, мы так и не выполнили ни одной нелинейной операции за исключением Softmax. Но ведь для нормальной работы сети, они необходимы! Простое перемножение параметров никогда не позволит сети выучить сложные зависимости и адекватно преобразовать входные данные. Именно поэтому в трансформерах существует отдельный блок, являющийся аналогом активации в полносвязных сетях.

FeedForward pass представляет из себя полносвязный слой с активацией и масштабированием. Обычно он содержит два линейных слоя со смещением для разных задач и активацию между ними.
Первый линейный слой увеличивает пространство признаков путем умножения на матрицу с расширением по измерению D (обычно в 4 раза):
X [B, T, D] x W [D, Dx4] + B [Dx4,] -> X [B, T, Dx4];
Далее идет активация (простейший вариант — ReLU);
После нужен еще один слой масштабирования, который возвращает размер выхода к стандартному:
X [B, T, Dx4] x W [Dx4, D] + B [D,] -> X [B, T, D].
Таким образом трансформер получает не только слой активации, но и дополнительные параметры для ее выполнения и возможность их регулировать.
ПрактикаДавайте же реализуем и этот блок.
def FF(X: np.ndarray, index: int = 0) -> tuple[np.ndarray, dict]:
"""Полносвязный блок с расширением"""
X1 = np.einsum('BTD, DP -> BTP', X, FW1[index]) # (B T D)(D D) (B T D*ff)
X2 = X1 + FB1[index] # (D*ff,) (B T D*ff)
X3 = leakyrelu(X2)
X4 = np.einsum('BTD, DP -> BTP', X3, FW2[index]) # (B T D)(D*ff D) (B T D)
X5 = X4 + FB2[index] # (D,) (B T D)
return X5, {
"X": X, "W1": X1, "B1": X2,
"act": X3, "W2": X4, "index": index
}Все размерности указаны в комментариях.
Итак, мы дошли до того момента, когда энкодер извлек из данных контекст, и нам предстоит сгенерировать ответное сообщение на его основе. Происходит это в цикле по одному токену за раз с помощью все тех же блоков со слоями Multi-head attention и Feedforward. Подробно этот процесс будет рассмотрен в разделе "Выходной слой и Автогенерация". А сейчас давайте ответим на вопрос: "Как именно генератор понимает контекст запроса?".
Встраивание информации о запросе, происходит с помощью слоя Multi-head cross-attention или Перекрестного внимания. Этот слой представляет из себя аналог MHA и отличается лишь методом проецирования данных через QKV.
Проекция запросов делается по входным данным декодера — сгенерированным токенам, начиная с технического Start. Когда как проекция ключей и значений делается по контексту из энкодера.

Таким образом генерируемые данные как бы "опрашивают" наш контекст, и все сообщение создается на его основе. Результат работы трансформера будет связан по смыслу с запросом пользователя. Чем мощнее энкодер, тем сильнее будет эта связь, вплоть до полного соответствия, которое необходимо в переводчиках или поисковиках. В случае с декодерными моделями, такими как ChatGPT, связь с запросом также будет, но без явного соответствия. Отсутствие энкодера делает их более "вольными", что хорошо для творческих задач.
ПрактикаДанный слой является полным аналогом MHA, но принимает на вход не только текущие данные, но и контекст из декодера.
def MHCA(X: np.ndarray, context: np.ndarray) -> tuple[np.ndarray, dict]:
"""Многоголовое перекрестное внимание"""
# Разделение входной матрицы для мультивнимания:
new_shape_X = (X.shape[0], X.shape[1], heads, part_size)
new_shape_Z = (context.shape[0], context.shape[1], heads, part_size)
X = X.reshape(new_shape_X) # (B Tx H P)
context = context.reshape(new_shape_Z) # (B Tz H P)
# Матрицы проекций:
Q = np.einsum('bthd, hdp -> bhtp', X, CWQ) # (B Tx H P)(H P P) (B H Tx P)
K = np.einsum('bthd, hdp -> bhtp', context, CWK) # (B T H P)(H P P) (B H Tz P)
V = np.einsum('bthd, hdp -> bhtp', context, CWV) # (B T H P)(H P P) (B H Tz P)
# Матрица внимания:
A = np.einsum('bhxp, bhtp -> bhxt', Q, K) # (B H Tx P)(B H Tz P) (B H Tx Tz)
A = A / np.sqrt(part_size, dtype=np.float32)
A_softmax = softmax(A)
# Матрица взвешенных значений:
Z = np.einsum('bhxt, bhtp -> bhxp', A_softmax, V) # (B H Tx Tz)(B H Tz P) (B H Tx P)
# Объединение голов:
Zo = Z.transpose(0, 2, 1, 3) # (B H Tx P) (B Tx H P)
shape = (X.shape[0], X.shape[1], d_model)
Zo = Zo.reshape(shape) # (B Tx H P) (B Tx D)
Zo = np.einsum('btd, df -> btf', Zo, CWO) # (B Tx D)(D D) (B Tx D)
cache = {
"X": X, "context": context, "Q": Q, "K": K, "V": V,
"A": A, "attn": A_softmax, "Z": Z,
}
return Zo, cacheМы точно также применяем формулу внимания. Но проекция ключей K и значений V делается уже по контексту. Таким образом, входные данные из предыдущего слоя (маскированного MHA) учатся делать верный запрос к контексту из энкодера. И только после перекрестного внимания данные поступают в полносвязный слой FF.
По сути весь блок MHA -> MHCA -> FF выступает как единое целое. Если блоков несколько, то один и тот же контекст подается и в них. Таким образом генерируемая последовательность X протекает от начала декодера к выходному слою, а контекст подается из энкодера во все слои MHCA.
Как уже было сказано, трансформер создает ответ в режиме автогенерации — цикле, где предыдущее сообщение служит основой для новой его части. Запуск генерации осуществляется после обработки запроса энкодером, (подготовки контекста) с подачей стартового токена.
Итак, декодер получил на вход токен Start, провел его через цепочку MHA -> MHCA -> FF и выдал тензор размера [B, Tx, D]. Что дальше?
Теперь нужно сгенерировать следующий токен на основе этих данных и присоединить его к текущему сообщению (Start, новый токен и так далее). Для того чтобы определить, какой токен будет следующим, нам нужен "выходной слой", выдающий вероятности размера [B 1 V].

Таким образом, с помощью линейного преобразования, тензор размера [B T D] становится равным по ширине нашему вокабуляру [B T V], где V — это размер вокабуляра, то есть все возможные токены, которые только могут быть. Это позволяет нам получить по нему вероятностное распределение, но прежде нужно отбросить все предыдущие токены по измерению T, оставив только последние в каждом сообщении батча.
Out [B, T, V] -> Out [B, 1, V]
На первый взгляд, это покажется странным. Зачем терять столько информации ради приведения тензора в нужную форму? Но на самом деле, последние токены [B, 1, V] содержат в себе все необходимое, так как:
Они являются конечными логитами выхода декодера, а не сырыми эмбеддингами, поступившими в начало сети;
Они прошли вместе большой путь через слои декодера и обменялись информацией через внимание;
Они являются последними, а значит "всевидящими" токенами в плане маскировки. Иными словами, им доступен обзор и внимание ко всем токенам в их последовательности.
Это означает, что у нас достаточно информации, чтобы сделать предсказание. Так что нормализовав тензор с помощью Softmax, мы получим вероятности следующих токенов для каждой последовательности.
Out [B, 1, V] -> Sotfmax -> Probs [B, 1, V] -> Argmax -> New tokens [B, 1]
Функция Argmax позволяет получить индекс наиболее вероятного токена для каждой последовательности батча. После, он присоединяется к итоговому сообщению в виде обычного индекса. При этом сами эмбеддинги с сообщении не хранятся. Если цикл генерации продолжается, то дополненное сообщение заново отправляется в декодер, получает эмбеддинги и проходит дальнейшие этапы.
Это продолжается пока цикл генерации не создаст максимальное количество токенов или, что наиболее вероятно, все сообщения в батче не получат токен остановки Stop.

Когда это произойдет, генерация оборвется раньше времени. До тех пор, последовательности, которые получили Stop раньше, будут заполнятся паддингами — это еще один вид технических токенов, нужных для работы сети.
ПрактикаКод выходного слоя поразительно прост: это всего лишь единственный линейный слой, только с выходом другой формы.
Из курса линейной алгебры мы знаем, что матрицы на выходе являются результатом работы линейного оператора, который расширяет/сжимает пространство признаков. То есть, была матрица формы [N K], где K - количество измерений (координат), а N - количество объектов, а стала [N M], где M - это новое число осей (измененное пространство признаков).
def output(X: np.ndarray) -> tuple[np.ndarray, dict]:
"""Конечный линейный слой проекции словаря"""
Y = np.einsum('BTD, DV -> BTV', X, WO) # (B T D)((D V) (B T V)
Y = Y + BO # (B T V)
return Y, {"X": X}Таким образом мы можем связать наши данные со словарем, сделав проекцию нужной формы [B, T, V]. Пропустив выход через Softmax мы получим вероятности токенов-кандидатов. Индекс с самой высокой вероятностью будет соответствовать индексу нового токена в вокабуляре.
Vocab[ argmax( out ) ] - > [B, 1] — новый токен для каждого набора в батче.
На первый взгляд может показаться странным, как вероятность и нужный индекс слоя соответствует индексам расположенных в вокабулярии токенов. Но в этом и есть суть машинного обучения: мы подстроили все формы так, чтобы они могли работать без ошибок, а корректировка весов сделает все остальное.
Итак, мы разобрались какой путь проделывает сообщение внутри трансформера. Поговорили о работе его слоев, о взаимодействии контекста с генератором и о том, как выходные логиты трансформируется в конкретные токены. Осталось разобраться, как все это обучить, ведь трансформер имеет довольно сложную структуру и принцип работы, особенно в декодере.
Зададим неправильные вопросы:
Каким образом генерируемое сообщение с переменной длиной по токенам, будет стыковаться с целевыми данными, представленными уже готовыми сообщениями?
Каким образом генератор, создающий собственное сообщение, будет учиться на сообщениях запроса пользователя?
Каким образом градиент будет протекать через цикл автогенерации модели?
Ответ: никаким.
И дело даже не в том, что это сложно и не рационально, а том что это попросту невозможно, так как функция Argmax, использующаяся в генераторе, не является непрерывной. Иными словами, у нее нет производной, в следствии чего граф обрывается, и градиенты уже не могут попасть в предшествующие слои.
Трансформеры полны удивительных сюрпризов. Помните как мультивнимание улучшило качество и одновременно ускорило работу модели? Так и здесь, на помощь приходит потрясающая вещь как параллельное обучение, позволяющее пропускать через трансформер сразу все сообщение целиком без циклов и отдельного предсказания токенов!
Этот подход корректирует все необходимые параметры модели и при этом вообще не задействует автогенерацию. Но чтобы понять работу такого обучения на фундаментальном уровне, давайте вернемся к истокам ML и вспомним, как вообще нейросети учатся на данных.
Сразу скажу, что это крайне важно, если хотите разобраться в трансформерах. Далее будет очень подробная логическая цепочка, которая приведет к ясному пониманию принципов процесса и их истоков.
Допустим у нас есть таблица с данными для бинарной классификации:
Feature 1 | Feature 2 | *** | Feature P | Target | |
1 | .452 | .179 | *** | .256 | 1 |
2 | .123 | .018 | *** | .235 | 0 |
3 | .014 | .653 | *** | .201 | 1 |
*** | *** | *** | *** | *** | *** |
N | .706 | .481 | *** | .199 | 1 |
Есть признаки, которые поступают на вход. Есть выход (предсказания) модели. И, наконец, имеется целевой признак по которому мы будем вычислять ошибку наших предсказаний. Так и в текстовых моделях есть: сообщение запроса и ожидаемый ответ.
Как решить нестыковку переменной длины сообщений с конечным вариантом?
Мы уже говорили о том, что трансформер может принимать на вход гибкое количество токенов — это связано с механикой внимания. Более того, такая особенность является необходимой, так как текстовые последовательности обычно не фиксированы по длине. Декодер в режиме генерации, также вынужден постепенно наращивать сообщение по оси T. Это означает, что одна и та же информация проходит через него множество раз. Так почему бы нам сразу не взаимодействовать с конечным вариантом сообщений, то есть с целевым ответом в его законченном виде? Это отвечает на вопрос о том, почему нам не нужны промежуточные варианты target-ов.
Какие данные подаются в декодер в качестве обучающих?
Мы уже знаем, что будем сравнивать с таргетом только конечные варианты выходных сообщений. Также мы знаем, что сообщение всегда начинается с технического токена Start, нужного для запуска цикла. Однако, что именно нужно подать в Декодер, чтобы получить этот самый конечный вариант, содержащий ожидаемое (и никак иначе) количество токенов, которое должно совпасть с таргетом для вычисления ошибки?
Если в режиме генерации мы можем позволить себе вольности: гибкую длину сообщений, пустой вывод, зацикливание, ленивый вывод и тому подобное, то в режиме обучения сообщения всегда должны совпадать по всем измерениям: Prediction size = target size!
Чтобы ответить на этот вопрос, нужно задать еще один... Что предшествовало конечному сообщению декодера? То есть, каким было сообщение декодера, до того как мы получили конечный его вариант? Ответ: оно было таким же, но на один токен меньше.
Предпоследняя итерация -> Out [B T-1 D] -> Последняя итерация -> Out [B T D]

Это означает, что сообщение декодера было идентичным прошлому, за исключением последнего (добавленного на последней итерации) токена. Учитывая, что мы вынуждены всегда добавлять токен Stop к сообщению в конце, чтобы научить модель останавливаться, то мы полностью знаем это сообщение! Это тот же самый target, но без токена Stop, которого нет на предпоследней итерации (модель его как бы еще не предсказала).
Получается, что на вход декодера в режиме обучения мы подаем Start + trarget (предпоследний шаг), а сравнивать Predictions модели мы будем с trarget + Stop (последний шаг) — конечное сообщение, которое должна предсказать модель.
Номера токенов | Обучающее сообщение | Предсказание декодера | Целевое сообщение |
1 | Start | Я | Я |
2 | Я | Что-то | Иду |
3 | Иду | Выдаю | В |
4 | В | Параллельно | Магазин |
5 | Магазин | Обучаясь | Stop |
Start предскажет токен "Я". Токен "Я" предскажет токен "Иду" и так далее, пока токен "Магазин" не предскажет Stop. Вот что такое параллельное обучение: токены предсказывают сами себя по целевым данным. То есть таргет используется и там и там: и в качестве обучающего сообщение и в качестве целевого!
Внимательный читатель, разумеется, задумается об очевидном следствии данного процесса — переобучении. Если мы используем одно и тоже сообщение для обучения, то каким образом модель вообще сможет обобщать данные?
С первого взгляда кажется, что это как подать целевой признак вместе с обучающим: модель быстро приспособиться к нему и выдаст идеальное качество на train, не имея никакой обобщающей способности. Этот страх не является безосновательным.
Ответ: никаким...
Но только если мы не запретим токенам видеть будущее. Вспомним нашу таблицу внимания из начала статьи:
T1 Start | T2 Я | T3 Иду | T4 В | T5 Магазин | |
Start | .5 | 0 | 0 | 0 | 0 |
Я | .25 | .5 | 0 | 0 | 0 |
Иду | .2 | .2 | .5 | 0 | 0 |
В | 0 | 0 | .15 | .7 | 0 |
Магазин | .05 | .2 | .2 | .1 | .5 |
Похоже, что в ней что-то изменилось: все значения внимания токенов, стоящих после текущего, обнулились, словно какая-то неведомая сила не дает им заглянуть в будущее и узнать, какой токен станет следующим. А ведь они и не должны этого знать!
Этот прием называется маскированием внимания.

Любой токен, идущий следом, не доступен для внимания предыдущего. Таким образом, значения таблицы внимания могут быть только у самого токена и у всех, что стоят до него. Все остальные значения получают -inf (на деле очень большое отрицательное число, например -1e9) для того чтобы Softmax обнулил их следующим шагом.
И того мы имеем следующую схему обучения:

Данные запроса поступают на вход энкодера. Токен Start + "целевое сообщение" поступает в декодер. Формируется ответ, имеющий сразу же все токены, которые мы от него ожидаем, который сравнивается с "целевым сообщением" + токен Stop. Специфичные для цикла автогенерации блоки (argmax и concat) даже не задействуются, так как не имеют обучаемых параметров.
ПрактикаДля обучения нам понадобится сразу несколько методов.
Для начала мы должны вычислить ошибку модели. Будем использовать разряженную Перекрестную энтропию. Ее особенность в том, что она принимает номера токенов, а не векторные представления вероятностей типа [0 0 1 0 ... 0].
def SparseCrossEntropy(prediction: np.ndarray, target: np.ndarray, eps: float = 1e-8) -> tuple[int, np.ndarray]:
"""
prediction: [B, T, V] — логиты
target: [B, T] — индексы правильных токенов
"""
probs = softmax(prediction)
B, T, V = probs.shape
correct_probs = probs[np.arange(B)[:, None], np.arange(T)[None, :], target]
loss_value = -np.sum(np.log(correct_probs + eps), dtype=np.float32)
loss_value = loss_value / (B * T)
loss_value = np.array(loss_value, dtype=np.float32) # ensure scalar-array of correct dtype
grad_prediction = probs.copy().astype(np.float32)
grad_prediction[np.arange(B)[:, None], np.arange(T)[None, :], target] -= 1.0
grad_prediction /= (B * T)
return loss_value, grad_predictionСледующей ступенью обучения будет обратный проход.
В отличии от динамически строящегося графа, где каждая операция (сложение, суммирование, усреднение и прочая математика) кэширует всю необходимую информацию и порождает новый объект класса Тензор со ссылками на родителей, статичный граф нуждается в обратных функциях, аналогичных слоям. Они необходимы чтобы вычислить градиент для каждого узла, имеющего обучаемые параметры по методу производной сложной функции:
(F(G(x)))' = F'(G(x)) * G'(x)
Модель словно луковица разворачивается через умножение входящего градиента на производную каждой ветви графа.
Данные методы стоит прочитать для общего понимания, но не более, ибо так уже давно никто не делает. Я выбрал этот алгоритм (из-за чего пришлось переписывать половину кода), поскольку динамический граф требует от читателя обязательного знания ООП. Кроме того, он менее прозрачен, так как вся работа происходит в скрытых объектах класса Тензор. Когда как обратные функции представляют из себя полный аналог прямых, но более длинный. И все же по ним можно легко понять, что именно происходит с входным объектом.
Код обратных функций для всех слоев:
def backward_normalize(dY: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
"""
Форма dY: (B, T, D)
Возврат:
dX: (B, T, D)
"""
X = cache["X"]
rms = cache["rms"]
D = X.shape[-1]
return (dY / rms - X * np.sum(dY * X, axis=-1, keepdims=True) / (D * rms**3))
def backward_MHA(dZ: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
"""
Форма dZo: (B, Tx, D)
Возврат:
dX: (B, T, D)
grads: словарь градиентов
"""
X = cache["X"] # (B T H P)
Q = cache["Q"] # (B H T P)
K = cache["K"] # (B H T P)
V = cache["V"] # (B H T P)
A_softmax = cache["attn"] # (B H T T)
Z = cache["Z"] # (B H T P)
index = cache["index"]
B, T, _, _ = X.shape
# Производная общей матрицы WO:
dMWO = np.einsum(
'btd, btf -> df',
Z.transpose(0, 2, 1, 3).reshape(B, T, d_model),
dZ
) # (B T D1)(B T D2) (D1 D2)
dZ = np.einsum(
'btd, fd -> btf',
dZ,
MWO[index]
).reshape(B, T, heads, part_size).transpose(0, 2, 1, 3) # (B T D)(D1 D2) (B T D1) (B H T P)
# Производная для проекции значений V:
dA = np.einsum('bhtp, bhfp -> bhtf', dZ, V) # (B H T P)(B H T P) (B H T T)
dV = np.einsum('bhit, bhtp -> bhip', A_softmax, dZ) # (B H T1 T2)(B H T P) (B H T1 P)
# Обнуление градиентов (если есть маска):
if (mask := cache["mask"]) is not None:
dA = np.where(mask == 0, dA, 0.0).astype(np.float32)
# Производная софтмакса:
tmp = np.sum(dA * A_softmax, axis=-1, keepdims=True)
dA = A_softmax * (dA - tmp)
dA = dA / np.sqrt(part_size, dtype=np.float32)
# Производная внимания:
dQ = np.einsum('bhtf, bhtp -> bhfp', dA, K) # (B H T1 T2)(B H T1 P) (B H T2 P)
dK = np.einsum('bhtf, bhfp -> bhtp', dA, Q) # (B H T1 T2)(B H T2 P) (B H T1 P)
# Производная по проекции входных данных:
dMWQ = np.einsum('bthp, bhtf -> hfp', X, dQ) # (B T H P)(B H T P) (H P P)
dMWK = np.einsum('bthp, bhtf -> hfp', X, dK) # (B T H P)(B H T P) (H P P)
dMWV = np.einsum('bthp, bhtf -> hfp', X, dV) # (B T H P)(B H T P) (H P P)
# Производная по входу:
dX = (
np.einsum('bhtp, hfp -> bthf', dQ, MWQ[index]) + # (B T H P)(H P1 P2) (B T H P1)
np.einsum('bhtp, hfp -> bthf', dK, MWK[index]) + # (B T H P)(H P1 P2) (B T H P1)
np.einsum('bhtp, hfp -> bthf', dV, MWV[index]) # (B T H P)(H P1 P2) (B T H P1)
).reshape(B, T, d_model)
return dX, {
"MWQ": dMWQ,
"MWK": dMWK,
"MWV": dMWV,
"MWO": dMWO
}
def backward_MHCA(dZ: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
"""
Форма dZo: (B, Tx, D)
Возврат:
dX: (B, Tx, D)
dContext: (B, Tz, D)
grads: словарь градиентов
"""
X = cache["X"] # (B Tx H P)
context = cache["context"] # (B Tz H P)
Q = cache["Q"] # (B H Tx P)
K = cache["K"] # (B H Tz P)
V = cache["V"] # (B H Tz P)
A_softmax = cache["attn"] # (B H Tx Tz)
Z = cache["Z"] # (B H Tx P)
B, Tx, H, P = X.shape
Tz = context.shape[1]
# Производная общей матрицы WO:
dCWO = np.einsum(
'btd, btf -> df',
Z.transpose(0, 2, 1, 3).reshape(B, Tx, d_model),
dZ
) # (B T D1)(B T D2) (D1 D2)
dZ = np.einsum(
'btd, fd -> btf',
dZ,
CWO
).reshape(B, Tx, H, P).transpose(0, 2, 1, 3) # (B Tx D)(D1 D2) (B Tx D1) (B H Tx P)
# Производная для проекции значений V:
dA = np.einsum('bhtp, bhfp -> bhtf', dZ, V) # (B H Tx P)(B H Tz P) (B H Tx Tz)
dV = np.einsum('bhtf, bhtp -> bhfp', A_softmax, dZ) # (B H Tx Tz)(B H Tx P) (B H Tz P)
# Производная софтмакса:
tmp = np.sum(dA * A_softmax, axis=-1, keepdims=True)
dA = A_softmax * (dA - tmp)
dA = dA / np.sqrt(part_size, dtype=np.float32)
# Производная внимания:
dQ = np.einsum('bhtf, bhfp -> bhtp', dA, K) # (B H Tx Tz)(B H Tz P) (B H Tx P)
dK = np.einsum('bhtf, bhtp -> bhfp', dA, Q) # (B H Tx Tz)(B H Tx P) (B H Tz P)
# Производная по проекциям входных данных:
dCWQ = np.einsum('bthp, bhtf -> hpf', X, dQ) # (B Tx H P)(B H Tx P) (H P P)
dCWK = np.einsum('bthp, bhtf -> hpf', context, dK) # (B Tx H P)(B H Tz P) (H P P)
dCWV = np.einsum('bthp, bhtf -> hpf', context, dV) # (B Tx H P)(B H Tz P) (H P P)
# Производная по входу (X и Z):
dX = np.einsum(
'bhtp, hfp -> bthf', dQ, CWQ
).reshape(B, Tx, d_model) # (B Tx H P)(H P1 P2) (B Tx H P1)
dZ = (
np.einsum('bhtp, hfp -> bthf', dK, CWK) + # (B Tz H P)(H P1 P2) (B Tz H P1)
np.einsum('bhtp, hfp -> bthf', dV, CWV) # (B Tz H P)(H P1 P2) (B Tz H P1)
).reshape(B, Tz, d_model)
grads = {
"CWQ": dCWQ,
"CWK": dCWK,
"CWV": dCWV,
"CWO": dCWO
}
return dX, dZ, grads
def backward_FF(dX5: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
"""
Форма dX5: (B, T, D)
Возврат:
dX: (B, T, D)
grads: словарь градиентов
"""
X = cache["X"] # (B T D)
X1 = cache["W1"] # (B T D*ff)
X2 = cache["B1"] # (D*ff,)
X3 = cache["act"] # (B T D*ff)
index = cache["index"] # (i)
# Градиент для Байес 2:
dFB2 = np.sum(dX5, axis=(0, 1)) # (B T D) (D,)
# Производная линейного слоя 2:
dFW2 = np.einsum('btd, btp -> dp', X3, dX5) # (B T D*ff)(B T D) (D*ff D)
dX3 = np.einsum('btp, dp -> btd', dX5, FW2[index]) # (B T D)(D*ff D) (B T D*ff)
# Производная нелинейного слоя:
dX2 = dX3 * np.where(X2 > 0, 1.0, 0.1).astype(np.float32) # (B T D*ff)
# Градиент для Байес 1:
dFB1 = np.sum(dX2, axis=(0, 1)) # (D*ff,)
# Производная линейного слоя 1:
dFW1 = np.einsum('btd, btp -> dp', X, dX2) # (B T D)(D D*ff) (D, D*ff)
dX = np.einsum('btp, dp -> btd', dX2, FW1[index]) # (B T D*ff)(D D*ff) (B T D)
grads = {
"FW1": dFW1,
"FB1": dFB1,
"FW2": dFW2,
"FB2": dFB2
}
return dX, grads
def backward_output(dY: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
"""
Форма dY: (B, T, V)
Возврат:
dX: (B, T, D)
grads: словарь градиентов
"""
X = cache["X"]
# Производная сдвига:
dBO = np.sum(dY, axis=(0, 1)) # (V,)
# Производная по весам:
dWO = np.einsum('btd, btv -> dv', X, dY) # (B T D)(B T V) (D V)
# Производная по входу:
dX = np.einsum('btv, dv -> btd', dY, WO) # (B T V)(D V) (B T D)
grads = {
"WO": dWO,
"BO": dBO
}
return dX, gradsОперации можно разделить на два вида: промежуточные и локальные. Одни нужны для передачи градиента дальше, другие непосредственно для корректировки весов. Промежуточные результаты записываются в специальный кэш, нужный алгоритму для оптимизации, и выводятся вместе с конечным значением.
Наконец, нам нужна верхнеуровневневая функция обратного вызова, являющаяся аналогом Call. Она запускает все нужное в обратном порядке и выводит результаты для всех обучаемых параметров.
def backward(loss: np.ndarray, cache) -> tuple[dict, ...]:
"""
Обратный проход.
Возвращает наборы градиентов для каждого слоя.
"""
# Декодер:
dX, grads1 = backward_output(loss, cache['Out'])
dXr = backward_normalize(dX, cache['Norm6'])
dX, grads2 = backward_FF(dXr, cache['FF-D'])
dX = backward_normalize(dX, cache['Norm5'])
dXr = dXr + dX
dX, dZ, grads3 = backward_MHCA(dXr, cache['MHCA'])
dX = backward_normalize(dX, cache['Norm4'])
dXr = dXr + dX
dX, grads4 = backward_MHA(dXr, cache['MHA-D'])
# dX = backward_normalize(dX, cache['Norm3'])
# dXr = dXr + dX
# Энкодер:
dX, grads5 = backward_FF(dZ, cache['FF-E'])
dX = backward_normalize(dX, cache['Norm2'])
dXr = dX + dZ
dX, grads6 = backward_MHA(dXr, cache['MHA-E'])
# dX = backward_normalize(dX, cache['Norm1'])
# dXr = dXr + dX
return grads6, grads5, grads4, grads3, grads2, grads1Я закомментировал некоторые строки, поскольку дальнейшая трансляция градиента не требуется. В серьезных фреймворках, как и в моем, используется параметр request_grad, отсекающий ветви графа, если на них больше не осталось обучаемых тензоров. Зачем вести градиент по ветви, если на ней нет потребителей? Это важный элемент оптимизации в больших моделях.
Эта бонусная глава для патологически глубокого понимания трансформеров. В ней я расскажу о нормализации, остаточных связях и масштабировании. Если вы пока неуверенно чувствуйте себя в математике нейросетей, то можете ее пропустить.
Как я сказал ранее, главной проблемой трансформеров является их интуитивное понимание. Если эта статья помогла ее решить, то пол дела сделано! На очереди следующая уже техническая задача, свойственная для deeplearning в целом: стабилизация обучения. Мало написать алгоритм, сгенерировать веса и подать данные, даже для минимального результата, ибо глубокие сети крайне капризны и так и норовят застрять в локальном минимуме, затухнуть или взорваться. И никакой оптимизатор тут не поможет, если не знать все ловушки, которые поджидают задолго до того, как он вступит в работу. Об этом далее.
В этой главе мы не будем рассматривать общие для всех сетей проблемы, такие как застревание в ЛМ, мертвые нейроны, переобучение, неравноправное обновление весов и другие. Достаточно сказать, что с большей частью справляется Адам и простейшая регуляризация. Здесь я хочу рассмотреть более специфичные для трансформеров препятствия и меры. Однако, если вам все же интересно о них узнать, то возможно позже я напишу полноценную статью на эту тему.
Взрыв градиента.
Представим, что мы запускаем трансформер, параметры которого только что были инициализированы случайными значениями. Ожидаемо, что ошибка на первой эпохе будет очень большой, так как модель еще не обучена. И проблема даже не в самом значении ошибки, а в его следствии — неэффективном обучении. Дело в том, что градиент будет пытаться компенсировать ее путем огромного шага в сторону наискорейшего убывания, то есть высокими конечными значениями для обучаемых параметров. И даже если в самом оптимизаторе мы ограничили его путем выставления скорости обучения или клиппингом (Gradient clipping), это не отменяет того факта, что по пути он дорастет до запредельных значений. То есть, возникнет переполнение довольно ограниченного типа float32 промежуточных тензоров, которое, прежде всего, убивает сам смысл графа вычислений. Это как вставить в некое прикладное инженерное выражение пару бесконечностей: оно утратит практический смысл. Эта проблема называется "взрывом градиента".

Затухание градиента.
Обратной стороной неконтролируемого роста градиента является его затухание. Если значения начальных тензоров находятся в приемлемом диапазоне, то многократное умножение таких матриц в слоях приведет к стремительному затуханию. Если это произойдет, то нормальное обновление весов, зависящее от самого градиента, будет уже невозможно, так как ошибка модели не изменится.

Нет корректировки весов, нет падения ошибки и нет полезного градиента — это замкнутый круг, который означает прекращение обучения.
Изменение значений тензоров на прямом проходе.
Эта проблема похожа на две предыдущие. Рост и падение значений на прямом проходе также имеет место при многократном умножении матриц.
В обычных сетях эта проблема не такая явная. Но слои трансформера серьезно отличаются от слоев других типов нейросетей, поскольку содержат огромное количество последовательных действий. Например, свертки обрабатывают изображения в параллельном режиме. Линейные слои содержат одну обучаемую матрицу и байес. А слой MHA трансформеров содержит несколько непараллельных этапов: проекция, внимание, взвешивание, обобщение. Это эквивалентно четырем линейным слоям!
Давайте для примера возьмем матрицы с разным порядком значений и умножим одну на другую:

Содержимое итоговых матриц изменились на порядок. Если мы продолжим это дело, то увидим ту же тенденцию. Причина тому — механика умножения матриц с произведением и суммированием элементов. Очевидно, если умножить матрицы больших размеров, то проблема только усугубиться.
Искажение данных.
Даже если отбросить сильное отклонение, само по себе прохождение данных через многочисленные слои меняют их не в лучшую сторону. Обученный слой делает прошедший батч абстрактным представлением вошедших данных. Дальнейшее прохождение по глубокой сети только усугубляет эту проблему. Что уж говорить о необученной модели, если после первого же слоя данные настолько искажены, что оригинальная информация, идущая к последующим слоям, просто теряется.

Это серьезно замедляет сходимость, так как приходится ждать пока начальные слои будут скорректированы градиентом, идущим с конца.
Как видно, пренебрежение этими особенностями сделает обучение трансформера хаотичным и нестабильным. Даже если простота архитектуры и случайное везение позволят обучать голую модель, то вряд ли она будет сходиться быстро и достигнет наилучшего результата.
Давайте рассмотрим некоторые меры, которые должны помочь нам выстроить безопасный алгоритм.
Нормализация.
Зайдем с козырей.
Нормализация — это комплекс мер по преобразованию данных с целью препятствия негативным эффектам линейных операций.
Обычно под ней подразумевается центрирование, нормирование в диапазоне и масштабирование значений (в том числе обучаемое). Оно позволяет не только компенсировать набегающие в слоях отклонения, но и помочь их предотвратить! Это одна из самых эффективных мер, реализацией которой не стоит пренебрегать, что мы и сделаем в практической части.
Рассмотрим изображение операций с матрицами, прошедшими грамотную нормализацию:

Центрирование и нормирование данных в диапазоне (-1, 1) приводит к существенной устойчивости признаков к различным операциям, даже если обучаемые параметры изначально имеют некачественные значения. Наряду с грамотной инициализацией обучаемых матриц, о чем я упоминал в практической части главы о самовнимании, нормализация ускорят обучение и делает его более стабильным.
Масштабирование.
Но все же одной нормализацией не обойтись.

Предполагаю, что деление в формуле внимания, оставшееся за скобками, для многих до сих пор держало интригу. Что это за загадочный корень и зачем он нужен? И как ни странно, связано это со статистикой.
Я написал небольшую функцию, которая генерирует случайные матрицы разных размеров с фиксированным СКО, центрирует их и перемножает. После, мы вычисляем модуль среднего (поскольку нас интересует сама величина) и стандартное отклонение. Для сбора статистики, мы проводим множество таких экспериментов (например 1к) и заносим их в отдельный список для каждого размера матриц и усредняем.
def statistic_matrix(
sizes: list, experiments: int,
round_num: int = 9,
scale: int = 1, scaling: bool = False
) -> None:
sizes.extend(sizes)
sizes_pair = sorted(sizes)
statistics_std = ([], [], [], [])
statistics_avg = ([], [], [], [])
for _ in range(experiments):
# Генерация матриц:
matrices = []
matrix = None
for i in sizes_pair:
matrix = np.random.normal(size=(i, i), scale=scale)
matrices.append(matrix)
# Центрирование матриц:
matrices = [matrix - matrix.mean() for matrix in matrices]
# Сбор статистики:
pairs = zip(matrices[:-1:2], matrices[1::2])
for i, (matrix_1, matrix_2) in enumerate(pairs):
# m1 = round(matrix_1.mean(), round_num)
# m2 = round(matrix_2.mean(), round_num)
result = matrix_1 @ matrix_2
result /= np.sqrt(len(result)) if scaling else 1
avg = round(abs(result.mean()), round_num)
std = round(result.std(), round_num)
statistics_avg[i].append(avg)
statistics_std[i].append(std)
# Вывод:
for i, (avg, std) in enumerate(zip(statistics_avg, statistics_std)):
avg = round(np.mean(avg), round_num)
std = round(np.mean(std), round_num)
print(f"Матрицы размера {sizes[i]}, СКО = {std}, Среднее = {avg}")Запускаем первый эксперимент:
experiments = 1000 # Количество экспериментов
sizes = [3, 16, 64, 512] # Размеры матриц
round_num = 5 # Округление
scale = 1 # Стандартное отклонение
scaling = False # Масштабирование
statistic_matrix(sizes, experiments, round_num, scale, scaling)Матрицы размера 3, СКО = 1.37415, Среднее = 0.34911
Матрицы размера 16, СКО = 3.96705, Среднее = 0.19906
Матрицы размера 64, СКО = 7.9948, Среднее = 0.10101
Матрицы размера 512, СКО = 22.62789, Среднее = 0.03494
Теперь мы видим, что отклонение от центра при произведении уменьшается с увеличением размера матриц. Большие матрицы более стабильны, что явно нам на руку. Однако, стандартное отклонение неуклонно растет!
С повышением размерности векторов (матриц/тензоров), растет и количество вариантов конкретных значений. Мы не знаем заранее, какими они будут, так как СГС и инициализация являются стохастическим процессом. Если увеличиваются данные, то увеличивается количество вариантов, а значит и сама вариативность. Как следствие, дисперсия — мера разброса данных, начинает увеличиваться. Возникает все больше выбросов и значений, стоящих на достаточном удалении от центра.
Пусть средние значение результата почти не изменилось, сами эти значения начали выходить за предпочтительный диапазон. А это прямой путь к переполнению (рис23).
Теперь становится ясно, зачем нам нужно масштабирование в формуле внимания.

Запустим код еще раз, но на этот раз выполним scaling.
experiments = 1000 # Количество экспериментов
sizes = [3, 16, 64, 512] # Размеры матриц
round_num = 5 # Округление
scale = 1 # Стандартное отклонение
scaling = True # Масштабирование
statistic_matrix(sizes, experiments, round_num, scale, scaling)Матрицы размера 3, СКО = 0.77202, Среднее = 0.18582
Матрицы размера 16, СКО = 0.99305, Среднее = 0.04576
Матрицы размера 64, СКО = 0.99852, Среднее = 0.01214
Матрицы размера 512, СКО = 0.99998, Среднее = 0.00157
Теперь мы видим, что отклонение стремиться к 1 — значению, которое было у родительских матриц. Вы сами можете провести эксперименты с различными параметрами. Код будет в моем учебном репозитории.
Остаточные связи.
Вспомним общую схему трансформера (рис1).

При ее описании я упомянул, что данные, поступающие на вход слоя, складываются с его выходом подобно skip connections в сверточных сетях, где они препятствовали затуханию градиента. Также, это позволяет первоначальной информации проходить глубоко в сеть, минуя полную модификацию внутри слоев и сохраняя оригинальную семантику. Это упрощает обучение и ускоряет сходимость.

Теперь проблема искажения, описанная выше, уже не страшна.
Прогрев модели и планировка скорости.
Как я упомянул ранее, необученная модель обладает огромной ошибкой. Все параметры со случайными значениями, будут получат большой градиент в течении первых шагов. Подобное обновление будет лишь швырять значения весов из стороны в сторону и, в лучшем случае, замедлит процесс обучения. Этого нужно избежать!

Почему бы нам не ограничить скорость на первых порах? Почему бы не начать с 0 и не набирать скорость постепенно до "стартовой", с которой и начнется основное обучение? Данный процесс называется прогревом.
Кроме того, вовсе не обязательно и даже не желательно оставаться на заданной скорости все обучение. В начале лучше сделать несколько крупных шагов для быстрого прогресса и преодоления локальных минимумов. В дальнейшем скорость лучше снизить до приемлемых значений. И у же ближе к концу, когда абсолютный минимум уже найден, аккуратно отъюстировать веса на малых скоростях. Этот принцип называется планировкой скорости.
Это далеко не все приемы, позволяющие стабилизировать обучение, но каждый из них будет задействован в нашем трансформере. Кроме того, один метод из категории регуляризации, вы сможете реализовать сами в Домашнем задании.
ПрактикаФункция простой необучаемой нормализации:
def normalize(X: np.ndarray, eps: float = 1e-8):
"""Простая нормализация без масштабирования и обучаемых параметров"""
rms = np.sqrt(np.mean(X**2, axis=-1, keepdims=True) + eps) # (B T 1)
cache = {
"X": X,
"rms": rms
}
return X / rms, cacheКак и обещал, мы воспользуемся продвинутой версией СГС со стабилизацией и сохранением момента (экспоненциальное скользящее среднее) скоростей. Это известный всем Адам, который наверняка заслуживает отдельной статьи про оптимизаторы.
def optimizer(
parameters: list, gradients: list,
step: int, state: dict,
beta1=0.9, beta2=0.999, eps=1e-8
) -> dict:
"""Оптимизатор Адам"""
if 'm' not in state:
state['m'] = [np.zeros_like(p) for p in parameters]
state['v'] = [np.zeros_like(p) for p in parameters]
gradients = [elem for values in gradients for elem in values.values()]
for i, (p, g) in enumerate(zip(parameters, gradients)):
# Моменты:
state['m'][i] = beta1 * state['m'][i] + (1 - beta1) * g
state['v'][i] = beta2 * state['v'][i] + (1 - beta2) * (g * g)
# Байес:
m_hat = state['m'][i] / (1 - beta1 ** step)
v_hat = state['v'][i] / (1 - beta2 ** step)
# Обновление параметра:
p -= sheduler(step) * m_hat / (np.sqrt(v_hat) + eps)
return stateВот и настало время запуска нашего трансформера!
Если вы не планируйте изучать код, то пропустите этот раздел.
Перед тем как пройти далее, я советую запустить следующий скрипт. Он представляет из себя проверку выводимых форм для составных частей модели и не является обязательным. Советую воспользоваться им при первом запуске для того чтобы понять, что все функции и импорты на месте, а выводы верные. Это поможет сразу исключить возможные ошибки на этом этапе.
size = (batch, max_tokens, d_model)
data = np.random.random(size).astype(np.float32)
# Проверка основных слоев:
X, cache_MHA = MHA(data, index=0)
assert X.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("MHA", X.shape)
X, cache_MHCA = MHCA(data, X)
assert X.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("MHCA", X.shape)
X, cache_FF = FF(X, index=0)
assert X.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("FF", X.shape)
X, cache_out = output(X)
assert (size[0], size[1], vocab_size) == X.shape, "Формы данных входа и выхода не совпали!"
print("Out", X.shape, '\n')
# Проверка обратных слоев:
dX, grads = backward_output(X, cache_out)
assert dX.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("dOut", dX.shape)
dX, cache = backward_FF(dX, cache_FF)
assert dX.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("dFF", dX.shape)
dX, dZ, cache = backward_MHCA(dX, cache_MHCA)
assert dX.shape == data.shape, "Формы данных входа и выхода не совпали!"
assert dZ.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("dMHCA", dX.shape, dZ.shape)
dX, grads = backward_MHA(dX, cache_MHA)
assert dX.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("dMHA", dX.shape, '\n')
# Проверка верхнеуровневых функций:
size = (batch, max_tokens)
data = np.random.randint(0, 9, size=size, dtype=np.int32)
# Инференс:
out = call(data)
assert out.shape == (batch, max_tokens + 1), f"Выходной размер не соответствует ожидаемому!"
print("Call generate", out.shape)
# Режим обучения:
out, cache = call(data, data)
assert out.shape == (batch, max_tokens + 1, vocab_size), f"Выходной размер не соответствует ожидаемому!"
print("Call training", out.shape)
# Проверка Loss-функции:
target = np.argmax(softmax(out), axis=-1)
print(target.shape)
loss, grad = SparseCrossEntropy(out, target)
# assert round(float(loss)) == round(np.log(vocab_size)), f"Лосс не совпал с ожидаемым!"
print("Loss ", loss)
# Обратный проход:
grads = backward(out, cache)
assert len(grads) == 6, f"Не все слои получили градиенты!"
print("Backward grads:")
for g in grads:
print(g.keys())
В будущем его можно закомментировать, так как при изменении кода, некоторые assert могут не сработать.
Подготовка модели к запуску.
Для того чтобы обучить модель, нам понадобится код, который будет управлять: прямым и обратным проходами, вычислением ошибки, оптимизацией и выводом результатов. Он называется fit и в данной версии представляет из себя функцию, которая организует все, что связано с обучением модели.
Внимательно изучите код:
def fit(
data_train: np.ndarray, data_valid: np.ndarray,
epoch: int, shuffle: bool = False, step_per_epoch: int = None
) -> None:
""""""
# Проверка количества шагов:
if not step_per_epoch:
step_per_epoch = data_train.shape[0]
# Цикл обучения:
state = dict()
for i in range(1, epoch + 1):
# Метрики:
general_loss_train = 0
general_loss_valid = 0
# Тасование батчей:
if shuffle:
idx = np.random.permutation(len(data_train)) # Случайный порядок индексов
data_train = data_train[idx]
# Обучающая часть:
for step, (batch, target) in enumerate(data_train[:step_per_epoch], start=1):
prediction, cache = call(batch, Y=target)
loss, grad = SparseCrossEntropy(
prediction,
add_finish_token(target)
)
grad_parameters = backward(grad, cache)
state = optimizer(parameters, grad_parameters, step, state) # Оптимизация
general_loss_train += float(loss)
print('=', end='')
# Валидационная часть:
for batch, target in data_valid:
prediction, cache = call(batch, Y=target)
loss, grad = SparseCrossEntropy(prediction, add_finish_token(target))
general_loss_valid += float(loss)
# Вывод ошибки:
general_loss_train /= step_per_epoch
general_loss_valid /= data_valid.shape[0]
print(f"\nЭпоха: {i}")
print(f"Ошибка модели на обучении: {general_loss_train}")
print(f"Ошибка модели на тесте: {general_loss_valid}")
Данная функция является упрощенной версией метода из моего базового фреймворка, и все же она обладает рядом интересных возможностей:
Параметр step_per_epoch в сочетании с shuffle позволяет ограничить данные, подаваемые за одну эпоху. А значит нам не обязательно ждать пока весь датасет пройдет через модель, прежде чем получить промежуточную информацию об ошибке. Это дает гибкие возможности работы с ним.
Планировщик позволяет регулировать скорость на каждом шаге через параметры start_rate и finish_rate.
Параметр warmup_steps регулирует количество шагов "разогрева" модели перед выходом на стартовую (пиковую) скорость планировщика.
Хранение состояний между батчами нужно для корректной работы оптимизатора.
Суть кода проста. Он тасует датасет и берет меньшую его часть для текущей эпохи. Запускается цикл подачи в верхнеуровневый метод call. Прямой вызов возвращает логиты и кэш промежуточных состояний всех слоев. Логиты нужны для вычисления ошибки. Та в свою очередь возвращает значение, нужное лишь для явного вывода результата и градиент по самой себе. Градиент и кэш из forward-а идут в backward для вычисления всех нужных значений для корректировки. Собранные градиенты и список параметров передаются в оптимизатор, который делает шаг спуска. После чего подается валидная часть датасета для измерения ошибки.
Подготовка синтетических данных.
А чему мы будем нашу модель учить?
Для этого я подготовил специальный класс, который содержит в себе набор тестов для создания синтетических данных. Если вы не знаете ООП, это не помешает вам его использовать. В будущем вы сможете написать собственные тесты на его основе.
class Generator:
def __init__(
self, batch_size: int,
number_tokens: int, number_batch: int,
shuffle: bool = True, test_size: float = .2,
dtype=np.int32
) -> None:
self.batch_size = batch_size
self.number_tokens = number_tokens
self.number_batch = number_batch
self.shuffle = shuffle
self.test_size = test_size
self.dtype = dtype
def palindrome(self) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]:
"""Создание последовательностий с переворотом второй половины"""
data, target = list(), list()
self.number_tokens //= 2
for _ in range(self.number_batch):
data_batch, target_batch = list(), list()
for num in range(self.batch_size):
half = str(random.randint(10 ** (self.number_tokens - 1), 10 ** self.number_tokens - 1))
reverse = list(half)
reverse.reverse()
data_batch.append([int(i) for i in "".join((half, half))])
target_batch.append([int(i) for i in "".join((half, "".join(reverse)))])
data.append(data_batch)
target.append(target_batch)
data = np.array((data, target), dtype=self.dtype).transpose(1, 0, 2, 3)
return self.train_test_split(data, self.test_size) if self.shuffle else data
def pointer_index(self) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]:
"""
Индексация собственной последовательности:
y_i = x_{x_i}
"""
data, target = list(), list()
seq_len = self.number_tokens
for _ in range(self.number_batch):
data_batch, target_batch = list(), list()
for _ in range(self.batch_size):
x = np.random.randint(0, 10, size=seq_len)
if seq_len < 10:
raise ValueError("seq_len должен быть >= 10")
y = x[x]
data_batch.append(x.tolist())
target_batch.append(y.tolist())
data.append(data_batch)
target.append(target_batch)
data = np.array((data, target), dtype=self.dtype).transpose(1, 0, 2, 3)
return self.train_test_split(data, self.test_size) if self.shuffle else data
def train_test_split(self, data: np.ndarray, test_size: float = .2) -> tuple[np.ndarray, np.ndarray]:
"""Разбиение данных"""
idx = np.random.permutation(len(data)) # Случайный порядок индексов
data = data[idx]
index = int(len(data) * (1 - test_size))
return data[:index], data[index:]
Стартовый тест включает в себя последовательность с двумя одинаковыми половинами. Задача трансформера перевернуть вторую часть, чтобы сделать палиндром.
1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8 -> 1 2 3 4 5 6 7 8 8 7 6 5 4 3 2 1
0 1 0 2 0 3 0 4 0 1 0 2 0 3 0 4 -> 0 1 0 2 0 3 0 4 4 0 3 0 2 0 1 0
Все очень просто, но все же не настолько, насколько может показаться.
Создаем датасет:
gen = Generator(
batch_size=batch,
number_tokens=max_tokens,
number_batch=256,
test_size=.33
)
train, valid = gen.palindrome() # palindrome, pointer_index, castom_test
print("Train shape: ", train.shape)
print("Valid shape: ", valid.shape)Обучение модели.
Зададим параметры обучения.
Всего у нас будет 10 эпох. За каждую из них модель получит по 64 батча. Начальная скорость 1e-2 будет постепенно снижаться до 1e-5, но перед этим в течении 128 батчей модель будет разогреваться.
epoch = 10
steps_per_epoch = 64 # Количество подаваемых батчей за одну эпоху
start_rate = 1e-2
finish_rate = 1e-5
warmup_steps = 128 # Количество "шагов прогрева"
total_steps = epoch * steps_per_epoch - warmup_steps # Общее количество шаговНе забудем про планировщик:
def sheduler(step, warmup_steps=128):
"""Вычисляет скорость на нужно шаге"""
if step <= warmup_steps:
return start_rate * step / warmup_steps
else:
return (start_rate - finish_rate) * (total_steps - step) / total_steps
return lrТеперь запустим цикл обучения.
fit(
data_train=train, data_valid=valid,
shuffle=True, step_per_epoch=steps_per_epoch,
epoch=epoch
)Если вы видите что-то подобное:
================================================================
Эпоха: 1
Ошибка модели на обучении: 2.267000511288643
Ошибка модели на тесте: 2.12314416660982
================================================================
Эпоха: 2
Ошибка модели на обучении: 1.6724637486040592
Ошибка модели на тесте: 0.30215326687868904
Значит все получилось!
Выводы.
Для ручной проверки результата, я создал функцию predict. Она поможет наглядно увидеть полученный результат:
print("\nВывод: ")
request = np.array([[
[1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8],
[8, 7, 6, 5, 4, 3, 2, 1, 8, 7, 6, 5, 4, 3, 2, 1],
[0, 1, 0, 2, 0, 3, 0, 4, 0, 1, 0, 2, 0, 3, 0, 4],
[1, 1, 2, 3, 1, 1, 4, 5, 1, 1, 2, 3, 1, 1, 4, 5],
[0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1],
[5, 5, 4, 4, 3, 3, 2, 2, 5, 5, 4, 4, 3, 3, 2, 2],
[7, 7, 7, 7, 8, 8, 9, 9, 7, 7, 7, 7, 8, 8, 9, 9],
[1, 5, 3, 7, 9, 2, 4, 6, 1, 5, 3, 7, 9, 2, 4, 6],
[3, 9, 2, 6, 1, 2, 5, 6, 3, 9, 2, 6, 1, 2, 5, 6],
]])
prediction = predict(request)
for r, p in zip(request[0], prediction[0]):
Выводы. print(f"{r} -> {p}")Вывод:
[1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8] -> [10 1 2 3 4 5 6 7 8 8 7 6 5 4 3 2 1]
[8 7 6 5 4 3 2 1 8 7 6 5 4 3 2 1] -> [10 8 7 6 5 4 3 2 1 1 2 3 4 5 6 7 8]
[0 1 0 2 0 3 0 4 0 1 0 2 0 3 0 4] -> [10 0 1 0 2 0 3 0 4 4 0 3 0 2 0 1 0]
[1 1 2 3 1 1 4 5 1 1 2 3 1 1 4 5] -> [10 1 1 2 3 1 1 4 5 5 4 1 1 3 2 1 1]
[0 0 0 0 1 1 1 1 0 0 0 0 1 1 1 1] -> [10 0 0 0 0 1 1 1 1 1 1 1 1 0 0 0 0]
[5 5 4 4 3 3 2 2 5 5 4 4 3 3 2 2] -> [10 5 5 4 4 3 3 2 2 2 2 3 3 4 4 5 5]
[7 7 7 7 8 8 9 9 7 7 7 7 8 8 9 9] -> [10 7 7 7 7 8 8 9 9 9 9 8 8 7 7 7 7]
[1 5 3 7 9 2 4 6 1 5 3 7 9 2 4 6] -> [10 1 5 3 7 9 2 4 6 6 4 2 9 7 3 5 1]
[3 9 2 6 1 2 5 6 3 9 2 6 1 2 5 6] -> [10 3 9 2 6 1 2 5 6 6 5 2 1 6 2 9 3]
Тривиальный тест пройден на отлично.
Первый токен в каждой последовательности —10 соответствует техническому токену Start с которого начинается генерация.
Проведем еще один эксперимент. Увеличим длину последовательностей в два раза: max_tokens=32. Увеличим батч и d_model и добьемся лучших показателей ошибки. А за тем в качестве тестовых данных подадим все те же последовательности в 16 токенов:
Вывод:
[1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8] -> [10 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 7 7 7 6 5 4 3 2 1 8 7 6 5 4 3 2 1]
[8 7 6 5 4 3 2 1 8 7 6 5 4 3 2 1] -> [10 8 7 6 5 4 3 2 1 8 7 6 5 4 3 2 2 2 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8]
[0 1 0 2 0 3 0 4 0 1 0 2 0 3 0 4] -> [10 0 1 0 2 0 3 0 4 0 1 0 2 0 3 0 0 0 0 3 0 2 0 1 0 4 0 3 0 2 0 1 0]
[1 1 2 3 1 1 4 5 1 1 2 3 1 1 4 5] -> [10 1 1 2 3 1 1 4 5 1 1 2 3 1 1 4 4 4 4 1 1 3 2 1 1 5 4 1 1 3 2 1 1]
[0 0 0 0 1 1 1 1 0 0 0 0 1 1 1 1] -> [10 0 0 0 0 1 1 1 1 0 0 0 0 1 1 1 1 1 1 1 1 0 0 0 0 1 1 1 1 0 0 0 0]
[5 5 4 4 3 3 2 2 5 5 4 4 3 3 2 2] -> [10 5 5 4 4 3 3 2 2 5 5 4 4 3 3 2 2 2 2 3 3 4 4 5 5 2 2 3 3 4 4 5 5]
[7 7 7 7 8 8 9 9 7 7 7 7 8 8 9 9] -> [10 7 7 7 7 8 8 9 9 7 7 7 7 8 8 9 9 9 9 8 8 7 7 7 7 9 9 8 8 7 7 7 7]
[1 5 3 7 9 2 4 6 1 5 3 7 9 2 4 6] -> [10 1 5 3 7 9 2 4 6 1 5 3 7 9 2 4 4 4 4 2 9 7 3 5 1 6 4 2 9 7 3 5 1]
[3 9 2 6 1 2 5 6 3 9 2 6 1 2 5 6] -> [10 3 9 2 6 1 2 5 6 3 9 2 6 1 2 5 5 5 5 2 1 6 2 9 3 6 5 2 1 6 2 9 3]
Удивительно, но трансформер нашел выход из положения, продублировав каждую половину, а за тем перевернул вторую часть, как должно! И это при том, что у нас нет механизма ранней остановки и спец токена Pad. Данный пример раскрывает ту самую обобщающую способность, которую мы ждем от модели в виде умения находить выход из нестандартных ситуаций и работать с данными разной длины.
Разумеется это еще не все. Почему бы нам не задействовать более сложные тесты для того чтобы выжать максимум из данной модели? И почему бы читателю не сделать это самостоятельно в домашнем задании?
Домашнее заданиеДля закрепления материала, вы можете выполнить несколько задач.
Задание 1. Дропнутые тензоры.
Реализуйте метод регуляризации Дропаут своими силами.
Пояснение.
Технически вам нужно сделать случайное обнуление значений тензора формы [B T D] по измерению D. Также необходимо кэширование значений и обратная функция, которые должны встроиться в статичный граф (функции call и backward).
Для удобства, вставьте один dropout в call, например перед выходным слоем. Вставьте backward_dropout соответственно после выходного слоя в backward.
Примерная сигнатура:
def dropout(X: np.ndarray, p: float) -> tuple[np.ndarray, dict]:
""""""
pass
def backward_dropout(Y: np.ndarray, cache: dict) -> np.ndarray:
""""""
passЗадание 2. Личное тестирование.
Придумайте свой тест для трансформера. Для этого напишите и встройте новый метод в генератор данных.
Пояснение.
Создайте данные сообщений запросов и таргетов, связанных интересной логикой. Она должен быть достаточно проста и хорошо интерпретируема для проверки результата.
Метод должен возвращать два массива (train и valid) стандартной формы: [Строки, Тип сообщения, Размер батча, Длина последовательности], например:
Train.shape: (171, 2, 64, 16)
Valid shape: (85, 2, 64, 16)
Внутри генератора уже имеется метод train_test_split для разделение и тасования данных.
Шаблон метода:
def castom_test(self) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]:
"""Короткое описание смысла теста"""
# --- Ваш код ---
size = (self.number_batch, self.batch_size, self.number_tokens)
data = np.ones(size, dtype=self.dtype)
target = np.ones(size, dtype=self.dtype)
# ----------------------------------------------------
data = np.array((data, target), dtype=self.dtype).transpose(1, 0, 2, 3)
return self.train_test_split(data, self.test_size) if self.shuffle else dataСейчас в шаблоне задаются два массива с единицами, правильной формы: data и target. Они объединяются в общий массив с перестановкой оси строк на первое место.
Задание 3. Самоиндексация.
В строке с созданием синтетических данных переключите метод генерации с простого palindrome на трансформер-специфичный pointer_index.
Отрегулируйте глобальные параметры и параметры обучения таким образом, чтобы ошибка на тесте упала ниже 5e-1, при параметре max_tokens >= 16 (это единственное ограничение).
Пояснение.
Данный тест создает последовательность, индексируя входную собственными номерами токенов:
Yi = X[Xi]
0 1 2 3 4 5 6 7 8 9 9 9 9 1 2 3 -> 0 1 2 3 4 5 6 7 8 9 9 9 9 1 2 3
5 5 5 5 5 0 9 9 9 1 9 9 5 9 9 6 -> 0 0 0 0 0 5 1 1 1 5 1 1 0 1 1 9
1 0 0 0 0 0 0 0 1 2 3 4 5 6 7 8 -> 0 1 1 1 1 1 1 1 0 0 0 0 0 0 0 1
Это сложный тест, который практически неподъемен для нашего маленького трансформера. Но если применить полученные знания и как следует поработать с параметрами, то задачу можно и нужно выполнить без изменения архитектуры с текущим уровнем глубины в 1 блок.
Совет: используйте GPU.
Мы проделали огромный путь от наивного понимания трансформеров и поверхностного рассмотрения схем к глубокой теории и непосредственной реализации. Теперь вам раскрыта не только идея и логика трансформеров, но и технические нюансы построения архитектуры, стабилизации обучения, а так же генерации сообщений.
По своему опыту могу сказать, что люди обладающие истинным пониманием никогда не прячутся за сложностью формулировок, дипломами и громкими званиями, а показывают свои знания на практике, выражаясь наиболее понятным языком. Чем глубже познание человека, тем легче ему провести деконструкцию и объяснить сложную тему "на пальцах". Надеюсь и у меня это получилось.
Если данная статья будет востребованной, то я напишу уже продвинутый и гораздо более глубокий материал, посвященный своему собственному фреймворку. В нем мы рассмотрим технические нюансы построения динамического графа, реализацию автоградиента, систему остановки генерации сообщений и много чего еще.
Также в этой статье я упомянул множество технологий, которые сами по себе являются интересными темами и заслуживают отдельных статей, например: оптимизаторы и математика СГС, продвинутые методы позиционного кодирования, латентное внимание и тому подобное.
Еще раз скажу, что написать подобную статью — это очень тяжелый и неблагодарный труд. Наверняка, в ней есть неточности, ошибки в тексте и "своеобразный стиль". Давайте исправим их вместе.
Если есть конкретные вопросы по теории и коду, то я отвечу на них.
Ну и в конце ссылка на учебный репозиторий. В нем вы найдете весь код уже собранного трансформера, приложение с дополнительным кодом, а так же шаблоны для задач из домашней работы.
Ссылка на статью создателей трансформера: https://arxiv.org/abs/1706.03762;
Учебный репозиторий: https://github.com/MaxKuzmenko/my-little-transformer/blob/main/Article. DIY Transformer. MaxKuz13.ipynb.
* Все изображения подготовлены мною в GoogleSlides.