LibreNMS/LibreNMS/service.py

923 lines
35 KiB
Python

import logging
import os
import sys
import threading
import time
import pymysql # pylint: disable=import-error
import LibreNMS
from LibreNMS.config import DBConfig
try:
import psutil
except ImportError:
pass
from datetime import timedelta
from datetime import datetime
from platform import python_version
from time import sleep
from socket import gethostname
from signal import signal, SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGCHLD
from uuid import uuid1
try:
from systemd.daemon import notify
except ImportError:
pass
try:
from redis.exceptions import ConnectionError as RedisConnectionError
except ImportError:
class RedisConnectionError(Exception):
pass
logger = logging.getLogger(__name__)
class ServiceConfig(DBConfig):
def __init__(self):
"""
Stores all of the configuration variables for the LibreNMS service in a common object
Starts with defaults, but can be populated with variables from config.php by calling populate()
"""
self._uuid = str(uuid1())
self.set_name(gethostname())
def set_name(self, name):
if name:
self.name = name.strip()
self.unique_name = "{}-{}".format(self.name, self._uuid)
class PollerConfig:
def __init__(self, workers, frequency, calculate=None):
self.enabled = True
self.workers = workers
self.frequency = frequency
self.calculate = calculate
# config variables with defaults
BASE_DIR = os.path.abspath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)
)
node_id = None
name = None
unique_name = None
single_instance = True
distributed = False
group = 0
debug = False
log_level = 20
max_db_failures = 5
alerting = PollerConfig(1, 60)
poller = PollerConfig(24, 300)
services = PollerConfig(8, 300)
discovery = PollerConfig(16, 21600)
billing = PollerConfig(2, 300, 60)
ping = PollerConfig(1, 60)
down_retry = 60
update_enabled = True
update_frequency = 86400
master_resolution = 1
master_timeout = 10
redis_host = "localhost"
redis_port = 6379
redis_db = 0
redis_user = None
redis_pass = None
redis_socket = None
redis_sentinel = None
redis_sentinel_user = None
redis_sentinel_pass = None
redis_sentinel_service = None
redis_timeout = 60
log_output = False
logdir = "logs"
watchdog_enabled = False
watchdog_logfile = "logs/librenms.log"
def populate(self):
config = LibreNMS.get_config_data(self.BASE_DIR)
# populate config variables
self.node_id = os.getenv("NODE_ID")
self.set_name(config.get("distributed_poller_name", None))
self.distributed = config.get("distributed_poller", ServiceConfig.distributed)
self.group = ServiceConfig.parse_group(
config.get("distributed_poller_group", ServiceConfig.group)
)
# backward compatible options
self.master_timeout = config.get(
"service_master_timeout", ServiceConfig.master_timeout
)
self.poller.workers = config.get(
"poller_service_workers", ServiceConfig.poller.workers
)
self.poller.frequency = config.get(
"poller_service_poll_frequency", ServiceConfig.poller.frequency
)
self.discovery.frequency = config.get(
"poller_service_discover_frequency", ServiceConfig.discovery.frequency
)
self.down_retry = config.get(
"poller_service_down_retry", ServiceConfig.down_retry
)
self.log_level = config.get("poller_service_loglevel", ServiceConfig.log_level)
# new options
self.poller.enabled = config.get("service_poller_enabled", True) # unused
self.poller.workers = config.get(
"service_poller_workers", ServiceConfig.poller.workers
)
self.poller.frequency = config.get(
"service_poller_frequency", ServiceConfig.poller.frequency
)
self.discovery.enabled = config.get("service_discovery_enabled", True) # unused
self.discovery.workers = config.get(
"service_discovery_workers", ServiceConfig.discovery.workers
)
self.discovery.frequency = config.get(
"service_discovery_frequency", ServiceConfig.discovery.frequency
)
self.services.enabled = config.get("service_services_enabled", True)
self.services.workers = config.get(
"service_services_workers", ServiceConfig.services.workers
)
self.services.frequency = config.get(
"service_services_frequency", ServiceConfig.services.frequency
)
self.billing.enabled = config.get("service_billing_enabled", True)
self.billing.frequency = config.get(
"service_billing_frequency", ServiceConfig.billing.frequency
)
self.billing.calculate = config.get(
"service_billing_calculate_frequency", ServiceConfig.billing.calculate
)
self.alerting.enabled = config.get("service_alerting_enabled", True)
self.alerting.frequency = config.get(
"service_alerting_frequency", ServiceConfig.alerting.frequency
)
self.ping.enabled = config.get("service_ping_enabled", False)
self.ping.frequency = config.get("ping_rrd_step", ServiceConfig.ping.frequency)
self.down_retry = config.get(
"service_poller_down_retry", ServiceConfig.down_retry
)
self.log_level = config.get("service_loglevel", ServiceConfig.log_level)
self.update_enabled = config.get(
"service_update_enabled", ServiceConfig.update_enabled
)
self.update_frequency = config.get(
"service_update_frequency", ServiceConfig.update_frequency
)
self.redis_host = os.getenv(
"REDIS_HOST", config.get("redis_host", ServiceConfig.redis_host)
)
self.redis_db = os.getenv(
"REDIS_DB", config.get("redis_db", ServiceConfig.redis_db)
)
self.redis_user = os.getenv(
"REDIS_USERNAME", config.get("redis_user", ServiceConfig.redis_user)
)
self.redis_pass = os.getenv(
"REDIS_PASSWORD", config.get("redis_pass", ServiceConfig.redis_pass)
)
self.redis_port = int(
os.getenv("REDIS_PORT", config.get("redis_port", ServiceConfig.redis_port))
)
self.redis_socket = os.getenv(
"REDIS_SOCKET", config.get("redis_socket", ServiceConfig.redis_socket)
)
self.redis_sentinel = os.getenv(
"REDIS_SENTINEL", config.get("redis_sentinel", ServiceConfig.redis_sentinel)
)
self.redis_sentinel_user = os.getenv(
"REDIS_SENTINEL_USERNAME",
config.get("redis_sentinel_user", ServiceConfig.redis_sentinel_user),
)
self.redis_sentinel_pass = os.getenv(
"REDIS_SENTINEL_PASSWORD",
config.get("redis_sentinel_pass", ServiceConfig.redis_sentinel_pass),
)
self.redis_sentinel_service = os.getenv(
"REDIS_SENTINEL_SERVICE",
config.get("redis_sentinel_service", ServiceConfig.redis_sentinel_service),
)
self.redis_timeout = int(
os.getenv(
"REDIS_TIMEOUT",
self.alerting.frequency
if self.alerting.frequency != 0
else self.redis_timeout,
)
)
self.db_host = os.getenv(
"DB_HOST", config.get("db_host", ServiceConfig.db_host)
)
self.db_name = os.getenv(
"DB_DATABASE", config.get("db_name", ServiceConfig.db_name)
)
self.db_pass = os.getenv(
"DB_PASSWORD", config.get("db_pass", ServiceConfig.db_pass)
)
self.db_port = int(
os.getenv("DB_PORT", config.get("db_port", ServiceConfig.db_port))
)
self.db_socket = os.getenv(
"DB_SOCKET", config.get("db_socket", ServiceConfig.db_socket)
)
self.db_user = os.getenv(
"DB_USERNAME", config.get("db_user", ServiceConfig.db_user)
)
self.db_sslmode = os.getenv(
"DB_SSLMODE", config.get("db_sslmode", ServiceConfig.db_sslmode)
)
self.db_ssl_ca = os.getenv(
"MYSQL_ATTR_SSL_CA", config.get("db_ssl_ca", ServiceConfig.db_ssl_ca)
)
self.watchdog_enabled = config.get(
"service_watchdog_enabled", ServiceConfig.watchdog_enabled
)
self.logdir = config.get("log_dir", ServiceConfig.BASE_DIR + "/logs")
self.watchdog_logfile = config.get("log_file", self.logdir + "/librenms.log")
# set convenient debug variable
self.debug = logging.getLogger().isEnabledFor(logging.DEBUG)
if not self.debug and self.log_level:
try:
logging.getLogger().setLevel(self.log_level)
except ValueError:
logger.error(
"Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(
self.log_level
)
)
logging.getLogger().setLevel(logging.INFO)
def load_poller_config(self, db):
try:
settings = {}
cursor = db.query(
"SELECT * FROM `poller_cluster` WHERE `node_id`=%s", self.node_id
)
if cursor.rowcount == 0:
return
for index, setting in enumerate(cursor.fetchone()):
name = cursor.description[index][0]
settings[name] = setting
if settings["poller_name"] is not None:
self.set_name(settings["poller_name"])
if settings["poller_groups"] is not None:
self.group = ServiceConfig.parse_group(settings["poller_groups"])
if settings["poller_enabled"] is not None:
self.poller.enabled = settings["poller_enabled"]
if settings["poller_frequency"] is not None:
self.poller.frequency = settings["poller_frequency"]
if settings["poller_workers"] is not None:
self.poller.workers = settings["poller_workers"]
if settings["poller_down_retry"] is not None:
self.down_retry = settings["poller_down_retry"]
if settings["discovery_enabled"] is not None:
self.discovery.enabled = settings["discovery_enabled"]
if settings["discovery_frequency"] is not None:
self.discovery.frequency = settings["discovery_frequency"]
if settings["discovery_workers"] is not None:
self.discovery.workers = settings["discovery_workers"]
if settings["services_enabled"] is not None:
self.services.enabled = settings["services_enabled"]
if settings["services_frequency"] is not None:
self.services.frequency = settings["services_frequency"]
if settings["services_workers"] is not None:
self.services.workers = settings["services_workers"]
if settings["billing_enabled"] is not None:
self.billing.enabled = settings["billing_enabled"]
if settings["billing_frequency"] is not None:
self.billing.frequency = settings["billing_frequency"]
if settings["billing_calculate_frequency"] is not None:
self.billing.calculate = settings["billing_calculate_frequency"]
if settings["alerting_enabled"] is not None:
self.alerting.enabled = settings["alerting_enabled"]
if settings["alerting_frequency"] is not None:
self.alerting.frequency = settings["alerting_frequency"]
if settings["ping_enabled"] is not None:
self.ping.enabled = settings["ping_enabled"]
if settings["ping_frequency"] is not None:
self.ping.frequency = settings["ping_frequency"]
if settings["update_enabled"] is not None:
self.update_enabled = settings["update_enabled"]
if settings["update_frequency"] is not None:
self.update_frequency = settings["update_frequency"]
if settings["loglevel"] is not None:
self.log_level = settings["loglevel"]
if settings["watchdog_enabled"] is not None:
self.watchdog_enabled = settings["watchdog_enabled"]
if settings["watchdog_log"] is not None:
self.watchdog_logfile = settings["watchdog_log"]
except pymysql.err.Error:
logger.warning("Unable to load poller (%s) config", self.node_id)
@staticmethod
def parse_group(g):
if g is None:
return [0]
elif type(g) is int:
return [g]
elif type(g) is str:
try:
return [int(x) for x in set(g.split(","))]
except ValueError:
pass
logger.error("Could not parse group string, defaulting to 0")
return [0]
class Service:
config = ServiceConfig()
_fp = False
_started = False
start_time = 0
queue_managers = {}
poller_manager = None
discovery_manager = None
last_poll = {}
reap_flag = False
terminate_flag = False
reload_flag = False
db_failures = 0
def __init__(self):
self.start_time = time.time()
self.config.populate()
self._db = LibreNMS.DB(self.config)
self.config.load_poller_config(self._db)
threading.current_thread().name = self.config.name # rename main thread
self.attach_signals()
self._lm = self.create_lock_manager()
self.daily_timer = LibreNMS.RecurringTimer(
self.config.update_frequency, self.run_maintenance, "maintenance"
)
self.stats_timer = LibreNMS.RecurringTimer(
self.config.poller.frequency, self.log_performance_stats, "performance"
)
if self.config.watchdog_enabled:
logger.info(
"Starting watchdog timer for log file: {}".format(
self.config.watchdog_logfile
)
)
self.watchdog_timer = LibreNMS.RecurringTimer(
self.config.poller.frequency, self.logfile_watchdog, "watchdog"
)
else:
logger.info("Watchdog is disabled.")
self.systemd_watchdog_timer = LibreNMS.RecurringTimer(
10, self.systemd_watchdog, "systemd-watchdog"
)
self.is_master = False
def service_age(self):
return time.time() - self.start_time
def attach_signals(self):
logger.debug(
"Attaching signal handlers on thread %s", threading.current_thread().name
)
signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully
signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully
signal(SIGINT, self.terminate) # capture sigint and exit gracefully
signal(SIGHUP, self.reload) # capture sighup and restart gracefully
if "psutil" not in sys.modules:
logger.warning("psutil is not available, polling gap possible")
else:
signal(SIGCHLD, self.reap) # capture sigchld and reap the process
def reap_psutil(self):
"""
A process from a previous invocation is trying to report its status
"""
# Speed things up by only looking at direct zombie children
for p in psutil.Process().children(recursive=False):
try:
cmd = (
p.cmdline()
) # cmdline is uncached, so needs to go here to avoid NoSuchProcess
status = p.status()
if status == psutil.STATUS_ZOMBIE:
pid = p.pid
r = os.waitpid(p.pid, os.WNOHANG)
logger.warning(
'Reaped long running job "%s" in state %s with PID %d - job returned %d',
cmd,
status,
r[0],
r[1],
)
except (OSError, psutil.NoSuchProcess):
# process was already reaped
continue
def start(self):
logger.debug("Performing startup checks...")
if self.config.single_instance:
self.check_single_instance() # don't allow more than one service at a time
if self._started:
raise RuntimeWarning("Not allowed to start Poller twice")
self._started = True
logger.debug("Starting up queue managers...")
# initialize and start the worker pools
self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm)
self.queue_managers["poller"] = self.poller_manager
self.discovery_manager = LibreNMS.DiscoveryQueueManager(self.config, self._lm)
self.queue_managers["discovery"] = self.discovery_manager
self.queue_managers["alerting"] = LibreNMS.AlertQueueManager(
self.config, self._lm
)
self.queue_managers["services"] = LibreNMS.ServicesQueueManager(
self.config, self._lm
)
self.queue_managers["billing"] = LibreNMS.BillingQueueManager(
self.config, self._lm
)
self.queue_managers["ping"] = LibreNMS.PingQueueManager(self.config, self._lm)
if self.config.update_enabled:
self.daily_timer.start()
self.stats_timer.start()
self.systemd_watchdog_timer.start()
if self.config.watchdog_enabled:
self.watchdog_timer.start()
logger.info("LibreNMS Service: {} started!".format(self.config.unique_name))
logger.info(
"Poller group {}. Using Python {} and {} locks and queues".format(
"0 (default)" if self.config.group == [0] else self.config.group,
python_version(),
"redis" if isinstance(self._lm, LibreNMS.RedisLock) else "internal",
)
)
logger.info(
"Queue Workers: Discovery={} Poller={} Services={} Alerting={} Billing={} Ping={}".format(
self.config.discovery.workers
if self.config.discovery.enabled
else "disabled",
self.config.poller.workers
if self.config.poller.enabled
else "disabled",
self.config.services.workers
if self.config.services.enabled
else "disabled",
"enabled" if self.config.alerting.enabled else "disabled",
"enabled" if self.config.billing.enabled else "disabled",
"enabled" if self.config.ping.enabled else "disabled",
)
)
if self.config.update_enabled:
logger.info(
"Maintenance tasks will be run every {}".format(
timedelta(seconds=self.config.update_frequency)
)
)
else:
logger.warning("Maintenance tasks are disabled.")
# Main dispatcher loop
try:
while not self.terminate_flag:
if self.reload_flag:
logger.info("Picked up reload flag, calling the reload process")
self.restart()
if self.reap_flag:
self.reap_flag = False
self.reap_psutil()
master_lock = self._acquire_master()
if master_lock:
if not self.is_master:
logger.info(
"{} is now the master dispatcher".format(self.config.name)
)
self.is_master = True
self.start_dispatch_timers()
devices = self.fetch_immediate_device_list()
for device in devices:
device_id = device[0]
group = device[1]
if device[2]: # polling
self.dispatch_immediate_polling(device_id, group)
if device[3]: # discovery
self.dispatch_immediate_discovery(device_id, group)
else:
if self.is_master:
logger.info(
"{} is no longer the master dispatcher".format(
self.config.name
)
)
self.stop_dispatch_timers()
self.is_master = False # no longer master
sleep(self.config.master_resolution)
except KeyboardInterrupt:
pass
logger.info("Dispatch loop terminated")
self.shutdown()
def _acquire_master(self):
return self._lm.lock(
"dispatch.master", self.config.unique_name, self.config.master_timeout, True
)
def _release_master(self):
self._lm.unlock("dispatch.master", self.config.unique_name)
# ------------ Discovery ------------
def dispatch_immediate_discovery(self, device_id, group):
if not self.discovery_manager.is_locked(device_id):
self.discovery_manager.post_work(device_id, group)
# ------------ Polling ------------
def dispatch_immediate_polling(self, device_id, group):
if not self.poller_manager.is_locked(device_id):
self.poller_manager.post_work(device_id, group)
if self.config.debug:
cur_time = time.time()
elapsed = cur_time - self.last_poll.get(device_id, cur_time)
self.last_poll[device_id] = cur_time
# arbitrary limit to reduce spam
if elapsed > (
self.config.poller.frequency - self.config.master_resolution
):
logger.debug(
"Dispatching polling for device {}, time since last poll {:.2f}s".format(
device_id, elapsed
)
)
def fetch_immediate_device_list(self):
try:
poller_find_time = self.config.poller.frequency - 1
discovery_find_time = self.config.discovery.frequency - 1
result = self._db.query(
"""SELECT `device_id`,
`poller_group`,
COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_polled_timetaken`, 0) SECOND), 1) AS `poll`,
IF(status=0, 0, IF (%s < `last_discovered_timetaken` * 1.25, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_discovered_timetaken`, 0) SECOND), 1))) AS `discover`
FROM `devices`
WHERE `disabled` = 0 AND (
`last_polled` IS NULL OR
`last_discovered` IS NULL OR
`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_polled_timetaken`, 0) SECOND) OR
`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_discovered_timetaken`, 0) SECOND)
)
ORDER BY `last_polled_timetaken` DESC""",
(
poller_find_time,
self.service_age(),
discovery_find_time,
poller_find_time,
discovery_find_time,
),
)
self.db_failures = 0
return result
except pymysql.err.Error:
self.db_failures += 1
if self.db_failures > self.config.max_db_failures:
logger.warning(
"Too many DB failures ({}), attempting to release master".format(
self.db_failures
)
)
self._release_master()
sleep(
self.config.master_resolution
) # sleep to give another node a chance to acquire
return []
def run_maintenance(self):
"""
Runs update and cleanup tasks by calling daily.sh. Reloads the python script after the update.
Sets a schema-update lock so no distributed pollers will update until the schema has been updated.
"""
attempt = 0
wait = 5
max_runtime = 86100
max_tries = int(max_runtime / wait)
logger.info("Waiting for schema lock")
while not self._lm.lock("schema-update", self.config.unique_name, max_runtime):
attempt += 1
if attempt >= max_tries: # don't get stuck indefinitely
logger.warning(
"Reached max wait for other pollers to update, updating now"
)
break
sleep(wait)
logger.info("Running maintenance tasks")
exit_code, output = LibreNMS.call_script("daily.sh")
if exit_code == 0:
logger.info("Maintenance tasks complete\n{}".format(output))
else:
logger.error("Error {} in daily.sh:\n{}".format(exit_code, output))
self._lm.unlock("schema-update", self.config.unique_name)
self.restart()
def create_lock_manager(self):
"""
Create a new LockManager. Tries to create a Redis LockManager, but falls
back to python's internal threading lock implementation.
Exits if distributing poller is enabled and a Redis LockManager cannot be created.
:return: Instance of LockManager
"""
try:
return LibreNMS.RedisLock(
sentinel_kwargs={
"username": self.config.redis_sentinel_user,
"password": self.config.redis_sentinel_pass,
"socket_timeout": self.config.redis_timeout,
"unix_socket_path": self.config.redis_socket,
},
namespace="librenms.lock",
host=self.config.redis_host,
port=self.config.redis_port,
db=self.config.redis_db,
username=self.config.redis_user,
password=self.config.redis_pass,
unix_socket_path=self.config.redis_socket,
sentinel=self.config.redis_sentinel,
sentinel_service=self.config.redis_sentinel_service,
socket_timeout=self.config.redis_timeout,
)
except ImportError:
if self.config.distributed:
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical(
"Please install redis-py, either through your os software repository or from PyPI"
)
self.exit(2)
except Exception as e:
if self.config.distributed:
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical(
"Lock manager could not connect to Redis. {}: {}".format(
type(e).__name__, e
)
)
self.exit(2)
return LibreNMS.ThreadingLock()
def restart(self):
"""
Stop then recreate this entire process by re-calling the original script.
Has the effect of reloading the python files from disk.
"""
if sys.version_info < (3, 4, 0):
logger.warning(
"Skipping restart as running under an incompatible interpreter"
)
logger.warning("Please restart manually")
return
logger.info("Restarting service... ")
if "psutil" not in sys.modules:
logger.warning("psutil is not available, polling gap possible")
self._stop_managers_and_wait()
else:
self._stop_managers()
self._release_master()
python = sys.executable
sys.stdout.flush()
os.execl(python, python, *sys.argv)
def reap(self, signalnum=None, flag=None):
"""
Handle a set the reload flag to begin a clean restart
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
self.reap_flag = True
def reload(self, signalnum=None, flag=None):
"""
Handle a set the reload flag to begin a clean restart
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
logger.info(
"Received signal on thread %s, handling", threading.current_thread().name
)
self.reload_flag = True
def terminate(self, signalnum=None, flag=None):
"""
Handle a set the terminate flag to begin a clean shutdown
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
logger.info(
"Received signal on thread %s, handling", threading.current_thread().name
)
self.terminate_flag = True
def shutdown(self, signalnum=None, flag=None):
"""
Stop and exit, waiting for all child processes to exit.
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
logger.info("Shutting down, waiting for running jobs to complete...")
self.stop_dispatch_timers()
self._release_master()
self.daily_timer.stop()
self.stats_timer.stop()
self.systemd_watchdog_timer.stop()
if self.config.watchdog_enabled:
self.watchdog_timer.stop()
self._stop_managers_and_wait()
# try to release master lock
logger.info(
"Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name
)
self.exit(0)
def start_dispatch_timers(self):
"""
Start all dispatch timers and begin pushing events into queues.
This should only be started when we are the master dispatcher.
"""
for manager in self.queue_managers.values():
try:
manager.start_dispatch()
except AttributeError:
pass
def stop_dispatch_timers(self):
"""
Stop all dispatch timers, this should be called when we are no longer the master dispatcher.
"""
for manager in self.queue_managers.values():
try:
manager.stop_dispatch()
except AttributeError:
pass
def _stop_managers(self):
for manager in self.queue_managers.values():
manager.stop()
def _stop_managers_and_wait(self):
"""
Stop all QueueManagers, and wait for their processing threads to complete.
We send the stop signal to all QueueManagers first, then wait for them to finish.
"""
self._stop_managers()
for manager in self.queue_managers.values():
manager.stop_and_wait()
def check_single_instance(self):
"""
Check that there is only one instance of the service running on this computer.
We do this be creating a file in the base directory (.lock.service) if it doesn't exist and
obtaining an exclusive lock on that file.
"""
lock_file = "{}/{}".format(self.config.BASE_DIR, ".lock.service")
import fcntl
self._fp = open(
lock_file, "w"
) # keep a reference so the file handle isn't garbage collected
self._fp.flush()
try:
fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
logger.warning("Another instance is already running, quitting.")
self.exit(2)
def log_performance_stats(self):
logger.info("Counting up time spent polling")
try:
# Report on the poller instance as a whole
self._db.query(
"INSERT INTO poller_cluster(node_id, poller_name, poller_version, poller_groups, last_report, master) "
'values("{0}", "{1}", "{2}", "{3}", NOW(), {4}) '
'ON DUPLICATE KEY UPDATE poller_version="{2}", poller_groups="{3}", last_report=NOW(), master={4}; '.format(
self.config.node_id,
self.config.name,
"librenms-service",
",".join(str(i) for i in self.config.group),
1 if self.is_master else 0,
)
)
# Find our ID
self._db.query(
'SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format(
self.config.node_id
)
)
for worker_type, manager in self.queue_managers.items():
worker_seconds, devices = manager.performance.reset()
# Record the queue state
self._db.query(
"INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) "
'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) '
"ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; ".format(
worker_type,
sum(
[
manager.get_queue(group).qsize()
for group in self.config.group
]
),
devices,
worker_seconds,
getattr(self.config, worker_type).workers,
getattr(self.config, worker_type).frequency,
)
)
except (pymysql.err.Error, ConnectionResetError, RedisConnectionError):
logger.critical(
"Unable to log performance statistics - is the database still online?",
exc_info=True,
)
def systemd_watchdog(self):
if "systemd.daemon" in sys.modules:
notify("WATCHDOG=1")
def logfile_watchdog(self):
try:
# check that lofgile has been written to within last poll period
logfile_mdiff = datetime.now().timestamp() - os.path.getmtime(
self.config.watchdog_logfile
)
except FileNotFoundError as e:
logger.error("Log file not found! {}".format(e))
return
if logfile_mdiff > self.config.poller.frequency:
logger.critical(
"BARK! Log file older than {}s, restarting service!".format(
self.config.poller.frequency
),
exc_info=True,
)
self.restart()
else:
logger.info("Log file updated {}s ago".format(int(logfile_mdiff)))
def exit(self, code=0):
sys.stdout.flush()
sys.exit(code)