Worker Reference

The ZeebeWorker class inherits from ZeebeTaskRouter class. This means that all methods that ZeebeTaskRouter has will also appear in ZeebeWorker.

class pyzeebe.ZeebeTaskRouter(before: List[Union[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job], Callable[[pyzeebe.job.job.Job], Awaitable[pyzeebe.job.job.Job]]]] = None, after: List[Union[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job], Callable[[pyzeebe.job.job.Job], Awaitable[pyzeebe.job.job.Job]]]] = None)
after(*decorators) → None

Add decorators to be performed after a job is run

Parameters:decorators (Iterable[TaskDecorator]) – The decorators to be performed after each job is run
before(*decorators) → None

Add decorators to be performed before a job is run

Parameters:decorators (Iterable[TaskDecorator]) – The decorators to be performed before each job is run
get_task(task_type: str) → pyzeebe.task.task.Task

Get a task by its type

Parameters:task_type (str) – The type of the wanted task
Returns:The wanted task
Return type:Task
Raises:TaskNotFoundError – If no task with specified type exists
remove_task(task_type: str) → pyzeebe.task.task.Task

Remove a task

Parameters:task_type (str) – The type of the wanted task
Returns:The task that was removed
Return type:Task
Raises:TaskNotFoundError – If no task with specified type exists
task(task_type: str, exception_handler: Callable[[Exception, pyzeebe.job.job.Job], Awaitable[T_co]] = <function default_exception_handler>, variables_to_fetch: Optional[List[str]] = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, max_running_jobs: int = 32, before: List[Union[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job], Callable[[pyzeebe.job.job.Job], Awaitable[pyzeebe.job.job.Job]]]] = None, after: List[Union[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job], Callable[[pyzeebe.job.job.Job], Awaitable[pyzeebe.job.job.Job]]]] = None, single_value: bool = False, variable_name: str = None)

Decorator to create a task

Parameters:
  • task_type (str) – The task type
  • exception_handler (ExceptionHandler) – Handler that will be called when a job fails.
  • variables_to_fetch (Optional[List[str]]) – The variables to request from Zeebe when activating jobs.
  • timeout_ms (int) – Maximum duration of the task in milliseconds. If the timeout is surpassed Zeebe will give up on the worker and retry it. Default: 10000 (10 seconds).
  • max_jobs_to_activate (int) – Maximum amount of jobs the worker will activate in one request to the Zeebe gateway. Default: 32
  • max_running_jobs (int) – Maximum amount of jobs that will run simultaneously. Default: 32
  • before (List[TaskDecorator]) – All decorators which should be performed before the task.
  • after (List[TaskDecorator]) – All decorators which should be performed after the task.
  • single_value (bool) – If the function returns a single value (int, string, list) and not a dictionary set this to True. Default: False
  • variable_name (str) – If single_value then this will be the variable name given to zeebe: { <variable_name>: <function_return_value> }
Raises:
  • DuplicateTaskTypeError – If a task from the router already exists in the worker
  • NoVariableNameGivenError – When single_value is set, but no variable_name is given
class pyzeebe.ZeebeWorker(grpc_channel: grpc.aio._base_channel.Channel, name: Optional[str] = None, request_timeout: int = 0, before: List[Union[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job], Callable[[pyzeebe.job.job.Job], Awaitable[pyzeebe.job.job.Job]]]] = None, after: List[Union[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job], Callable[[pyzeebe.job.job.Job], Awaitable[pyzeebe.job.job.Job]]]] = None, max_connection_retries: int = 10, watcher_max_errors_factor: int = 3, poll_retry_delay: int = 5)

A zeebe worker that can connect to a zeebe instance and perform tasks.

include_router(*routers) → None

Adds all router’s tasks to the worker.

Raises:DuplicateTaskTypeError – If a task from the router already exists in the worker
stop() → None

Stop the worker. This will emit a signal asking tasks to complete the current task and stop polling for new.

work() → None

Start the worker. The worker will poll zeebe for jobs of each task in a different thread.

Raises:
  • ActivateJobsRequestInvalidError – If one of the worker’s task has invalid types
  • ZeebeBackPressureError – If Zeebe is currently in back pressure (too many requests)
  • ZeebeGatewayUnavailableError – If the Zeebe gateway is unavailable
  • ZeebeInternalError – If Zeebe experiences an internal error
  • UnkownGrpcStatusCodeError – If Zeebe returns an unexpected status code
class pyzeebe.Job(key: int, _type: str, process_instance_key: int, bpmn_process_id: str, process_definition_version: int, process_definition_key: int, element_id: str, element_instance_key: int, custom_headers: Dict[KT, VT], worker: str, retries: int, deadline: int, variables: Dict[KT, VT], status: pyzeebe.job.job_status.JobStatus = <JobStatus.Running: 'Running'>, zeebe_adapter=None)
set_error_status(message: str, error_code: str = '') → None

Error status means that the job could not be completed because of a business error and won’t ever be able to be completed. For example: a required parameter was not given An error code can be added to handle the error in the Zeebe process

Parameters:
  • message (str) – The error message
  • error_code (str) – The error code that Zeebe will receive
Raises:
  • NoZeebeAdapterError – If the job does not have a configured ZeebeAdapter
  • ZeebeBackPressureError – If Zeebe is currently in back pressure (too many requests)
  • ZeebeGatewayUnavailableError – If the Zeebe gateway is unavailable
  • ZeebeInternalError – If Zeebe experiences an internal error
set_failure_status(message: str) → None

Failure status means a technical error has occurred. If retried the job may succeed. For example: connection to DB lost

Parameters:

message (str) – The failure message that Zeebe will receive

Raises:
  • NoZeebeAdapterError – If the job does not have a configured ZeebeAdapter
  • ZeebeBackPressureError – If Zeebe is currently in back pressure (too many requests)
  • ZeebeGatewayUnavailableError – If the Zeebe gateway is unavailable
  • ZeebeInternalError – If Zeebe experiences an internal error
set_success_status() → None

Success status means that the job has been completed as intended.

Raises:
  • NoZeebeAdapterError – If the job does not have a configured ZeebeAdapter
  • ZeebeBackPressureError – If Zeebe is currently in back pressure (too many requests)
  • ZeebeGatewayUnavailableError – If the Zeebe gateway is unavailable
  • ZeebeInternalError – If Zeebe experiences an internal error
class pyzeebe.JobStatus

An enumeration.

Completed = 'Completed'
ErrorThrown = 'ErrorThrown'
Failed = 'Failed'
Running = 'Running'