# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:[email protected]
#
# This file is part of logilab-common.
#
# logilab-common is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option) any
# later version.
#
# logilab-common is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with logilab-common. If not, see <http://www.gnu.org/licenses/>.
"""module providing:
* process information (linux specific: rely on /proc)
* a class for resource control (memory / time / cpu time)
This module doesn't work on windows platforms (only tested on linux)
:organization: Logilab
"""
__docformat__ = "restructuredtext en"
import os
import stat
from resource import getrlimit, setrlimit, RLIMIT_CPU, RLIMIT_AS
from signal import signal, SIGXCPU, SIGKILL, SIGUSR2, SIGUSR1
from threading import Timer, currentThread, Thread, Event
from time import time
from logilab.common.tree import Node
class NoSuchProcess(Exception): pass
def proc_exists(pid):
"""check the a pid is registered in /proc
raise NoSuchProcess exception if not
"""
if not os.path.exists('/proc/%s' % pid):
raise NoSuchProcess()
PPID = 3
UTIME = 13
STIME = 14
CUTIME = 15
CSTIME = 16
VSIZE = 22
class ProcInfo(Node):
"""provide access to process information found in /proc"""
def __init__(self, pid):
self.pid = int(pid)
Node.__init__(self, self.pid)
proc_exists(self.pid)
self.file = '/proc/%s/stat' % self.pid
self.ppid = int(self.status()[PPID])
def memory_usage(self):
"""return the memory usage of the process in Ko"""
try :
return int(self.status()[VSIZE])
except IOError:
return 0
def lineage_memory_usage(self):
return self.memory_usage() + sum([child.lineage_memory_usage()
for child in self.children])
def time(self, children=0):
"""return the number of jiffies that this process has been scheduled
in user and kernel mode"""
status = self.status()
time = int(status[UTIME]) + int(status[STIME])
if children:
time += int(status[CUTIME]) + int(status[CSTIME])
return time
def status(self):
"""return the list of fields found in /proc/<pid>/stat"""
return open(self.file).read().split()
def name(self):
"""return the process name found in /proc/<pid>/stat
"""
return self.status()[1].strip('()')
def age(self):
"""return the age of the process
"""
return os.stat(self.file)[stat.ST_MTIME]
class ProcInfoLoader:
"""manage process information"""
def __init__(self):
self._loaded = {}
def list_pids(self):
"""return a list of existent process ids"""
for subdir in os.listdir('/proc'):
if subdir.isdigit():
yield int(subdir)
def load(self, pid):
"""get a ProcInfo object for a given pid"""
pid = int(pid)
try:
return self._loaded[pid]
except KeyError:
procinfo = ProcInfo(pid)
procinfo.manager = self
self._loaded[pid] = procinfo
return procinfo
def load_all(self):
"""load all processes information"""
for pid in self.list_pids():
try:
procinfo = self.load(pid)
if procinfo.parent is None and procinfo.ppid:
pprocinfo = self.load(procinfo.ppid)
pprocinfo.append(procinfo)
except NoSuchProcess:
pass
try:
class ResourceError(BaseException):
"""Error raise when resource limit is reached"""
limit = "Unknown Resource Limit"
except NameError:
class ResourceError(Exception):
"""Error raise when resource limit is reached"""
limit = "Unknown Resource Limit"
class XCPUError(ResourceError):
"""Error raised when CPU Time limit is reached"""
limit = "CPU Time"
class LineageMemoryError(ResourceError):
"""Error raised when the total amount of memory used by a process and
it's child is reached"""
limit = "Lineage total Memory"
class TimeoutError(ResourceError):
"""Error raised when the process is running for to much time"""
limit = "Real Time"
# Can't use subclass because the StandardError MemoryError raised
RESOURCE_LIMIT_EXCEPTION = (ResourceError, MemoryError)
class MemorySentinel(Thread):
"""A class checking a process don't use too much memory in a separated
daemonic thread
"""
def __init__(self, interval, memory_limit, gpid=os.getpid()):
Thread.__init__(self, target=self._run, name="Test.Sentinel")
self.memory_limit = memory_limit
self._stop = Event()
self.interval = interval
self.setDaemon(True)
self.gpid = gpid
def stop(self):
"""stop ap"""
self._stop.set()
def _run(self):
pil = ProcInfoLoader()
while not self._stop.isSet():
if self.memory_limit <= pil.load(self.gpid).lineage_memory_usage():
os.killpg(self.gpid, SIGUSR1)
self._stop.wait(self.interval)
class ResourceController:
def __init__(self, max_cpu_time=None, max_time=None, max_memory=None,
max_reprieve=60):
if SIGXCPU == -1:
raise RuntimeError("Unsupported platform")
self.max_time = max_time
self.max_memory = max_memory
self.max_cpu_time = max_cpu_time
self._reprieve = max_reprieve
self._timer = None
self._msentinel = None
self._old_max_memory = None
self._old_usr1_hdlr = None
self._old_max_cpu_time = None
self._old_usr2_hdlr = None
self._old_sigxcpu_hdlr = None
self._limit_set = 0
self._abort_try = 0
self._start_time = None
self._elapse_time = 0
def _hangle_sig_timeout(self, sig, frame):
raise TimeoutError()
def _hangle_sig_memory(self, sig, frame):
if self._abort_try < self._reprieve:
self._abort_try += 1
raise LineageMemoryError("Memory limit reached")
else:
os.killpg(os.getpid(), SIGKILL)
def _handle_sigxcpu(self, sig, frame):
if self._abort_try < self._reprieve:
self._abort_try += 1
raise XCPUError("Soft CPU time limit reached")
else:
os.killpg(os.getpid(), SIGKILL)
def _time_out(self):
if self._abort_try < self._reprieve:
self._abort_try += 1
os.killpg(os.getpid(), SIGUSR2)
if self._limit_set > 0:
self._timer = Timer(1, self._time_out)
self._timer.start()
else:
os.killpg(os.getpid(), SIGKILL)
def setup_limit(self):
"""set up the process limit"""
assert currentThread().getName() == 'MainThread'
os.setpgrp()
if self._limit_set <= 0:
if self.max_time is not None:
self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
self._time_out)
self._start_time = int(time())
self._timer.start()
if self.max_cpu_time is not None:
self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
setrlimit(RLIMIT_CPU, cpu_limit)
if self.max_memory is not None:
self._msentinel = MemorySentinel(1, int(self.max_memory) )
self._old_max_memory = getrlimit(RLIMIT_AS)
self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
as_limit = (int(self.max_memory), self._old_max_memory[1])
setrlimit(RLIMIT_AS, as_limit)
self._msentinel.start()
self._limit_set += 1
def clean_limit(self):
"""reinstall the old process limit"""
if self._limit_set > 0:
if self.max_time is not None:
self._timer.cancel()
self._elapse_time += int(time())-self._start_time
self._timer = None
signal(SIGUSR2, self._old_usr2_hdlr)
if self.max_cpu_time is not None:
setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
signal(SIGXCPU, self._old_sigxcpu_hdlr)
if self.max_memory is not None:
self._msentinel.stop()
self._msentinel = None
setrlimit(RLIMIT_AS, self._old_max_memory)
signal(SIGUSR1, self._old_usr1_hdlr)
self._limit_set -= 1