Posts Tagged ‘Add new tag’

Killing child processes of Celery tasks, on a timeout

Wednesday, September 7th, 2011

This is a quick Monkeypatch to make sure that all child processes are killed when a worker is killed.

from celery.worker import state
import celery.worker.job
from celery import exceptions

def kill_child_processes(parent_pid, sig=signal.SIGTERM, top=True):
        ps_command = subprocess.Popen("ps -o pid --ppid %d --noheaders" % parent_pid, shell=True, stdout=subprocess.PIPE)
        ps_output = ps_command.stdout.read()
        retcode = ps_command.wait()
        if retcode != 0: return
        for pid_str in ps_output.split("n")[:-1]:
            try:
                kill_child_processes(int(pid_str), sig, top=False)

                if not top: os.kill(int(pid_str), sig)
            except OSError: pass


def on_timeout(self, soft, timeout):
    """Handler called if the task times out."""
    state.task_ready(self)
    if soft:
        self.logger.warning("Soft time limit (%ss) exceeded for %s[%s]" % (
            timeout, self.task_name, self.task_id))
        exc = exceptions.SoftTimeLimitExceeded(timeout)
    else:
        kill_child_processes(self.worker_pid)
        self.logger.error("Hard time limit (%ss) exceeded for %s[%s]" % (
            timeout, self.task_name, self.task_id))
        exc = exceptions.TimeLimitExceeded(timeout)

    if self._store_errors:
        self.task.backend.mark_as_failure(self.task_id, exc)


celery.worker.job.TaskRequest.on_timeout = on_timeout


Copyright © 2018 All Rights Reserved.
No computers were harmed in the 0.081 seconds it took to produce this page.

dmarkey.com