.. dramatiq-abort documentation master file, created by sphinx-quickstart on Thu Dec 26 14:09:29 2019. You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. .. highlight:: python dramatiq-abort ============== .. toctree:: Documentation Source .. module:: dramatiq-abort When running `dramatiq`_ task, it might be useful to abort enqueued or even running actors. The **dramatiq-abort** middleware allows to signal a task termination from any dramatiq client using a distributed event backend. At the moment, only a Redis backend exists, but the API provide a simple interface to add more. Installing dramatiq-abort ------------------------- Install with **pip**:: pip install dramatiq-abort[redis] Configuring dramatiq-abort -------------------------- **dramatiq-abort** is configured by creating a backend object and adding the ``Abortable`` middleware to the **dramatiq** ``Broker`` instance. .. code-block:: from dramatiq import get_broker from dramatiq_abort import Abortable, backends event_backend = backends.RedisBackend() abortable = Abortable(backend=event_backend) get_broker.add_middleware(abortable) Abort a task ------------ When an actor is sent to a worker, a message instance is returned with the message id. This message id can be kept somewhere so it can be used to abort the enqueued or running task. :any:`abort` can then be used to signal the task termination using the message id in two modes: - `cancel` mode only aborts the enqueued but pending task. - `abort` mode will abort the pending or running task. .. code-block:: @dramatiq.actor def my_long_running_task(): ... message = my_long_running_task.send() message_id = message.message_id # the default mode is 'AbortMode.ABORT' abort(message_id) abort(message_id, mode=AbortMode.CANCEL) Gracefully aborting tasks ------------------------- When the abort mode is set to :any:`abort`, an optional timeout can be provided to allow for tasks to finish before being aborted. Running tasks can check if an abort is requested by calling :any:`abort_requested`, which returns the number of milliseconds until the task is aborted or ``None`` if no abort is requested. .. code-block:: @dramatiq.actor def my_long_running_task(): while True: sleep(0.1) # do work if abort_requested(): break # stop working message = my_long_running_task.send() # signals the task that an abort is requested, allowing 2 seconds for it to finish abort(message.message_id, mode=AbortMode.ABORT, abort_timeout=2000) Abort a task using a custom abort_ttl value ------------------------------------------- By default, abort has a limited window of 90,000 milliseconds. This means a worker will skip a task only if the task was aborted up to 90 seconds ago. In case of longer delay in the task processing this value can be overridden. .. code-block:: @dramatiq.actor def count_words(url): ... message = count_words.send_with_options(args=("https://example.com",), delay=60*60*1000) message_id = message.message_id abort(message_id, abort_ttl=2*60*60*1000) Abort a task running a subprocess --------------------------------- When a worker is waiting on a subprocess, the :any:`Abort` exception will only be raised AFTER the subprocess has been completed. The following example is one way to deal with this: .. code-block:: @dramatiq.actor def my_subprocess(): with subprocess.Popen(*args, **kwargs) as process: ret = None try: while ret is None: try: ret = process.wait(timeout=1) # Note: same principle for `process.communicate()` except subprocess.TimeoutExpired: pass except Abort: process.signal(signal.SIGINT) # or on windows: `os.kill(process.pid, signal.SIGINT)` process.wait() API --- .. module:: dramatiq_abort .. autofunction:: abort .. autofunction:: abort_requested .. autoclass:: Abortable .. autoclass:: Abort .. autoclass:: Event .. autoclass:: EventBackend :members: .. module:: dramatiq_abort.backends .. autoclass:: RedisBackend :members: from_url .. _dramatiq: https://dramatiq.io .. _GitHub: https://github.com/Flared/dramatiq-abort