Fix midnight poller data loss (#11582)
* Handle more signals * Flush buffers before exiting process This ensures log messages aren't lost * Restart process before jobs have finished If there is a very log running job it can cause service restart to take over 5 minutes. We tweak the order of things to make sure that running processes continue, but nothing more is scheduled. The worst case impact is that a pollling/discovery job gets scheduled twice, but this should not be a big issue - this should only occur at most once per day. * Remove python 3.8 feature * Ensure that processes from the previous invocation are reaped * Correct typo's * Attach subprocess descriptors to /dev/null Occasionally, PHP would throw a fit and crash when its stdout went away. To avoid this, we attach stdout to devnull. This means we lost output of daily.sh - but this is already recorded in $LOGDIR/daily.log * Don't immediately schedule long running jobs To avoid the situation where the maintenance reload happens or a sighup, then a second long running job is immediately started, we wait (`last_[poll/discovery]_timetaken` * 1.25) seconds before scheduling any jobs. * Add `psutil` to requirements * Add support for "systemctl reload" to the unit files * Add a fallback for systems that don't have psutil * Reduce CPU load when psutil is not installed * Don't avoid double polling by extending the timeout This shouldn't happen due to locks * Remove fallback option * Remove extra variable * Fix issue introduced during rebase * Fix issue introduced when fixing issue introduced during rebase * Make psutil optional
This commit is contained in:
parent
a8ca361acd
commit
41ed0537b4
|
@ -36,7 +36,7 @@ def call_script(script, args=()):
|
||||||
cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args))
|
cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args))
|
||||||
debug("Running {}".format(cmd))
|
debug("Running {}".format(cmd))
|
||||||
# preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default)
|
# preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default)
|
||||||
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, preexec_fn=os.setsid).decode()
|
return subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, preexec_fn=os.setsid, close_fds=True)
|
||||||
|
|
||||||
|
|
||||||
class DB:
|
class DB:
|
||||||
|
|
|
@ -9,13 +9,18 @@ import threading
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
try:
|
||||||
|
import psutil
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from logging import debug, info, warning, error, critical, exception
|
from logging import debug, info, warning, error, critical, exception
|
||||||
from platform import python_version
|
from platform import python_version
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from socket import gethostname
|
from socket import gethostname
|
||||||
from signal import signal, SIGTERM
|
from signal import signal, SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGCHLD, SIG_DFL
|
||||||
from uuid import uuid1
|
from uuid import uuid1
|
||||||
|
|
||||||
|
|
||||||
|
@ -258,14 +263,18 @@ class Service:
|
||||||
config = ServiceConfig()
|
config = ServiceConfig()
|
||||||
_fp = False
|
_fp = False
|
||||||
_started = False
|
_started = False
|
||||||
|
start_time = 0
|
||||||
queue_managers = {}
|
queue_managers = {}
|
||||||
poller_manager = None
|
poller_manager = None
|
||||||
discovery_manager = None
|
discovery_manager = None
|
||||||
last_poll = {}
|
last_poll = {}
|
||||||
|
reap_flag = False
|
||||||
terminate_flag = False
|
terminate_flag = False
|
||||||
|
reload_flag = False
|
||||||
db_failures = 0
|
db_failures = 0
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
self.start_time = time.time()
|
||||||
self.config.populate()
|
self.config.populate()
|
||||||
self._db = LibreNMS.DB(self.config)
|
self._db = LibreNMS.DB(self.config)
|
||||||
self.config.load_poller_config(self._db)
|
self.config.load_poller_config(self._db)
|
||||||
|
@ -283,9 +292,38 @@ class Service:
|
||||||
info("Watchdog is disabled.")
|
info("Watchdog is disabled.")
|
||||||
self.is_master = False
|
self.is_master = False
|
||||||
|
|
||||||
|
def service_age(self):
|
||||||
|
return time.time() - self.start_time
|
||||||
|
|
||||||
def attach_signals(self):
|
def attach_signals(self):
|
||||||
info("Attaching signal handlers on thread %s", threading.current_thread().name)
|
info("Attaching signal handlers on thread %s", threading.current_thread().name)
|
||||||
signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully
|
signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully
|
||||||
|
signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully
|
||||||
|
signal(SIGINT, self.terminate) # capture sigint and exit gracefully
|
||||||
|
signal(SIGHUP, self.reload) # capture sighup and restart gracefully
|
||||||
|
|
||||||
|
if 'psutil' not in sys.modules:
|
||||||
|
warning("psutil is not available, polling gap possible")
|
||||||
|
else:
|
||||||
|
signal(SIGCHLD, self.reap) # capture sigchld and reap the process
|
||||||
|
|
||||||
|
def reap_psutil(self):
|
||||||
|
"""
|
||||||
|
A process from a previous invocation is trying to report its status
|
||||||
|
"""
|
||||||
|
# Speed things up by only looking at direct zombie children
|
||||||
|
for p in psutil.Process().children(recursive=False):
|
||||||
|
try:
|
||||||
|
cmd = p.cmdline() # cmdline is uncached, so needs to go here to avoid NoSuchProcess
|
||||||
|
status = p.status()
|
||||||
|
|
||||||
|
if status == psutil.STATUS_ZOMBIE:
|
||||||
|
pid = p.pid
|
||||||
|
r = os.waitpid(p.pid, os.WNOHANG)
|
||||||
|
warning('Reaped long running job "%s" in state %s with PID %d - job returned %d', cmd, status, r[0], r[1])
|
||||||
|
except (OSError, psutil.NoSuchProcess):
|
||||||
|
# process was already reaped
|
||||||
|
continue
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
debug("Performing startup checks...")
|
debug("Performing startup checks...")
|
||||||
|
@ -330,6 +368,17 @@ class Service:
|
||||||
# Main dispatcher loop
|
# Main dispatcher loop
|
||||||
try:
|
try:
|
||||||
while not self.terminate_flag:
|
while not self.terminate_flag:
|
||||||
|
if self.reload_flag:
|
||||||
|
info("Picked up reload flag, calling the reload process")
|
||||||
|
self.restart()
|
||||||
|
|
||||||
|
if self.reap_flag:
|
||||||
|
self.reap_psutil()
|
||||||
|
|
||||||
|
# Re-arm the signal handler
|
||||||
|
signal(SIGCHLD, self.reap)
|
||||||
|
self.reap_flag = False
|
||||||
|
|
||||||
master_lock = self._acquire_master()
|
master_lock = self._acquire_master()
|
||||||
if master_lock:
|
if master_lock:
|
||||||
if not self.is_master:
|
if not self.is_master:
|
||||||
|
@ -392,7 +441,7 @@ class Service:
|
||||||
result = self._db.query('''SELECT `device_id`,
|
result = self._db.query('''SELECT `device_id`,
|
||||||
`poller_group`,
|
`poller_group`,
|
||||||
COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND), 1) AS `poll`,
|
COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND), 1) AS `poll`,
|
||||||
IF(snmp_disable=1 OR status=0, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1)) AS `discover`
|
IF(snmp_disable=1 OR status=0, 0, IF (%s < `last_discovered_timetaken` * 1.25, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1))) AS `discover`
|
||||||
FROM `devices`
|
FROM `devices`
|
||||||
WHERE `disabled` = 0 AND (
|
WHERE `disabled` = 0 AND (
|
||||||
`last_polled` IS NULL OR
|
`last_polled` IS NULL OR
|
||||||
|
@ -400,7 +449,7 @@ class Service:
|
||||||
`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND) OR
|
`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND) OR
|
||||||
`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND)
|
`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND)
|
||||||
)
|
)
|
||||||
ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, discovery_find_time, poller_find_time, discovery_find_time))
|
ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, self.service_age(), discovery_find_time, poller_find_time, discovery_find_time))
|
||||||
self.db_failures = 0
|
self.db_failures = 0
|
||||||
return result
|
return result
|
||||||
except pymysql.err.Error:
|
except pymysql.err.Error:
|
||||||
|
@ -460,12 +509,12 @@ class Service:
|
||||||
if self.config.distributed:
|
if self.config.distributed:
|
||||||
critical("ERROR: Redis connection required for distributed polling")
|
critical("ERROR: Redis connection required for distributed polling")
|
||||||
critical("Please install redis-py, either through your os software repository or from PyPI")
|
critical("Please install redis-py, either through your os software repository or from PyPI")
|
||||||
sys.exit(2)
|
self.exit(2)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if self.config.distributed:
|
if self.config.distributed:
|
||||||
critical("ERROR: Redis connection required for distributed polling")
|
critical("ERROR: Redis connection required for distributed polling")
|
||||||
critical("Could not connect to Redis. {}".format(e))
|
critical("Could not connect to Redis. {}".format(e))
|
||||||
sys.exit(2)
|
self.exit(2)
|
||||||
|
|
||||||
return LibreNMS.ThreadingLock()
|
return LibreNMS.ThreadingLock()
|
||||||
|
|
||||||
|
@ -480,26 +529,53 @@ class Service:
|
||||||
return
|
return
|
||||||
|
|
||||||
info('Restarting service... ')
|
info('Restarting service... ')
|
||||||
self._stop_managers_and_wait()
|
|
||||||
|
if 'psutil' not in sys.modules:
|
||||||
|
warning("psutil is not available, polling gap possible")
|
||||||
|
self._stop_managers_and_wait()
|
||||||
|
else:
|
||||||
|
self._stop_managers()
|
||||||
self._release_master()
|
self._release_master()
|
||||||
|
|
||||||
python = sys.executable
|
python = sys.executable
|
||||||
|
sys.stdout.flush()
|
||||||
os.execl(python, python, *sys.argv)
|
os.execl(python, python, *sys.argv)
|
||||||
|
|
||||||
def terminate(self, _unused=None, _=None):
|
def reap(self, signalnum=None, flag=None):
|
||||||
|
"""
|
||||||
|
Handle a set the reload flag to begin a clean restart
|
||||||
|
:param signalnum: UNIX signal number
|
||||||
|
:param flag: Flags accompanying signal
|
||||||
|
"""
|
||||||
|
if (signal(SIGCHLD, SIG_DFL) == SIG_DFL):
|
||||||
|
# signal is already being handled, bail out as this handler is not reentrant - the kernel will re-raise the signal later
|
||||||
|
return
|
||||||
|
|
||||||
|
self.reap_flag = True
|
||||||
|
|
||||||
|
def reload(self, signalnum=None, flag=None):
|
||||||
|
"""
|
||||||
|
Handle a set the reload flag to begin a clean restart
|
||||||
|
:param signalnum: UNIX signal number
|
||||||
|
:param flag: Flags accompanying signal
|
||||||
|
"""
|
||||||
|
info("Received signal on thread %s, handling", threading.current_thread().name)
|
||||||
|
self.reload_flag = True
|
||||||
|
|
||||||
|
def terminate(self, signalnum=None, flag=None):
|
||||||
"""
|
"""
|
||||||
Handle a set the terminate flag to begin a clean shutdown
|
Handle a set the terminate flag to begin a clean shutdown
|
||||||
:param _unused:
|
:param signalnum: UNIX signal number
|
||||||
:param _:
|
:param flag: Flags accompanying signal
|
||||||
"""
|
"""
|
||||||
info("Received SIGTERM on thead %s, handling", threading.current_thread().name)
|
info("Received signal on thread %s, handling", threading.current_thread().name)
|
||||||
self.terminate_flag = True
|
self.terminate_flag = True
|
||||||
|
|
||||||
def shutdown(self, _unused=None, _=None):
|
def shutdown(self, signalnum=None, flag=None):
|
||||||
"""
|
"""
|
||||||
Stop and exit, waiting for all child processes to exit.
|
Stop and exit, waiting for all child processes to exit.
|
||||||
:param _unused:
|
:param signalnum: UNIX signal number
|
||||||
:param _:
|
:param flag: Flags accompanying signal
|
||||||
"""
|
"""
|
||||||
info('Shutting down, waiting for running jobs to complete...')
|
info('Shutting down, waiting for running jobs to complete...')
|
||||||
|
|
||||||
|
@ -515,7 +591,7 @@ class Service:
|
||||||
|
|
||||||
# try to release master lock
|
# try to release master lock
|
||||||
info('Shutdown of %s/%s complete', os.getpid(), threading.current_thread().name)
|
info('Shutdown of %s/%s complete', os.getpid(), threading.current_thread().name)
|
||||||
sys.exit(0)
|
self.exit(0)
|
||||||
|
|
||||||
def start_dispatch_timers(self):
|
def start_dispatch_timers(self):
|
||||||
"""
|
"""
|
||||||
|
@ -538,13 +614,16 @@ class Service:
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def _stop_managers(self):
|
||||||
|
for manager in self.queue_managers.values():
|
||||||
|
manager.stop()
|
||||||
|
|
||||||
def _stop_managers_and_wait(self):
|
def _stop_managers_and_wait(self):
|
||||||
"""
|
"""
|
||||||
Stop all QueueManagers, and wait for their processing threads to complete.
|
Stop all QueueManagers, and wait for their processing threads to complete.
|
||||||
We send the stop signal to all QueueManagers first, then wait for them to finish.
|
We send the stop signal to all QueueManagers first, then wait for them to finish.
|
||||||
"""
|
"""
|
||||||
for manager in self.queue_managers.values():
|
self._stop_managers()
|
||||||
manager.stop()
|
|
||||||
|
|
||||||
for manager in self.queue_managers.values():
|
for manager in self.queue_managers.values():
|
||||||
manager.stop_and_wait()
|
manager.stop_and_wait()
|
||||||
|
@ -564,7 +643,7 @@ class Service:
|
||||||
fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
except IOError:
|
except IOError:
|
||||||
warning("Another instance is already running, quitting.")
|
warning("Another instance is already running, quitting.")
|
||||||
exit(2)
|
self.exit(2)
|
||||||
|
|
||||||
def log_performance_stats(self):
|
def log_performance_stats(self):
|
||||||
info("Counting up time spent polling")
|
info("Counting up time spent polling")
|
||||||
|
@ -611,3 +690,6 @@ class Service:
|
||||||
else:
|
else:
|
||||||
info("Log file updated {}s ago".format(int(logfile_mdiff)))
|
info("Log file updated {}s ago".format(int(logfile_mdiff)))
|
||||||
|
|
||||||
|
def exit(self, code=0):
|
||||||
|
sys.stdout.flush()
|
||||||
|
sys.exit(code)
|
||||||
|
|
|
@ -22,6 +22,7 @@ behaviour only found in Python3.4+.
|
||||||
install. MySQLclient can also be used, but does require compilation.
|
install. MySQLclient can also be used, but does require compilation.
|
||||||
- python-dotenv .env loader
|
- python-dotenv .env loader
|
||||||
- redis-py 3.0+ and Redis 5.0+ server (if using distributed polling)
|
- redis-py 3.0+ and Redis 5.0+ server (if using distributed polling)
|
||||||
|
- psutil
|
||||||
|
|
||||||
These can be obtained from your OS package manager, or from PyPI with the below commands.
|
These can be obtained from your OS package manager, or from PyPI with the below commands.
|
||||||
|
|
||||||
|
@ -196,7 +197,7 @@ First, enable SCL's on your system:
|
||||||
Then install and configure the runtime and service:
|
Then install and configure the runtime and service:
|
||||||
|
|
||||||
```
|
```
|
||||||
# yum install rh-python36 epel-release
|
# yum install gcc rh-python36 rh-python36-python-devel epel-release
|
||||||
# yum --enablerepo=remi install redis
|
# yum --enablerepo=remi install redis
|
||||||
# vi /opt/librenms/config.php
|
# vi /opt/librenms/config.php
|
||||||
# vi /etc/redis.conf
|
# vi /etc/redis.conf
|
||||||
|
|
|
@ -4,6 +4,7 @@ After=network.target
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
ExecStart=/opt/librenms/librenms-service.py -v
|
ExecStart=/opt/librenms/librenms-service.py -v
|
||||||
|
ExecReload=/bin/kill -HUP $MAINPID
|
||||||
WorkingDirectory=/opt/librenms
|
WorkingDirectory=/opt/librenms
|
||||||
User=librenms
|
User=librenms
|
||||||
Group=librenms
|
Group=librenms
|
||||||
|
|
|
@ -4,6 +4,7 @@ After=network.target
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
ExecStart=/usr/bin/scl enable rh-python36 -- /opt/librenms/librenms-service.py -v
|
ExecStart=/usr/bin/scl enable rh-python36 -- /opt/librenms/librenms-service.py -v
|
||||||
|
ExecReload=/bin/kill -HUP $MAINPID
|
||||||
WorkingDirectory=/opt/librenms
|
WorkingDirectory=/opt/librenms
|
||||||
User=librenms
|
User=librenms
|
||||||
Group=librenms
|
Group=librenms
|
||||||
|
|
|
@ -2,3 +2,4 @@ PyMySQL
|
||||||
python-dotenv
|
python-dotenv
|
||||||
redis>=3.0
|
redis>=3.0
|
||||||
setuptools
|
setuptools
|
||||||
|
psutil
|
Loading…
Reference in New Issue