Worker Quickstart

Create and start a worker

Run using event loop

import asyncio

from pyzeebe import ZeebeWorker, create_insecure_channel

channel = create_insecure_channel()
worker = ZeebeWorker(channel)


@worker.task(task_type="my_task")
async def my_task(x: int):
    return {"y": x + 1}

loop = asyncio.get_event_loop()
loop.run_until_complete(worker.work())

Warning

Calling worker.work directly using asyncio.run will not work. When you create an async grpc channel a new event loop will automatically be created, which causes problems when running the worker (see: https://github.com/camunda-community-hub/pyzeebe/issues/198).

An easy workaround:

async def main():
    channel = create_insecure_channel()
    worker = ZeebeWorker(channel)
    await worker.work()

asyncio.run(main())

This does make it somewhat harder to add tasks to a worker. The recommended way to deal with this is using a Task Router.

Worker connection options

To change connection retries:

worker = ZeebeWorker(grpc_channel, max_connection_retries=1)  # Will only accept one failure and disconnect upon the second

This means the worker will disconnect upon two consecutive failures. Each time the worker connects successfully the counter is reset.

Note

The default behavior is 10 retries. If you want infinite retries just set to -1.

Add a task

To add a task to the worker:

@worker.task(task_type="my_task")
async def my_task(x: int):
    return {"y": x + 1}

# Or using a non-async function:

@worker.task(task_type="my_task")
def second_task(x: int):
    return {"y": x + 1}

Stopping a worker

To stop a running worker:

# Trigger this on some event (SIGTERM for example)
async def shutdown():
    await worker.stop()