dramatiq-abort¶
When running dramatiq task, it might be useful to abort enqueued or even running actors.
The dramatiq-abort middleware allows to signal a task termination from any dramatiq client using a distributed event backend. At the moment, only a Redis backend exists, but the API provide a simple interface to add more.
Installing dramatiq-abort¶
Install with pip:
pip install dramatiq-abort[redis]
Configuring dramatiq-abort¶
dramatiq-abort is configured by creating a backend object and adding the
Abortable
middleware to the dramatiq Broker
instance.
from dramatiq import get_broker
from dramatiq_abort import Abortable, backends
event_backend = backends.RedisBackend()
abortable = Abortable(backend=event_backend)
get_broker.add_middleware(abortable)
Abort a task¶
When an actor is sent to a worker, a message instance is returned with the
message id. This message id can be kept somewhere so it can be used to abort
the enqueued or running task. abort
can then be used to signal the task
termination using the message id in two modes:
cancel mode only aborts the enqueued but pending task.
abort mode will abort the pending or running task.
@dramatiq.actor
def my_long_running_task(): ...
message = my_long_running_task.send()
message_id = message.message_id
# the default mode is 'AbortMode.ABORT'
abort(message_id)
abort(message_id, mode=AbortMode.CANCEL)
Gracefully aborting tasks¶
When the abort mode is set to abort
, an optional timeout can be provided to allow
for tasks to finish before being aborted.
Running tasks can check if an abort is requested by calling abort_requested
,
which returns the number of milliseconds until the task is aborted or None
if no
abort is requested.
@dramatiq.actor
def my_long_running_task():
while True:
sleep(0.1) # do work
if abort_requested():
break # stop working
message = my_long_running_task.send()
# signals the task that an abort is requested, allowing 2 seconds for it to finish
abort(message.message_id, mode=AbortMode.ABORT, abort_timeout=2000)
Abort a task using a custom abort_ttl value¶
By default, abort has a limited window of 90,000 milliseconds. This means a worker will skip a task only if the task was aborted up to 90 seconds ago. In case of longer delay in the task processing this value can be overridden.
@dramatiq.actor
def count_words(url): ...
message = count_words.send_with_options(args=("https://example.com",), delay=60*60*1000)
message_id = message.message_id
abort(message_id, abort_ttl=2*60*60*1000)
Abort a task running a subprocess¶
When a worker is waiting on a subprocess, the Abort
exception will only be raised
AFTER the subprocess has been completed. The following example is one way to deal with this:
@dramatiq.actor
def my_subprocess():
with subprocess.Popen(*args, **kwargs) as process:
ret = None
try:
while ret is None:
try:
ret = process.wait(timeout=1) # Note: same principle for `process.communicate()`
except subprocess.TimeoutExpired:
pass
except Abort:
process.signal(signal.SIGINT) # or on windows: `os.kill(process.pid, signal.SIGINT)`
process.wait()
API¶
- dramatiq_abort.abort(message_id, middleware=None, abort_ttl=None, mode=AbortMode.ABORT, abort_timeout=0)¶
Abort a pending or running message given its
message_id
.- Parameters:
message_id (
str
) – Message to abort. Use the return value ofactor.send
oractor.send_with_options
to then use its.message_id
attribute.middleware (
Abortable
) –Abortable
middleware used by the workers and broker used to signal termination. If set toNone
, use the default broker fromdramatiq.get_broker()
and retrieve the configuredAbortable
middleware. If noAbortable
middleware is set on the broker andmiddleware
isNone
, raises aRuntimeError
.abort_ttl (
Optional
[int
]) – Change default abort TTL value, optional argument. If set toNone
default value fromAbortable
is used.mode (
AbortMode
) – “AbortMode.ABORT” or “AbortMode.CANCEL”. In “cancel” mode, only pending message will be aborted, running message will also be aborted additionally in “abort” mode.abort_timeout (
int
) – Only applicable when mode is “AbortMode.ABORT”. If set, signals the running message that an abort is requested and waits for the given number of milliseconds for it to finish before aborting it. Messages can check if an abort is requested by callingabort_requested()
.
- Return type:
None
- dramatiq_abort.abort_requested(message_id=None, middleware=None)¶
Check if there is an abort request for the current message. Returns the number of milliseconds until the message is aborted via an exception or
None
if no abort is requested.
- class dramatiq_abort.Abortable(*, backend, abortable=True, abort_ttl=90000)¶
Middleware that interrupts actors whose job has been signaled for termination. Currently, this is only available on CPython.
This middleware also adds an
abortable
option that can be set on dramatiqactor
andsend_with_options
. Value priority is respectivelysend_with_options
,actor
and thisAbortable
.Note: This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.
- Parameters:
backend (
EventBackend
) – Event backend used to signal termination from a broker to the workers. SeeRedisBackend
.abortable (
bool
) – Set the default value for every actorabortable
option.abort_ttl (
int
) –
- class dramatiq_abort.Abort¶
Exception used to interrupt worker threads when their worker processes have been signaled to abort.
- class dramatiq_abort.Event(key: str, params: Dict[str, Any])¶
Events are composed of an identifying key and associated parameters.
- class dramatiq_abort.EventBackend¶
ABC for event backends.
- abstract notify(events, ttl)¶
Signal events.
Once notified, a call to
poll
orwait_many
with this event should result in a positive result.- Parameters:
keys – Events to signal.
ttl (
int
) – Time for the signal to live. The value should be large enough to give time for workers to poll the value, but small enough that the backend doesn’t end up with too many outdated keys not being garbage collected.events (
Iterable
[Event
]) –
- Return type:
None
- abstract poll(key)¶
Check if an event has been signaled.
This function should not block and wait for an event to signal. Returns the event if it was signaled,
None
otherwise. A backend might not be idempotent and once a key has signaled, subsequent calls might returnNone
.- Parameters:
key (
str
) – Event to check for signal.- Return type:
Optional
[Event
]
- abstract wait_many(keys, timeout)¶
Wait for either one of the events in
keys
to be signaled ortimeout
milliseconds to elapsed.Returns the event that signaled or
None
if no event was signaled. A backend might not be idempotent and once a key has signaled, subsequent calls might wait indefinitely.- Parameters:
keys (
Iterable
[str
]) – List of event to wait for.timeout (
int
) – Maximum amount of milliseconds to wait.
- Return type:
Optional
[Event
]
- class dramatiq_abort.backends.RedisBackend(*, client, namespace='dramatiq:aborts:')¶
An event backend for Redis.
- Parameters:
client (
Any
) – The Redis client instance.namespace (
str
) – Namespace to prefix keys with.
- classmethod from_url(url, *args, **kwargs)¶
Initialize the backend using an URL to the Redis server. Any extra parameters are passed on to the default constructor.
- Parameters:
url (
str
) – Redis server URL.args (
Any
) –kwargs (
Any
) –
- Return type: