From 41ed0537b417a13d5ba47edeee7c87c1b61e2476 Mon Sep 17 00:00:00 2001 From: Adam Bishop Date: Wed, 30 Sep 2020 05:50:40 +0100 Subject: [PATCH] Fix midnight poller data loss (#11582) * Handle more signals * Flush buffers before exiting process This ensures log messages aren't lost * Restart process before jobs have finished If there is a very log running job it can cause service restart to take over 5 minutes. We tweak the order of things to make sure that running processes continue, but nothing more is scheduled. The worst case impact is that a pollling/discovery job gets scheduled twice, but this should not be a big issue - this should only occur at most once per day. * Remove python 3.8 feature * Ensure that processes from the previous invocation are reaped * Correct typo's * Attach subprocess descriptors to /dev/null Occasionally, PHP would throw a fit and crash when its stdout went away. To avoid this, we attach stdout to devnull. This means we lost output of daily.sh - but this is already recorded in $LOGDIR/daily.log * Don't immediately schedule long running jobs To avoid the situation where the maintenance reload happens or a sighup, then a second long running job is immediately started, we wait (`last_[poll/discovery]_timetaken` * 1.25) seconds before scheduling any jobs. * Add `psutil` to requirements * Add support for "systemctl reload" to the unit files * Add a fallback for systems that don't have psutil * Reduce CPU load when psutil is not installed * Don't avoid double polling by extending the timeout This shouldn't happen due to locks * Remove fallback option * Remove extra variable * Fix issue introduced during rebase * Fix issue introduced when fixing issue introduced during rebase * Make psutil optional --- LibreNMS/__init__.py | 2 +- LibreNMS/service.py | 116 +++++++++++++++++++++++---- doc/Extensions/Dispatcher-Service.md | 3 +- misc/librenms.service | 1 + misc/librenms.service.scl | 1 + requirements.txt | 1 + 6 files changed, 105 insertions(+), 19 deletions(-) diff --git a/LibreNMS/__init__.py b/LibreNMS/__init__.py index 4099cb8a90..fe9610a8bf 100644 --- a/LibreNMS/__init__.py +++ b/LibreNMS/__init__.py @@ -36,7 +36,7 @@ def call_script(script, args=()): cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args)) debug("Running {}".format(cmd)) # preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default) - return subprocess.check_output(cmd, stderr=subprocess.STDOUT, preexec_fn=os.setsid).decode() + return subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, preexec_fn=os.setsid, close_fds=True) class DB: diff --git a/LibreNMS/service.py b/LibreNMS/service.py index d695dae447..042af21404 100644 --- a/LibreNMS/service.py +++ b/LibreNMS/service.py @@ -9,13 +9,18 @@ import threading import sys import time +try: + import psutil +except ImportError: + pass + from datetime import timedelta from datetime import datetime from logging import debug, info, warning, error, critical, exception from platform import python_version from time import sleep from socket import gethostname -from signal import signal, SIGTERM +from signal import signal, SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGCHLD, SIG_DFL from uuid import uuid1 @@ -258,14 +263,18 @@ class Service: config = ServiceConfig() _fp = False _started = False + start_time = 0 queue_managers = {} poller_manager = None discovery_manager = None last_poll = {} + reap_flag = False terminate_flag = False + reload_flag = False db_failures = 0 def __init__(self): + self.start_time = time.time() self.config.populate() self._db = LibreNMS.DB(self.config) self.config.load_poller_config(self._db) @@ -283,9 +292,38 @@ class Service: info("Watchdog is disabled.") self.is_master = False + def service_age(self): + return time.time() - self.start_time + def attach_signals(self): info("Attaching signal handlers on thread %s", threading.current_thread().name) signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully + signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully + signal(SIGINT, self.terminate) # capture sigint and exit gracefully + signal(SIGHUP, self.reload) # capture sighup and restart gracefully + + if 'psutil' not in sys.modules: + warning("psutil is not available, polling gap possible") + else: + signal(SIGCHLD, self.reap) # capture sigchld and reap the process + + def reap_psutil(self): + """ + A process from a previous invocation is trying to report its status + """ + # Speed things up by only looking at direct zombie children + for p in psutil.Process().children(recursive=False): + try: + cmd = p.cmdline() # cmdline is uncached, so needs to go here to avoid NoSuchProcess + status = p.status() + + if status == psutil.STATUS_ZOMBIE: + pid = p.pid + r = os.waitpid(p.pid, os.WNOHANG) + warning('Reaped long running job "%s" in state %s with PID %d - job returned %d', cmd, status, r[0], r[1]) + except (OSError, psutil.NoSuchProcess): + # process was already reaped + continue def start(self): debug("Performing startup checks...") @@ -330,6 +368,17 @@ class Service: # Main dispatcher loop try: while not self.terminate_flag: + if self.reload_flag: + info("Picked up reload flag, calling the reload process") + self.restart() + + if self.reap_flag: + self.reap_psutil() + + # Re-arm the signal handler + signal(SIGCHLD, self.reap) + self.reap_flag = False + master_lock = self._acquire_master() if master_lock: if not self.is_master: @@ -392,7 +441,7 @@ class Service: result = self._db.query('''SELECT `device_id`, `poller_group`, COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND), 1) AS `poll`, - IF(snmp_disable=1 OR status=0, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1)) AS `discover` + IF(snmp_disable=1 OR status=0, 0, IF (%s < `last_discovered_timetaken` * 1.25, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1))) AS `discover` FROM `devices` WHERE `disabled` = 0 AND ( `last_polled` IS NULL OR @@ -400,7 +449,7 @@ class Service: `last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND) OR `last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND) ) - ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, discovery_find_time, poller_find_time, discovery_find_time)) + ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, self.service_age(), discovery_find_time, poller_find_time, discovery_find_time)) self.db_failures = 0 return result except pymysql.err.Error: @@ -460,12 +509,12 @@ class Service: if self.config.distributed: critical("ERROR: Redis connection required for distributed polling") critical("Please install redis-py, either through your os software repository or from PyPI") - sys.exit(2) + self.exit(2) except Exception as e: if self.config.distributed: critical("ERROR: Redis connection required for distributed polling") critical("Could not connect to Redis. {}".format(e)) - sys.exit(2) + self.exit(2) return LibreNMS.ThreadingLock() @@ -480,26 +529,53 @@ class Service: return info('Restarting service... ') - self._stop_managers_and_wait() + + if 'psutil' not in sys.modules: + warning("psutil is not available, polling gap possible") + self._stop_managers_and_wait() + else: + self._stop_managers() self._release_master() python = sys.executable + sys.stdout.flush() os.execl(python, python, *sys.argv) - def terminate(self, _unused=None, _=None): + def reap(self, signalnum=None, flag=None): + """ + Handle a set the reload flag to begin a clean restart + :param signalnum: UNIX signal number + :param flag: Flags accompanying signal + """ + if (signal(SIGCHLD, SIG_DFL) == SIG_DFL): + # signal is already being handled, bail out as this handler is not reentrant - the kernel will re-raise the signal later + return + + self.reap_flag = True + + def reload(self, signalnum=None, flag=None): + """ + Handle a set the reload flag to begin a clean restart + :param signalnum: UNIX signal number + :param flag: Flags accompanying signal + """ + info("Received signal on thread %s, handling", threading.current_thread().name) + self.reload_flag = True + + def terminate(self, signalnum=None, flag=None): """ Handle a set the terminate flag to begin a clean shutdown - :param _unused: - :param _: + :param signalnum: UNIX signal number + :param flag: Flags accompanying signal """ - info("Received SIGTERM on thead %s, handling", threading.current_thread().name) + info("Received signal on thread %s, handling", threading.current_thread().name) self.terminate_flag = True - def shutdown(self, _unused=None, _=None): + def shutdown(self, signalnum=None, flag=None): """ Stop and exit, waiting for all child processes to exit. - :param _unused: - :param _: + :param signalnum: UNIX signal number + :param flag: Flags accompanying signal """ info('Shutting down, waiting for running jobs to complete...') @@ -515,7 +591,7 @@ class Service: # try to release master lock info('Shutdown of %s/%s complete', os.getpid(), threading.current_thread().name) - sys.exit(0) + self.exit(0) def start_dispatch_timers(self): """ @@ -538,13 +614,16 @@ class Service: except AttributeError: pass + def _stop_managers(self): + for manager in self.queue_managers.values(): + manager.stop() + def _stop_managers_and_wait(self): """ Stop all QueueManagers, and wait for their processing threads to complete. We send the stop signal to all QueueManagers first, then wait for them to finish. """ - for manager in self.queue_managers.values(): - manager.stop() + self._stop_managers() for manager in self.queue_managers.values(): manager.stop_and_wait() @@ -564,7 +643,7 @@ class Service: fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: warning("Another instance is already running, quitting.") - exit(2) + self.exit(2) def log_performance_stats(self): info("Counting up time spent polling") @@ -611,3 +690,6 @@ class Service: else: info("Log file updated {}s ago".format(int(logfile_mdiff))) + def exit(self, code=0): + sys.stdout.flush() + sys.exit(code) diff --git a/doc/Extensions/Dispatcher-Service.md b/doc/Extensions/Dispatcher-Service.md index cc8ef8b8c2..f778fe6d01 100644 --- a/doc/Extensions/Dispatcher-Service.md +++ b/doc/Extensions/Dispatcher-Service.md @@ -22,6 +22,7 @@ behaviour only found in Python3.4+. install. MySQLclient can also be used, but does require compilation. - python-dotenv .env loader - redis-py 3.0+ and Redis 5.0+ server (if using distributed polling) +- psutil These can be obtained from your OS package manager, or from PyPI with the below commands. @@ -196,7 +197,7 @@ First, enable SCL's on your system: Then install and configure the runtime and service: ``` -# yum install rh-python36 epel-release +# yum install gcc rh-python36 rh-python36-python-devel epel-release # yum --enablerepo=remi install redis # vi /opt/librenms/config.php # vi /etc/redis.conf diff --git a/misc/librenms.service b/misc/librenms.service index e0363d1d41..e2644bb6bf 100644 --- a/misc/librenms.service +++ b/misc/librenms.service @@ -4,6 +4,7 @@ After=network.target [Service] ExecStart=/opt/librenms/librenms-service.py -v +ExecReload=/bin/kill -HUP $MAINPID WorkingDirectory=/opt/librenms User=librenms Group=librenms diff --git a/misc/librenms.service.scl b/misc/librenms.service.scl index da870be3ae..cf230771ea 100644 --- a/misc/librenms.service.scl +++ b/misc/librenms.service.scl @@ -4,6 +4,7 @@ After=network.target [Service] ExecStart=/usr/bin/scl enable rh-python36 -- /opt/librenms/librenms-service.py -v +ExecReload=/bin/kill -HUP $MAINPID WorkingDirectory=/opt/librenms User=librenms Group=librenms diff --git a/requirements.txt b/requirements.txt index 750ba58d62..499fee0055 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ PyMySQL python-dotenv redis>=3.0 setuptools +psutil \ No newline at end of file