Killing child processes of Celery tasks, on a timeout
Wednesday, September 7th, 2011This is a quick Monkeypatch to make sure that all child processes are killed when a worker is killed.
from celery.worker import state
import celery.worker.job
from celery import exceptions
def kill_child_processes(parent_pid, sig=signal.SIGTERM, top=True):
ps_command = subprocess.Popen("ps -o pid --ppid %d --noheaders" % parent_pid, shell=True, stdout=subprocess.PIPE)
ps_output = ps_command.stdout.read()
retcode = ps_command.wait()
if retcode != 0: return
for pid_str in ps_output.split("n")[:-1]:
try:
kill_child_processes(int(pid_str), sig, top=False)
if not top: os.kill(int(pid_str), sig)
except OSError: pass
def on_timeout(self, soft, timeout):
"""Handler called if the task times out."""
state.task_ready(self)
if soft:
self.logger.warning("Soft time limit (%ss) exceeded for %s[%s]" % (
timeout, self.task_name, self.task_id))
exc = exceptions.SoftTimeLimitExceeded(timeout)
else:
kill_child_processes(self.worker_pid)
self.logger.error("Hard time limit (%ss) exceeded for %s[%s]" % (
timeout, self.task_name, self.task_id))
exc = exceptions.TimeLimitExceeded(timeout)
if self._store_errors:
self.task.backend.mark_as_failure(self.task_id, exc)
celery.worker.job.TaskRequest.on_timeout = on_timeout
