Tasks

Tasks are the building blocks of workflows

Creating a Task

To create a task you must first create a ZeebeWorker or ZeebeTaskRouter instance.

@worker.task(task_type="my_task")
def my_task():
    return {}

This is a task that does nothing. It receives no parameters and also doesn’t return any.

Note

While this task indeed returns a python dictionary, it doesn’t return anything to Zeebe. Do do that we have to fill the dictionary.

Task Exception Handler

An exception handler’s signature:

Callable[[Exception, Job], None]

In other words: an exception handler is a function that receives an Exception and Job instance (a pyzeebe class).

The exception handler is called when the task has failed.

To add an exception handler to a task:

from pyzeebe import Job


def my_exception_handler(exception: Exception, job: Job) -> None:
    print(exception)
    job.set_failure_status(message=str(exception))


@worker.task(task_type="my_task", exception_handler=my_exception_handler)
def my_task():
    raise Exception()

Now every time my_task is called (and then fails), my_exception_handler is called.

What does job.set_failure_status do?

This tells Zeebe that the job failed. The job will then be retried (if configured in workflow definition).

Task timeout

When creating a task one of the parameters we can specify is timeout.

@worker.task(task_type="my_task", timeout=20000)
def my_task(input: str):
    return {"output": f"Hello World, {input}!"}

Here we specify a timeout of 20000 milliseconds (20 seconds). If the job is not completed within this timeout, Zeebe will reactivate the job and another worker will take over.

The default value is 10000 milliseconds or 10 seconds.

Be sure to test your task’s time and adjust the timeout accordingly.

Tasks that don’t return a dictionary

Sometimes we want a task to return a singular JSON value (not a dictionary). To do this we can set the single_value parameter to True.

@worker.task(task_type="my_task", single_value=True, variable_name="y")
def my_task(x: int) -> int:
    return x + 1

This will create a task that receives parameter x and returns an integer called y.

So the above task is in fact equal to:

@worker.task(task_type="my_task")
def my_task(x: int) -> dict:
    return {"y": x + 1}

This can be helpful when we don’t want to read return values from a dictionary each time we call the task (in tests for example).

Note

The parameter variable_name must be supplied if single_value is true. If not given a NoVariableNameGiven will be raised.