Как построить и развернуть ML-пайплайн в Яндекс Облаке. Практическое руководство

В новой статье CleverData мы расскажем о проектировании ML-пайплайна предсказания целевого действия с помощью Yandex Cloud. Пайплайн необходим для автоматического обмена данными с CDP CleverData Join - использования информации с платформы для обучения ML-моделей и формирования прогнозов поведения каждого пользователя. На примерах рассмотрим использование API сервисов Yandex Cloud, коснемся алгоритмов обработки данных и обучения ML-модели, а также расскажем о возникших проблемах. Под катом делимся кодом.


Что такое CDP и зачем нужны предиктивные атрибуты

Платформа управления клиентскими данными (Customer Data Platform) - это инструмент, помогающий компаниям собирать, анализировать и использовать информацию о своих клиентах. CDP объединяет данные из таких источников, как сайты, мобильные приложения, CRM-системы и т.п., чтобы сформировать полное представление о портрете клиента и истории его действий.

CDP CleverData Join содержит два основных типа данных: клиентские профили и события, привязанные к этим профилям. В состав профиля входят идентификаторы клиента и атрибуты, содержащие информацию о пользователе (его имя, демографические характеристики, предпочтения и многое другое). Атрибуты событий представляют собой информацию о конкретных действиях клиентов. Например, посещения страниц сайта, факты покупки товаров, реакции на маркетинговые кампании. Но CDP - это не только база данных. Это инструмент, который помогает делать полезные выводы о клиентах и повышать эффективность маркетинга.

Основная часть данных собирается из внешних систем, интегрированных с CDP. Однако при наличии такого массива данных возникает желание получить новую, предиктивную характеристику профиля, сформированную методом машинного обучения на собранных сведениях. Знать не то, что человек уже сделал, а предугадывать его следующие шаги.

В CDP предиктивные атрибуты, как правило, используются для прогнозирования поведения клиентов, определения их потребностей и предсказания вероятности совершения определенных действий. Например, для маркетологов может быть полезно иметь прогноз того, что клиент возьмет кредит или оформит подписку. Это можно вычислить при помощи ML-модели, обученной на исторических данных о поведении клиента. Предиктивные атрибуты играют важную роль в персонализации маркетинговых кампаний, улучшении обслуживания клиентов.

Конечно же, нужно понимать, что для моделирования предиктивных атрибутов требуется:

  • иметь качественные, структурированные данные для моделирования,

  • обеспечить достаточный объем таких данных,

  • обладать качественными алгоритмами предобработки и обучения.

Не менее важным вопросом является обслуживание моделей - их надо регулярно обновлять и дообучать на свежих данных, иначе качество прогнозирования будет неизбежно падать. Построению пайплайна, который обеспечивает автоматизированное взаимодействие с CDP, и посвящена эта статья.

MVP и его цели

Мы решили построить MVP пайплайна, реализующего передачу данных для обучения, само обучение и передачу предсказаний обратно в CDP CleverData Join, используя готовые инструменты, предоставляемые Yandex Cloud.

В качестве бизнес-задачи мы для себя определили предсказание вероятности совершения клиентом целевого действия на основании данных о переходах по страницам сайта - кликстриме. Это типовая задача бинарной классификации (классификация по определенным признакам в две группы) массива клиентов. Под целевым действием подразумевается посещение нужного нам URL (например, страницы оформления покупки или кредита). Поскольку кликстрим является цепочкой переходов пользователей по ссылкам, в качестве архитектуры модели была выбрана рекуррентная нейронная сеть (вид нейронных сетей, где связи между элементами образуют направленную последовательность).

Целью проекта стало построение полностью автоматизированного пайплайна, который приносил бы ценность заказчику в течение длительного времени. Для этого нам нужно было реализовать следующие цепочки операций.

  • Предобработка новых данных для обучения и инференса (непрерывной работы нейронной сети на устройствах конечного пользователя) и преобразование их из формата хранения в CDP в формат, требуемый для ML-модели.

  • Первичное обучение модели под задачу конкретного заказчика на кликстриме его клиентов.

  • Периодическое дообучение модели на новых данных, поступающих в CDP. Дообучение на новых данных крайне важно, поскольку с течением времени модель может потерять свою актуальность, а, значит, точность прогнозов значительно уменьшится.

  • Периодический инференс (обеспечение непрерывной работы нейронной сети на устройствах конечного пользователя) для клиентов с обновлениями в кликстриме и отгрузку результатов обратно в CDP.

Архитектура компонентов MVP в Yandex Cloud

Итак, нам потребовалось разработать и развернуть в Yandex Cloud сервисы, реализующие все необходимые для построения MVP цепочки операций. После анализа требований была предложена следующая архитектура решения:

Схема взаимодействия компонентов решения
Схема взаимодействия компонентов решения

На стороне CleverData Join находятся сервисы Model Manager, отвечающие за автоматизированное создание инфраструктуры под пайплайн в Yandex Cloud, и Data Processor, обеспечивающий периодическую первичную обработку новых данных и загрузку их в бакет S3. По сути Model Manager является Control Plane, который автоматизирует операции по развертыванию компонентов в Yandex Cloud при появлении потребности создать новый предиктивный атрибут. Остальные компоненты находятся в инфраструктуре Yandex Cloud.

  • В бакете Yandex Object Storage накапливаются данные, представляющие собой первично обработанный кликстрим. Там же должны располагаться артефакты и метаданные, полученные при обучении моделей, а также результаты инференса моделей.

  • В сервисе Yandex DataSphere автоматически создаётся проект, содержащий ноутбуки для обучения и инференса. При поступлении новых данных в Yandex Object Storage должен запускаться необходимый ноутбук. В ноутбуке обучения реализованы вторичная обработка данных на кластере Yandex DataProc, обучение модели непосредственно в сервисе, загрузка артефактов модели на S3. В ноутбуке инференса - получение предсказаний по новым данным, отгруженным на S3.

  • В сервисе Yandex Cloud Functions располагаются функции, необходимые для запуска ноутбуков, и триггеры, которые привязаны к обновлению данных в Yandex Object Storage по определенным при их создании префиксам. Таким образом, после отгрузки новых данных на бакет производится, в зависимости от расположения данных, обучение/дообучение модели, инференс модели и отгрузка предсказаний в REST API CleverData Join.

Так как важно обеспечить одновременную работу нескольких моделей совершения целевого действия для разных тенантов в системе CD Join, для каждой задачи нужен свой отдельный проект в DataSphere со своими шаблонными ноутбуками, облачными функциями и триггерами. Чтобы процесс был максимально автоматизирован, необходимо, чтобы каждый проект был преднастроен:

  • в него были загружены шаблонные ноутбуки,

  • был активирован готовый Docker-образ для проекта DataSphere.

Забегая вперёд, отметим, что с автоматизацией преднастройки проекта возникли некоторые затруднения, но об этом мы расскажем позднее.

Немного о предобработке данных

Data Processor запускается по расписанию и производит первичную обработку данных и записывает результат в хранилище Yandex Object Storage.

События кликстрима хранятся в CleverData Join в формате AVRO. Предобработка происходит в два этапа.

  • На первом этапе из событий кликстрима извлекаются идентификатор клиента, метка времени и URL перехода. Ссылки очищаются от технических доменов и обрезаются до верхнеуровневых доменов. Данные сохраняются в формате Parquet.

  • Далее для каждого идентификатора клиента формируется цепочка из посещенных URL в хронологическом порядке и дельт, то есть временных интервалов, прошедших между посещениями каждого URL.

Схема предобработки данных
Схема предобработки данных

Процесс, организованный в два этапа, позволяет избегать повторной первичной обработки старых событий. Старые события хранятся в формате Parquet в S3 бакете и периодически дополняются первично обработанными событиями за новые периоды. После этого происходит пересборка цепочек для клиентов, по которым есть обновления.

Автоматизированное создание инфраструктуры

У сервисов Yandex Cloud есть API, позволяющий взаимодействовать с ними без использования UI, что критически важно для реализации управления процессом в Model Manager. В этом разделе мы пройдём по всем автоматизированным через API операциям, а именно:

  • созданию сервисного аккаунта;

  • созданию и настройке сети VPC для кластера Data Proc;

  • созданию проекта в DataSphere;

  • созданию S3 бакета и получению статического ключа доступа;

  • созданию и настройке Cloud Functions и их триггеров.

Сначала создадим сервисный аккаунт, от имени которого будет происходит взаимодействие с ресурсами Yandex Cloud:

def create_service_account(folder_id: str,
                           name: str,
                           iam_token: str,
                           description: str = ''):
  url = 'https://iam.api.cloud.yandex.net/iam/v1/serviceAccounts'
  headers = {"Authorization": f"Bearer {iam_token}"}
  data = {
           "folderId": folder_id,
           "name": name,
           "description": description,
  response = requests.post(url, headers=headers, json=data)
  return response
sa_response = create_service_account(folder_id = folder_id,
                                     name = 'ta-service-account',
                                     iam_token = iam,
                                     description = 'ta-service-account')

Теперь, чтобы у сервисного аккаунта хватало прав на взаимодействие с необходимыми сервисами Yandex Cloud (DataSphere, Object Storage, Data Proc, Cloud Functions, VPC), выдадим ему необходимые роли. Для каталога это:

  • editor - нужна для чтения, записи и управления бакетом S3;

  • vpc.user, vpc.admin - необходимы для настройки доступа кластеру в интернет;

  • dataproc.agent - для использования кластера Data Proc;

  • dataproc.admin - для создания и удаления кластера Data Proc.

def update_access_bindings(resource_id : str,
                           folder_id: str,
                           roles: list = ['iam.serviceAccounts.user',
                           iam_token: str = iam):
   url = f'https://resource-manager.api.cloud.yandex.net/resource-manager/v1/folders/{folder_id}:updateAccessBindings'
   headers = {"Authorization": f"Bearer {iam_token}"}
   access_binding_deltas = [
           "action": "ADD",
           "accessBinding": {
               "roleId": role,
               "subject": {
                   "id": resource_id,
                   "type": "serviceAccount"
       } for role in roles
   payload = {
       "accessBindingDeltas": access_binding_deltas
   response = requests.post(url, json=payload, headers=headers)
   return response
response_update_access_bindings = update_access_bindings(

Также необходимо назначить роль сервисному аккаунту, чтобы можно было им управлять. Роль должна быть назначена как на ресурс:

def update_access_bindings_1(resource_id: str,
roles: list = ['datasphere.community-projects.editor',
iam_token: str = iam):
url = f"https://iam.api.cloud.yandex.net/iam/v1/serviceAccounts/{resource_id}:updateAccessBindings"
headers = {"Authorization": f"Bearer {iam_token}"}
access_binding_deltas = [
"action": "ADD",
"accessBinding": {
"roleId": role,
"subject": {
"id": resource_id,
"type": "serviceAccount"
} for role in roles
payload = {
"accessBindingDeltas": access_binding_deltas
response = requests.post(url, json=payload, headers=headers)
return response
update_access_response_1 = update_access_bindings_1(resource_id=service_account_id)

Для того, чтобы кластер Data Proc мог общаться с внешним миром, нам нужно создать сеть VPC, указать шлюз и сформировать таблицу маршрутизации. В этой сети необходимо выделить подсеть для настройки доступа в интернет при помощи созданного шлюза.

Итак, создаём сеть:

def create_network(folder_id: str,
                   network_name: str,
                   description: str,
                   iam_token: str = iam,
  url = 'https://vpc.api.cloud.yandex.net/vpc/v1/networks'
  headers = {"Authorization": f"Bearer {iam_token}"}
  data = {
    "folderId": folder_id,
    "name": network_name,
    "description": description,
  response = requests.post(url, headers=headers, json=data)
  return response
create_network_response = create_network(folder_id = folder_id,
                                         network_name = 'network-1',
                                         description = ''

…, шлюз:

def create_gateway(folder_id: str,
                   gateway_name: str,
                   description: str,
                   iam_token: str = iam,
  url = 'https://vpc.api.cloud.yandex.net/vpc/v1/gateways'
  headers = {"Authorization": f"Bearer {iam_token}"}
  data = {
    "folderId": folder_id,
    "name": gateway_name,
    "description": description,
    "sharedEgressGatewaySpec": {}
  response = requests.post(url, headers=headers, json=data)
  return response
response_create_gateway = create_gateway(folder_id = folder_id,
                                         gateway_name = 'gateway',
                                         description = ''

…, таблицу маршрутизации:

def create_route_table(folder_id: str,
                         route_table_name: str,
                         description: str,
                         gateway_id: str,
                         network_id: str,
                         destination_prefix: str = '',
                         iam_token: str = iam,
  url = 'https://vpc.api.cloud.yandex.net/vpc/v1/routeTables'
  headers = {"Authorization": f"Bearer {iam_token}"}
  data = {
    "folderId": folder_id,
    "name": route_table_name,
    "description": description,
    "networkId": network_id,
    "staticRoutes": [
        "destinationPrefix": destination_prefix,
        "gatewayId": gateway_id,
  response = requests.post(url, headers=headers, json=data)
  return response
response_create_route_table = create_route_table(folder_id = folder_id,
                                           route_table_name = 'route-table',
                                           description = '',
                                           gateway_id = gateway_id,
                                           network_id = network_id,
                                           destination_prefix = ''

…, и подсеть с доступом в интернет:

def create_subnet(folder_id: str,
                  subnet_name: str,
                  network_id: str,
                  route_table_id: str,
                  description: str,
                  ip_range: str,
                  zone_id: str,
                  dhcp_options: dict = {},
                  iam_token: str = iam,
  url = 'https://vpc.api.cloud.yandex.net/vpc/v1/subnets'
  headers = {"Authorization": f"Bearer {iam_token}"}
  data = {
    "folderId": folder_id,
    "name": subnet_name,
    "description": description,
    "networkId": network_id,
    "zoneId": zone_id,
    "v4CidrBlocks": [
    "routeTableId": route_table_id,
    "dhcpOptions": dhcp_options
  response = requests.post(url, headers=headers, json=data)
  return response
response_create_subnet_a = create_subnet(folder_id = folder_id,
                subnet_name = 'subnet-ru-central1-a',
                network_id = network_id,
                route_table_id = route_table_id,
                description = 'default subnet for zone ru-central1-a',
                ip_range = '',
                zone_id = 'ru-central1-a',
                dhcp_options = {},

В процессе работы мы будем создавать кластер Data Proc, на котором производится вторичная обработка данных.

При создании кластера важно не забыть передать идентификатор подсети, для которой мы ранее настраивали доступ в интернет. Нам нужно создать объект конфигурации с информацией о подкластерах, их ролях, типе и размере диска, количестве хостов. Этот объект будет передан в функцию создания кластера.

def create_cluster(folder_id: str,
                   cluster_name: str,
                   description: str,
                   config_spec: dict,
                   service_account_id: str,
                   zone_id: str,
                   bucket: str,
                   ui_proxy: bool,
                   security_group_ids: str,
                   host_group_ids: str,
                   deletion_protection: bool,
                   log_group_id: str,
                   iam_token: str = iam,
  url = 'https://dataproc.api.cloud.yandex.net/dataproc/v1/clusters'
  headers = {"Authorization": "Bearer {}".format(iam_token)}
  data = {
  "folderId": folder_id,
  "name": cluster_name,
  "description": description,
  "configSpec": config_spec,
  "zoneId": zone_id,
  "serviceAccountId": service_account_id,
  "uiProxy": ui_proxy,
  "deletionProtection": deletion_protection,
  response = requests.post(url, headers=headers, json=data)
  return response
config_spec = {
  "versionId": "2.0",
  "hadoop": {
  "services": [
  "sshPublicKeys": [
  "subclustersSpec": [
      "name": "master",
      "role": "MASTERNODE",
      "resources": {
      "resourcePresetId": "s2.micro",
      "diskTypeId": "network-ssd",
      "diskSize": "137438953472"
      "subnetId": a_subnet_id,
      "hostsCount": "1",
      "assignPublicIp": False,
      "name": "data",
      "role": "DATANODE",
      "resources": {
      "resourcePresetId": "s2.small",
      "diskTypeId": "network-hdd",
      "diskSize": "274877906944"
      "subnetId": a_subnet_id,
      "hostsCount": "1",
      "assignPublicIp": False,
response_create_cluster = create_cluster(folder_id = folder_id,
                                         cluster_name = 'xs-proc',
                                         description = '',
                                         config_spec = config_spec,
                                         service_account_id = service_account_id,
                                         zone_id = zone_id,
                                         bucket = '',
                                         ui_proxy = False,
                                         security_group_ids = '',
                                         host_group_ids = '',
                                         deletion_protection = False,
                                         log_group_id = '')
cluster_id = response_create_cluster.json()['metadata']['clusterId']

Так как кластер будет считывать данные с s3, нужно предоставить ему Credentials для доступа к S3:

#!spark --cluster=xs-proc --session preparing_session --variables key --variables sct
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net")
sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "")
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

Теперь мы готовы создать проект в DataSphere, в нем будут происходить все операции, связанные с обучением и инференсом ML-моделей:

def create_project(project_name: str,
                   community_id: str,
                   service_account_id: str,
                   description: str,
                   subnet_id: str,
                   commit_mode: str,
                   default_folder_id: str,
                   vm_inactivity_timeout: str,
                   default_dedicated_spec: str,
                   data_proc_cluster_id: str = '',
                   security_group_ids: list = [],
                   limits: dict = {},
                   early_access: bool = False,
                   ide: str = 'JUPYTER_LAB',
                   iam_token: str = iam):
 url = 'https://datasphere.api.cloud.yandex.net/datasphere/v2/projects'
 headers = {"Authorization": f"Bearer {iam_token}"}
 data = {
   "communityId": community_id,
   "name": project_name,
   "description": description,
   "settings": {
     "serviceAccountId": service_account_id,
     "subnetId": subnet_id,
     "dataProcClusterId": data_proc_cluster_id,
     "commitMode": commit_mode,
     "securityGroupIds": security_group_ids,
     "earlyAccess": early_access,
     "ide": ide,
     "defaultFolderId": default_folder_id,
     "vmInactivityTimeout": vm_inactivity_timeout,
     "defaultDedicatedSpec": default_dedicated_spec
   "limits": limits
 response = requests.post(url, headers=headers, json=data)
 return response
response_create_project = create_project(project_name = 'ta-service-project',
                                         community_id = community_id,
                                         service_account_id = service_account_id,
                                         subnet_id = a_subnet_id,
                                         description = 'ta model project',
                                         commit_mode = 'AUTO',
                                         default_folder_id = folder_id,
                                         vm_inactivity_timeout = '1800s',
                                         default_dedicated_spec = 'c1.4')

Сервисный аккаунт, который из Cloud Functions будет запускать ноутбук, должен быть в участниках проекта DataSphere. Выдадим ему соответствующую роль:

def update_access_bindings_2(resource_id : str,
                            project_id: str,
                            roles: list = ['datasphere.community-projects.editor'],
                            iam_token: str = iam):
   url = f"https://datasphere.api.cloud.yandex.net/datasphere/v2/projects/{project_id}:updateAccessBindings"
   headers = {"Authorization": f"Bearer {iam_token}"}
   access_binding_deltas = [
           "action": "ADD",
           "accessBinding": {
               "roleId": role,
               "subject": {
                   "id": resource_id,
                   "type": "serviceAccount"
       } for role in roles
   payload = {
       "accessBindingDeltas": access_binding_deltas
   response = requests.patch(url, json=payload, headers=headers)
   return response
response_update_access_bindings_2 = update_access_bindings_2(resource_id=service_account_id,

Вы можете использовать для обмена данными уже существующий бакет S3. Но, если его нет, то его создание также не представляет сложности:

def create_bucket(bucket_name: str,
                  folder_id: str,
                  default_storage_class: str,
                  max_size: str,
                  anonymous_access_flags: dict,
                  acl: dict,
                  versioning: str,
                  tags: list,
                  iam_token: str = iam,
  url = 'https://storage.api.cloud.yandex.net/storage/v1/buckets'
  headers = {"Authorization": f"Bearer {iam_token}"}
  data = {
    "name": bucket_name,
    "folderId": folder_id,
    "defaultStorageClass": default_storage_class,
    "maxSize": max_size,
    "anonymousAccessFlags": anonymous_access_flags,
    "versioning": versioning,
    "acl": acl,
    "tags": tags
  response = requests.post(url, headers=headers, json=data)
  return response
bucket_name = 'test-storage-042'
response_create_bucket = create_bucket(bucket_name = bucket_name,
                           folder_id = folder_id,
                           default_storage_class = 'STANDARD',
                           max_size = '3221225472',
                           anonymous_access_flags = {'read': False, 'list': False},
                           acl = {},
                           versioning = 'VERSIONING_DISABLED',
                           tags = [])

Однако крайне важно создать статический ключ доступа к бакету. Ключ позволяет читать данные с S3 как кластеру Data Proc, так и при помощи SDK boto3. Конечно же, у сервисного аккаунта service_account_id, указанного в функции, должна быть роль в каталоге с правами чтения и записи S3 бакета, в нашем случае это editor.

def create_static_key(service_account_id: str,
                      description: str,
                      iam_token: str = iam,
   url = 'https://iam.api.cloud.yandex.net/iam/aws-compatibility/v1/accessKeys'
   headers = {"Authorization": f"Bearer {iam_token}"}
   data = {
     "serviceAccountId": service_account_id,
     "description": description
   response = requests.post(url, headers=headers, json=data)
   return response
response_create_static_key = create_static_key(
                               service_account_id = service_account_id,
                               description = '')
key = response_create_static_key.json()['accessKey']['keyId']
secret = response_create_static_key.json()['secret']

Обязательно сохраняем идентификатор статического ключа и секретный ключ в защищенном месте. Мы передадим их через параметр запроса InputVariables метода execute при запуске ноутбуков.

Создаем функцию в Cloud Functions, которая будет запускать обучение:

def create_cloud_function(folder_id: str,
                          function_name: str,
                          description: str,
                          iam_token: str = iam):
   url = 'https://serverless-functions.api.cloud.yandex.net/functions/v1/functions'
   headers = {"Authorization": f"Bearer {iam_token}"}
   data = {
     "folderId": folder_id,
     "name": function_name,
     "description": description,
   response = requests.post(url, headers=headers, json=data)
   return response
response_create_cloud_function = create_cloud_function(folder_id = folder_id,
                                                       function_name = 'fit',
                                                       description = 'start fit')

Далее нужно собрать версию функции, в которую передадим сам код скрипта в виде строки. Чтобы получить эту строку, нужно заархивировать код функции в zip-архив и закодировать в base64. Строка передается в переменную content:

def create_cloud_function_version(function_id: str,
                                 resources: dict,
                                 runtime: str,
                                 entrypoint: str,
                                 log_options: dict,
                                 description: str,
                                 execution_timeout: str,
                                 service_account_id: str,
                                 content: str,
                                 iam_token: str = iam
   url = 'https://serverless-functions.api.cloud.yandex.net/functions/v1/versions'
   headers = {"Authorization": f"Bearer {iam_token}"}
   data = {
     "functionId": function_id,
     "runtime": runtime,
     "description": description,
     "entrypoint": entrypoint,
     "resources": resources,
     "executionTimeout": execution_timeout,
     "serviceAccountId": service_account_id,
     "logOptions": log_options,
     "content": content,
   response = requests.post(url, headers=headers, json=data)
   return response
response_create_cloud_function_version = create_cloud_function_version(
function_id = fit_cf_id,
resources = {'memory': '134217728'},
runtime = 'python312',
entrypoint = 'index.handler',
log_options = {'disabled': False, 'folderId': folder_id},
description = 'the function will execute fit notebook',      
execution_timeout = '1s',                                                                    
service_account_id = service_account_id,
content = str(encoded)[2:-1],
iam_token = iam

Функция и ее версия для запуска инференса добавляется полностью аналогичным образом.

Осталось создать триггеры, которые после появления новых данных в S3 запустят функции, которые в свою очередь запустят обучение, инференс или же отправку предсказаний в CleverData Join.

Мы будем отслеживать появление нового объекта на S3 с заданным префиксом, поэтому используем соответствующий event_type и передадим нужный prefix:

def create_trigger(folder_id: str,
                  trigger_name: str,
                  description: str,
                  event_type: str,
                  bucket_id: str,
                  prefix: str,
                  cloud_function_id: str,
                  service_account_id: str,
                  retry_attempts: str,
                  interval: str,
                  function_tag: str = '$latest',
                  iam_token: str = iam
  url = 'https://serverless-triggers.api.cloud.yandex.net/triggers/v1/triggers'
  headers = {"Authorization": f"Bearer {iam_token}"}
  data = {
    "folderId": folder_id,
    "name": trigger_name,
    "description": description,
    "rule": {
      "objectStorage": {
        "eventType": [
        "bucketId": bucket_id,
        "prefix": prefix,
        "invokeFunction": {
          "functionId": cloud_function_id,
          "functionTag": function_tag,
          "serviceAccountId": service_account_id,
          "retrySettings": {
            "retryAttempts": retry_attempts,
            "interval": interval
  response = requests.post(url, headers=headers, json=data)
  return response
response_create_fit_trigger = create_trigger(
                         folder_id = folder_id,
                         trigger_name = 'fit-test-trigger',
                         description = '',
                         event_type = 'OBJECT_STORAGE_EVENT_TYPE_CREATE_OBJECT',
                         bucket_id = bucket_id,
                         prefix = fitting_trigger_prefix,
                         cloud_function_id = fit_cf_id,
                         service_account_id = service_account_id,
                         retry_attempts = '5',
                         interval = '20s',
                         function_tag = '$latest')

Поговорим немного о возврате данных в CDP. В CleverData Join имеются готовые адаптеры на пакетную загрузку данных. При этом, все операции взаимодействия с CDP также превосходно автоматизируются через API.

Мы создаем экземпляр входящего адаптера для того, чтобы передавать и записывать в атрибут клиентского профиля CDP предсказания модели:

def create_adapter_instance_input(adapter_instance_id: str,
                                  source: str,
                                  cid: str,
                                  access_token: str,
                                  value: dict={}):
   headers = {"Authorization": "Bearer {}".format(access_token),
              "x-dmpkit-onbehalf-of": cid}
   url = f'https://domain_name.cleverdata.ru/integration-manager/v1/adapters/1/instances/{adapter_instance_id}/inputs'
   data = {
   response = requests.post(url, json=data, headers=headers)
   return response
response = create_adapter_instance_input(adapter_instance_id=adapter_instance_id,

Все, что останется сделать, - реализовать в теле функции загрузку данных в файловый менеджер CleverData Join. Это тоже можно реализовать через API. Загруженные данные будут обработаны ранее созданным адаптером.

Обучение и дообучение модели

После запуска ноутбука обучения или инференса в проекте DataSphere происходит вторичная обработка с использованием кластера Dataproc. Новые сведения о событиях кликстрима добавляются в цепочки действий клиента. Далее на полученных цепочках происходит обучение модели.

Код вторичной обработки и код обучения во время первичного и повторного запуска ноутбуков немного отличается. Один и тот же код в зависимости от состояния объектов в бакете обрабатывает следующие несколько ситуаций. Рассмотрим их по отдельности.

Первичное обучение и первичный инференс модели
Первичное обучение и первичный инференс модели

В ситуации, когда в бакете находится только файл с событиями за текущий период, полученный в результате первичной обработки, мы имеем дело с первичным обучением модели. Следовательно, после обработки данных ноутбук выполнит последовательность первичного обучения модели. Для ноутбука инференса реализована та же логика. Если это первые данные для инференса в бакете, то модель вычислит предсказания для всех профилей.

Дообучение модели
Дообучение модели

В случае, если в бакете уже находились данные за предыдущие периоды, произойдет объединение строк и построение новых цепочек. Далее - дообучение модели.


Схема для повторного инференса выглядит схожим образом. Отличие заключается в том, что в данных за предыдущие периоды будут оставлены только события клиентов текущего периода. За счёт этого цепочки будут перестроены только для новых клиентов - именно тех, по которым значения предиктивных атрибутов ещё не сформированы.

В процессе дообучения создается новая модель-кандидат. Для того, чтобы определить, лучше ли эта модель, чем предыдущая, сравнивается AUC-ROC score новой модели на тестовой выборке с аналогичным показателем предыдущей модели, который сохраняется в бакете S3. При отсутствии увеличения метрики несколько итераций дообучения подряд - процесс дообучения заканчивается. Если же после дообучения модель показала лучшее качество, чем предыдущая лучшая модель, то происходит обновление лучшей модели (а также токенизатора, значения порога предсказания, файла с AUC-ROC score).

Ограничения в MVP версии проекта

В процессе реализации решения обнаружили некоторые ограничения, проработка которых может помочь расширить функциональность этого сервиса в будущем.

Например, в настоящее время отсутствует метод для загрузки ноутбуков в проект DataSphere. Поэтому при реализации мы изменили логику и ввели концепцию сервисного ноутбука, который однократно создается в DataSphere вручную. Это -разовая операция, нет необходимости выполнять ее для каждой новой задачи предсказания.

При заведении новой задачи выполнения целевого действия Model Manager размещает рабочие ноутбуки в S3 бакете. Сервисный ноутбук скачивает их в файловую систему DataSphere и последовательно запускает, если для задачи, соответствующей ноутбуку, есть новые первично обработанные данные.

Переход к данной концепции позволил ускорить и удешевить вторичную обработку. Теперь она сопровождается единоразовым поднятием кластера Data Proc и единой вторичной обработкой файлов всех задач клиента, под которые есть свежие данные.

Подводя итог, важно отметить, что в совокупности с концепцией сервисного ноутбука добавление метода загрузки ноутбуков в API позволит добиться еще более эффективной автоматизации развертывания пайплайна в Yandex Cloud, исключив необходимость в использовании UI.

Отключение режима запуска ноутбуков в Serverless в DataSphere

MVP изначально проектировалось в Serverless-парадигме - планировалось выделять ресурсы только в момент запуска вычислительных задач. Разумеется, мы с огромным удовольствием воспользовались Serverless-режимом запуска ноутбуков в DataSphere - это позволяло распределять рабочую нагрузку на CPU- и GPU-ресурсы в зависимости от характера нагрузки, хотя и приходилось взамен жертвовать временем на сериализацию, десериализацию переменных в процессе перехода между инстансами. Однако данный режим более не поддерживается, и, следовательно, нами был осуществлен переход к Dedicated.

Cтоит подчеркнуть, что переход от режима Serverless к Dedicated не повлиял на увеличение расходов на ресурсы вследствие простаивания инстансов, поскольку выделенная ВМ на запуск ноутбука через API после завершения выполнения инструкций моментально удаляется.

В рамках доработки текущего ограничения мы должны будем подобрать наиболее оптимальную конфигурацию ВМ при запуске в режиме Dedicated.

Ручная активация docker-образа в проекте

Пока что нет возможности взаимодействовать с ресурсом Docker-образ проекта DataSphere посредством API. Однако в ближайшем будущем этот функционал будет добавлен в API, что также обеспечит большую гибкость при развертывании пайплайна.

Для повышения функциональности ML-сервиса в CDP все перечисленные ограничения будут проработаны при разработке его продакшн-версии, в том числе некоторые из них будут усовершенствованы в сотрудничестве с Yandex Cloud.

Итоги MVP

Подводя итоги, можно с уверенностью сказать, что цели MVP были достигнуты.

  • Мы проверили гипотезу о возможности практически полностью автоматизированного развертывания инфраструктуры сервиса в облаке.

  • Построили эффективный конвейер предобработки данных.

  • Научились запускать код обучения и инференса по внешним триггерам.

  • Организовали процесс дообучения модели.

Использование ML-сервисов Yandex Cloud позволяет значительно упростить разработку решений по предиктивному анализу данных. Разработанный функционал является отличным дополнением к уже существующим широким возможностям платформы CleverData Join в части накопления знаний о едином профиле пользователя и построении его клиентского пути и позволяет обогащать данные о клиентской базе и открывает возможности использования предиктивных атрибутов для сегментации аудитории и организации адресных взаимодействий с клиентом.


