Python Workflows в бета-тесте: Создавайте сложные приложения на Cloudflare без лишних сложностей

Разработчики уже сейчас могут использовать Cloudflare Workflows для создания долго выполняющихся многоэтапных приложений на Workers. Теперь доступны Python Workflows, что означает возможность использования языка по выбору для оркестрации многоэтапных приложений.

С помощью Workflows вы можете автоматизировать последовательность идемпотентных шагов в вашем приложении со встроенной обработкой ошибок и повторными попытками. Но изначально Workflows поддерживались только в TypeScript. Поскольку Python является де-факто языком выбора для конвейеров данных, искусственного интеллекта/машинного обучения и автоматизации задач — всех областей, которые сильно зависят от оркестрации — это создавало сложности для многих разработчиков.

За последние годы мы предоставляли разработчикам инструменты для создания таких приложений на Python в Cloudflare. В 2020 году мы добавили Python в Workers через Transcrypt перед прямой интеграцией Python в workerd в 2024 году. В начале этого года мы добавили поддержку CPython вместе с любыми пакетами, собранными в Pyodide, такими как matplotlib и pandas, в Workers. Теперь также поддерживаются Python Workflows, поэтому разработчики могут создавать надежные приложения, используя язык, который они лучше всего знают.

Зачем использовать Python для Workflows?

Представьте, что вы обучаете LLM. Вам нужно разметить набор данных, подать данные, дождаться выполнения модели, оценить потери, скорректировать модель и повторить. Без автоматизации вам нужно было бы запускать каждый шаг, вручную отслеживать завершение и затем запускать следующий. Вместо этого вы можете использовать workflow для оркестрации обучения модели, запуская каждый шаг после завершения предыдущего. Для любых необходимых ручных корректировок, таких как оценка потерь и соответствующая настройка модели, вы можете реализовать шаг, который уведомляет вас и ожидает необходимого ввода.

Рассмотрим конвейеры данных, которые являются основным случаем использования Python для приема и обработки данных. Автоматизируя конвейер данных через определенный набор идемпотентных шагов, разработчики могут развернуть workflow, который обрабатывает весь конвейер данных за них.

Возьмем другой пример: создание ИИ-агентов, например, агента для управления покупками продуктов. Каждую неделю вы вводите свой список рецептов, и агент (1) составляет список необходимых ингредиентов, (2) проверяет, какие ингредиенты у вас остались с предыдущих недель, и (3) заказывает разницу для самовывоза из местного продуктового магазина. Используя Workflow, это может выглядеть так:

  1. await step.wait_for_event() пользователь вводит список покупок

  2. step.do() составить список необходимых ингредиентов

  3. step.do() сравнить список необходимых ингредиентов с оставшимися

  4. step.do() выполнить API-вызов для размещения заказа

  5. step.do() провести оплату

Использование workflows как инструмента для создания агентов на Cloudflare может упростить архитектуру агентов и повысить вероятность их завершения благодаря повторным попыткам отдельных шагов и сохранению состояния. Поддержка Python Workflows означает, что создавать агентов на Python стало проще, чем когда-либо.

Как работают Python Workflows

Cloudflare Workflows использует базовую инфраструктуру, которую мы создали для устойчивого выполнения, предоставляя при этом идиоматический способ для пользователей Python писать свои workflows. Кроме того, мы стремились к полной функциональной совместимости между JavaScript и Python SDK. Это возможно благодаря тому, что Cloudflare Workers поддерживают Python непосредственно в самой среде выполнения.

Создание Python Workflow

Cloudflare Workflows полностью построены на основе Workers и Durable Objects. Каждый элемент играет свою роль в хранении метаданных Workflow и информации на уровне экземпляра. Для получения более подробной информации о том, как работает платформа Workflows, ознакомьтесь с этим постом в блоге.

В самом низу плоскости управления Workflows находится пользовательский Worker, который является WorkflowEntrypoint. Когда экземпляр Workflow готов к запуску, механизм Workflow вызовет метод run пользовательского worker через RPC, который в данном случае будет Python Worker.

Это пример скелета для объявления Workflow, предоставленный официальной документацией:

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // Steps here
  }
}

Метод run, как показано выше, предоставляет параметр WorkflowStep, который реализует API устойчивого выполнения. Именно на это полагаются пользователи для выполнения "не более одного раза". Эти API реализованы в JavaScript и должны быть доступны в контексте Python Worker.

WorkflowStep должен пересечь барьер RPC, что означает, что механизм (вызывающая сторона) предоставляет его как RpcTarget. Эта настройка позволяет пользовательскому Workflow (вызываемой стороне) заменить параметр заглушкой. Эта заглушка затем позволяет использовать API устойчивого выполнения для Workflows через RPC обратно к механизму. Чтобы узнать больше о сериализации RPC и о том, как функции могут передаваться между вызывающей и вызываемой сторонами, прочитайте документацию по удаленным вызовам процедур.

Все это справедливо как для Python, так и для JavaScript Workflows, поскольку мы фактически не меняем способ вызова пользовательского Worker со стороны Workflows. Однако в случае с Python существует еще один барьер — языковой мост между Python и JavaScript модулем. Когда RPC-запрос направлен на Python Worker, существует модуль точки входа на JavaScript, отвечающий за проксирование запроса для обработки скриптом Python и последующего возврата вызывающей стороне. Этот процесс обычно включает преобразование типов до и после обработки запроса.

Преодоление языкового барьера

Python workers полагаются на Pyodide, который представляет собой порт CPython в WebAssembly. Pyodide предоставляет интерфейс внешних функций (FFI) к JavaScript, который позволяет вызывать методы JavaScript из Python. Это механизм, который позволяет другим привязкам и пакетам Python работать на платформе Workers. Поэтому мы используем этот слой FFI не только для того, чтобы позволить напрямую использовать привязку Workflow, но и для предоставления методов WorkflowStep на Python. Другими словами, учитывая, что WorkflowEntrypoint является специальным классом для среды выполнения, метод run вручную оборачивается так, что WorkflowStep предоставляется как JsProxy вместо преобразования типов, как другие объекты JavaScript. Более того, оборачивая API с точки зрения пользовательского Worker, мы позволяем себе внести некоторые корректировки в общий опыт разработки, вместо простого предоставления JavaScript SDK другому языку с другой семантикой.

Создание Pythonic SDK для Workflows

Большая часть портирования Workflows на Python включает предоставление интерфейса, который будет знаком пользователям Python и не вызовет проблем при использовании, аналогично тому, что происходит с нашими JavaScript API. Давайте сделаем шаг назад и посмотрим на фрагмент определения Workflow (написанного на Typescript).

import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent} from 'cloudflare:workers';
 
export class MyWorkflow extends WorkflowEntrypoint {
    async run(event: WorkflowEvent<YourEventType>, step: WorkflowStep) {
        let state = step.do("мой первый шаг", async () => {
          // Доступ к свойствам через event.payload
          let userEmail = event.payload.userEmail
          let createdTimestamp = event.payload.createdTimestamp
          return {"userEmail": userEmail, "createdTimestamp": createdTimestamp}
	    })
 
        step.sleep("мой первый сон", "30 минут");
 
        await step.waitForEvent<EventType>("получить пример события", { type: "простое-событие", timeout: "1 час" })
 
   	 const developerWeek = Date.parse("22 сент 2025 13:00:00 UTC");
        await step.sleepUntil("сон до истечения времени X", developerWeek)
    }
}

Реализация API рабочих процессов на Python требует модификации метода do. В отличие от других языков, Python не поддерживает легко анонимные колбэки. Такое поведение обычно достигается с помощью декораторов, которые в данном случае позволяют нам перехватывать метод и представлять его идиоматически. Другими словами, все параметры сохраняют свой исходный порядок, а декорированный метод служит колбэком.

Методы waitForEvent, sleep и sleepUntil могут сохранять свои исходные сигнатуры, при условии что их имена преобразованы в snake_case.

Вот соответствующая версия на Python для того же рабочего процесса, достигающая аналогичного поведения:

from workers import WorkflowEntrypoint
 
class MyWorkflow(WorkflowEntrypoint):
    async def run(self, event, step):
        @step.do("мой первый шаг")
        async def my_first_step():
            user_email = event["payload"]["userEmail"]
            created_timestamp = event["payload"]["createdTimestamp"]
            return {
                "userEmail": user_email,
                "createdTimestamp": created_timestamp,
            }
 
        await my_first_step()
 
        step.sleep("мой первый сон", "30 минут")
 
         await step.wait_for_event(
            "получить пример события",
            "простое-событие",
            timeout="1 час",
        )
 
        developer_week = datetime(2024, 10, 24, 13, 0, 0, tzinfo=timezone.utc)
        await step.sleep_until("сон до истечения времени X", developer_week)

DAG рабочие процессы

При проектировании рабочих процессов мы часто управляем зависимостями между шагами, даже когда некоторые из этих задач могут выполняться параллельно. Хотя мы об этом не задумываемся, многие рабочие процессы имеют направленный ациклический граф (DAG) выполнения. Параллельность достижима в первой итерации Python рабочих процессов (т.е. минимальном портировании на Python Workers), потому что Pyodide захватывает Javascript thenables и проксирует их в Python awaitables.

Следовательно, asyncio.gather работает как аналог Promise.all. Хотя это совершенно нормально и готово к использованию в SDK, мы также поддерживаем декларативный подход.

Одним из преимуществ декорирования метода do является то, что мы можем по сути предоставлять дальнейшие абстракции поверх исходного API и заставить их работать в обёртке точки входа. Вот пример Python API, использующего возможности DAG:

from workers import Response, WorkflowEntrypoint

class PythonWorkflowDAG(WorkflowEntrypoint):
    async def run(self, event, step):

        @step.do('зависимость 1')
        async def dep_1():
            # делает что-то
            print('выполнение dep1')

        @step.do('зависимость 2')
        async def dep_2():
            # делает что-то
            print('выполнение dep2')

        @step.do('демо шаг', depends=[dep_1, dep_2], concurrent=True)
        async def final_step(res1=None, res2=None):
            # делает что-то
            print('что-то')

        await final_step()

Такой подход делает объявление рабочего процесса гораздо чище, оставляя управление состоянием механизму рабочих процессов data plane, а также обёртке рабочих процессов Python Workers. Обратите внимание, что хотя несколько шагов могут выполняться с одним и тем же именем, механизм будет слегка изменять имя каждого шага для обеспечения уникальности. В Python рабочих процессах зависимость считается разрешённой, когда начальный шаг, связанный с ней, был успешно завершён.

Попробуйте