Python Workflows в бета-тестировании: Создавайте сложные приложения на Cloudflare без лишних сложностей

Разработчики уже сейчас могут использовать Cloudflare Workflows для создания длительных многоэтапных приложений на Workers. Теперь появились Python Workflows, что означает возможность использовать предпочитаемый язык для оркестрации многоэтапных приложений.

С помощью Workflows вы можете автоматизировать последовательность идемпотентных шагов в вашем приложении со встроенной обработкой ошибок и поведением повтора. Но изначально Workflows поддерживались только в TypeScript. Поскольку Python является де-факто языком выбора для конвейеров данных, искусственного интеллекта/машинного обучения и автоматизации задач — всего, что сильно зависит от оркестрации — это создавало сложности для многих разработчиков.

За последние годы мы предоставляли разработчикам инструменты для создания таких приложений на Python в Cloudflare. В 2020 году мы добавили Python в Workers через Transcrypt перед прямой интеграцией Python в workerd в 2024 году. Ранее в этом году мы добавили поддержку CPython вместе с любыми пакетами, собранными в Pyodide, такими как matplotlib и pandas, в Workers. Теперь также поддерживаются Python Workflows, поэтому разработчики могут создавать надежные приложения, используя язык, который они лучше всего знают.

Зачем Python для Workflows?

Представьте, что вы обучаете большую языковую модель (LLM). Вам нужно разметить набор данных, подать данные, дождаться выполнения модели, оценить потери, скорректировать модель и повторить. Без автоматизации вам нужно было бы запускать каждый шаг, вручную отслеживать завершение и затем запускать следующий. Вместо этого вы можете использовать workflow для оркестрации обучения модели, запуская каждый шаг после завершения предыдущего. Для любых необходимых ручных корректировок, таких как оценка потерь и соответствующая настройка модели, вы можете реализовать шаг, который уведомляет вас и ожидает необходимого ввода.

Рассмотрим конвейеры данных — это основной вариант использования Python для приема и обработки данных. Автоматизируя конвейер данных через определенный набор идемпотентных шагов, разработчики могут развернуть workflow, который обрабатывает весь конвейер данных за них.

Возьмем другой пример: создание ИИ-агентов, например агента для управления вашими покупками продуктов. Каждую неделю вы вводите свой список рецептов, и агент (1) составляет список необходимых ингредиентов, (2) проверяет, какие ингредиенты у вас остались с предыдущих недель, и (3) заказывает разницу для самовывоза из вашего местного продуктового магазина. Используя Workflow, это может выглядеть так:

  1. await step.wait_for_event() пользователь вводит список покупок

  2. step.do() составить список необходимых ингредиентов

  3. step.do() проверить список необходимых ингредиентов против оставшихся

  4. step.do() выполнить API-вызов для размещения заказа

  5. step.do() продолжить с оплатой

Использование workflows как инструмента для создания агентов на Cloudflare может упростить архитектуру агентов и повысить их шансы на завершение благодаря повторным попыткам отдельных шагов и сохранению состояния. Поддержка Python Workflows означает, что создавать агентов на Python стало проще, чем когда-либо.

Как работают Python Workflows

Cloudflare Workflows использует базовую инфраструктуру, которую мы создали для устойчивого выполнения, предоставляя при этом идиоматичный способ для пользователей Python писать свои workflows. Кроме того, мы стремились к полной функциональной совместимости между jаvascript и Python SDK. Это возможно благодаря тому, что Cloudflare Workers поддерживают Python непосредственно в самой среде выполнения.

Создание Python Workflow

Cloudflare Workflows полностью построены на основе Workers и Durable Objects. Каждый элемент играет свою роль в хранении метаданных Workflow и информации на уровне экземпляра. Для более подробной информации о том, как работает платформа Workflows, ознакомьтесь с этим постом в блоге.

В самой основе управляющей плоскости Workflows находится пользовательский Worker, который является WorkflowEntrypoint. Когда экземпляр Workflow готов к запуску, механизм Workflow вызовет метод run пользовательского worker через RPC, который в данном случае будет Python Worker.

Это пример каркаса для объявления Workflow, предоставленный официальной документацией:

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // Шаги здесь
  }
}

Метод run, как показано выше, предоставляет параметр WorkflowStep, который реализует API устойчивого выполнения. Именно на это полагаются пользователи для выполнения не более одного раза. Эти API реализованы на jаvascript и должны быть доступны в контексте Python Worker.

WorkflowStep должен пересекать барьер RPC, что означает, что механизм (вызывающая сторона) предоставляет его как RpcTarget. Такая настройка позволяет пользовательскому Workflow (вызываемой стороне) заменить параметр заглушкой. Эта заглушка затем позволяет использовать API устойчивого выполнения для Workflows через RPC обратно к механизму. Чтобы узнать больше о сериализации RPC и о том, как функции могут передаваться между вызывающей и вызываемой сторонами, прочитайте документацию по удаленным вызовам процедур.

Все это верно как для Python, так и для jаvascript Workflows, поскольку мы фактически не меняем способ вызова пользовательского Worker со стороны Workflows. Однако в случае с Python существует еще один барьер — языковой мост между Python и jаvascript модулем. Когда RPC-запрос направляется на Python Worker, существует модуль точки входа на jаvascript, отвечающий за проксирование запроса для обработки скриптом Python с последующим возвратом вызывающей стороне. Этот процесс обычно включает преобразование типов до и после обработки запроса.

Преодоление языкового барьера

Python workers полагаются на Pyodide, который представляет собой порт CPython в WebAssembly. Pyodide предоставляет интерфейс внешних функций (FFI) к jаvascript, который позволяет вызывать методы jаvascript из Python. Это механизм, который позволяет другим привязкам и пакетам Python работать на платформе Workers. Поэтому мы используем этот слой FFI не только для того, чтобы разрешить прямое использование привязки Workflow, но и для предоставления методов WorkflowStep на Python. Другими словами, учитывая, что WorkflowEntrypoint является специальным классом для среды выполнения, метод run вручную оборачивается так, чтобы WorkflowStep предоставлялся как JsProxy вместо преобразования типов как другие объекты jаvascript. Более того, оборачивая API с точки зрения пользовательского Worker, мы позволяем себе вносить некоторые корректировки в общий опыт разработки, вместо простого предоставления jаvascript SDK другому языку с другой семантикой.

Создание Pythonic SDK для Workflows

Большая часть портирования Workflows на Python включает предоставление интерфейса, который будет знаком пользователям Python и не вызовет проблем в использовании, аналогично тому, что происходит с нашими jаvascript API. Давайте сделаем шаг назад и посмотрим на фрагмент определения Workflow (написанного на Typescript).

import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent} from 'cloudflare:workers';
 
export class MyWorkflow extends WorkflowEntrypoint {
    async run(event: WorkflowEvent<YourEventType>, step: WorkflowStep) {
        let state = step.do("мой первый шаг", async () => {
          // Доступ к свойствам через event.payload
          let userEmail = event.payload.userEmail
          let createdTimestamp = event.payload.createdTimestamp
          return {"userEmail": userEmail, "createdTimestamp": createdTimestamp}
	    })
 
        step.sleep("мой первый сон", "30 минут");
 
        await step.waitForEvent<EventType>("получить пример события", { type: "simple-event", timeout: "1 час" })
 
   	 const developerWeek = Date.parse("22 сент 2025 13:00:00 UTC");
        await step.sleepUntil("спать до X таймаута", developerWeek)
    }
}

Реализация API рабочих процессов на Python требует модификации метода do. В отличие от других языков, Python не поддерживает легко анонимные обратные вызовы. Такое поведение обычно достигается с помощью декораторов, которые в данном случае позволяют нам перехватывать метод и представлять его идиоматически. Другими словами, все параметры сохраняют свой исходный порядок, а декорированный метод служит обратным вызовом.

Методы waitForEvent, sleep и sleepUntil могут сохранить свои исходные сигнатуры, при условии что их имена будут преобразованы в snake case.

Вот соответствующая версия на Python для того же рабочего процесса, достигающая аналогичного поведения:

from workers import WorkflowEntrypoint
 
class MyWorkflow(WorkflowEntrypoint):
    async def run(self, event, step):
        @step.do("мой первый шаг")
        async def my_first_step():
            user_email = event["payload"]["userEmail"]
            created_timestamp = event["payload"]["createdTimestamp"]
            return {
                "userEmail": user_email,
                "createdTimestamp": created_timestamp,
            }
 
        await my_first_step()
 
        step.sleep("мой первый сон", "30 минут")
 
         await step.wait_for_event(
            "получить пример события",
            "simple-event",
            timeout="1 час",
        )
 
        developer_week = datetime(2024, 10, 24, 13, 0, 0, tzinfo=timezone.utc)
        await step.sleep_until("спать до X таймаута", developer_week)

DAG рабочие процессы

При проектировании рабочих процессов мы часто управляем зависимостями между шагами, даже когда некоторые из этих задач могут выполняться параллельно. Хотя мы об этом не задумываемся, многие рабочие процессы имеют направленный ациклический граф (DAG) выполнения. Параллельность достижима в первой итерации Python рабочих процессов (т.е. минимальном портировании на Python Workers), потому что Pyodide захватывает jаvascript thenables и проксирует их в Python awaitables.

Следовательно, asyncio.gather работает как аналог Promise.all. Хотя это совершенно нормально и готово к использованию в SDK, мы также поддерживаем декларативный подход.

Одним из преимуществ декорирования метода do является то, что мы можем по сути предоставить дальнейшие абстракции поверх исходного API и заставить их работать в обёртке точки входа. Вот пример Python API, использующего возможности DAG:

from workers import Response, WorkflowEntrypoint

class PythonWorkflowDAG(WorkflowEntrypoint):
    async def run(self, event, step):

        @step.do('зависимость 1')
        async def dep_1():
            # делает что-то
            print('выполнение dep1')

        @step.do('зависимость 2')
        async def dep_2():
            # делает что-то
            print('выполнение dep2')

        @step.do('демо do', depends=[dep_1, dep_2], concurrent=True)
        async def final_step(res1=None, res2=None):
            # делает что-то
            print('что-то')

        await final_step()

Такой подход делает объявление рабочего процесса намного чище, оставляя управление состоянием движку рабочих процессов data plane, а также обёртке рабочего процесса Python workers. Обратите внимание, что хотя несколько шагов могут выполняться с одним и тем же именем, движок будет слегка изменять имя каждого шага, чтобы обеспечить уникальность. В Python рабочих процессах зависимость считается разрешённой, как только начальный шаг с её участием был успешно завершён.

Попробуйте