dramatiq-abort

When running dramatiq task, it might be useful to abort enqueued or even running actors.

The dramatiq-abort middleware allows to signal a task termination from any dramatiq client using a distributed event backend. At the moment, only a Redis backend exists, but the API provide a simple interface to add more.

Installing dramatiq-abort

Install with pip:

pip install dramatiq-abort[redis]

Configuring dramatiq-abort

dramatiq-abort is configured by creating a backend object and adding the Abortable middleware to the dramatiq Broker instance.

from dramatiq import get_broker
from dramatiq_abort import Abortable, backends

event_backend = backends.RedisBackend()
abortable = Abortable(backend=event_backend)
get_broker.add_middleware(abortable)

Abort a task

When an actor is sent to a worker, a message instance is returned with the message id. This message id can be kept somewhere so it can be used to abort the enqueued or running task. abort can then be used to signal the task termination using the message id in two modes:

  • cancel mode only aborts the enqueued but pending task.

  • abort mode will abort the pending or running task.

@dramatiq.actor
def my_long_running_task(): ...

message = my_long_running_task.send()
message_id = message.message_id

# the default mode is 'AbortMode.ABORT'
abort(message_id)
abort(message_id, mode=AbortMode.CANCEL)

Gracefully aborting tasks

When the abort mode is set to abort, an optional timeout can be provided to allow for tasks to finish before being aborted. Running tasks can check if an abort is requested by calling abort_requested, which returns the number of milliseconds until the task is aborted or None if no abort is requested.

@dramatiq.actor
def my_long_running_task():
  while True:
    sleep(0.1) # do work
    if abort_requested():
      break # stop working

message = my_long_running_task.send()

# signals the task that an abort is requested, allowing 2 seconds for it to finish
abort(message.message_id, mode=AbortMode.ABORT, abort_timeout=2000)

Abort a task using a custom abort_ttl value

By default, abort has a limited window of 90,000 milliseconds. This means a worker will skip a task only if the task was aborted up to 90 seconds ago. In case of longer delay in the task processing this value can be overridden.

@dramatiq.actor
def count_words(url): ...

message = count_words.send_with_options(args=("https://example.com",), delay=60*60*1000)
message_id = message.message_id

abort(message_id, abort_ttl=2*60*60*1000)

Abort a task running a subprocess

When a worker is waiting on a subprocess, the Abort exception will only be raised AFTER the subprocess has been completed. The following example is one way to deal with this:

@dramatiq.actor
def my_subprocess():
  with subprocess.Popen(*args, **kwargs) as process:
    ret = None
    try:
      while ret is None:
        try:
          ret = process.wait(timeout=1)  # Note: same principle for `process.communicate()`
        except subprocess.TimeoutExpired:
          pass
    except Abort:
      process.signal(signal.SIGINT)  # or on windows: `os.kill(process.pid, signal.SIGINT)`
      process.wait()

API

dramatiq_abort.abort(message_id, middleware=None, abort_ttl=None, mode=AbortMode.ABORT, abort_timeout=0)

Abort a pending or running message given its message_id.

Parameters:
  • message_id (str) – Message to abort. Use the return value of actor.send or actor.send_with_options to then use its .message_id attribute.

  • middleware (Abortable) – Abortable middleware used by the workers and broker used to signal termination. If set to None, use the default broker from dramatiq.get_broker() and retrieve the configured Abortable middleware. If no Abortable middleware is set on the broker and middleware is None, raises a RuntimeError.

  • abort_ttl (Optional[int]) – Change default abort TTL value, optional argument. If set to None default value from Abortable is used.

  • mode (AbortMode) – “AbortMode.ABORT” or “AbortMode.CANCEL”. In “cancel” mode, only pending message will be aborted, running message will also be aborted additionally in “abort” mode.

  • abort_timeout (int) – Only applicable when mode is “AbortMode.ABORT”. If set, signals the running message that an abort is requested and waits for the given number of milliseconds for it to finish before aborting it. Messages can check if an abort is requested by calling abort_requested().

Return type:

None

dramatiq_abort.abort_requested(message_id=None, middleware=None)

Check if there is an abort request for the current message. Returns the number of milliseconds until the message is aborted via an exception or None if no abort is requested.

Parameters:
  • message_id (Optional[str]) – If provided, checks for a abort request for the given message_id instead of the current message.

  • middleware (Optional[Abortable]) – As in abort().

Return type:

Optional[float]

class dramatiq_abort.Abortable(*, backend, abortable=True, abort_ttl=90000)

Middleware that interrupts actors whose job has been signaled for termination. Currently, this is only available on CPython.

This middleware also adds an abortable option that can be set on dramatiq actor and send_with_options. Value priority is respectively send_with_options, actor and this Abortable.

Note: This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.

Parameters:
  • backend (EventBackend) – Event backend used to signal termination from a broker to the workers. See RedisBackend.

  • abortable (bool) – Set the default value for every actor abortable option.

  • abort_ttl (int)

class dramatiq_abort.Abort

Exception used to interrupt worker threads when their worker processes have been signaled to abort.

class dramatiq_abort.Event(key: str, params: Dict[str, Any])

Events are composed of an identifying key and associated parameters.

class dramatiq_abort.EventBackend

ABC for event backends.

abstract notify(events, ttl)

Signal events.

Once notified, a call to poll or wait_many with this event should result in a positive result.

Parameters:
  • keys – Events to signal.

  • ttl (int) – Time for the signal to live. The value should be large enough to give time for workers to poll the value, but small enough that the backend doesn’t end up with too many outdated keys not being garbage collected.

  • events (Iterable[Event])

Return type:

None

abstract poll(key)

Check if an event has been signaled.

This function should not block and wait for an event to signal. Returns the event if it was signaled, None otherwise. A backend might not be idempotent and once a key has signaled, subsequent calls might return None.

Parameters:

key (str) – Event to check for signal.

Return type:

Optional[Event]

abstract wait_many(keys, timeout)

Wait for either one of the events in keys to be signaled or timeout milliseconds to elapsed.

Returns the event that signaled or None if no event was signaled. A backend might not be idempotent and once a key has signaled, subsequent calls might wait indefinitely.

Parameters:
  • keys (Iterable[str]) – List of event to wait for.

  • timeout (int) – Maximum amount of milliseconds to wait.

Return type:

Optional[Event]

class dramatiq_abort.backends.RedisBackend(*, client, namespace='dramatiq:aborts:')

An event backend for Redis.

Parameters:
  • client (Any) – The Redis client instance.

  • namespace (str) – Namespace to prefix keys with.

classmethod from_url(url, *args, **kwargs)

Initialize the backend using an URL to the Redis server. Any extra parameters are passed on to the default constructor.

Parameters:
  • url (str) – Redis server URL.

  • args (Any)

  • kwargs (Any)

Return type:

RedisBackend