dramatiq-abort

When running dramatiq task, it might be useful to abort enqueued or even running actors.

The dramatiq-abort middleware allows to signal a task termination from any dramatiq client using a distributed event backend. At the moment, only a Redis backend exists, but the API provide a simple interface to add more.

Installing dramatiq-abort

Install with pip:

pip install dramatiq-abort[redis]

Configuring dramatiq-abort

dramatiq-abort is configured by creating a backend object and adding the Abortable middleware to the dramatiq Broker instance.

from dramatiq import get_broker
from dramatiq_abort import Abortable, backends

event_backend = backends.RedisBackend()
abortable = Abortable(backend=event_backend)
get_broker.add_middleware(abortable)

Abort a task

When an actor is sent to a worker, a message instance is returned with the message id. This message id can be kept somewhere so it can be used to abort the enqueued or running task. abort can then be used to signal the task termination using the message id.

@dramatiq.actor
def my_long_running_task(): ...

message = my_long_running_task.send()
message_id = message.message_id

abort(message_id)

API

dramatiq_abort.abort(message_id, middleware=None)

Abort a pending or running message given its message_id.

Parameters
  • message_id (str) – Message to abort. Use the return value of actor.send or actor.send_with_options to then use its .message_id attribute.

  • middleware (Abortable) – Abortable middleware used by the workers and broker used to signal termination. If set to None, use the default broker from dramatiq.get_broker() and retrieve the configured Abortable middleware. If no Abortable middleware is set on the broker and middleware is None, raises a RuntimeError.

Return type

None

class dramatiq_abort.Abortable(*, backend, abortable=True)

Middleware that interrupts actors whose job has been signaled for termination. Currently, this is only available on CPython.

This middleware also adds an abortable option that can be set on dramatiq actor and send_with_options. Value priority is respectively send_with_options, actor and this Abortable.

Note: This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.

Parameters
  • backend (EventBackend) – Event backend used to signal termination from a broker to the workers. See RedisBackend.

  • abortable (bool) – Set the default value for every actor abortable option.

class dramatiq_abort.Abort

Exception used to interrupt worker threads when their worker processes have been signaled to abort.

class dramatiq_abort.EventBackend

ABC for event backends.

abstract notify(key, ttl)

Signal an event.

Once notified, a call to poll or wait_many with this event should result in a positive result.

Parameters
  • key (bytes) – Event to signal.

  • ttl (int) – Time for the signal to live. The value should be large enough to give time for workers to poll the value, but small enough that the backend doesn’t end up with too many outdated keys not being garbage collected.

Return type

None

abstract poll(key)

Check if an event has been signaled.

This function should not block and wait for an event to signal. Returns True if the event was signaled, False otherwise. A backend might not be idempotent and once a key has signaled, subsequent call returns False.

Parameters

key (bytes) – Event to check for signal.

Return type

bool

abstract wait_many(keys, timeout)

Wait for either one of the event in keys to be signaled or timeout milliseconds to elapsed.

Returns the event that signaled or None if no event was signaled. A backend might not be idempotent and once a key has signaled, subsequent calls might wait indefinitely.

Parameters
  • keys (List[bytes]) – List of event to wait for.

  • timeout (int) – Maximum amount of milliseconds to wait.

Return type

Optional[bytes]

class dramatiq_abort.backends.RedisBackend(*, client)

An event backend for Redis.

Parameters

client (Any) – The Redis client instance.

classmethod from_url(url)

Initialize the backend using an URL to the Redis server.

Parameters

url (str) – Redis server URL.

Return type

RedisBackend