Python dispatcher service v2 (#10050)
* Refactor LibreNMS service add ping * services ported remote legacy stats collection * alerting * implement unique queues * update discovery queue manager * remove message * more cleanup * Don't shuffle queue * clean up imports * don't try to discover ping only devices * Fix for discovery not running timer * Update docs a bit and and add some additional config options. Intentionally undocumented. * Wait until the device is marked up by the poller before discovering * Handle loosing connection to db gracefully * Attempt to release master after 5 db failures * Sleep to give other nodes a chance to acquire * Update docs and rename the doc to Dispatcher Service to more accurately reflect its function. * add local notification
This commit is contained in:
parent
1e76915af3
commit
604a200891
|
@ -1,17 +1,44 @@
|
|||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import timeit
|
||||
from collections import deque
|
||||
|
||||
from logging import critical, info, debug, exception
|
||||
from math import ceil
|
||||
from queue import Queue
|
||||
from time import time
|
||||
|
||||
from .service import Service, ServiceConfig
|
||||
from .queuemanager import QueueManager, TimedQueueManager, BillingQueueManager
|
||||
from .queuemanager import QueueManager, TimedQueueManager, BillingQueueManager, PingQueueManager, ServicesQueueManager, \
|
||||
AlertQueueManager, PollerQueueManager, DiscoveryQueueManager
|
||||
|
||||
|
||||
def normalize_wait(seconds):
|
||||
return ceil(seconds - (time() % seconds))
|
||||
|
||||
|
||||
def call_script(script, args=()):
|
||||
"""
|
||||
Run a LibreNMS script. Captures all output and throws an exception if a non-zero
|
||||
status is returned. Blocks parent signals (like SIGINT and SIGTERM).
|
||||
:param script: the name of the executable relative to the base directory
|
||||
:param args: a tuple of arguments to send to the command
|
||||
:returns the output of the command
|
||||
"""
|
||||
if script.endswith('.php'):
|
||||
# save calling the sh process
|
||||
base = ('/usr/bin/env', 'php')
|
||||
else:
|
||||
base = ()
|
||||
|
||||
base_dir = os.path.realpath(os.path.dirname(__file__) + "/..")
|
||||
cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args))
|
||||
debug("Running {}".format(cmd))
|
||||
# preexec_fn=os.setsid here keeps process signals from propagating
|
||||
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, preexec_fn=os.setsid, close_fds=True).decode()
|
||||
|
||||
|
||||
class DB:
|
||||
def __init__(self, config, auto_connect=True):
|
||||
"""
|
||||
|
@ -78,10 +105,23 @@ class DB:
|
|||
:param args:
|
||||
:return: the cursor with results
|
||||
"""
|
||||
cursor = self.db_conn().cursor()
|
||||
cursor.execute(query, args)
|
||||
cursor.close()
|
||||
return cursor
|
||||
try:
|
||||
cursor = self.db_conn().cursor()
|
||||
cursor.execute(query, args)
|
||||
cursor.close()
|
||||
return cursor
|
||||
except Exception as e:
|
||||
critical("DB Connection exception {}".format(e))
|
||||
self.close()
|
||||
raise
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close the connection owned by this thread.
|
||||
"""
|
||||
conn = self._db.pop(threading.get_ident(), None)
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
class RecurringTimer:
|
||||
|
@ -185,7 +225,7 @@ class ThreadingLock(Lock):
|
|||
return Lock.check_lock(self, name)
|
||||
|
||||
def print_locks(self):
|
||||
Lock.print_locks(self)
|
||||
Lock.print_locks(self)
|
||||
|
||||
|
||||
class RedisLock(Lock):
|
||||
|
@ -220,7 +260,6 @@ class RedisLock(Lock):
|
|||
exception("Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s",
|
||||
name, owner, expiration, allow_owner_relock)
|
||||
|
||||
|
||||
def unlock(self, name, owner):
|
||||
"""
|
||||
Release the named lock.
|
||||
|
@ -242,7 +281,7 @@ class RedisLock(Lock):
|
|||
print("{} locked by {}, expires in {} seconds".format(key, self._redis.get(key), self._redis.ttl(key)))
|
||||
|
||||
|
||||
class RedisQueue(object):
|
||||
class RedisUniqueQueue(object):
|
||||
def __init__(self, name, namespace='queue', **redis_kwargs):
|
||||
import redis
|
||||
redis_kwargs['decode_responses'] = True
|
||||
|
@ -250,25 +289,24 @@ class RedisQueue(object):
|
|||
self._redis.ping()
|
||||
self.key = "{}:{}".format(namespace, name)
|
||||
|
||||
# clean up from previous implementations
|
||||
if self._redis.type(self.key) != 'zset':
|
||||
self._redis.delete(self.key)
|
||||
|
||||
def qsize(self):
|
||||
return self._redis.llen(self.key)
|
||||
return self._redis.zcount(self.key, '-inf', '+inf')
|
||||
|
||||
def empty(self):
|
||||
return self.qsize() == 0
|
||||
|
||||
def put(self, item):
|
||||
# commented code allows unique entries, but shuffles the queue
|
||||
# p = self._redis.pipeline()
|
||||
# p.lrem(self.key, 1, item)
|
||||
# p.lpush(self.key, item)
|
||||
# p.execute()
|
||||
self._redis.rpush(self.key, item)
|
||||
self._redis.zadd(self.key, {item: time()}, nx=True)
|
||||
|
||||
def get(self, block=True, timeout=None):
|
||||
if block:
|
||||
item = self._redis.blpop(self.key, timeout=timeout)
|
||||
item = self._redis.bzpopmin(self.key, timeout=timeout)
|
||||
else:
|
||||
item = self._redis.lpop(self.key)
|
||||
item = self._redis.zpopmin(self.key)
|
||||
|
||||
if item:
|
||||
item = item[1]
|
||||
|
@ -276,3 +314,96 @@ class RedisQueue(object):
|
|||
|
||||
def get_nowait(self):
|
||||
return self.get(False)
|
||||
|
||||
|
||||
class UniqueQueue(Queue):
|
||||
def _init(self, maxsize):
|
||||
self.queue = deque()
|
||||
self.setqueue = set()
|
||||
|
||||
def _put(self, item):
|
||||
if item not in self.setqueue:
|
||||
self.setqueue.add(item)
|
||||
self.queue.append(item)
|
||||
|
||||
def _get(self):
|
||||
item = self.queue.popleft()
|
||||
self.setqueue.remove(item)
|
||||
return item
|
||||
|
||||
|
||||
class PerformanceCounter(object):
|
||||
"""
|
||||
This is a simple counter to record execution time and number of jobs. It's unique to each
|
||||
poller instance, so does not need to be globally syncronised, just locally.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._count = 0
|
||||
self._jobs = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def add(self, n):
|
||||
"""
|
||||
Add n to the counter and increment the number of jobs by 1
|
||||
:param n: Number to increment by
|
||||
"""
|
||||
with self._lock:
|
||||
self._count += n
|
||||
self._jobs += 1
|
||||
|
||||
def split(self, precise=False):
|
||||
"""
|
||||
Return the current counter value and keep going
|
||||
:param precise: Whether floating point precision is desired
|
||||
:return: ((INT or FLOAT), INT)
|
||||
"""
|
||||
return (self._count if precise else int(self._count)), self._jobs
|
||||
|
||||
def reset(self, precise=False):
|
||||
"""
|
||||
Return the current counter value and then zero it.
|
||||
:param precise: Whether floating point precision is desired
|
||||
:return: ((INT or FLOAT), INT)
|
||||
"""
|
||||
with self._lock:
|
||||
c = self._count
|
||||
j = self._jobs
|
||||
self._count = 0
|
||||
self._jobs = 0
|
||||
|
||||
return (c if precise else int(c)), j
|
||||
|
||||
|
||||
class TimeitContext(object):
|
||||
"""
|
||||
Wrapper around timeit to allow the timing of larger blocks of code by wrapping them in "with"
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._t = timeit.default_timer()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
del self._t
|
||||
|
||||
def delta(self):
|
||||
"""
|
||||
Calculate the elapsed time since the context was initialised
|
||||
:return: FLOAT
|
||||
"""
|
||||
if not self._t:
|
||||
raise ArithmeticError("Timer has not been started, cannot return delta")
|
||||
|
||||
return timeit.default_timer() - self._t
|
||||
|
||||
@classmethod
|
||||
def start(cls):
|
||||
"""
|
||||
Factory method for TimeitContext
|
||||
:param cls:
|
||||
:return: TimeitContext
|
||||
"""
|
||||
return cls()
|
||||
|
|
|
@ -1,22 +1,16 @@
|
|||
import random
|
||||
import pymysql
|
||||
import subprocess
|
||||
import threading
|
||||
import traceback
|
||||
from logging import debug, info, error, critical
|
||||
from multiprocessing import Queue
|
||||
from logging import debug, info, error, critical, warning
|
||||
from queue import Empty
|
||||
from subprocess import CalledProcessError
|
||||
|
||||
import sys
|
||||
|
||||
import LibreNMS
|
||||
|
||||
if sys.version_info[0] < 3:
|
||||
from Queue import Empty
|
||||
else:
|
||||
from queue import Empty
|
||||
|
||||
|
||||
class QueueManager:
|
||||
def __init__(self, config, type_desc, work_function, auto_start=True):
|
||||
def __init__(self, config, lock_manager, type_desc, work_function, auto_start=True):
|
||||
"""
|
||||
This class manages a queue of jobs and can be used to submit jobs to the queue with post_work()
|
||||
and process jobs in that queue in worker threads using the work_function
|
||||
|
@ -26,16 +20,19 @@ class QueueManager:
|
|||
You can start or stop the worker threads with start(), stop(), and stop_and_wait()
|
||||
|
||||
:param config: LibreNMS.ServiceConfig reference to the service config object
|
||||
:param lock_manager: A LibreNMS.Lock instance to help with locks
|
||||
:param type_desc: description for this queue manager type
|
||||
:param work_function: function that will be called to perform the task
|
||||
:param auto_start: automatically start worker threads
|
||||
"""
|
||||
self.type = type_desc
|
||||
self.config = config
|
||||
self.performance = LibreNMS.PerformanceCounter()
|
||||
|
||||
self._threads = []
|
||||
self._queues = {}
|
||||
self._queue_create_lock = threading.Lock()
|
||||
self._lm = lock_manager
|
||||
|
||||
self._work_function = work_function
|
||||
self._stop_event = threading.Event()
|
||||
|
@ -48,14 +45,29 @@ class QueueManager:
|
|||
self.start()
|
||||
|
||||
def _service_worker(self, work_func, queue_id):
|
||||
debug("Worker started {}".format(threading.current_thread().getName()))
|
||||
while not self._stop_event.is_set():
|
||||
debug("Worker {} checking queue {} ({}) for work".format(threading.current_thread().getName(), queue_id,
|
||||
self.get_queue(queue_id).qsize()))
|
||||
try:
|
||||
# cannot break blocking request with redis-py, so timeout :(
|
||||
device_id = self.get_queue(queue_id).get(True, 3)
|
||||
device_id = self.get_queue(queue_id).get(True, 10)
|
||||
|
||||
if device_id: # None returned by redis after timeout when empty
|
||||
debug("Queues: {}".format(self._queues))
|
||||
work_func(device_id)
|
||||
if device_id is not None: # None returned by redis after timeout when empty
|
||||
debug(
|
||||
"Worker {} ({}) got work {} ".format(threading.current_thread().getName(), queue_id, device_id))
|
||||
with LibreNMS.TimeitContext.start() as t:
|
||||
debug("Queues: {}".format(self._queues))
|
||||
target_desc = "{} ({})".format(device_id if device_id else '',
|
||||
queue_id) if queue_id else device_id
|
||||
if work_func:
|
||||
work_func(device_id)
|
||||
else:
|
||||
self.do_work(device_id, queue_id)
|
||||
|
||||
runtime = t.delta()
|
||||
info("Completed {} run for {} in {:.2f}s".format(self.type, target_desc, runtime))
|
||||
self.performance.add(runtime)
|
||||
except Empty:
|
||||
pass # ignore empty queue exception from subprocess.Queue
|
||||
except CalledProcessError as e:
|
||||
|
@ -92,6 +104,9 @@ class QueueManager:
|
|||
else:
|
||||
self.spawn_worker(self.type.title(), 0)
|
||||
|
||||
def do_work(self, device_id, group):
|
||||
pass
|
||||
|
||||
def spawn_worker(self, thread_name, group):
|
||||
pt = threading.Thread(target=self._service_worker, name=thread_name,
|
||||
args=(self._work_function, group))
|
||||
|
@ -147,14 +162,14 @@ class QueueManager:
|
|||
"""
|
||||
info("Creating queue {}".format(self.queue_name(queue_type, group)))
|
||||
try:
|
||||
return LibreNMS.RedisQueue(self.queue_name(queue_type, group),
|
||||
namespace='librenms.queue',
|
||||
host=self.config.redis_host,
|
||||
port=self.config.redis_port,
|
||||
db=self.config.redis_db,
|
||||
password=self.config.redis_pass,
|
||||
unix_socket_path=self.config.redis_socket
|
||||
)
|
||||
return LibreNMS.RedisUniqueQueue(self.queue_name(queue_type, group),
|
||||
namespace='librenms.queue',
|
||||
host=self.config.redis_host,
|
||||
port=self.config.redis_port,
|
||||
db=self.config.redis_db,
|
||||
password=self.config.redis_pass,
|
||||
unix_socket_path=self.config.redis_socket
|
||||
)
|
||||
except ImportError:
|
||||
if self.config.distributed:
|
||||
critical("ERROR: Redis connection required for distributed polling")
|
||||
|
@ -166,7 +181,7 @@ class QueueManager:
|
|||
critical("Could not connect to Redis. {}".format(e))
|
||||
exit(2)
|
||||
|
||||
return Queue()
|
||||
return LibreNMS.UniqueQueue()
|
||||
|
||||
@staticmethod
|
||||
def queue_name(queue_type, group):
|
||||
|
@ -175,9 +190,28 @@ class QueueManager:
|
|||
else:
|
||||
raise ValueError("Refusing to create improperly scoped queue - parameters were invalid or not set")
|
||||
|
||||
def record_runtime(self, duration):
|
||||
self.performance.add(duration)
|
||||
|
||||
# ------ Locking Helpers ------
|
||||
def lock(self, context, context_name='device', allow_relock=False, timeout=0):
|
||||
return self._lm.lock(self._gen_lock_name(context, context_name), self._gen_lock_owner(), timeout, allow_relock)
|
||||
|
||||
def unlock(self, context, context_name='device'):
|
||||
return self._lm.unlock(self._gen_lock_name(context, context_name), self._gen_lock_owner())
|
||||
|
||||
def is_locked(self, context, context_name='device'):
|
||||
return self._lm.check_lock(self._gen_lock_name(context, context_name))
|
||||
|
||||
def _gen_lock_name(self, context, context_name):
|
||||
return '{}.{}.{}'.format(self.type, context_name, context)
|
||||
|
||||
def _gen_lock_owner(self):
|
||||
return "{}-{}".format(self.config.unique_name, threading.current_thread().name)
|
||||
|
||||
|
||||
class TimedQueueManager(QueueManager):
|
||||
def __init__(self, config, type_desc, work_function, dispatch_function, auto_start=True):
|
||||
def __init__(self, config, lock_manager, type_desc, work_function=None, dispatch_function=None, auto_start=True):
|
||||
"""
|
||||
A queue manager that periodically dispatches work to the queue
|
||||
The times are normalized like they started at 0:00
|
||||
|
@ -187,7 +221,8 @@ class TimedQueueManager(QueueManager):
|
|||
:param dispatch_function: function that will be called when the timer is up, should call post_work()
|
||||
:param auto_start: automatically start worker threads
|
||||
"""
|
||||
QueueManager.__init__(self, config, type_desc, work_function, auto_start)
|
||||
dispatch_function = dispatch_function if dispatch_function else self.do_dispatch
|
||||
QueueManager.__init__(self, config, lock_manager, type_desc, work_function, auto_start)
|
||||
self.timer = LibreNMS.RecurringTimer(self.get_poller_config().frequency, dispatch_function)
|
||||
|
||||
def start_dispatch(self):
|
||||
|
@ -209,21 +244,22 @@ class TimedQueueManager(QueueManager):
|
|||
self.stop_dispatch()
|
||||
QueueManager.stop(self)
|
||||
|
||||
def do_dispatch(self):
|
||||
pass
|
||||
|
||||
|
||||
class BillingQueueManager(TimedQueueManager):
|
||||
def __init__(self, config, work_function, poll_dispatch_function, calculate_dispatch_function,
|
||||
auto_start=True):
|
||||
def __init__(self, config, lock_manager, auto_start=True):
|
||||
"""
|
||||
A TimedQueueManager with two timers dispatching poll billing and calculate billing to the same work queue
|
||||
|
||||
:param config: LibreNMS.ServiceConfig reference to the service config object
|
||||
:param work_function: function that will be called to perform the task
|
||||
:param poll_dispatch_function: function that will be called when the timer is up, should call post_work()
|
||||
:param calculate_dispatch_function: function that will be called when the timer is up, should call post_work()
|
||||
:param lock_manager: the single instance of lock manager
|
||||
:param auto_start: automatically start worker threads
|
||||
"""
|
||||
TimedQueueManager.__init__(self, config, 'billing', work_function, poll_dispatch_function, auto_start)
|
||||
self.calculate_timer = LibreNMS.RecurringTimer(self.get_poller_config().calculate, calculate_dispatch_function, 'calculate_billing_timer')
|
||||
TimedQueueManager.__init__(self, config, lock_manager, 'billing', None, None, auto_start)
|
||||
self.calculate_timer = LibreNMS.RecurringTimer(self.get_poller_config().calculate,
|
||||
self.dispatch_calculate_billing, 'calculate_billing_timer')
|
||||
|
||||
def start_dispatch(self):
|
||||
"""
|
||||
|
@ -238,3 +274,176 @@ class BillingQueueManager(TimedQueueManager):
|
|||
"""
|
||||
self.calculate_timer.stop()
|
||||
TimedQueueManager.stop_dispatch(self)
|
||||
|
||||
def dispatch_calculate_billing(self):
|
||||
self.post_work('calculate', 0)
|
||||
|
||||
def do_dispatch(self):
|
||||
self.post_work('poll', 0)
|
||||
|
||||
def do_work(self, run_type, group):
|
||||
if run_type == 'poll':
|
||||
info("Polling billing")
|
||||
LibreNMS.call_script('poll-billing.php')
|
||||
else: # run_type == 'calculate'
|
||||
info("Calculating billing")
|
||||
LibreNMS.call_script('billing-calculate.php')
|
||||
|
||||
|
||||
class PingQueueManager(TimedQueueManager):
|
||||
def __init__(self, config, lock_manager, auto_start=True):
|
||||
"""
|
||||
A TimedQueueManager to manage dispatch and workers for Ping
|
||||
|
||||
:param config: LibreNMS.ServiceConfig reference to the service config object
|
||||
:param lock_manager: the single instance of lock manager
|
||||
:param auto_start: automatically start worker threads
|
||||
"""
|
||||
TimedQueueManager.__init__(self, config, lock_manager, 'ping', auto_start=auto_start)
|
||||
self._db = LibreNMS.DB(self.config)
|
||||
|
||||
def do_dispatch(self):
|
||||
try:
|
||||
groups = self._db.query("SELECT DISTINCT (`poller_group`) FROM `devices`")
|
||||
for group in groups:
|
||||
self.post_work('', group[0])
|
||||
except pymysql.err.Error:
|
||||
pass
|
||||
|
||||
def do_work(self, context, group):
|
||||
if self.lock(group, 'group', timeout=self.config.ping.frequency):
|
||||
try:
|
||||
info("Running fast ping")
|
||||
LibreNMS.call_script('ping.php', ('-g', group))
|
||||
finally:
|
||||
self.unlock(group, 'group')
|
||||
|
||||
|
||||
class ServicesQueueManager(TimedQueueManager):
|
||||
def __init__(self, config, lock_manager, auto_start=True):
|
||||
"""
|
||||
A TimedQueueManager to manage dispatch and workers for Services
|
||||
|
||||
:param config: LibreNMS.ServiceConfig reference to the service config object
|
||||
:param lock_manager: the single instance of lock manager
|
||||
:param auto_start: automatically start worker threads
|
||||
"""
|
||||
TimedQueueManager.__init__(self, config, lock_manager, 'services', auto_start=auto_start)
|
||||
self._db = LibreNMS.DB(self.config)
|
||||
|
||||
def do_dispatch(self):
|
||||
try:
|
||||
devices = self._db.query("SELECT DISTINCT(`device_id`), `poller_group` FROM `services`"
|
||||
" LEFT JOIN `devices` USING (`device_id`) WHERE `disabled`=0")
|
||||
for device in devices:
|
||||
self.post_work(device[0], device[1])
|
||||
except pymysql.err.Error:
|
||||
pass
|
||||
|
||||
def do_work(self, device_id, group):
|
||||
if self.lock(device_id, timeout=self.config.services.frequency):
|
||||
try:
|
||||
info("Checking services on device {}".format(device_id))
|
||||
LibreNMS.call_script('check-services.php', ('-h', device_id))
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 5:
|
||||
info("Device {} is down, cannot poll service, waiting {}s for retry"
|
||||
.format(device_id, self.config.down_retry))
|
||||
self.lock(device_id, allow_relock=True, timeout=self.config.down_retry)
|
||||
else:
|
||||
self.unlock(device_id)
|
||||
|
||||
|
||||
class AlertQueueManager(TimedQueueManager):
|
||||
def __init__(self, config, lock_manager, auto_start=True):
|
||||
"""
|
||||
A TimedQueueManager to manage dispatch and workers for Alerts
|
||||
|
||||
:param config: LibreNMS.ServiceConfig reference to the service config object
|
||||
:param lock_manager: the single instance of lock manager
|
||||
:param auto_start: automatically start worker threads
|
||||
"""
|
||||
TimedQueueManager.__init__(self, config, lock_manager, 'alerting', auto_start=auto_start)
|
||||
self._db = LibreNMS.DB(self.config)
|
||||
|
||||
def do_dispatch(self):
|
||||
self.post_work('alerts', 0)
|
||||
|
||||
def do_work(self, device_id, group):
|
||||
try:
|
||||
info("Checking alerts")
|
||||
LibreNMS.call_script('alerts.php')
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 1:
|
||||
warning("There was an error issuing alerts: {}".format(e.output))
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
class PollerQueueManager(QueueManager):
|
||||
def __init__(self, config, lock_manager, auto_start=True):
|
||||
"""
|
||||
A TimedQueueManager to manage dispatch and workers for Alerts
|
||||
|
||||
:param config: LibreNMS.ServiceConfig reference to the service config object
|
||||
:param lock_manager: the single instance of lock manager
|
||||
:param auto_start: automatically start worker threads
|
||||
"""
|
||||
QueueManager.__init__(self, config, lock_manager, 'poller', None, auto_start=auto_start)
|
||||
|
||||
def do_work(self, device_id, group):
|
||||
if self.lock(device_id, timeout=self.config.poller.frequency):
|
||||
info('Polling device {}'.format(device_id))
|
||||
|
||||
try:
|
||||
LibreNMS.call_script('poller.php', ('-h', device_id))
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 6:
|
||||
warning('Polling device {} unreachable, waiting {}s for retry'.format(device_id,
|
||||
self.config.down_retry))
|
||||
# re-lock to set retry timer
|
||||
self.lock(device_id, allow_relock=True, timeout=self.config.down_retry)
|
||||
else:
|
||||
error('Polling device {} failed! {}'.format(device_id, e))
|
||||
self.unlock(device_id)
|
||||
else:
|
||||
info('Polling complete {}'.format(device_id))
|
||||
self.unlock(device_id)
|
||||
else:
|
||||
debug('Tried to poll {}, but it is locked'.format(device_id))
|
||||
|
||||
|
||||
class DiscoveryQueueManager(TimedQueueManager):
|
||||
def __init__(self, config, lock_manager, auto_start=True):
|
||||
"""
|
||||
A TimedQueueManager to manage dispatch and workers for Alerts
|
||||
|
||||
:param config: LibreNMS.ServiceConfig reference to the service config object
|
||||
:param lock_manager: the single instance of lock manager
|
||||
:param auto_start: automatically start worker threads
|
||||
"""
|
||||
TimedQueueManager.__init__(self, config, lock_manager, 'discovery', None, auto_start=auto_start)
|
||||
self._db = LibreNMS.DB(self.config)
|
||||
|
||||
def do_dispatch(self):
|
||||
try:
|
||||
devices = self._db.query("SELECT `device_id`, `poller_group` FROM `devices` WHERE `disabled`=0")
|
||||
for device in devices:
|
||||
self.post_work(device[0], device[1])
|
||||
except pymysql.err.Error:
|
||||
pass
|
||||
|
||||
def do_work(self, device_id, group):
|
||||
if self.lock(device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency)):
|
||||
try:
|
||||
info("Discovering device {}".format(device_id))
|
||||
LibreNMS.call_script('discovery.php', ('-h', device_id))
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 5:
|
||||
info("Device {} is down, cannot discover, waiting {}s for retry"
|
||||
.format(device_id, self.config.down_retry))
|
||||
self.lock(device_id, allow_relock=True, timeout=self.config.down_retry)
|
||||
else:
|
||||
self.unlock(device_id)
|
||||
else:
|
||||
self.unlock(device_id)
|
||||
|
|
|
@ -3,11 +3,11 @@ import LibreNMS
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import pymysql
|
||||
import subprocess
|
||||
import threading
|
||||
import sys
|
||||
import time
|
||||
import timeit
|
||||
|
||||
from datetime import timedelta
|
||||
from logging import debug, info, warning, error, critical, exception
|
||||
|
@ -18,83 +18,6 @@ from signal import signal, SIGTERM
|
|||
from uuid import uuid1
|
||||
|
||||
|
||||
class PerformanceCounter(object):
|
||||
"""
|
||||
This is a simple counter to record execution time and number of jobs. It's unique to each
|
||||
poller instance, so does not need to be globally syncronised, just locally.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._count = 0
|
||||
self._jobs = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def add(self, n):
|
||||
"""
|
||||
Add n to the counter and increment the number of jobs by 1
|
||||
:param n: Number to increment by
|
||||
"""
|
||||
with self._lock:
|
||||
self._count += n
|
||||
self._jobs += 1
|
||||
|
||||
def split(self, precise=False):
|
||||
"""
|
||||
Return the current counter value and keep going
|
||||
:param precise: Whether floating point precision is desired
|
||||
:return: ((INT or FLOAT), INT)
|
||||
"""
|
||||
return (self._count if precise else int(self._count)), self._jobs
|
||||
|
||||
def reset(self, precise=False):
|
||||
"""
|
||||
Return the current counter value and then zero it.
|
||||
:param precise: Whether floating point precision is desired
|
||||
:return: ((INT or FLOAT), INT)
|
||||
"""
|
||||
with self._lock:
|
||||
c = self._count
|
||||
j = self._jobs
|
||||
self._count = 0
|
||||
self._jobs = 0
|
||||
|
||||
return (c if precise else int(c)), j
|
||||
|
||||
|
||||
class TimeitContext(object):
|
||||
"""
|
||||
Wrapper around timeit to allow the timing of larger blocks of code by wrapping them in "with"
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._t = timeit.default_timer()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
del self._t
|
||||
|
||||
def delta(self):
|
||||
"""
|
||||
Calculate the elapsed time since the context was initialised
|
||||
:return: FLOAT
|
||||
"""
|
||||
if not self._t:
|
||||
raise ArithmeticError("Timer has not been started, cannot return delta")
|
||||
|
||||
return timeit.default_timer() - self._t
|
||||
|
||||
@classmethod
|
||||
def start(cls):
|
||||
"""
|
||||
Factory method for TimeitContext
|
||||
:param cls:
|
||||
:return: TimeitContext
|
||||
"""
|
||||
return cls()
|
||||
|
||||
|
||||
class ServiceConfig:
|
||||
def __init__(self):
|
||||
"""
|
||||
|
@ -111,6 +34,7 @@ class ServiceConfig:
|
|||
|
||||
class PollerConfig:
|
||||
def __init__(self, workers, frequency, calculate=None):
|
||||
self.enabled = True
|
||||
self.workers = workers
|
||||
self.frequency = frequency
|
||||
self.calculate = calculate
|
||||
|
@ -127,13 +51,16 @@ class ServiceConfig:
|
|||
|
||||
debug = False
|
||||
log_level = 20
|
||||
max_db_failures = 5
|
||||
|
||||
alerting = PollerConfig(1, 60)
|
||||
poller = PollerConfig(24, 300)
|
||||
services = PollerConfig(8, 300)
|
||||
discovery = PollerConfig(16, 21600)
|
||||
billing = PollerConfig(2, 300, 60)
|
||||
ping = PollerConfig(1, 120)
|
||||
down_retry = 60
|
||||
update_enabled = True
|
||||
update_frequency = 86400
|
||||
|
||||
master_resolution = 1
|
||||
|
@ -169,16 +96,25 @@ class ServiceConfig:
|
|||
self.log_level = config.get('poller_service_loglevel', ServiceConfig.log_level)
|
||||
|
||||
# new options
|
||||
self.poller.enabled = config.get('service_poller_enabled', True) # unused
|
||||
self.poller.workers = config.get('service_poller_workers', ServiceConfig.poller.workers)
|
||||
self.poller.frequency = config.get('service_poller_frequency', ServiceConfig.poller.frequency)
|
||||
self.services.workers = config.get('service_services_workers', ServiceConfig.services.workers)
|
||||
self.services.frequency = config.get('service_services_frequency', ServiceConfig.services.frequency)
|
||||
self.discovery.enabled = config.get('service_discovery_enabled', True) # unused
|
||||
self.discovery.workers = config.get('service_discovery_workers', ServiceConfig.discovery.workers)
|
||||
self.discovery.frequency = config.get('service_discovery_frequency', ServiceConfig.discovery.frequency)
|
||||
self.services.enabled = config.get('service_services_enabled', True)
|
||||
self.services.workers = config.get('service_services_workers', ServiceConfig.services.workers)
|
||||
self.services.frequency = config.get('service_services_frequency', ServiceConfig.services.frequency)
|
||||
self.billing.enabled = config.get('service_billing_enabled', True)
|
||||
self.billing.frequency = config.get('service_billing_frequency', ServiceConfig.billing.frequency)
|
||||
self.billing.calculate = config.get('service_billing_calculate_frequency', ServiceConfig.billing.calculate)
|
||||
self.alerting.enabled = config.get('service_ping_enabled', True)
|
||||
self.alerting.frequency = config.get('service_billing_frequency', ServiceConfig.alerting.frequency)
|
||||
self.ping.enabled = config.get('service_ping_enabled', False)
|
||||
self.ping.frequency = config.get('ping_rrd_step', ServiceConfig.billing.calculate)
|
||||
self.down_retry = config.get('service_poller_down_retry', ServiceConfig.down_retry)
|
||||
self.log_level = config.get('service_loglevel', ServiceConfig.log_level)
|
||||
self.update_enabled = config.get('service_update_enabled', ServiceConfig.update_enabled)
|
||||
self.update_frequency = config.get('service_update_frequency', ServiceConfig.update_frequency)
|
||||
|
||||
self.redis_host = os.getenv('REDIS_HOST', config.get('redis_host', ServiceConfig.redis_host))
|
||||
|
@ -245,13 +181,12 @@ class Service:
|
|||
config = ServiceConfig()
|
||||
_fp = False
|
||||
_started = False
|
||||
alerting_manager = None
|
||||
queue_managers = {}
|
||||
poller_manager = None
|
||||
discovery_manager = None
|
||||
services_manager = None
|
||||
billing_manager = None
|
||||
last_poll = {}
|
||||
terminate_flag = False
|
||||
db_failures = 0
|
||||
|
||||
def __init__(self):
|
||||
self.config.populate()
|
||||
|
@ -259,18 +194,13 @@ class Service:
|
|||
|
||||
self.attach_signals()
|
||||
|
||||
# init database connections different ones for different threads
|
||||
self._db = LibreNMS.DB(self.config) # main
|
||||
self._services_db = LibreNMS.DB(self.config) # services dispatch
|
||||
self._discovery_db = LibreNMS.DB(self.config) # discovery dispatch
|
||||
self._db = LibreNMS.DB(self.config)
|
||||
|
||||
self._lm = self.create_lock_manager()
|
||||
self.daily_timer = LibreNMS.RecurringTimer(self.config.update_frequency, self.run_maintenance, 'maintenance')
|
||||
self.stats_timer = LibreNMS.RecurringTimer(self.config.poller.frequency, self.log_performance_stats, 'performance')
|
||||
self.is_master = False
|
||||
|
||||
self.performance_stats = {'poller': PerformanceCounter(), 'discovery': PerformanceCounter(), 'services': PerformanceCounter()}
|
||||
|
||||
def attach_signals(self):
|
||||
info("Attaching signal handlers on thread %s", threading.current_thread().name)
|
||||
signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully
|
||||
|
@ -288,17 +218,20 @@ class Service:
|
|||
debug("Starting up queue managers...")
|
||||
|
||||
# initialize and start the worker pools
|
||||
self.poller_manager = LibreNMS.QueueManager(self.config, 'poller', self.poll_device)
|
||||
self.alerting_manager = LibreNMS.TimedQueueManager(self.config, 'alerting', self.poll_alerting,
|
||||
self.dispatch_alerting)
|
||||
self.services_manager = LibreNMS.TimedQueueManager(self.config, 'services', self.poll_services,
|
||||
self.dispatch_services)
|
||||
self.discovery_manager = LibreNMS.TimedQueueManager(self.config, 'discovery', self.discover_device,
|
||||
self.dispatch_discovery)
|
||||
self.billing_manager = LibreNMS.BillingQueueManager(self.config, self.poll_billing,
|
||||
self.dispatch_poll_billing, self.dispatch_calculate_billing)
|
||||
|
||||
self.daily_timer.start()
|
||||
self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm)
|
||||
self.queue_managers['poller'] = self.poller_manager
|
||||
self.discovery_manager = LibreNMS.DiscoveryQueueManager(self.config, self._lm)
|
||||
self.queue_managers['discovery'] = self.discovery_manager
|
||||
if self.config.alerting.enabled:
|
||||
self.queue_managers['alerting'] = LibreNMS.AlertQueueManager(self.config, self._lm)
|
||||
if self.config.services.enabled:
|
||||
self.queue_managers['services'] = LibreNMS.ServicesQueueManager(self.config, self._lm)
|
||||
if self.config.billing.enabled:
|
||||
self.queue_managers['billing'] = LibreNMS.BillingQueueManager(self.config, self._lm)
|
||||
if self.config.ping.enabled:
|
||||
self.queue_managers['ping'] = LibreNMS.PingQueueManager(self.config, self._lm)
|
||||
if self.config.update_enabled:
|
||||
self.daily_timer.start()
|
||||
self.stats_timer.start()
|
||||
|
||||
info("LibreNMS Service: {} started!".format(self.config.unique_name))
|
||||
|
@ -310,7 +243,7 @@ class Service:
|
|||
# Main dispatcher loop
|
||||
try:
|
||||
while not self.terminate_flag:
|
||||
master_lock = self._lm.lock('dispatch.master', self.config.unique_name, self.config.master_timeout, True)
|
||||
master_lock = self._acquire_master()
|
||||
if master_lock:
|
||||
if not self.is_master:
|
||||
info("{} is now the master dispatcher".format(self.config.name))
|
||||
|
@ -339,148 +272,57 @@ class Service:
|
|||
info("Dispatch loop terminated")
|
||||
self.shutdown()
|
||||
|
||||
def _acquire_master(self):
|
||||
return self._lm.lock('dispatch.master', self.config.unique_name, self.config.master_timeout, True)
|
||||
|
||||
def _release_master(self):
|
||||
self._lm.unlock('dispatch.master', self.config.unique_name)
|
||||
|
||||
# ------------ Discovery ------------
|
||||
def dispatch_immediate_discovery(self, device_id, group):
|
||||
if self.discovery_manager.get_queue(group).empty() and not self.discovery_is_locked(device_id):
|
||||
if not self.discovery_manager.is_locked(device_id):
|
||||
self.discovery_manager.post_work(device_id, group)
|
||||
|
||||
def dispatch_discovery(self):
|
||||
devices = self.fetch_device_list()
|
||||
for device in devices:
|
||||
self.discovery_manager.post_work(device[0], device[1])
|
||||
|
||||
def discover_device(self, device_id):
|
||||
if self.lock_discovery(device_id):
|
||||
try:
|
||||
with TimeitContext.start() as t:
|
||||
info("Discovering device {}".format(device_id))
|
||||
self.call_script('discovery.php', ('-h', device_id))
|
||||
info('Discovery complete {}'.format(device_id))
|
||||
self.report_execution_time(t.delta(), 'discovery')
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 5:
|
||||
info("Device {} is down, cannot discover, waiting {}s for retry"
|
||||
.format(device_id, self.config.down_retry))
|
||||
self.lock_discovery(device_id, True)
|
||||
else:
|
||||
self.unlock_discovery(device_id)
|
||||
else:
|
||||
self.unlock_discovery(device_id)
|
||||
|
||||
# ------------ Alerting ------------
|
||||
def dispatch_alerting(self):
|
||||
self.alerting_manager.post_work('alerts', 0)
|
||||
|
||||
def poll_alerting(self, _=None):
|
||||
try:
|
||||
info("Checking alerts")
|
||||
self.call_script('alerts.php')
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 1:
|
||||
warning("There was an error issuing alerts: {}".format(e.output))
|
||||
else:
|
||||
raise
|
||||
|
||||
# ------------ Services ------------
|
||||
def dispatch_services(self):
|
||||
devices = self.fetch_services_device_list()
|
||||
for device in devices:
|
||||
self.services_manager.post_work(device[0], device[1])
|
||||
|
||||
def poll_services(self, device_id):
|
||||
if self.lock_services(device_id):
|
||||
try:
|
||||
with TimeitContext.start() as t:
|
||||
info("Checking services on device {}".format(device_id))
|
||||
self.call_script('check-services.php', ('-h', device_id))
|
||||
info('Services complete {}'.format(device_id))
|
||||
self.report_execution_time(t.delta(), 'services')
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 5:
|
||||
info("Device {} is down, cannot poll service, waiting {}s for retry"
|
||||
.format(device_id, self.config.down_retry))
|
||||
self.lock_services(device_id, True)
|
||||
else:
|
||||
self.unlock_services(device_id)
|
||||
else:
|
||||
self.unlock_services(device_id)
|
||||
|
||||
# ------------ Billing ------------
|
||||
def dispatch_calculate_billing(self):
|
||||
self.billing_manager.post_work('calculate', 0)
|
||||
|
||||
def dispatch_poll_billing(self):
|
||||
self.billing_manager.post_work('poll', 0)
|
||||
|
||||
def poll_billing(self, run_type):
|
||||
if run_type == 'poll':
|
||||
info("Polling billing")
|
||||
self.call_script('poll-billing.php')
|
||||
info("Polling billing complete")
|
||||
else: # run_type == 'calculate'
|
||||
info("Calculating billing")
|
||||
self.call_script('billing-calculate.php')
|
||||
info("Calculating billing complete")
|
||||
|
||||
# ------------ Polling ------------
|
||||
def dispatch_immediate_polling(self, device_id, group):
|
||||
if self.poller_manager.get_queue(group).empty() and not self.polling_is_locked(device_id):
|
||||
if not self.poller_manager.is_locked(device_id):
|
||||
self.poller_manager.post_work(device_id, group)
|
||||
|
||||
if self.config.debug:
|
||||
cur_time = time.time()
|
||||
elapsed = cur_time - self.last_poll.get(device_id, cur_time)
|
||||
self.last_poll[device_id] = time.time()
|
||||
self.last_poll[device_id] = cur_time
|
||||
# arbitrary limit to reduce spam
|
||||
if elapsed > (self.config.poller.frequency - self.config.master_resolution):
|
||||
debug("Dispatching polling for device {}, time since last poll {:.2f}s"
|
||||
.format(device_id, elapsed))
|
||||
|
||||
def poll_device(self, device_id):
|
||||
if self.lock_polling(device_id):
|
||||
info('Polling device {}'.format(device_id))
|
||||
|
||||
try:
|
||||
with TimeitContext.start() as t:
|
||||
self.call_script('poller.php', ('-h', device_id))
|
||||
self.report_execution_time(t.delta(), 'poller')
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 6:
|
||||
warning('Polling device {} unreachable, waiting {}s for retry'.format(device_id, self.config.down_retry))
|
||||
# re-lock to set retry timer
|
||||
self.lock_polling(device_id, True)
|
||||
else:
|
||||
error('Polling device {} failed! {}'.format(device_id, e))
|
||||
self.unlock_polling(device_id)
|
||||
else:
|
||||
info('Polling complete {}'.format(device_id))
|
||||
# self.polling_unlock(device_id)
|
||||
else:
|
||||
debug('Tried to poll {}, but it is locked'.format(device_id))
|
||||
|
||||
def fetch_services_device_list(self):
|
||||
return self._services_db.query("SELECT DISTINCT(`device_id`), `poller_group` FROM `services`"
|
||||
" LEFT JOIN `devices` USING (`device_id`) WHERE `disabled`=0")
|
||||
|
||||
def fetch_device_list(self):
|
||||
return self._discovery_db.query("SELECT `device_id`, `poller_group` FROM `devices` WHERE `disabled`=0")
|
||||
|
||||
def fetch_immediate_device_list(self):
|
||||
poller_find_time = self.config.poller.frequency - 1
|
||||
discovery_find_time = self.config.discovery.frequency - 1
|
||||
try:
|
||||
poller_find_time = self.config.poller.frequency - 1
|
||||
discovery_find_time = self.config.discovery.frequency - 1
|
||||
|
||||
return self._db.query('''SELECT `device_id`,
|
||||
`poller_group`,
|
||||
COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND), 1) AS `poll`,
|
||||
COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1) AS `discover`
|
||||
FROM `devices`
|
||||
WHERE `disabled` = 0 AND (
|
||||
`last_polled` IS NULL OR
|
||||
`last_discovered` IS NULL OR
|
||||
`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND) OR
|
||||
`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND)
|
||||
)
|
||||
ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, discovery_find_time, poller_find_time, discovery_find_time))
|
||||
result = self._db.query('''SELECT `device_id`,
|
||||
`poller_group`,
|
||||
COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND), 1) AS `poll`,
|
||||
IF(snmp_disable=1 OR status=0, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1)) AS `discover`
|
||||
FROM `devices`
|
||||
WHERE `disabled` = 0 AND (
|
||||
`last_polled` IS NULL OR
|
||||
`last_discovered` IS NULL OR
|
||||
`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND) OR
|
||||
`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND)
|
||||
)
|
||||
ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, discovery_find_time, poller_find_time, discovery_find_time))
|
||||
self.db_failures = 0
|
||||
return result
|
||||
except pymysql.err.Error:
|
||||
self.db_failures += 1
|
||||
if self.db_failures > self.config.max_db_failures:
|
||||
warning("Too many DB failures ({}), attempting to release master".format(self.db_failures))
|
||||
self._release_master()
|
||||
sleep(self.config.master_resolution) # sleep to give another node a chance to acquire
|
||||
return []
|
||||
|
||||
def run_maintenance(self):
|
||||
"""
|
||||
|
@ -500,76 +342,11 @@ class Service:
|
|||
sleep(wait)
|
||||
|
||||
info("Running maintenance tasks")
|
||||
output = self.call_script('daily.sh')
|
||||
output = LibreNMS.call_script('daily.sh')
|
||||
info("Maintenance tasks complete\n{}".format(output))
|
||||
|
||||
self.restart()
|
||||
|
||||
# Lock Helpers #
|
||||
def lock_discovery(self, device_id, retry=False):
|
||||
lock_name = self.gen_lock_name('discovery', device_id)
|
||||
timeout = self.config.down_retry if retry else LibreNMS.normalize_wait(self.config.discovery.frequency)
|
||||
return self._lm.lock(lock_name, self.gen_lock_owner(), timeout, retry)
|
||||
|
||||
def unlock_discovery(self, device_id):
|
||||
lock_name = self.gen_lock_name('discovery', device_id)
|
||||
return self._lm.unlock(lock_name, self.gen_lock_owner())
|
||||
|
||||
def discovery_is_locked(self, device_id):
|
||||
lock_name = self.gen_lock_name('discovery', device_id)
|
||||
return self._lm.check_lock(lock_name)
|
||||
|
||||
def lock_polling(self, device_id, retry=False):
|
||||
lock_name = self.gen_lock_name('polling', device_id)
|
||||
timeout = self.config.down_retry if retry else self.config.poller.frequency
|
||||
return self._lm.lock(lock_name, self.gen_lock_owner(), timeout, retry)
|
||||
|
||||
def unlock_polling(self, device_id):
|
||||
lock_name = self.gen_lock_name('polling', device_id)
|
||||
return self._lm.unlock(lock_name, self.gen_lock_owner())
|
||||
|
||||
def polling_is_locked(self, device_id):
|
||||
lock_name = self.gen_lock_name('polling', device_id)
|
||||
return self._lm.check_lock(lock_name)
|
||||
|
||||
def lock_services(self, device_id, retry=False):
|
||||
lock_name = self.gen_lock_name('services', device_id)
|
||||
timeout = self.config.down_retry if retry else self.config.services.frequency
|
||||
return self._lm.lock(lock_name, self.gen_lock_owner(), timeout, retry)
|
||||
|
||||
def unlock_services(self, device_id):
|
||||
lock_name = self.gen_lock_name('services', device_id)
|
||||
return self._lm.unlock(lock_name, self.gen_lock_owner())
|
||||
|
||||
def services_is_locked(self, device_id):
|
||||
lock_name = self.gen_lock_name('services', device_id)
|
||||
return self._lm.check_lock(lock_name)
|
||||
|
||||
@staticmethod
|
||||
def gen_lock_name(lock_class, device_id):
|
||||
return '{}.device.{}'.format(lock_class, device_id)
|
||||
|
||||
def gen_lock_owner(self):
|
||||
return "{}-{}".format(self.config.unique_name, threading.current_thread().name)
|
||||
|
||||
def call_script(self, script, args=()):
|
||||
"""
|
||||
Run a LibreNMS script. Captures all output and throws an exception if a non-zero
|
||||
status is returned. Blocks parent signals (like SIGINT and SIGTERM).
|
||||
:param script: the name of the executable relative to the base directory
|
||||
:param args: a tuple of arguments to send to the command
|
||||
:returns the output of the command
|
||||
"""
|
||||
if script.endswith('.php'):
|
||||
# save calling the sh process
|
||||
base = ('/usr/bin/env', 'php')
|
||||
else:
|
||||
base = ()
|
||||
|
||||
cmd = base + ("{}/{}".format(self.config.BASE_DIR, script),) + tuple(map(str, args))
|
||||
# preexec_fn=os.setsid here keeps process signals from propagating
|
||||
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, preexec_fn=os.setsid, close_fds=True).decode()
|
||||
|
||||
def create_lock_manager(self):
|
||||
"""
|
||||
Create a new LockManager. Tries to create a Redis LockManager, but falls
|
||||
|
@ -609,7 +386,7 @@ class Service:
|
|||
|
||||
info('Restarting service... ')
|
||||
self._stop_managers_and_wait()
|
||||
self._lm.unlock('dispatch.master', self.config.unique_name)
|
||||
self._release_master()
|
||||
|
||||
python = sys.executable
|
||||
os.execl(python, python, *sys.argv)
|
||||
|
@ -632,7 +409,7 @@ class Service:
|
|||
info('Shutting down, waiting for running jobs to complete...')
|
||||
|
||||
self.stop_dispatch_timers()
|
||||
self._lm.unlock('dispatch.master', self.config.unique_name)
|
||||
self._release_master()
|
||||
|
||||
self.daily_timer.stop()
|
||||
self.stats_timer.stop()
|
||||
|
@ -648,34 +425,32 @@ class Service:
|
|||
Start all dispatch timers and begin pushing events into queues.
|
||||
This should only be started when we are the master dispatcher.
|
||||
"""
|
||||
self.alerting_manager.start_dispatch()
|
||||
self.billing_manager.start_dispatch()
|
||||
self.services_manager.start_dispatch()
|
||||
self.discovery_manager.start_dispatch()
|
||||
for manager in self.queue_managers.values():
|
||||
try:
|
||||
manager.start_dispatch()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def stop_dispatch_timers(self):
|
||||
"""
|
||||
Stop all dispatch timers, this should be called when we are no longer the master dispatcher.
|
||||
"""
|
||||
self.alerting_manager.stop_dispatch()
|
||||
self.billing_manager.stop_dispatch()
|
||||
self.services_manager.stop_dispatch()
|
||||
self.discovery_manager.stop_dispatch()
|
||||
for manager in self.queue_managers.values():
|
||||
try:
|
||||
manager.stop_dispatch()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def _stop_managers_and_wait(self):
|
||||
"""
|
||||
Stop all QueueManagers, and wait for their processing threads to complete.
|
||||
We send the stop signal to all QueueManagers first, then wait for them to finish.
|
||||
"""
|
||||
self.discovery_manager.stop()
|
||||
self.poller_manager.stop()
|
||||
self.services_manager.stop()
|
||||
self.billing_manager.stop()
|
||||
for manager in self.queue_managers.values():
|
||||
manager.stop()
|
||||
|
||||
self.discovery_manager.stop_and_wait()
|
||||
self.poller_manager.stop_and_wait()
|
||||
self.services_manager.stop_and_wait()
|
||||
self.billing_manager.stop_and_wait()
|
||||
for manager in self.queue_managers.values():
|
||||
manager.stop_and_wait()
|
||||
|
||||
def check_single_instance(self):
|
||||
"""
|
||||
|
@ -694,9 +469,6 @@ class Service:
|
|||
warning("Another instance is already running, quitting.")
|
||||
exit(2)
|
||||
|
||||
def report_execution_time(self, time, activity):
|
||||
self.performance_stats[activity].add(time)
|
||||
|
||||
def log_performance_stats(self):
|
||||
info("Counting up time spent polling")
|
||||
|
||||
|
@ -710,19 +482,19 @@ class Service:
|
|||
# Find our ID
|
||||
self._db.query('SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format(self.config.node_id))
|
||||
|
||||
for worker_type, counter in self.performance_stats.items():
|
||||
worker_seconds, devices = counter.reset()
|
||||
for worker_type, manager in self.queue_managers.items():
|
||||
worker_seconds, devices = manager.performance.reset()
|
||||
|
||||
# Record the queue state
|
||||
self._db.query('INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) '
|
||||
'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) '
|
||||
'ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; '
|
||||
.format(worker_type,
|
||||
sum([getattr(self, ''.join([worker_type, '_manager'])).get_queue(group).qsize() for group in self.config.group]),
|
||||
sum([manager.get_queue(group).qsize() for group in self.config.group]),
|
||||
devices,
|
||||
worker_seconds,
|
||||
getattr(self.config, worker_type).workers,
|
||||
getattr(self.config, worker_type).frequency)
|
||||
)
|
||||
except Exception:
|
||||
except pymysql.err.Error:
|
||||
exception("Unable to log performance statistics - is the database still online?")
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
source: Extensions/Dispatcher-Service.md
|
||||
path: blob/master/doc/
|
||||
|
||||
# Dispatcher Service
|
||||
|
||||
> Status: Release Candidate
|
||||
|
||||
The new LibreNMS dispatcher service (`librenms-service.py`) replaces the old poller service (`poller-service.py`), improving its reliability. It's mostly a drop in replacement for the old service, but testing is recommended before switching over.
|
||||
|
||||
If you are currently using the old poller service, it's strongly recommended that you migrate away - it has a serious defect under certain versions of mysql/mariadb, and may be inadvertently DoS'ing your devices. The new service does not have this issue,
|
||||
|
||||
Make sure you uninstall the old poller service before deploying the new service.
|
||||
|
||||
The dispatcher does not replace the php scripts, but the cron entries running them. It attempts to do a better job than simple time based scheduling.
|
||||
|
||||
## External Requirements
|
||||
#### A recent version of Python
|
||||
The LibreNMS service requires Python 3 and some features require behaviour only found in Python3.4+.
|
||||
|
||||
#### Python modules
|
||||
- PyMySQL is recommended as it requires no C compiler to install. MySQLclient can also be used, but does require compilation.
|
||||
- python-dotenv .env loader
|
||||
- redis-py 3.0+ and Redis 5.0+ (if using distributed polling)
|
||||
|
||||
These can be obtained from your OS package manager, or from PyPI with the below commands.
|
||||
```bash
|
||||
pip3 install -r requirements.txt
|
||||
```
|
||||
|
||||
#### Redis (distributed polling)
|
||||
If you want to use distributed polling, you'll need a Redis instance to coordinate the nodes.
|
||||
It's recommended that you do not share the Redis database with any other system - by default, Redis supports up to 16 databases (numbered 0-15).
|
||||
You can also use Redis on a single host if you want
|
||||
|
||||
It's strongly recommended that you deploy a resilient cluster of redis systems, and use redis-sentinel.
|
||||
|
||||
You should not rely on the password for the security of your system. See https://redis.io/topics/security
|
||||
|
||||
|
||||
#### MySQL
|
||||
You should already have this, but the pollers do need access to the SQL database. The LibreNMS service runs faster and more aggressively than the standard poller, so keep an eye on the number of open connections and other important health metrics.
|
||||
|
||||
## Configuration
|
||||
|
||||
Connection settings are required in `.env`. The `.env` file is generated after composer install and APP_KEY and NODE_ID are set.
|
||||
|
||||
```dotenv
|
||||
#APP_KEY= #Required, generated by composer install
|
||||
#NODE_ID= #Required, generated by composer install
|
||||
|
||||
DB_HOST=localhost
|
||||
DB_DATABASE=librenms
|
||||
DB_USERNAME=librenms
|
||||
DB_PASSWORD=
|
||||
```
|
||||
|
||||
### Distributed Polling Configuration
|
||||
|
||||
Once you have your Redis database set up, configure it in the .env file on each node.
|
||||
|
||||
```dotenv
|
||||
REDIS_HOST=127.0.0.1
|
||||
#REDIS_DB=0
|
||||
#REDIS_PASSWORD=
|
||||
#REDIS_PORT=6379
|
||||
```
|
||||
|
||||
### Basic Configuration
|
||||
|
||||
Additional configuration settings can be set in `config.php` or directly into the database.
|
||||
|
||||
The defaults are shown here - it's recommended that you at least tune the number of workers.
|
||||
|
||||
```php
|
||||
$config['service_poller_workers'] = 24; # Processes spawned for polling
|
||||
$config['service_services_workers'] = 8; # Processes spawned for service polling
|
||||
$config['service_discovery_workers'] = 16; # Processes spawned for discovery
|
||||
|
||||
|
||||
//Optional Settings
|
||||
$config['service_poller_frequency'] = 300; # Seconds between polling attempts
|
||||
$config['service_services_frequency'] = 300; # Seconds between service polling attempts
|
||||
$config['service_discovery_frequency'] = 21600; # Seconds between discovery runs
|
||||
$config['service_billing_frequency'] = 300; # Seconds between billing calculations
|
||||
$config['service_billing_calculate_frequency'] = 60; # Billing interval
|
||||
$config['service_poller_down_retry'] = 60; # Seconds between failed polling attempts
|
||||
$config['service_loglevel'] = 'INFO'; # Must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
|
||||
$config['service_update_frequency'] = 86400; # Seconds between LibreNMS update checks
|
||||
```
|
||||
|
||||
There are also some SQL options, but these should be inherited from your LibreNMS web UI configuration.
|
||||
|
||||
Logs are sent to the system logging service (usually `journald` or `rsyslog`) - see https://docs.python.org/3/library/logging.html#logging-levels for the options available.
|
||||
|
||||
|
||||
```php
|
||||
distributed_poller = true; # Set to true to enable distributed polling
|
||||
distributed_poller_name = null; # Uniquely identifies the poller instance
|
||||
distributed_poller_group = 0; # Which group to poll
|
||||
```
|
||||
|
||||
## Fast Ping
|
||||
The [fast ping](Fast-Ping-Check.md) scheduler is disabled by default. You can enable it by setting the following:
|
||||
```php
|
||||
$config['service_ping_enabled'] = true;
|
||||
```
|
||||
|
||||
## Cron Scripts
|
||||
Once the LibreNMS service is installed, the cron scripts used by LibreNMS are no longer required and must be removed.
|
||||
|
||||
## Service Installation
|
||||
A systemd unit file is provided - the sysv and upstart init scripts could also be used with a little modification.
|
||||
|
||||
### systemd
|
||||
A systemd unit file can be found in `misc/librenms.service`. To install run `cp /opt/librenms/misc/librenms.service /etc/systemd/system/librenms.service && systemctl enable --now librenms.service`
|
||||
|
||||
## OS-Specific Instructions
|
||||
|
||||
### RHEL/CentOS
|
||||
To get the LibreNMS service running under python3.4+ on RHEL-derivatives with minimal fuss, you can use the software collections build:
|
||||
|
||||
First, enable SCL's on your system:
|
||||
|
||||
#### CentOS 7
|
||||
```
|
||||
# yum install centos-release-scl
|
||||
```
|
||||
|
||||
#### RHEL 7
|
||||
```
|
||||
# subscription-manager repos --enable rhel-server-rhscl-7-rpms
|
||||
```
|
||||
|
||||
Then install and configure the runtime and service:
|
||||
|
||||
```
|
||||
# yum install rh-python36 epel-release
|
||||
# yum install redis
|
||||
# vi /opt/librenms/config.php
|
||||
# vi /etc/redis.conf
|
||||
# systemctl enable --now redis.service
|
||||
# scl enable rh-python36 bash
|
||||
# pip install pymysql redis
|
||||
# cp /opt/librenms/misc/librenms.service.scl /etc/systemd/system/librenms.service
|
||||
# systemctl enable --now librenms.service
|
||||
```
|
||||
|
||||
If you want to use another version of python 3, change `rh-python36` in the unit file and the commands above to match the name of the replacement scl.
|
||||
|
||||
### Debian/Ubuntu
|
||||
|
||||
#### Debian 9 (stretch)
|
||||
|
||||
install python3 and python-mysqldb. python-dotenv is not yes available, but the testing package is working fine, you can grab it on https://packages.debian.org/fr/buster/all/python3-dotenv/download (the package may be updated and have a new version number).
|
||||
|
||||
```
|
||||
apt install python3 python-mysqldb
|
||||
cd /tmp
|
||||
wget http://ftp.fr.debian.org/debian/pool/main/p/python-dotenv/python3-dotenv_0.9.1-1_all.deb
|
||||
dpkg -i python3-dotenv_0.9.1-1_all.deb
|
||||
```
|
|
@ -1,150 +1,3 @@
|
|||
source: Extensions/Poller-Service.md
|
||||
path: blob/master/doc/
|
||||
# Poller Service
|
||||
|
||||
> Status: BETA
|
||||
|
||||
The new LibreNMS service (`librenms-service.py`) replaces the old poller service (`poller-service.py`), improving its reliability. It's mostly a drop in replacement for the old service, but testing is recommended before switching over.
|
||||
|
||||
If you are currently using the old poller service, it's strongly recommended that you migrate away - it has a serious defect under certain versions of mysql/mariadb, and may be inadvertently DoS'ing your devices. The new service does not have this issue,
|
||||
|
||||
Make sure you uninstall the old poller service before deploying the new service.
|
||||
|
||||
## External Requirements
|
||||
#### A recent version of Python
|
||||
The LibreNMS service won't work under Python 2.7+; some features require behaviour only found in Python3.4+.
|
||||
|
||||
#### Python modules
|
||||
- PyMySQL is recommended as it requires no C compiler to install. MySQLclient can also be used, but does require compilation.
|
||||
- python-dotenv .env loader
|
||||
- redis-py (if using distributed polling)
|
||||
|
||||
These can be obtained from your OS package manager, or from PyPI with the below commands. (You ma)
|
||||
```bash
|
||||
pip3 install -r requirements.txt
|
||||
```
|
||||
|
||||
#### Redis (distributed polling only)
|
||||
If you want to use distributed polling, you'll need a redis instance to coordinate the nodes. It's recommeded that you do not share the redis database with any other system - by default, redis supports up to 16 databases (numbered 0-15).
|
||||
|
||||
It's strongly recommended that you deploy a resilient cluster of redis systems, and use redis-sentinel.
|
||||
|
||||
#### MySQL
|
||||
You should already have this, but the pollers do need access to the SQL database. The LibreNMS service runs much faster and more aggressively than the standard poller, so keep an eye on the number of open connections and other important health metrics.
|
||||
|
||||
## Configuration
|
||||
|
||||
Connection settings are required in `.env`. The `.env` file is generated after composer install and APP_KEY and NODE_ID are set.
|
||||
|
||||
```dotenv
|
||||
#APP_KEY= #Required, generated by composer install
|
||||
#NODE_ID= #Required, generated by composer install
|
||||
|
||||
DB_HOST=localhost
|
||||
DB_DATABASE=librenms
|
||||
DB_USERNAME=librenms
|
||||
DB_PASSWORD=
|
||||
```
|
||||
|
||||
### Distributed Polling Configuration
|
||||
|
||||
Once you have your redis database set up, configure it in the .env file on each node.
|
||||
|
||||
```dotenv
|
||||
REDIS_HOST=127.0.0.1
|
||||
#REDIS_DB=0
|
||||
#REDIS_PASSWORD=
|
||||
#REDIS_PORT=6379
|
||||
```
|
||||
|
||||
### Basic Configuration
|
||||
|
||||
Additional configuration settings can be set in `config.php` or directly into the database.
|
||||
|
||||
The defaults are shown here - it's recommended that you at least tune the number of workers.
|
||||
|
||||
```php
|
||||
$config['service_poller_workers'] = 24; # Processes spawned for polling
|
||||
$config['service_services_workers'] = 8; # Processes spawned for service polling
|
||||
$config['service_discovery_workers'] = 16; # Processes spawned for discovery
|
||||
|
||||
|
||||
//Optional Settings
|
||||
$config['service_poller_frequency'] = 300; # Seconds between polling attempts
|
||||
$config['service_services_frequency'] = 300; # Seconds between service polling attempts
|
||||
$config['service_discovery_frequency'] = 21600; # Seconds between discovery runs
|
||||
$config['service_billing_frequency'] = 300; # Seconds between billing calculations
|
||||
$config['service_billing_calculate_frequency'] = 60; # Billing interval
|
||||
$config['service_poller_down_retry'] = 60; # Seconds between failed polling attempts
|
||||
$config['service_loglevel'] = 'INFO'; # Must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
|
||||
$config['service_update_frequency'] = 86400; # Seconds between LibreNMS update checks
|
||||
```
|
||||
|
||||
There are also some SQL options, but these should be inherited from your LibreNMS web UI configuration.
|
||||
|
||||
Logs are sent to the system logging service (usually `journald` or `rsyslog`) - see https://docs.python.org/3/library/logging.html#logging-levels for the options available.
|
||||
|
||||
|
||||
|
||||
You should not rely on the password for the security of your system. See https://redis.io/topics/security
|
||||
|
||||
```php
|
||||
distributed_poller = true; # Set to true to enable distributed polling
|
||||
distributed_poller_name = null; # Uniquely identifies the poller instance
|
||||
distributed_poller_group = 0; # Which group to poll
|
||||
```
|
||||
|
||||
## Cron Scripts
|
||||
Once the LibreNMS service is installed, the cron scripts used by LibreNMS are no longer required and must be removed.
|
||||
|
||||
## Service Installation
|
||||
A systemd unit file is provided - the sysv and upstart init scripts could also be used with a little modification.
|
||||
|
||||
### systemd
|
||||
A systemd unit file can be found in `misc/librenms.service`. To install run `cp /opt/librenms/misc/librenms.service /etc/systemd/system/librenms.service && systemctl enable --now librenms.service`
|
||||
|
||||
## OS-Specific Instructions
|
||||
|
||||
### RHEL/CentOS
|
||||
To get the LibreNMS service running under python3.4+ on RHEL-derivatives with minimal fuss, you can use the software collections build:
|
||||
|
||||
First, enable SCL's on your system:
|
||||
|
||||
#### CentOS 7
|
||||
```
|
||||
# yum install centos-release-scl
|
||||
```
|
||||
|
||||
#### RHEL 7
|
||||
```
|
||||
# subscription-manager repos --enable rhel-server-rhscl-7-rpms
|
||||
```
|
||||
|
||||
Then install and configure the runtime and service:
|
||||
|
||||
```
|
||||
# yum install rh-python36 epel-release
|
||||
# yum install redis
|
||||
# vi /opt/librenms/config.php
|
||||
# vi /etc/redis.conf
|
||||
# systemctl enable --now redis.service
|
||||
# scl enable rh-python36 bash
|
||||
# pip install pymysql redis
|
||||
# cp /opt/librenms/misc/librenms.service.scl /etc/systemd/system/librenms.service
|
||||
# systemctl enable --now librenms.service
|
||||
```
|
||||
|
||||
If you want to use another version of python 3, change `rh-python36` in the unit file and the commands above to match the name of the replacement scl.
|
||||
|
||||
### Debian/Ubuntu
|
||||
|
||||
#### Debian 9 (stretch)
|
||||
|
||||
install python3 and python-mysqldb. python-dotenv is not yes available, but the testing package is working fine, you can grab it on https://packages.debian.org/fr/buster/all/python3-dotenv/download (the package may be updated and have a new version number).
|
||||
|
||||
```
|
||||
apt install python3 python-mysqldb
|
||||
cd /tmp
|
||||
wget http://ftp.fr.debian.org/debian/pool/main/p/python-dotenv/python3-dotenv_0.9.1-1_all.deb
|
||||
dpkg -i python3-dotenv_0.9.1-1_all.deb
|
||||
```
|
||||
<meta http-equiv="refresh" content="0; url=/Extensions/Dispatcher-Service/" />
|
||||
|
|
|
@ -53,10 +53,15 @@
|
|||
<description>As described previously, Legacy Alert Templates and Transports have been removed. You can find more info here: https://community.librenms.org/t/deprecation-notice-alerting-legacy-transports-and-templates/5915</description>
|
||||
<pubDate>Fri, 15 Feb 2017 23:00:00 +0000</pubDate>
|
||||
</item>
|
||||
<item>
|
||||
<title>Cisco Temperature Sensor Threshold Values Stored as High Limits</title>
|
||||
<description>During discovery, some temperature sensors discovered on Cisco devices had their highest temperature value stored as warning high limit. For new discovered devices, this value is now stored as high limit.</description>
|
||||
<pubDate>Sun, 17 Mar 2019 21:00:00 +0100</pubDate>
|
||||
</item>
|
||||
<item>
|
||||
<title>Cisco Temperature Sensor Threshold Values Stored as High Limits</title>
|
||||
<description>During discovery, some temperature sensors discovered on Cisco devices had their highest temperature value stored as warning high limit. For new discovered devices, this value is now stored as high limit.</description>
|
||||
<pubDate>Sun, 17 Mar 2019 21:00:00 +0100</pubDate>
|
||||
</item>
|
||||
<item>
|
||||
<title>LibreNMS Python Dispatcher Service V2 (breaking change)</title>
|
||||
<description>Version two of the LibreNMS python dispatcher service is merged. This code now requires Redis 5.x and redis-py 3.x to operate (if using distributed polling). If you are using the dispatcher service with Redis and have not updated, polling will stop functioning.</description>
|
||||
<pubDate>Mon, 20 May 2019 00:00:00 +0000</pubDate>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>
|
||||
|
|
|
@ -70,7 +70,7 @@ nav:
|
|||
- Extensions/RRDCached.md
|
||||
- Sub-directory Support: Extensions/Sub-Directory.md
|
||||
- Extensions/Varnish.md
|
||||
- Poller Service (BETA): Extensions/Poller-Service.md
|
||||
- Dispatcher Service (RC): Extensions/Dispatcher-Service.md
|
||||
- Extensions/RRDTune.md
|
||||
- Extensions/IRC-Bot.md
|
||||
- Extensions/IRC-Bot-Extensions.md
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
PyMySQL
|
||||
python-dotenv
|
||||
redis
|
||||
redis>=3.0
|
||||
|
|
|
@ -81,7 +81,7 @@ class TestLocks(unittest.TestCase):
|
|||
else:
|
||||
rc = redis.Redis()
|
||||
rc.delete('queue:testing') # make sure no previous data exists
|
||||
qm = LibreNMS.RedisQueue('testing', namespace='queue')
|
||||
qm = LibreNMS.RedisUniqueQueue('testing', namespace='queue')
|
||||
|
||||
thread = threading.Thread(target=self.queue_thread, args=(qm, None, False))
|
||||
thread.daemon = True
|
||||
|
|
Loading…
Reference in New Issue