dramatiq-abort¶
When running dramatiq task, it might be useful to abort enqueued or even running actors.
The dramatiq-abort middleware allows to signal a task termination from any dramatiq client using a distributed event backend. At the moment, only a Redis backend exists, but the API provide a simple interface to add more.
Installing dramatiq-abort¶
Install with pip:
pip install dramatiq-abort[redis]
Configuring dramatiq-abort¶
dramatiq-abort is configured by creating a backend object and adding the
Abortable middleware to the dramatiq Broker instance.
from dramatiq import get_broker
from dramatiq_abort import Abortable, backends
event_backend = backends.RedisBackend()
abortable = Abortable(backend=event_backend)
get_broker.add_middleware(abortable)
Abort a task¶
When an actor is sent to a worker, a message instance is returned with the
message id. This message id can be kept somewhere so it can be used to abort
the enqueued or running task. abort can then be used to signal the task
termination using the message id in two modes:
cancel mode only aborts the enqueued but pending task.
abort mode will abort the pending or running task.
@dramatiq.actor
def my_long_running_task(): ...
message = my_long_running_task.send()
message_id = message.message_id
# the default mode is 'AbortMode.ABORT'
abort(message_id)
abort(message_id, mode=AbortMode.CANCEL)
Gracefully aborting tasks¶
When the abort mode is set to abort, an optional timeout can be provided to allow
for tasks to finish before being aborted.
Running tasks can check if an abort is requested by calling abort_requested,
which returns the number of milliseconds until the task is aborted or None if no
abort is requested.
@dramatiq.actor
def my_long_running_task():
while True:
sleep(0.1) # do work
if abort_requested():
break # stop working
message = my_long_running_task.send()
# signals the task that an abort is requested, allowing 2 seconds for it to finish
abort(message.message_id, mode=AbortMode.ABORT, abort_timeout=2000)
Abort a task using a custom abort_ttl value¶
By default, abort has a limited window of 90,000 milliseconds. This means a worker will skip a task only if the task was aborted up to 90 seconds ago. In case of longer delay in the task processing this value can be overridden.
@dramatiq.actor
def count_words(url): ...
message = count_words.send_with_options(args=("https://example.com",), delay=60*60*1000)
message_id = message.message_id
abort(message_id, abort_ttl=2*60*60*1000)
Abort a task running a subprocess¶
When a worker is waiting on a subprocess, the Abort exception will only be raised
AFTER the subprocess has been completed. The following example is one way to deal with this:
@dramatiq.actor
def my_subprocess():
with subprocess.Popen(*args, **kwargs) as process:
ret = None
try:
while ret is None:
try:
ret = process.wait(timeout=1) # Note: same principle for `process.communicate()`
except subprocess.TimeoutExpired:
pass
except Abort:
process.signal(signal.SIGINT) # or on windows: `os.kill(process.pid, signal.SIGINT)`
process.wait()
API¶
- dramatiq_abort.abort(message_id, middleware=None, abort_ttl=None, mode=AbortMode.ABORT, abort_timeout=0)¶
Abort a pending or running message given its
message_id.- Parameters:
message_id (
str) – Message to abort. Use the return value ofactor.sendoractor.send_with_optionsto then use its.message_idattribute.middleware (
Abortable) –Abortablemiddleware used by the workers and broker used to signal termination. If set toNone, use the default broker fromdramatiq.get_broker()and retrieve the configuredAbortablemiddleware. If noAbortablemiddleware is set on the broker andmiddlewareisNone, raises aRuntimeError.abort_ttl (
Optional[int]) – Change default abort TTL value, optional argument. If set toNonedefault value fromAbortableis used.mode (
AbortMode) – “AbortMode.ABORT” or “AbortMode.CANCEL”. In “cancel” mode, only pending message will be aborted, running message will also be aborted additionally in “abort” mode.abort_timeout (
int) – Only applicable when mode is “AbortMode.ABORT”. If set, signals the running message that an abort is requested and waits for the given number of milliseconds for it to finish before aborting it. Messages can check if an abort is requested by callingabort_requested().
- Return type:
None
- dramatiq_abort.abort_requested(message_id=None, middleware=None)¶
Check if there is an abort request for the current message. Returns the number of milliseconds until the message is aborted via an exception or
Noneif no abort is requested.
- class dramatiq_abort.Abortable(*, backend, abortable=True, abort_ttl=90000)¶
Middleware that interrupts actors whose job has been signaled for termination. Currently, this is only available on CPython.
This middleware also adds an
abortableoption that can be set on dramatiqactorandsend_with_options. Value priority is respectivelysend_with_options,actorand thisAbortable.Note: This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.
- Parameters:
backend (
EventBackend) – Event backend used to signal termination from a broker to the workers. SeeRedisBackend.abortable (
bool) – Set the default value for every actorabortableoption.abort_ttl (
int)
- class dramatiq_abort.Abort¶
Exception used to interrupt worker threads when their worker processes have been signaled to abort.
- class dramatiq_abort.Event(key: str, params: Dict[str, Any])¶
Events are composed of an identifying key and associated parameters.
- class dramatiq_abort.EventBackend¶
ABC for event backends.
- abstractmethod notify(events, ttl)¶
Signal events.
Once notified, a call to
pollorwait_manywith this event should result in a positive result.- Parameters:
keys – Events to signal.
ttl (
int) – Time for the signal to live. The value should be large enough to give time for workers to poll the value, but small enough that the backend doesn’t end up with too many outdated keys not being garbage collected.events (
Iterable[Event])
- Return type:
None
- abstractmethod poll(key)¶
Check if an event has been signaled.
This function should not block and wait for an event to signal. Returns the event if it was signaled,
Noneotherwise. A backend might not be idempotent and once a key has signaled, subsequent calls might returnNone.- Parameters:
key (
str) – Event to check for signal.- Return type:
Optional[Event]
- abstractmethod wait_many(keys, timeout)¶
Wait for either one of the events in
keysto be signaled ortimeoutmilliseconds to elapsed.Returns the event that signaled or
Noneif no event was signaled. A backend might not be idempotent and once a key has signaled, subsequent calls might wait indefinitely.- Parameters:
keys (
Iterable[str]) – List of event to wait for.timeout (
int) – Maximum amount of milliseconds to wait.
- Return type:
Optional[Event]
- class dramatiq_abort.backends.RedisBackend(*, client, namespace='dramatiq:aborts:')¶
An event backend for Redis.
- Parameters:
client (
Any) – The Redis client instance.namespace (
str) – Namespace to prefix keys with.
- classmethod from_url(url, *args, **kwargs)¶
Initialize the backend using an URL to the Redis server. Any extra parameters are passed on to the default constructor.
- Parameters:
url (
str) – Redis server URL.args (
Any)kwargs (
Any)
- Return type: