Worker Reference

The ZeebeTaskHandler class from which both ZeebeWorker and ZeebeTaskRouter inherit.

class pyzeebe.worker.task_handler.ZeebeTaskHandler(before: List[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job]] = None, after: List[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job]] = None)
get_task(task_type: str) → pyzeebe.task.task.Task

Get a task by its type

Parameters:task_type (str) – The type of the wanted task
Returns:The wanted task
Return type:Task
Raises:TaskNotFound – If no task with specified type exists
remove_task(task_type: str) → pyzeebe.task.task.Task

Remove a task

Parameters:task_type (str) – The type of the wanted task
Returns:The task that was removed
Return type:Task
Raises:TaskNotFound – If no task with specified type exists
task(task_type: str, exception_handler: Callable[[Exception, pyzeebe.job.job.Job], None] = <function default_exception_handler>, variables_to_fetch: List[str] = None, timeout: int = 10000, max_jobs_to_activate: int = 32, before: List[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job]] = None, after: List[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job]] = None, single_value: bool = False, variable_name: str = None)

Decorator to create a task

Parameters:
  • before (List[TaskDecorator]) – All decorators which should be performed before the task.
  • after (List[TaskDecorator]) – All decorators which should be performed after the task.
  • timeout (int) – How long Zeebe should wait before the job is retried. Default: 10000 milliseconds
  • single_value (bool) – If the function returns a single value (int, string, list) and not a dictionary set this to True. Default: False
  • variable_name (str) – If single_value then this will be the variable name given to zeebe: { <variable_name>: <function_return_value> }
  • timeout – Maximum duration of the task in milliseconds. If the timeout is surpasses Zeebe will give up on the job and retry it. Default: 10000
  • max_jobs_to_activate (int) – Maximum jobs the worker will execute in parallel (of this task). Default: 32
Raises:

DuplicateTaskType – If a task from the router already exists in the worker

class pyzeebe.ZeebeWorker(name: str = None, request_timeout: int = 0, hostname: str = None, port: int = None, credentials: pyzeebe.credentials.base_credentials.BaseCredentials = None, secure_connection: bool = False, before: List[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job]] = None, after: List[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job]] = None, max_connection_retries: int = 10, watcher_max_errors_factor: int = 3)

A zeebe worker that can connect to a zeebe instance and perform tasks.

include_router(*routers) → None

Adds all router’s tasks to the worker.

Raises:DuplicateTaskType – If a task from the router already exists in the worker
stop(wait: bool = False) → None

Stop the worker. This will emit a signal asking tasks to complete the current task and stop polling for new.

Parameters:wait (bool) – Wait for all tasks to complete
work(watch: bool = False) → None

Start the worker. The worker will poll zeebe for jobs of each task in a different thread.

Parameters:

watch (bool) – Start a watcher thread that restarts task threads on error

Raises:
  • ActivateJobsRequestInvalid – If one of the worker’s task has invalid types
  • ZeebeBackPressure – If Zeebe is currently in back pressure (too many requests)
  • ZeebeGatewayUnavailable – If the Zeebe gateway is unavailable
  • ZeebeInternalError – If Zeebe experiences an internal error
class pyzeebe.ZeebeTaskRouter(before: List[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job]] = None, after: List[Callable[[pyzeebe.job.job.Job], pyzeebe.job.job.Job]] = None)
class pyzeebe.Job(key: int, _type: str, workflow_instance_key: int, bpmn_process_id: str, workflow_definition_version: int, workflow_key: int, element_id: str, element_instance_key: int, custom_headers: Dict[KT, VT], worker: str, retries: int, deadline: int, variables: Dict[KT, VT], status: pyzeebe.job.job_status.JobStatus = <JobStatus.Running: 'Running'>, zeebe_adapter=None)
set_error_status(message: str) → None

Error status means that the job could not be completed because of a business error and won’t ever be able to be completed. For example: a required parameter was not given

Parameters:

message (str) – The error message that Zeebe will receive

Raises:
  • NoZeebeAdapter – If the job does not have a configured ZeebeAdapter
  • ZeebeBackPressure – If Zeebe is currently in back pressure (too many requests)
  • ZeebeGatewayUnavailable – If the Zeebe gateway is unavailable
  • ZeebeInternalError – If Zeebe experiences an internal error
set_failure_status(message: str) → None

Failure status means a technical error has occurred. If retried the job may succeed. For example: connection to DB lost

Parameters:

message (str) – The failure message that Zeebe will receive

Raises:
  • NoZeebeAdapter – If the job does not have a configured ZeebeAdapter
  • ZeebeBackPressure – If Zeebe is currently in back pressure (too many requests)
  • ZeebeGatewayUnavailable – If the Zeebe gateway is unavailable
  • ZeebeInternalError – If Zeebe experiences an internal error
set_success_status() → None

Success status means that the job has been completed as intended.

Raises:
  • NoZeebeAdapter – If the job does not have a configured ZeebeAdapter
  • ZeebeBackPressure – If Zeebe is currently in back pressure (too many requests)
  • ZeebeGatewayUnavailable – If the Zeebe gateway is unavailable
  • ZeebeInternalError – If Zeebe experiences an internal error
class pyzeebe.JobStatus

An enumeration.

Completed = 'Completed'
ErrorThrown = 'ErrorThrown'
Failed = 'Failed'
Running = 'Running'