Format python code with Black (#12663)

This commit is contained in:
Jellyfrog 2021-03-28 18:02:33 +02:00 committed by GitHub
parent f9b25ccdbc
commit 9946fe8b15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1295 additions and 586 deletions

View File

@ -21,6 +21,8 @@ jobs:
VALIDATE_BASH: true
VALIDATE_PHP_BUILTIN: true
VALIDATE_PYTHON_PYLINT: true
VALIDATE_PYTHON_BLACK: true
VALIDATE_ALL_CODEBASE: false
DEFAULT_BRANCH: master
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@ -10,8 +10,16 @@ from queue import Queue
from time import time
from .service import Service, ServiceConfig
from .queuemanager import QueueManager, TimedQueueManager, BillingQueueManager, PingQueueManager, ServicesQueueManager, \
AlertQueueManager, PollerQueueManager, DiscoveryQueueManager
from .queuemanager import (
QueueManager,
TimedQueueManager,
BillingQueueManager,
PingQueueManager,
ServicesQueueManager,
AlertQueueManager,
PollerQueueManager,
DiscoveryQueueManager,
)
def normalize_wait(seconds):
@ -26,9 +34,9 @@ def call_script(script, args=()):
:param args: a tuple of arguments to send to the command
:returns the output of the command
"""
if script.endswith('.php'):
if script.endswith(".php"):
# save calling the sh process
base = ('/usr/bin/env', 'php')
base = ("/usr/bin/env", "php")
else:
base = ()
@ -36,7 +44,13 @@ def call_script(script, args=()):
cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args))
debug("Running {}".format(cmd))
# preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default)
return subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, preexec_fn=os.setsid, close_fds=True)
return subprocess.check_call(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
close_fds=True,
)
class DB:
@ -54,6 +68,7 @@ class DB:
def connect(self):
try:
import pymysql
pymysql.install_as_MySQLdb()
info("Using pure python SQL client")
except ImportError:
@ -63,19 +78,21 @@ class DB:
import MySQLdb
except ImportError:
critical("ERROR: missing a mysql python module")
critical("Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI")
critical(
"Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI"
)
raise
try:
args = {
'host': self.config.db_host,
'port': self.config.db_port,
'user': self.config.db_user,
'passwd': self.config.db_pass,
'db': self.config.db_name
"host": self.config.db_host,
"port": self.config.db_port,
"user": self.config.db_user,
"passwd": self.config.db_pass,
"db": self.config.db_name,
}
if self.config.db_socket:
args['unix_socket'] = self.config.db_socket
args["unix_socket"] = self.config.db_socket
conn = MySQLdb.connect(**args)
conn.autocommit(True)
@ -164,9 +181,11 @@ class Lock:
:param expiration: int in seconds
"""
if (
(name not in self._locks) or # lock doesn't exist
(allow_owner_relock and self._locks.get(name, [None])[0] == owner) or # owner has permission
time() > self._locks[name][1] # lock has expired
(name not in self._locks)
or ( # lock doesn't exist
allow_owner_relock and self._locks.get(name, [None])[0] == owner
)
or time() > self._locks[name][1] # owner has permission # lock has expired
):
self._locks[name] = (owner, expiration + time())
return self._locks[name][0] == owner
@ -229,21 +248,32 @@ class ThreadingLock(Lock):
class RedisLock(Lock):
def __init__(self, namespace='lock', **redis_kwargs):
def __init__(self, namespace="lock", **redis_kwargs):
import redis
from redis.sentinel import Sentinel
redis_kwargs['decode_responses'] = True
if redis_kwargs.get('sentinel') and redis_kwargs.get('sentinel_service'):
sentinels = [tuple(l.split(':')) for l in redis_kwargs.pop('sentinel').split(',')]
sentinel_service = redis_kwargs.pop('sentinel_service')
kwargs = {k: v for k, v in redis_kwargs.items() if k in ["decode_responses", "password", "db", "socket_timeout"]}
redis_kwargs["decode_responses"] = True
if redis_kwargs.get("sentinel") and redis_kwargs.get("sentinel_service"):
sentinels = [
tuple(l.split(":")) for l in redis_kwargs.pop("sentinel").split(",")
]
sentinel_service = redis_kwargs.pop("sentinel_service")
kwargs = {
k: v
for k, v in redis_kwargs.items()
if k in ["decode_responses", "password", "db", "socket_timeout"]
}
self._redis = Sentinel(sentinels, **kwargs).master_for(sentinel_service)
else:
kwargs = {k: v for k, v in redis_kwargs.items() if "sentinel" not in k}
self._redis = redis.Redis(**kwargs)
self._redis.ping()
self._namespace = namespace
info("Created redis lock manager with socket_timeout of {}s".format(redis_kwargs['socket_timeout']))
info(
"Created redis lock manager with socket_timeout of {}s".format(
redis_kwargs["socket_timeout"]
)
)
def __key(self, name):
return "{}:{}".format(self._namespace, name)
@ -266,8 +296,13 @@ class RedisLock(Lock):
non_existing = not (allow_owner_relock and self._redis.get(key) == owner)
return self._redis.set(key, owner, ex=int(expiration), nx=non_existing)
except redis.exceptions.ResponseError as e:
exception("Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s",
name, owner, expiration, allow_owner_relock)
exception(
"Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s",
name,
owner,
expiration,
allow_owner_relock,
)
def unlock(self, name, owner):
"""
@ -285,34 +320,49 @@ class RedisLock(Lock):
return self._redis.get(self.__key(name)) is not None
def print_locks(self):
keys = self._redis.keys(self.__key('*'))
keys = self._redis.keys(self.__key("*"))
for key in keys:
print("{} locked by {}, expires in {} seconds".format(key, self._redis.get(key), self._redis.ttl(key)))
print(
"{} locked by {}, expires in {} seconds".format(
key, self._redis.get(key), self._redis.ttl(key)
)
)
class RedisUniqueQueue(object):
def __init__(self, name, namespace='queue', **redis_kwargs):
def __init__(self, name, namespace="queue", **redis_kwargs):
import redis
from redis.sentinel import Sentinel
redis_kwargs['decode_responses'] = True
if redis_kwargs.get('sentinel') and redis_kwargs.get('sentinel_service'):
sentinels = [tuple(l.split(':')) for l in redis_kwargs.pop('sentinel').split(',')]
sentinel_service = redis_kwargs.pop('sentinel_service')
kwargs = {k: v for k, v in redis_kwargs.items() if k in ["decode_responses", "password", "db", "socket_timeout"]}
redis_kwargs["decode_responses"] = True
if redis_kwargs.get("sentinel") and redis_kwargs.get("sentinel_service"):
sentinels = [
tuple(l.split(":")) for l in redis_kwargs.pop("sentinel").split(",")
]
sentinel_service = redis_kwargs.pop("sentinel_service")
kwargs = {
k: v
for k, v in redis_kwargs.items()
if k in ["decode_responses", "password", "db", "socket_timeout"]
}
self._redis = Sentinel(sentinels, **kwargs).master_for(sentinel_service)
else:
kwargs = {k: v for k, v in redis_kwargs.items() if "sentinel" not in k}
self._redis = redis.Redis(**kwargs)
self._redis.ping()
self.key = "{}:{}".format(namespace, name)
info("Created redis queue with socket_timeout of {}s".format(redis_kwargs['socket_timeout']))
info(
"Created redis queue with socket_timeout of {}s".format(
redis_kwargs["socket_timeout"]
)
)
# clean up from previous implementations
if self._redis.type(self.key) != 'zset':
if self._redis.type(self.key) != "zset":
self._redis.delete(self.key)
def qsize(self):
return self._redis.zcount(self.key, '-inf', '+inf')
return self._redis.zcount(self.key, "-inf", "+inf")
def empty(self):
return self.qsize() == 0

View File

@ -15,32 +15,33 @@ try:
except ImportError:
try:
import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
except ImportError as exc:
print('ERROR: missing the mysql python module please run:')
print('pip install -r requirements.txt')
print('ERROR: %s' % exc)
print("ERROR: missing the mysql python module please run:")
print("pip install -r requirements.txt")
print("ERROR: %s" % exc)
sys.exit(2)
logger = logging.getLogger(__name__)
# Logging functions ########################################################
FORMATTER = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s")
def logger_get_console_handler():
try:
console_handler = logging.StreamHandler(sys.stdout)
except OSError as exc:
print('Cannot log to stdout, trying stderr. Message %s' % exc)
print("Cannot log to stdout, trying stderr. Message %s" % exc)
try:
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(FORMATTER)
return console_handler
except OSError as exc:
print('Cannot log to stderr neither. Message %s' % exc)
print("Cannot log to stderr neither. Message %s" % exc)
return False
else:
console_handler.setFormatter(FORMATTER)
@ -50,20 +51,33 @@ def logger_get_console_handler():
def logger_get_file_handler(log_file):
err_output = None
try:
file_handler = RotatingFileHandler(log_file, mode='a', encoding='utf-8', maxBytes=1024000, backupCount=3)
file_handler = RotatingFileHandler(
log_file, mode="a", encoding="utf-8", maxBytes=1024000, backupCount=3
)
except OSError as exc:
try:
print('Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s' % exc)
print(
"Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s"
% exc
)
err_output = str(exc)
temp_log_file = tempfile.gettempdir() + os.sep + __name__ + '.log'
print('Trying temporary log file in ' + temp_log_file)
file_handler = RotatingFileHandler(temp_log_file, mode='a', encoding='utf-8', maxBytes=1000000,
backupCount=1)
temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log"
print("Trying temporary log file in " + temp_log_file)
file_handler = RotatingFileHandler(
temp_log_file,
mode="a",
encoding="utf-8",
maxBytes=1000000,
backupCount=1,
)
file_handler.setFormatter(FORMATTER)
err_output += '\nUsing [%s]' % temp_log_file
err_output += "\nUsing [%s]" % temp_log_file
return file_handler, err_output
except OSError as exc:
print('Cannot create temporary log file either. Will not log to file. Message: %s' % exc)
print(
"Cannot create temporary log file either. Will not log to file. Message: %s"
% exc
)
return False
else:
file_handler.setFormatter(FORMATTER)
@ -87,59 +101,74 @@ def logger_get_logger(log_file=None, temp_log_file=None, debug=False):
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning('Failed to use log file [%s], %s.', log_file, err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
if temp_log_file is not None:
if os.path.isfile(temp_log_file):
try:
os.remove(temp_log_file)
except OSError:
logger.warning('Cannot remove temp log file [%s].' % temp_log_file)
logger.warning("Cannot remove temp log file [%s]." % temp_log_file)
file_handler, err_output = logger_get_file_handler(temp_log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning('Failed to use log file [%s], %s.', log_file, err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
return _logger
# Generic functions ########################################################
def check_for_file(file):
try:
with open(file) as f:
pass
except IOError as exc:
logger.error('Oh dear... %s does not seem readable' % file)
logger.debug('ERROR:', exc_info=True)
logger.error("Oh dear... %s does not seem readable" % file)
logger.debug("ERROR:", exc_info=True)
sys.exit(2)
# Config functions #########################################################
def get_config_data(install_dir):
config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir]
config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % install_dir]
try:
proc = subprocess.Popen(config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
proc = subprocess.Popen(
config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
return proc.communicate()[0].decode()
except Exception as e:
print("ERROR: Could not execute: %s" % config_cmd)
print(e)
sys.exit(2)
# Database functions #######################################################
def db_open(db_socket, db_server, db_port, db_username, db_password, db_dbname):
try:
options = dict(host=db_server, port=int(db_port), user=db_username, passwd=db_password, db=db_dbname)
options = dict(
host=db_server,
port=int(db_port),
user=db_username,
passwd=db_password,
db=db_dbname,
)
if db_socket:
options['unix_socket'] = db_socket
options["unix_socket"] = db_socket
return MySQLdb.connect(**options)
except Exception as dbexc:
print('ERROR: Could not connect to MySQL database!')
print('ERROR: %s' % dbexc)
print("ERROR: Could not connect to MySQL database!")
print("ERROR: %s" % dbexc)
sys.exit(2)

View File

@ -10,7 +10,9 @@ import LibreNMS
class QueueManager:
def __init__(self, config, lock_manager, type_desc, uses_groups=False, auto_start=True):
def __init__(
self, config, lock_manager, type_desc, uses_groups=False, auto_start=True
):
"""
This class manages a queue of jobs and can be used to submit jobs to the queue with post_work()
and process jobs in that queue in worker threads using the work_function
@ -38,8 +40,13 @@ class QueueManager:
self._stop_event = threading.Event()
info("Groups: {}".format(self.config.group))
info("{} QueueManager created: {} workers, {}s frequency"
.format(self.type.title(), self.get_poller_config().workers, self.get_poller_config().frequency))
info(
"{} QueueManager created: {} workers, {}s frequency".format(
self.type.title(),
self.get_poller_config().workers,
self.get_poller_config().frequency,
)
)
if auto_start:
self.start()
@ -47,31 +54,51 @@ class QueueManager:
def _service_worker(self, queue_id):
debug("Worker started {}".format(threading.current_thread().getName()))
while not self._stop_event.is_set():
debug("Worker {} checking queue {} ({}) for work".format(threading.current_thread().getName(), queue_id,
self.get_queue(queue_id).qsize()))
debug(
"Worker {} checking queue {} ({}) for work".format(
threading.current_thread().getName(),
queue_id,
self.get_queue(queue_id).qsize(),
)
)
try:
# cannot break blocking request with redis-py, so timeout :(
device_id = self.get_queue(queue_id).get(True, 10)
if device_id is not None: # None returned by redis after timeout when empty
if (
device_id is not None
): # None returned by redis after timeout when empty
debug(
"Worker {} ({}) got work {} ".format(threading.current_thread().getName(), queue_id, device_id))
"Worker {} ({}) got work {} ".format(
threading.current_thread().getName(), queue_id, device_id
)
)
with LibreNMS.TimeitContext.start() as t:
debug("Queues: {}".format(self._queues))
target_desc = "{} ({})".format(device_id if device_id else '',
queue_id) if queue_id else device_id
target_desc = (
"{} ({})".format(device_id if device_id else "", queue_id)
if queue_id
else device_id
)
self.do_work(device_id, queue_id)
runtime = t.delta()
info("Completed {} run for {} in {:.2f}s".format(self.type, target_desc, runtime))
info(
"Completed {} run for {} in {:.2f}s".format(
self.type, target_desc, runtime
)
)
self.performance.add(runtime)
except Empty:
pass # ignore empty queue exception from subprocess.Queue
except CalledProcessError as e:
error('{} poller script error! {} returned {}: {}'
.format(self.type.title(), e.cmd, e.returncode, e.output))
error(
"{} poller script error! {} returned {}: {}".format(
self.type.title(), e.cmd, e.returncode, e.output
)
)
except Exception as e:
error('{} poller exception! {}'.format(self.type.title(), e))
error("{} poller exception! {}".format(self.type.title(), e))
traceback.print_exc()
def post_work(self, payload, queue_id):
@ -81,15 +108,22 @@ class QueueManager:
:param queue_id: which queue to post to, 0 is the default
"""
self.get_queue(queue_id).put(payload)
debug("Posted work for {} to {}:{} queue size: {}"
.format(payload, self.type, queue_id, self.get_queue(queue_id).qsize()))
debug(
"Posted work for {} to {}:{} queue size: {}".format(
payload, self.type, queue_id, self.get_queue(queue_id).qsize()
)
)
def start(self):
"""
Start worker threads
"""
workers = self.get_poller_config().workers
groups = self.config.group if hasattr(self.config.group, "__iter__") else [self.config.group]
groups = (
self.config.group
if hasattr(self.config.group, "__iter__")
else [self.config.group]
)
if self.uses_groups:
for group in groups:
group_workers = max(int(workers / len(groups)), 1)
@ -97,7 +131,11 @@ class QueueManager:
thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1)
self.spawn_worker(thread_name, group)
debug("Started {} {} threads for group {}".format(group_workers, self.type, group))
debug(
"Started {} {} threads for group {}".format(
group_workers, self.type, group
)
)
else:
self.spawn_worker(self.type.title(), 0)
@ -105,8 +143,9 @@ class QueueManager:
pass
def spawn_worker(self, thread_name, group):
pt = threading.Thread(target=self._service_worker, name=thread_name,
args=(group,))
pt = threading.Thread(
target=self._service_worker, name=thread_name, args=(group,)
)
pt.daemon = True
self._threads.append(pt)
pt.start()
@ -159,21 +198,25 @@ class QueueManager:
"""
info("Creating queue {}".format(self.queue_name(queue_type, group)))
try:
return LibreNMS.RedisUniqueQueue(self.queue_name(queue_type, group),
namespace='librenms.queue',
host=self.config.redis_host,
port=self.config.redis_port,
db=self.config.redis_db,
password=self.config.redis_pass,
unix_socket_path=self.config.redis_socket,
sentinel=self.config.redis_sentinel,
sentinel_service=self.config.redis_sentinel_service,
socket_timeout=self.config.redis_timeout)
return LibreNMS.RedisUniqueQueue(
self.queue_name(queue_type, group),
namespace="librenms.queue",
host=self.config.redis_host,
port=self.config.redis_port,
db=self.config.redis_db,
password=self.config.redis_pass,
unix_socket_path=self.config.redis_socket,
sentinel=self.config.redis_sentinel,
sentinel_service=self.config.redis_sentinel_service,
socket_timeout=self.config.redis_timeout,
)
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical("Please install redis-py, either through your os software repository or from PyPI")
critical(
"Please install redis-py, either through your os software repository or from PyPI"
)
exit(2)
except Exception as e:
if self.config.distributed:
@ -188,30 +231,41 @@ class QueueManager:
if queue_type and type(group) == int:
return "{}:{}".format(queue_type, group)
else:
raise ValueError("Refusing to create improperly scoped queue - parameters were invalid or not set")
raise ValueError(
"Refusing to create improperly scoped queue - parameters were invalid or not set"
)
def record_runtime(self, duration):
self.performance.add(duration)
# ------ Locking Helpers ------
def lock(self, context, context_name='device', allow_relock=False, timeout=0):
return self._lm.lock(self._gen_lock_name(context, context_name), self._gen_lock_owner(), timeout, allow_relock)
def lock(self, context, context_name="device", allow_relock=False, timeout=0):
return self._lm.lock(
self._gen_lock_name(context, context_name),
self._gen_lock_owner(),
timeout,
allow_relock,
)
def unlock(self, context, context_name='device'):
return self._lm.unlock(self._gen_lock_name(context, context_name), self._gen_lock_owner())
def unlock(self, context, context_name="device"):
return self._lm.unlock(
self._gen_lock_name(context, context_name), self._gen_lock_owner()
)
def is_locked(self, context, context_name='device'):
def is_locked(self, context, context_name="device"):
return self._lm.check_lock(self._gen_lock_name(context, context_name))
def _gen_lock_name(self, context, context_name):
return '{}.{}.{}'.format(self.type, context_name, context)
return "{}.{}.{}".format(self.type, context_name, context)
def _gen_lock_owner(self):
return "{}-{}".format(self.config.unique_name, threading.current_thread().name)
class TimedQueueManager(QueueManager):
def __init__(self, config, lock_manager, type_desc, uses_groups=False, auto_start=True):
def __init__(
self, config, lock_manager, type_desc, uses_groups=False, auto_start=True
):
"""
A queue manager that periodically dispatches work to the queue
The times are normalized like they started at 0:00
@ -220,8 +274,12 @@ class TimedQueueManager(QueueManager):
:param uses_groups: If this queue respects assigned groups or there is only one group
:param auto_start: automatically start worker threads
"""
QueueManager.__init__(self, config, lock_manager, type_desc, uses_groups, auto_start)
self.timer = LibreNMS.RecurringTimer(self.get_poller_config().frequency, self.do_dispatch)
QueueManager.__init__(
self, config, lock_manager, type_desc, uses_groups, auto_start
)
self.timer = LibreNMS.RecurringTimer(
self.get_poller_config().frequency, self.do_dispatch
)
def start_dispatch(self):
"""
@ -254,9 +312,12 @@ class BillingQueueManager(TimedQueueManager):
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
"""
TimedQueueManager.__init__(self, config, lock_manager, 'billing')
self.calculate_timer = LibreNMS.RecurringTimer(self.get_poller_config().calculate,
self.dispatch_calculate_billing, 'calculate_billing_timer')
TimedQueueManager.__init__(self, config, lock_manager, "billing")
self.calculate_timer = LibreNMS.RecurringTimer(
self.get_poller_config().calculate,
self.dispatch_calculate_billing,
"calculate_billing_timer",
)
def start_dispatch(self):
"""
@ -273,18 +334,18 @@ class BillingQueueManager(TimedQueueManager):
TimedQueueManager.stop_dispatch(self)
def dispatch_calculate_billing(self):
self.post_work('calculate', 0)
self.post_work("calculate", 0)
def do_dispatch(self):
self.post_work('poll', 0)
self.post_work("poll", 0)
def do_work(self, run_type, group):
if run_type == 'poll':
if run_type == "poll":
info("Polling billing")
LibreNMS.call_script('poll-billing.php')
LibreNMS.call_script("poll-billing.php")
else: # run_type == 'calculate'
info("Calculating billing")
LibreNMS.call_script('billing-calculate.php')
LibreNMS.call_script("billing-calculate.php")
class PingQueueManager(TimedQueueManager):
@ -295,24 +356,24 @@ class PingQueueManager(TimedQueueManager):
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
"""
TimedQueueManager.__init__(self, config, lock_manager, 'ping', True)
TimedQueueManager.__init__(self, config, lock_manager, "ping", True)
self._db = LibreNMS.DB(self.config)
def do_dispatch(self):
try:
groups = self._db.query("SELECT DISTINCT (`poller_group`) FROM `devices`")
for group in groups:
self.post_work('', group[0])
self.post_work("", group[0])
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
def do_work(self, context, group):
if self.lock(group, 'group', timeout=self.config.ping.frequency):
if self.lock(group, "group", timeout=self.config.ping.frequency):
try:
info("Running fast ping")
LibreNMS.call_script('ping.php', ('-g', group))
LibreNMS.call_script("ping.php", ("-g", group))
finally:
self.unlock(group, 'group')
self.unlock(group, "group")
class ServicesQueueManager(TimedQueueManager):
@ -323,13 +384,15 @@ class ServicesQueueManager(TimedQueueManager):
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
"""
TimedQueueManager.__init__(self, config, lock_manager, 'services', True)
TimedQueueManager.__init__(self, config, lock_manager, "services", True)
self._db = LibreNMS.DB(self.config)
def do_dispatch(self):
try:
devices = self._db.query("SELECT DISTINCT(`device_id`), `poller_group` FROM `services`"
" LEFT JOIN `devices` USING (`device_id`) WHERE `disabled`=0")
devices = self._db.query(
"SELECT DISTINCT(`device_id`), `poller_group` FROM `services`"
" LEFT JOIN `devices` USING (`device_id`) WHERE `disabled`=0"
)
for device in devices:
self.post_work(device[0], device[1])
except pymysql.err.Error as e:
@ -339,12 +402,17 @@ class ServicesQueueManager(TimedQueueManager):
if self.lock(device_id, timeout=self.config.services.frequency):
try:
info("Checking services on device {}".format(device_id))
LibreNMS.call_script('check-services.php', ('-h', device_id))
LibreNMS.call_script("check-services.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 5:
info("Device {} is down, cannot poll service, waiting {}s for retry"
.format(device_id, self.config.down_retry))
self.lock(device_id, allow_relock=True, timeout=self.config.down_retry)
info(
"Device {} is down, cannot poll service, waiting {}s for retry".format(
device_id, self.config.down_retry
)
)
self.lock(
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
self.unlock(device_id)
@ -357,16 +425,16 @@ class AlertQueueManager(TimedQueueManager):
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
"""
TimedQueueManager.__init__(self, config, lock_manager, 'alerting')
TimedQueueManager.__init__(self, config, lock_manager, "alerting")
self._db = LibreNMS.DB(self.config)
def do_dispatch(self):
self.post_work('alerts', 0)
self.post_work("alerts", 0)
def do_work(self, device_id, group):
try:
info("Checking alerts")
LibreNMS.call_script('alerts.php')
LibreNMS.call_script("alerts.php")
except subprocess.CalledProcessError as e:
if e.returncode == 1:
warning("There was an error issuing alerts: {}".format(e.output))
@ -382,27 +450,32 @@ class PollerQueueManager(QueueManager):
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
"""
QueueManager.__init__(self, config, lock_manager, 'poller', True)
QueueManager.__init__(self, config, lock_manager, "poller", True)
def do_work(self, device_id, group):
if self.lock(device_id, timeout=self.config.poller.frequency):
info('Polling device {}'.format(device_id))
info("Polling device {}".format(device_id))
try:
LibreNMS.call_script('poller.php', ('-h', device_id))
LibreNMS.call_script("poller.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 6:
warning('Polling device {} unreachable, waiting {}s for retry'.format(device_id,
self.config.down_retry))
warning(
"Polling device {} unreachable, waiting {}s for retry".format(
device_id, self.config.down_retry
)
)
# re-lock to set retry timer
self.lock(device_id, allow_relock=True, timeout=self.config.down_retry)
self.lock(
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
error('Polling device {} failed! {}'.format(device_id, e))
error("Polling device {} failed! {}".format(device_id, e))
self.unlock(device_id)
else:
self.unlock(device_id)
else:
debug('Tried to poll {}, but it is locked'.format(device_id))
debug("Tried to poll {}, but it is locked".format(device_id))
class DiscoveryQueueManager(TimedQueueManager):
@ -413,27 +486,36 @@ class DiscoveryQueueManager(TimedQueueManager):
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
"""
TimedQueueManager.__init__(self, config, lock_manager, 'discovery', True)
TimedQueueManager.__init__(self, config, lock_manager, "discovery", True)
self._db = LibreNMS.DB(self.config)
def do_dispatch(self):
try:
devices = self._db.query("SELECT `device_id`, `poller_group` FROM `devices` WHERE `disabled`=0")
devices = self._db.query(
"SELECT `device_id`, `poller_group` FROM `devices` WHERE `disabled`=0"
)
for device in devices:
self.post_work(device[0], device[1])
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
def do_work(self, device_id, group):
if self.lock(device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency)):
if self.lock(
device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency)
):
try:
info("Discovering device {}".format(device_id))
LibreNMS.call_script('discovery.php', ('-h', device_id))
LibreNMS.call_script("discovery.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 5:
info("Device {} is down, cannot discover, waiting {}s for retry"
.format(device_id, self.config.down_retry))
self.lock(device_id, allow_relock=True, timeout=self.config.down_retry)
info(
"Device {} is down, cannot discover, waiting {}s for retry".format(
device_id, self.config.down_retry
)
)
self.lock(
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
self.unlock(device_id)
else:

View File

@ -29,7 +29,6 @@ except ImportError:
pass
class ServiceConfig:
def __init__(self):
"""
@ -52,7 +51,9 @@ class ServiceConfig:
self.calculate = calculate
# config variables with defaults
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
BASE_DIR = os.path.abspath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)
)
node_id = None
name = None
@ -78,7 +79,7 @@ class ServiceConfig:
master_resolution = 1
master_timeout = 10
redis_host = 'localhost'
redis_host = "localhost"
redis_port = 6379
redis_db = 0
redis_pass = None
@ -87,74 +88,142 @@ class ServiceConfig:
redis_sentinel_service = None
redis_timeout = 60
db_host = 'localhost'
db_host = "localhost"
db_port = 0
db_socket = None
db_user = 'librenms'
db_pass = ''
db_name = 'librenms'
db_user = "librenms"
db_pass = ""
db_name = "librenms"
watchdog_enabled = False
watchdog_logfile = 'logs/librenms.log'
watchdog_logfile = "logs/librenms.log"
def populate(self):
config = self._get_config_data()
# populate config variables
self.node_id = os.getenv('NODE_ID')
self.set_name(config.get('distributed_poller_name', None))
self.distributed = config.get('distributed_poller', ServiceConfig.distributed)
self.group = ServiceConfig.parse_group(config.get('distributed_poller_group', ServiceConfig.group))
self.node_id = os.getenv("NODE_ID")
self.set_name(config.get("distributed_poller_name", None))
self.distributed = config.get("distributed_poller", ServiceConfig.distributed)
self.group = ServiceConfig.parse_group(
config.get("distributed_poller_group", ServiceConfig.group)
)
# backward compatible options
self.poller.workers = config.get('poller_service_workers', ServiceConfig.poller.workers)
self.poller.frequency = config.get('poller_service_poll_frequency', ServiceConfig.poller.frequency)
self.discovery.frequency = config.get('poller_service_discover_frequency', ServiceConfig.discovery.frequency)
self.down_retry = config.get('poller_service_down_retry', ServiceConfig.down_retry)
self.log_level = config.get('poller_service_loglevel', ServiceConfig.log_level)
self.poller.workers = config.get(
"poller_service_workers", ServiceConfig.poller.workers
)
self.poller.frequency = config.get(
"poller_service_poll_frequency", ServiceConfig.poller.frequency
)
self.discovery.frequency = config.get(
"poller_service_discover_frequency", ServiceConfig.discovery.frequency
)
self.down_retry = config.get(
"poller_service_down_retry", ServiceConfig.down_retry
)
self.log_level = config.get("poller_service_loglevel", ServiceConfig.log_level)
# new options
self.poller.enabled = config.get('service_poller_enabled', True) # unused
self.poller.workers = config.get('service_poller_workers', ServiceConfig.poller.workers)
self.poller.frequency = config.get('service_poller_frequency', ServiceConfig.poller.frequency)
self.discovery.enabled = config.get('service_discovery_enabled', True) # unused
self.discovery.workers = config.get('service_discovery_workers', ServiceConfig.discovery.workers)
self.discovery.frequency = config.get('service_discovery_frequency', ServiceConfig.discovery.frequency)
self.services.enabled = config.get('service_services_enabled', True)
self.services.workers = config.get('service_services_workers', ServiceConfig.services.workers)
self.services.frequency = config.get('service_services_frequency', ServiceConfig.services.frequency)
self.billing.enabled = config.get('service_billing_enabled', True)
self.billing.frequency = config.get('service_billing_frequency', ServiceConfig.billing.frequency)
self.billing.calculate = config.get('service_billing_calculate_frequency', ServiceConfig.billing.calculate)
self.alerting.enabled = config.get('service_alerting_enabled', True)
self.alerting.frequency = config.get('service_alerting_frequency', ServiceConfig.alerting.frequency)
self.ping.enabled = config.get('service_ping_enabled', False)
self.ping.frequency = config.get('ping_rrd_step', ServiceConfig.ping.frequency)
self.down_retry = config.get('service_poller_down_retry', ServiceConfig.down_retry)
self.log_level = config.get('service_loglevel', ServiceConfig.log_level)
self.update_enabled = config.get('service_update_enabled', ServiceConfig.update_enabled)
self.update_frequency = config.get('service_update_frequency', ServiceConfig.update_frequency)
self.poller.enabled = config.get("service_poller_enabled", True) # unused
self.poller.workers = config.get(
"service_poller_workers", ServiceConfig.poller.workers
)
self.poller.frequency = config.get(
"service_poller_frequency", ServiceConfig.poller.frequency
)
self.discovery.enabled = config.get("service_discovery_enabled", True) # unused
self.discovery.workers = config.get(
"service_discovery_workers", ServiceConfig.discovery.workers
)
self.discovery.frequency = config.get(
"service_discovery_frequency", ServiceConfig.discovery.frequency
)
self.services.enabled = config.get("service_services_enabled", True)
self.services.workers = config.get(
"service_services_workers", ServiceConfig.services.workers
)
self.services.frequency = config.get(
"service_services_frequency", ServiceConfig.services.frequency
)
self.billing.enabled = config.get("service_billing_enabled", True)
self.billing.frequency = config.get(
"service_billing_frequency", ServiceConfig.billing.frequency
)
self.billing.calculate = config.get(
"service_billing_calculate_frequency", ServiceConfig.billing.calculate
)
self.alerting.enabled = config.get("service_alerting_enabled", True)
self.alerting.frequency = config.get(
"service_alerting_frequency", ServiceConfig.alerting.frequency
)
self.ping.enabled = config.get("service_ping_enabled", False)
self.ping.frequency = config.get("ping_rrd_step", ServiceConfig.ping.frequency)
self.down_retry = config.get(
"service_poller_down_retry", ServiceConfig.down_retry
)
self.log_level = config.get("service_loglevel", ServiceConfig.log_level)
self.update_enabled = config.get(
"service_update_enabled", ServiceConfig.update_enabled
)
self.update_frequency = config.get(
"service_update_frequency", ServiceConfig.update_frequency
)
self.redis_host = os.getenv('REDIS_HOST', config.get('redis_host', ServiceConfig.redis_host))
self.redis_db = os.getenv('REDIS_DB', config.get('redis_db', ServiceConfig.redis_db))
self.redis_pass = os.getenv('REDIS_PASSWORD', config.get('redis_pass', ServiceConfig.redis_pass))
self.redis_port = int(os.getenv('REDIS_PORT', config.get('redis_port', ServiceConfig.redis_port)))
self.redis_socket = os.getenv('REDIS_SOCKET', config.get('redis_socket', ServiceConfig.redis_socket))
self.redis_sentinel = os.getenv('REDIS_SENTINEL', config.get('redis_sentinel', ServiceConfig.redis_sentinel))
self.redis_sentinel_service = os.getenv('REDIS_SENTINEL_SERVICE',
config.get('redis_sentinel_service',
ServiceConfig.redis_sentinel_service))
self.redis_timeout = int(os.getenv('REDIS_TIMEOUT', self.alerting.frequency if self.alerting.frequency != 0 else self.redis_timeout))
self.redis_host = os.getenv(
"REDIS_HOST", config.get("redis_host", ServiceConfig.redis_host)
)
self.redis_db = os.getenv(
"REDIS_DB", config.get("redis_db", ServiceConfig.redis_db)
)
self.redis_pass = os.getenv(
"REDIS_PASSWORD", config.get("redis_pass", ServiceConfig.redis_pass)
)
self.redis_port = int(
os.getenv("REDIS_PORT", config.get("redis_port", ServiceConfig.redis_port))
)
self.redis_socket = os.getenv(
"REDIS_SOCKET", config.get("redis_socket", ServiceConfig.redis_socket)
)
self.redis_sentinel = os.getenv(
"REDIS_SENTINEL", config.get("redis_sentinel", ServiceConfig.redis_sentinel)
)
self.redis_sentinel_service = os.getenv(
"REDIS_SENTINEL_SERVICE",
config.get("redis_sentinel_service", ServiceConfig.redis_sentinel_service),
)
self.redis_timeout = int(
os.getenv(
"REDIS_TIMEOUT",
self.alerting.frequency
if self.alerting.frequency != 0
else self.redis_timeout,
)
)
self.db_host = os.getenv('DB_HOST', config.get('db_host', ServiceConfig.db_host))
self.db_name = os.getenv('DB_DATABASE', config.get('db_name', ServiceConfig.db_name))
self.db_pass = os.getenv('DB_PASSWORD', config.get('db_pass', ServiceConfig.db_pass))
self.db_port = int(os.getenv('DB_PORT', config.get('db_port', ServiceConfig.db_port)))
self.db_socket = os.getenv('DB_SOCKET', config.get('db_socket', ServiceConfig.db_socket))
self.db_user = os.getenv('DB_USERNAME', config.get('db_user', ServiceConfig.db_user))
self.db_host = os.getenv(
"DB_HOST", config.get("db_host", ServiceConfig.db_host)
)
self.db_name = os.getenv(
"DB_DATABASE", config.get("db_name", ServiceConfig.db_name)
)
self.db_pass = os.getenv(
"DB_PASSWORD", config.get("db_pass", ServiceConfig.db_pass)
)
self.db_port = int(
os.getenv("DB_PORT", config.get("db_port", ServiceConfig.db_port))
)
self.db_socket = os.getenv(
"DB_SOCKET", config.get("db_socket", ServiceConfig.db_socket)
)
self.db_user = os.getenv(
"DB_USERNAME", config.get("db_user", ServiceConfig.db_user)
)
self.watchdog_enabled = config.get('service_watchdog_enabled', ServiceConfig.watchdog_enabled)
self.watchdog_logfile = config.get('log_file', ServiceConfig.watchdog_logfile)
self.watchdog_enabled = config.get(
"service_watchdog_enabled", ServiceConfig.watchdog_enabled
)
self.watchdog_logfile = config.get("log_file", ServiceConfig.watchdog_logfile)
# set convenient debug variable
self.debug = logging.getLogger().isEnabledFor(logging.DEBUG)
@ -163,13 +232,19 @@ class ServiceConfig:
try:
logging.getLogger().setLevel(self.log_level)
except ValueError:
error("Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(self.log_level))
error(
"Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(
self.log_level
)
)
logging.getLogger().setLevel(logging.INFO)
def load_poller_config(self, db):
try:
settings = {}
cursor = db.query('SELECT * FROM `poller_cluster` WHERE `node_id`=%s', self.node_id)
cursor = db.query(
"SELECT * FROM `poller_cluster` WHERE `node_id`=%s", self.node_id
)
if cursor.rowcount == 0:
return
@ -177,77 +252,88 @@ class ServiceConfig:
name = cursor.description[index][0]
settings[name] = setting
if settings['poller_name'] is not None:
self.set_name(settings['poller_name'])
if settings['poller_groups'] is not None:
self.group = ServiceConfig.parse_group(settings['poller_groups'])
if settings['poller_enabled'] is not None:
self.poller.enabled = settings['poller_enabled']
if settings['poller_frequency'] is not None:
self.poller.frequency = settings['poller_frequency']
if settings['poller_workers'] is not None:
self.poller.workers = settings['poller_workers']
if settings['poller_down_retry'] is not None:
self.down_retry = settings['poller_down_retry']
if settings['discovery_enabled'] is not None:
self.discovery.enabled = settings['discovery_enabled']
if settings['discovery_frequency'] is not None:
self.discovery.frequency = settings['discovery_frequency']
if settings['discovery_workers'] is not None:
self.discovery.workers = settings['discovery_workers']
if settings['services_enabled'] is not None:
self.services.enabled = settings['services_enabled']
if settings['services_frequency'] is not None:
self.services.frequency = settings['services_frequency']
if settings['services_workers'] is not None:
self.services.workers = settings['services_workers']
if settings['billing_enabled'] is not None:
self.billing.enabled = settings['billing_enabled']
if settings['billing_frequency'] is not None:
self.billing.frequency = settings['billing_frequency']
if settings['billing_calculate_frequency'] is not None:
self.billing.calculate = settings['billing_calculate_frequency']
if settings['alerting_enabled'] is not None:
self.alerting.enabled = settings['alerting_enabled']
if settings['alerting_frequency'] is not None:
self.alerting.frequency = settings['alerting_frequency']
if settings['ping_enabled'] is not None:
self.ping.enabled = settings['ping_enabled']
if settings['ping_frequency'] is not None:
self.ping.frequency = settings['ping_frequency']
if settings['update_enabled'] is not None:
self.update_enabled = settings['update_enabled']
if settings['update_frequency'] is not None:
self.update_frequency = settings['update_frequency']
if settings['loglevel'] is not None:
self.log_level = settings['loglevel']
if settings['watchdog_enabled'] is not None:
self.watchdog_enabled = settings['watchdog_enabled']
if settings['watchdog_log'] is not None:
self.watchdog_logfile = settings['watchdog_log']
if settings["poller_name"] is not None:
self.set_name(settings["poller_name"])
if settings["poller_groups"] is not None:
self.group = ServiceConfig.parse_group(settings["poller_groups"])
if settings["poller_enabled"] is not None:
self.poller.enabled = settings["poller_enabled"]
if settings["poller_frequency"] is not None:
self.poller.frequency = settings["poller_frequency"]
if settings["poller_workers"] is not None:
self.poller.workers = settings["poller_workers"]
if settings["poller_down_retry"] is not None:
self.down_retry = settings["poller_down_retry"]
if settings["discovery_enabled"] is not None:
self.discovery.enabled = settings["discovery_enabled"]
if settings["discovery_frequency"] is not None:
self.discovery.frequency = settings["discovery_frequency"]
if settings["discovery_workers"] is not None:
self.discovery.workers = settings["discovery_workers"]
if settings["services_enabled"] is not None:
self.services.enabled = settings["services_enabled"]
if settings["services_frequency"] is not None:
self.services.frequency = settings["services_frequency"]
if settings["services_workers"] is not None:
self.services.workers = settings["services_workers"]
if settings["billing_enabled"] is not None:
self.billing.enabled = settings["billing_enabled"]
if settings["billing_frequency"] is not None:
self.billing.frequency = settings["billing_frequency"]
if settings["billing_calculate_frequency"] is not None:
self.billing.calculate = settings["billing_calculate_frequency"]
if settings["alerting_enabled"] is not None:
self.alerting.enabled = settings["alerting_enabled"]
if settings["alerting_frequency"] is not None:
self.alerting.frequency = settings["alerting_frequency"]
if settings["ping_enabled"] is not None:
self.ping.enabled = settings["ping_enabled"]
if settings["ping_frequency"] is not None:
self.ping.frequency = settings["ping_frequency"]
if settings["update_enabled"] is not None:
self.update_enabled = settings["update_enabled"]
if settings["update_frequency"] is not None:
self.update_frequency = settings["update_frequency"]
if settings["loglevel"] is not None:
self.log_level = settings["loglevel"]
if settings["watchdog_enabled"] is not None:
self.watchdog_enabled = settings["watchdog_enabled"]
if settings["watchdog_log"] is not None:
self.watchdog_logfile = settings["watchdog_log"]
except pymysql.err.Error:
warning('Unable to load poller (%s) config', self.node_id)
warning("Unable to load poller (%s) config", self.node_id)
def _get_config_data(self):
try:
import dotenv
env_path = "{}/.env".format(self.BASE_DIR)
info("Attempting to load .env from '%s'", env_path)
dotenv.load_dotenv(dotenv_path=env_path, verbose=True)
if not os.getenv('NODE_ID'):
if not os.getenv("NODE_ID"):
raise ImportError(".env does not contain a valid NODE_ID setting.")
except ImportError as e:
exception("Could not import .env - check that the poller user can read the file, and that composer install has been run recently")
exception(
"Could not import .env - check that the poller user can read the file, and that composer install has been run recently"
)
sys.exit(3)
config_cmd = ['/usr/bin/env', 'php', '{}/config_to_json.php'.format(self.BASE_DIR), '2>&1']
config_cmd = [
"/usr/bin/env",
"php",
"{}/config_to_json.php".format(self.BASE_DIR),
"2>&1",
]
try:
return json.loads(subprocess.check_output(config_cmd).decode())
except subprocess.CalledProcessError as e:
error("ERROR: Could not load or parse configuration! {}: {}"
.format(subprocess.list2cmdline(e.cmd), e.output.decode()))
error(
"ERROR: Could not load or parse configuration! {}: {}".format(
subprocess.list2cmdline(e.cmd), e.output.decode()
)
)
@staticmethod
def parse_group(g):
@ -257,7 +343,7 @@ class ServiceConfig:
return [g]
elif type(g) is str:
try:
return [int(x) for x in set(g.split(','))]
return [int(x) for x in set(g.split(","))]
except ValueError:
pass
@ -289,14 +375,26 @@ class Service:
self.attach_signals()
self._lm = self.create_lock_manager()
self.daily_timer = LibreNMS.RecurringTimer(self.config.update_frequency, self.run_maintenance, 'maintenance')
self.stats_timer = LibreNMS.RecurringTimer(self.config.poller.frequency, self.log_performance_stats, 'performance')
self.daily_timer = LibreNMS.RecurringTimer(
self.config.update_frequency, self.run_maintenance, "maintenance"
)
self.stats_timer = LibreNMS.RecurringTimer(
self.config.poller.frequency, self.log_performance_stats, "performance"
)
if self.config.watchdog_enabled:
info("Starting watchdog timer for log file: {}".format(self.config.watchdog_logfile))
self.watchdog_timer = LibreNMS.RecurringTimer(self.config.poller.frequency, self.logfile_watchdog, 'watchdog')
info(
"Starting watchdog timer for log file: {}".format(
self.config.watchdog_logfile
)
)
self.watchdog_timer = LibreNMS.RecurringTimer(
self.config.poller.frequency, self.logfile_watchdog, "watchdog"
)
else:
info("Watchdog is disabled.")
self.systemd_watchdog_timer = LibreNMS.RecurringTimer(10, self.systemd_watchdog, 'systemd-watchdog')
self.systemd_watchdog_timer = LibreNMS.RecurringTimer(
10, self.systemd_watchdog, "systemd-watchdog"
)
self.is_master = False
def service_age(self):
@ -309,7 +407,7 @@ class Service:
signal(SIGINT, self.terminate) # capture sigint and exit gracefully
signal(SIGHUP, self.reload) # capture sighup and restart gracefully
if 'psutil' not in sys.modules:
if "psutil" not in sys.modules:
warning("psutil is not available, polling gap possible")
else:
signal(SIGCHLD, self.reap) # capture sigchld and reap the process
@ -321,13 +419,21 @@ class Service:
# Speed things up by only looking at direct zombie children
for p in psutil.Process().children(recursive=False):
try:
cmd = p.cmdline() # cmdline is uncached, so needs to go here to avoid NoSuchProcess
cmd = (
p.cmdline()
) # cmdline is uncached, so needs to go here to avoid NoSuchProcess
status = p.status()
if status == psutil.STATUS_ZOMBIE:
pid = p.pid
r = os.waitpid(p.pid, os.WNOHANG)
warning('Reaped long running job "%s" in state %s with PID %d - job returned %d', cmd, status, r[0], r[1])
warning(
'Reaped long running job "%s" in state %s with PID %d - job returned %d',
cmd,
status,
r[0],
r[1],
)
except (OSError, psutil.NoSuchProcess):
# process was already reaped
continue
@ -346,17 +452,25 @@ class Service:
# initialize and start the worker pools
self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm)
self.queue_managers['poller'] = self.poller_manager
self.queue_managers["poller"] = self.poller_manager
self.discovery_manager = LibreNMS.DiscoveryQueueManager(self.config, self._lm)
self.queue_managers['discovery'] = self.discovery_manager
self.queue_managers["discovery"] = self.discovery_manager
if self.config.alerting.enabled:
self.queue_managers['alerting'] = LibreNMS.AlertQueueManager(self.config, self._lm)
self.queue_managers["alerting"] = LibreNMS.AlertQueueManager(
self.config, self._lm
)
if self.config.services.enabled:
self.queue_managers['services'] = LibreNMS.ServicesQueueManager(self.config, self._lm)
self.queue_managers["services"] = LibreNMS.ServicesQueueManager(
self.config, self._lm
)
if self.config.billing.enabled:
self.queue_managers['billing'] = LibreNMS.BillingQueueManager(self.config, self._lm)
self.queue_managers["billing"] = LibreNMS.BillingQueueManager(
self.config, self._lm
)
if self.config.ping.enabled:
self.queue_managers['ping'] = LibreNMS.PingQueueManager(self.config, self._lm)
self.queue_managers["ping"] = LibreNMS.PingQueueManager(
self.config, self._lm
)
if self.config.update_enabled:
self.daily_timer.start()
self.stats_timer.start()
@ -365,11 +479,19 @@ class Service:
self.watchdog_timer.start()
info("LibreNMS Service: {} started!".format(self.config.unique_name))
info("Poller group {}. Using Python {} and {} locks and queues"
.format('0 (default)' if self.config.group == [0] else self.config.group, python_version(),
'redis' if isinstance(self._lm, LibreNMS.RedisLock) else 'internal'))
info(
"Poller group {}. Using Python {} and {} locks and queues".format(
"0 (default)" if self.config.group == [0] else self.config.group,
python_version(),
"redis" if isinstance(self._lm, LibreNMS.RedisLock) else "internal",
)
)
if self.config.update_enabled:
info("Maintenance tasks will be run every {}".format(timedelta(seconds=self.config.update_frequency)))
info(
"Maintenance tasks will be run every {}".format(
timedelta(seconds=self.config.update_frequency)
)
)
else:
warning("Maintenance tasks are disabled.")
@ -403,7 +525,11 @@ class Service:
self.dispatch_immediate_discovery(device_id, group)
else:
if self.is_master:
info("{} is no longer the master dispatcher".format(self.config.name))
info(
"{} is no longer the master dispatcher".format(
self.config.name
)
)
self.stop_dispatch_timers()
self.is_master = False # no longer master
sleep(self.config.master_resolution)
@ -414,10 +540,12 @@ class Service:
self.shutdown()
def _acquire_master(self):
return self._lm.lock('dispatch.master', self.config.unique_name, self.config.master_timeout, True)
return self._lm.lock(
"dispatch.master", self.config.unique_name, self.config.master_timeout, True
)
def _release_master(self):
self._lm.unlock('dispatch.master', self.config.unique_name)
self._lm.unlock("dispatch.master", self.config.unique_name)
# ------------ Discovery ------------
def dispatch_immediate_discovery(self, device_id, group):
@ -434,16 +562,22 @@ class Service:
elapsed = cur_time - self.last_poll.get(device_id, cur_time)
self.last_poll[device_id] = cur_time
# arbitrary limit to reduce spam
if elapsed > (self.config.poller.frequency - self.config.master_resolution):
debug("Dispatching polling for device {}, time since last poll {:.2f}s"
.format(device_id, elapsed))
if elapsed > (
self.config.poller.frequency - self.config.master_resolution
):
debug(
"Dispatching polling for device {}, time since last poll {:.2f}s".format(
device_id, elapsed
)
)
def fetch_immediate_device_list(self):
try:
poller_find_time = self.config.poller.frequency - 1
discovery_find_time = self.config.discovery.frequency - 1
result = self._db.query('''SELECT `device_id`,
result = self._db.query(
"""SELECT `device_id`,
`poller_group`,
COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND), 1) AS `poll`,
IF(snmp_disable=1 OR status=0, 0, IF (%s < `last_discovered_timetaken` * 1.25, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1))) AS `discover`
@ -454,15 +588,29 @@ class Service:
`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND) OR
`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND)
)
ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, self.service_age(), discovery_find_time, poller_find_time, discovery_find_time))
ORDER BY `last_polled_timetaken` DESC""",
(
poller_find_time,
self.service_age(),
discovery_find_time,
poller_find_time,
discovery_find_time,
),
)
self.db_failures = 0
return result
except pymysql.err.Error:
self.db_failures += 1
if self.db_failures > self.config.max_db_failures:
warning("Too many DB failures ({}), attempting to release master".format(self.db_failures))
warning(
"Too many DB failures ({}), attempting to release master".format(
self.db_failures
)
)
self._release_master()
sleep(self.config.master_resolution) # sleep to give another node a chance to acquire
sleep(
self.config.master_resolution
) # sleep to give another node a chance to acquire
return []
def run_maintenance(self):
@ -475,21 +623,24 @@ class Service:
max_runtime = 86100
max_tries = int(max_runtime / wait)
info("Waiting for schema lock")
while not self._lm.lock('schema-update', self.config.unique_name, max_runtime):
while not self._lm.lock("schema-update", self.config.unique_name, max_runtime):
attempt += 1
if attempt >= max_tries: # don't get stuck indefinitely
warning('Reached max wait for other pollers to update, updating now')
warning("Reached max wait for other pollers to update, updating now")
break
sleep(wait)
info("Running maintenance tasks")
try:
output = LibreNMS.call_script('daily.sh')
output = LibreNMS.call_script("daily.sh")
info("Maintenance tasks complete\n{}".format(output))
except subprocess.CalledProcessError as e:
error("Error in daily.sh:\n" + (e.output.decode() if e.output is not None else 'No output'))
error(
"Error in daily.sh:\n"
+ (e.output.decode() if e.output is not None else "No output")
)
self._lm.unlock('schema-update', self.config.unique_name)
self._lm.unlock("schema-update", self.config.unique_name)
self.restart()
@ -501,19 +652,23 @@ class Service:
:return: Instance of LockManager
"""
try:
return LibreNMS.RedisLock(namespace='librenms.lock',
host=self.config.redis_host,
port=self.config.redis_port,
db=self.config.redis_db,
password=self.config.redis_pass,
unix_socket_path=self.config.redis_socket,
sentinel=self.config.redis_sentinel,
sentinel_service=self.config.redis_sentinel_service,
socket_timeout=self.config.redis_timeout)
return LibreNMS.RedisLock(
namespace="librenms.lock",
host=self.config.redis_host,
port=self.config.redis_port,
db=self.config.redis_db,
password=self.config.redis_pass,
unix_socket_path=self.config.redis_socket,
sentinel=self.config.redis_sentinel,
sentinel_service=self.config.redis_sentinel_service,
socket_timeout=self.config.redis_timeout,
)
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical("Please install redis-py, either through your os software repository or from PyPI")
critical(
"Please install redis-py, either through your os software repository or from PyPI"
)
self.exit(2)
except Exception as e:
if self.config.distributed:
@ -533,9 +688,9 @@ class Service:
warning("Please restart manually")
return
info('Restarting service... ')
info("Restarting service... ")
if 'psutil' not in sys.modules:
if "psutil" not in sys.modules:
warning("psutil is not available, polling gap possible")
self._stop_managers_and_wait()
else:
@ -578,7 +733,7 @@ class Service:
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
info('Shutting down, waiting for running jobs to complete...')
info("Shutting down, waiting for running jobs to complete...")
self.stop_dispatch_timers()
self._release_master()
@ -592,7 +747,7 @@ class Service:
self._stop_managers_and_wait()
# try to release master lock
info('Shutdown of %s/%s complete', os.getpid(), threading.current_thread().name)
info("Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name)
self.exit(0)
def start_dispatch_timers(self):
@ -636,10 +791,13 @@ class Service:
We do this be creating a file in the base directory (.lock.service) if it doesn't exist and
obtaining an exclusive lock on that file.
"""
lock_file = "{}/{}".format(self.config.BASE_DIR, '.lock.service')
lock_file = "{}/{}".format(self.config.BASE_DIR, ".lock.service")
import fcntl
self._fp = open(lock_file, 'w') # keep a reference so the file handle isn't garbage collected
self._fp = open(
lock_file, "w"
) # keep a reference so the file handle isn't garbage collected
self._fp.flush()
try:
fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
@ -652,46 +810,72 @@ class Service:
try:
# Report on the poller instance as a whole
self._db.query('INSERT INTO poller_cluster(node_id, poller_name, poller_version, poller_groups, last_report, master) '
'values("{0}", "{1}", "{2}", "{3}", NOW(), {4}) '
'ON DUPLICATE KEY UPDATE poller_version="{2}", poller_groups="{3}", last_report=NOW(), master={4}; '
.format(self.config.node_id, self.config.name, "librenms-service", ','.join(str(g) for g in self.config.group), 1 if self.is_master else 0))
self._db.query(
"INSERT INTO poller_cluster(node_id, poller_name, poller_version, poller_groups, last_report, master) "
'values("{0}", "{1}", "{2}", "{3}", NOW(), {4}) '
'ON DUPLICATE KEY UPDATE poller_version="{2}", poller_groups="{3}", last_report=NOW(), master={4}; '.format(
self.config.node_id,
self.config.name,
"librenms-service",
",".join(str(g) for g in self.config.group),
1 if self.is_master else 0,
)
)
# Find our ID
self._db.query('SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format(self.config.node_id))
self._db.query(
'SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format(
self.config.node_id
)
)
for worker_type, manager in self.queue_managers.items():
worker_seconds, devices = manager.performance.reset()
# Record the queue state
self._db.query('INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) '
'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) '
'ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; '
.format(worker_type,
sum([manager.get_queue(group).qsize() for group in self.config.group]),
devices,
worker_seconds,
getattr(self.config, worker_type).workers,
getattr(self.config, worker_type).frequency)
)
self._db.query(
"INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) "
'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) '
"ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; ".format(
worker_type,
sum(
[
manager.get_queue(group).qsize()
for group in self.config.group
]
),
devices,
worker_seconds,
getattr(self.config, worker_type).workers,
getattr(self.config, worker_type).frequency,
)
)
except pymysql.err.Error:
exception("Unable to log performance statistics - is the database still online?")
exception(
"Unable to log performance statistics - is the database still online?"
)
def systemd_watchdog(self):
if 'systemd.daemon' in sys.modules:
if "systemd.daemon" in sys.modules:
notify("WATCHDOG=1")
def logfile_watchdog(self):
try:
# check that lofgile has been written to within last poll period
logfile_mdiff = datetime.now().timestamp() - os.path.getmtime(self.config.watchdog_logfile)
logfile_mdiff = datetime.now().timestamp() - os.path.getmtime(
self.config.watchdog_logfile
)
except FileNotFoundError as e:
error("Log file not found! {}".format(e))
return
if logfile_mdiff > self.config.poller.frequency:
critical("BARK! Log file older than {}s, restarting service!".format(self.config.poller.frequency))
critical(
"BARK! Log file older than {}s, restarting service!".format(
self.config.poller.frequency
)
)
self.restart()
else:
info("Log file updated {}s ago".format(int(logfile_mdiff)))

View File

@ -51,9 +51,9 @@ try:
from optparse import OptionParser
except ImportError as exc:
print('ERROR: missing one or more of the following python modules:')
print('threading, queue, sys, subprocess, time, os, json')
print('ERROR: %s' % exc)
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
APP_NAME = "discovery_wrapper"
@ -68,9 +68,9 @@ def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set('discovery.ping.' + key, key, 60)
if memc.get('discovery.ping.' + key) == key:
memc.delete('discovery.ping.' + key)
memc.set("discovery.ping." + key, key, 60)
if memc.get("discovery.ping." + key) == key:
memc.delete("discovery.ping." + key)
return True
else:
return False
@ -106,17 +106,19 @@ def printworker():
global distdisco
if distdisco:
if not IsNode:
memc_touch('discovery.master', 30)
nodes = memc.get('discovery.nodes')
memc_touch("discovery.master", 30)
nodes = memc.get("discovery.nodes")
if nodes is None and not memc_alive():
print("WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly.")
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
distdisco = False
nodes = nodeso
if nodes is not nodeso:
print("INFO: %s Node(s) Total" % (nodes))
nodeso = nodes
else:
memc_touch('discovery.nodes', 30)
memc_touch("discovery.nodes", 30)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
@ -136,9 +138,15 @@ def printworker():
per_device_duration[device_id] = elapsed_time
discovered_devices += 1
if elapsed_time < 300:
print("INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time))
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print("WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time))
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
@ -152,28 +160,48 @@ def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if not distdisco or memc.get('discovery.device.' + str(device_id)) is None:
if not distdisco or memc.get("discovery.device." + str(device_id)) is None:
if distdisco:
result = memc.add('discovery.device.' + str(device_id), config['distributed_poller_name'], 300)
result = memc.add(
"discovery.device." + str(device_id),
config["distributed_poller_name"],
300,
)
if not result:
print("This device (%s) appears to be being discovered by another discovery node" % (device_id))
print(
"This device (%s) appears to be being discovered by another discovery node"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print("Lost Memcached, Not discovering Device %s as Node. Master will discover it." % device_id)
print(
"Lost Memcached, Not discovering Device %s as Node. Master will discover it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = "-d >> %s/discover_device_%s.log" % (log_dir, device_id) if debug else ">> /dev/null"
command = "/usr/bin/env php %s -h %s %s 2>&1" % (discovery_path, device_id, output)
output = (
"-d >> %s/discover_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
discovery_path,
device_id,
output,
)
# TODO: Replace with command_runner
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put([threading.current_thread().name, device_id, elapsed_time])
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
@ -181,48 +209,60 @@ def poll_worker():
poll_queue.task_done()
if __name__ == '__main__':
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + '/.env')
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
discovery_path = config['install_dir'] + '/discovery.php'
log_dir = config['log_dir']
discovery_path = config["install_dir"] + "/discovery.php"
log_dir = config["log_dir"]
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if 'distributed_poller_group' in config:
discovery_group = str(config['distributed_poller_group'])
if "distributed_poller_group" in config:
discovery_group = str(config["distributed_poller_group"])
else:
discovery_group = False
if ('distributed_poller' in config and
'distributed_poller_memcached_host' in config and
'distributed_poller_memcached_port' in config and
config['distributed_poller']):
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
try:
import memcache
import uuid
memc = memcache.Client([config['distributed_poller_memcached_host'] + ':' +
str(config['distributed_poller_memcached_port'])])
if str(memc.get("discovery.master")) == config['distributed_poller_name']:
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get("discovery.master")) == config["distributed_poller_name"]:
print("This system is already joined as the discovery master.")
sys.exit(2)
if memc_alive():
if memc.get("discovery.master") is None:
print("Registered as Master")
memc.set("discovery.master", config['distributed_poller_name'], 30)
memc.set("discovery.master", config["distributed_poller_name"], 30)
memc.set("discovery.nodes", 0, 3600)
IsNode = False
else:
print("Registered as Node joining Master %s" % memc.get("discovery.master"))
print(
"Registered as Node joining Master %s"
% memc.get("discovery.master")
)
IsNode = True
memc.incr("discovery.nodes")
distdisco = True
else:
print("Could not connect to memcached, disabling distributed discovery.")
print(
"Could not connect to memcached, disabling distributed discovery."
)
distdisco = False
IsNode = False
except SystemExit:
@ -249,8 +289,13 @@ if __name__ == '__main__':
usage = "usage: %prog [options] <workers> (Default: 1 Do not set too high)"
description = "Spawn multiple discovery.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option('-d', '--debug', action='store_true', default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.")
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
@ -269,12 +314,23 @@ if __name__ == '__main__':
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if discovery_group is not False:
query = "select device_id from devices where poller_group IN(" + discovery_group + ") and disabled = 0 order by last_polled_timetaken desc"
query = (
"select device_id from devices where poller_group IN("
+ discovery_group
+ ") and disabled = 0 order by last_polled_timetaken desc"
)
else:
query = "select device_id from devices where disabled = 0 order by last_polled_timetaken desc"
# EOC2
db = LNMS.db_open(config['db_socket'], config['db_host'], int(config['db_port']), config['db_user'], config['db_pass'], config['db_name'])
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
int(config["db_port"]),
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
@ -293,9 +349,10 @@ if __name__ == '__main__':
poll_queue = queue.Queue()
print_queue = queue.Queue()
print("INFO: starting the discovery at %s with %s threads, slowest devices first" % (
time.strftime("%Y-%m-%d %H:%M:%S"),
amount_of_workers))
print(
"INFO: starting the discovery at %s with %s threads, slowest devices first"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
@ -317,13 +374,15 @@ if __name__ == '__main__':
total_time = int(time.time() - s_time)
print("INFO: discovery-wrapper polled %s devices in %s seconds with %s workers" % (
discovered_devices, total_time, amount_of_workers))
print(
"INFO: discovery-wrapper polled %s devices in %s seconds with %s workers"
% (discovered_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if distdisco or memc_alive():
master = memc.get("discovery.master")
if master == config['distributed_poller_name'] and not IsNode:
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all discovery-nodes to finish")
nodes = memc.get("discovery.nodes")
while nodes is not None and nodes > 0:
@ -335,7 +394,7 @@ if __name__ == '__main__':
print("Clearing Locks")
x = minlocks
while x <= maxlocks:
memc.delete('discovery.device.' + str(x))
memc.delete("discovery.device." + str(x))
x = x + 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
@ -349,17 +408,29 @@ if __name__ == '__main__':
show_stopper = False
if total_time > 21600:
print("WARNING: the process took more than 6 hours to finish, you need faster hardware or more threads")
print("INFO: in sequential style discovery the elapsed time would have been: %s seconds" % real_duration)
print(
"WARNING: the process took more than 6 hours to finish, you need faster hardware or more threads"
)
print(
"INFO: in sequential style discovery the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > 3600:
print("WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device]))
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print("ERROR: Some devices are taking more than 3600 seconds, the script cannot recommend you what to do.")
print(
"ERROR: Some devices are taking more than 3600 seconds, the script cannot recommend you what to do."
)
else:
recommend = int(total_time / 300.0 * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend)
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)

View File

@ -10,20 +10,36 @@ import LibreNMS
from logging import info
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='LibreNMS Service - manages polling and other periodic processes')
parser.add_argument('-g', '--group', type=int, help="Set the poller group for this poller")
parser.add_argument('-v', '--verbose', action='count', help="Show verbose output.")
parser.add_argument('-d', '--debug', action="store_true", help="Show debug output.")
parser.add_argument('-m', '--multiple', action="store_true", help="Allow multiple instances of the service.")
parser.add_argument('-t', '--timestamps', action="store_true", help="Include timestamps in the logs (not normally needed for syslog/journald")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="LibreNMS Service - manages polling and other periodic processes"
)
parser.add_argument(
"-g", "--group", type=int, help="Set the poller group for this poller"
)
parser.add_argument("-v", "--verbose", action="count", help="Show verbose output.")
parser.add_argument("-d", "--debug", action="store_true", help="Show debug output.")
parser.add_argument(
"-m",
"--multiple",
action="store_true",
help="Allow multiple instances of the service.",
)
parser.add_argument(
"-t",
"--timestamps",
action="store_true",
help="Include timestamps in the logs (not normally needed for syslog/journald",
)
args = parser.parse_args()
if args.timestamps:
logging.basicConfig(format='%(asctime)s %(threadName)s(%(levelname)s):%(message)s')
logging.basicConfig(
format="%(asctime)s %(threadName)s(%(levelname)s):%(message)s"
)
else:
logging.basicConfig(format='%(threadName)s(%(levelname)s):%(message)s')
logging.basicConfig(format="%(threadName)s(%(levelname)s):%(message)s")
if args.verbose:
logging.getLogger().setLevel(logging.INFO)
@ -42,7 +58,11 @@ if __name__ == '__main__':
service.config.single_instance = args.multiple
if args.group:
service.config.group = [ args.group ]
service.config.group = [args.group]
info('Entering main LibreNMS service loop on {}/{}...'.format(os.getpid(), threading.current_thread().name))
info(
"Entering main LibreNMS service loop on {}/{}...".format(
os.getpid(), threading.current_thread().name
)
)
service.start()

View File

@ -39,9 +39,9 @@ try:
from optparse import OptionParser
except ImportError as exc:
print('ERROR: missing one or more of the following python modules:')
print('threading, queue, sys, subprocess, time, os, json')
print('ERROR: %s' % exc)
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
@ -60,9 +60,9 @@ def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set('poller.ping.' + key, key, 60)
if memc.get('poller.ping.' + key) == key:
memc.delete('poller.ping.' + key)
memc.set("poller.ping." + key, key, 60)
if memc.get("poller.ping." + key) == key:
memc.delete("poller.ping." + key)
return True
else:
return False
@ -82,7 +82,9 @@ def memc_touch(key, time):
def get_time_tag(step):
ts = int(time.time())
return ts - ts % step
#EOC0
# EOC0
"""
A seperate queue and a single worker for printing information to the screen prevents
@ -91,6 +93,8 @@ def get_time_tag(step):
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then two they hav erpoblesms.
"""
def printworker():
nodeso = 0
while True:
@ -102,7 +106,9 @@ def printworker():
memc_touch(master_tag, 10)
nodes = memc.get(nodes_tag)
if nodes is None and not memc_alive():
print("WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly.")
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
distpoll = False
nodes = nodeso
if nodes is not nodeso:
@ -129,9 +135,15 @@ def printworker():
per_device_duration[device_id] = elapsed_time
polled_devices += 1
if elapsed_time < step:
print("INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time))
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print("WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time))
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
@ -139,33 +151,57 @@ def printworker():
This class will fork off single instances of the poller.php process, record
how long it takes, and push the resulting reports to the printer queue
"""
def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if not distpoll or memc.get('poller.device.%s.%s' % (device_id, time_tag)) is None:
if (
not distpoll
or memc.get("poller.device.%s.%s" % (device_id, time_tag)) is None
):
if distpoll:
result = memc.add('poller.device.%s.%s' % (device_id, time_tag), config['distributed_poller_name'],
step)
result = memc.add(
"poller.device.%s.%s" % (device_id, time_tag),
config["distributed_poller_name"],
step,
)
if not result:
print("This device (%s) appears to be being polled by another poller" % (device_id))
print(
"This device (%s) appears to be being polled by another poller"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print("Lost Memcached, Not polling Device %s as Node. Master will poll it." % device_id)
print(
"Lost Memcached, Not polling Device %s as Node. Master will poll it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = "-d >> %s/poll_device_%s.log" % (log_dir, device_id) if debug else ">> /dev/null"
command = "/usr/bin/env php %s -h %s %s 2>&1" % (poller_path, device_id, output)
output = (
"-d >> %s/poll_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
poller_path,
device_id,
output,
)
# TODO: replace with command_runner
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put([threading.current_thread().name, device_id, elapsed_time])
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
@ -173,32 +209,33 @@ def poll_worker():
poll_queue.task_done()
if __name__ == '__main__':
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + '/.env')
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
poller_path = config['install_dir'] + '/poller.php'
log_dir = config['log_dir']
poller_path = config["install_dir"] + "/poller.php"
log_dir = config["log_dir"]
if 'rrd' in config and 'step' in config['rrd']:
step = config['rrd']['step']
if "rrd" in config and "step" in config["rrd"]:
step = config["rrd"]["step"]
else:
step = 300
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if 'distributed_poller_group' in config:
poller_group = str(config['distributed_poller_group'])
if "distributed_poller_group" in config:
poller_group = str(config["distributed_poller_group"])
else:
poller_group = False
if ('distributed_poller' in config and
'distributed_poller_memcached_host' in config and
'distributed_poller_memcached_port' in config and
config['distributed_poller']):
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
time_tag = str(get_time_tag(step))
master_tag = "poller.master." + time_tag
@ -208,15 +245,20 @@ if __name__ == '__main__':
import memcache
import uuid
memc = memcache.Client([config['distributed_poller_memcached_host'] + ':' +
str(config['distributed_poller_memcached_port'])])
if str(memc.get(master_tag)) == config['distributed_poller_name']:
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get(master_tag)) == config["distributed_poller_name"]:
print("This system is already joined as the poller master.")
sys.exit(2)
if memc_alive():
if memc.get(master_tag) is None:
print("Registered as Master")
memc.set(master_tag, config['distributed_poller_name'], 10)
memc.set(master_tag, config["distributed_poller_name"], 10)
memc.set(nodes_tag, 0, step)
IsNode = False
else:
@ -252,8 +294,13 @@ if __name__ == '__main__':
usage = "usage: %prog [options] <workers> (Default: 16 (Do not set too high)"
description = "Spawn multiple poller.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option('-d', '--debug', action='store_true', default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.")
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
@ -272,13 +319,23 @@ if __name__ == '__main__':
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if poller_group is not False:
query = 'select device_id from devices where poller_group IN(' + poller_group + \
') and disabled = 0 order by last_polled_timetaken desc'
query = (
"select device_id from devices where poller_group IN("
+ poller_group
+ ") and disabled = 0 order by last_polled_timetaken desc"
)
else:
query = 'select device_id from devices where disabled = 0 order by last_polled_timetaken desc'
query = "select device_id from devices where disabled = 0 order by last_polled_timetaken desc"
# EOC2
db = LNMS.db_open(config['db_socket'], config['db_host'], config['db_port'], config['db_user'], config['db_pass'], config['db_name'])
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
@ -298,8 +355,9 @@ if __name__ == '__main__':
print_queue = queue.Queue()
print(
"INFO: starting the poller at %s with %s threads, slowest devices first" % (time.strftime("%Y-%m-%d %H:%M:%S"),
amount_of_workers))
"INFO: starting the poller at %s with %s threads, slowest devices first"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
@ -321,13 +379,15 @@ if __name__ == '__main__':
total_time = int(time.time() - s_time)
print("INFO: poller-wrapper polled %s devices in %s seconds with %s workers" % (
polled_devices, total_time, amount_of_workers))
print(
"INFO: poller-wrapper polled %s devices in %s seconds with %s workers"
% (polled_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if distpoll or memc_alive():
master = memc.get(master_tag)
if master == config['distributed_poller_name'] and not IsNode:
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all poller-nodes to finish")
nodes = memc.get(nodes_tag)
while nodes is not None and nodes > 0:
@ -339,7 +399,7 @@ if __name__ == '__main__':
print("Clearing Locks for %s" % time_tag)
x = minlocks
while x <= maxlocks:
res = memc.delete('poller.device.%s.%s' % (x, time_tag))
res = memc.delete("poller.device.%s.%s" % (x, time_tag))
x += 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
@ -352,36 +412,57 @@ if __name__ == '__main__':
show_stopper = False
db = LNMS.db_open(config['db_socket'], config['db_host'], config['db_port'], config['db_user'], config['db_pass'], config['db_name'])
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
query = "update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'" % (
polled_devices,
total_time,
config['distributed_poller_name'])
query = (
"update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'"
% (polled_devices, total_time, config["distributed_poller_name"])
)
response = cursor.execute(query)
if response == 1:
db.commit()
else:
query = "insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'" % (
config['distributed_poller_name'], polled_devices, total_time)
query = (
"insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'"
% (config["distributed_poller_name"], polled_devices, total_time)
)
cursor.execute(query)
db.commit()
db.close()
if total_time > step:
print(
"WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads" % step)
print("INFO: in sequential style polling the elapsed time would have been: %s seconds" % real_duration)
"WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads"
% step
)
print(
"INFO: in sequential style polling the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > step:
print("WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device]))
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print(
"ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do." % step)
"ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do."
% step
)
else:
recommend = int(total_time / step * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend)
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)

View File

@ -6,11 +6,9 @@ from pkg_resources import DistributionNotFound, VersionConflict
args = sys.argv
# verbose flag
verbose = '-v' in args
verbose = "-v" in args
requirements = [
'PyMySQL'
]
requirements = ["PyMySQL"]
try:
pkg_resources.require(requirements)

View File

@ -51,9 +51,9 @@ try:
from optparse import OptionParser
except ImportError as exc:
print('ERROR: missing one or more of the following python modules:')
print('threading, queue, sys, subprocess, time, os, json')
print('ERROR: %s' % exc)
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
@ -72,9 +72,9 @@ def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set('poller.ping.' + key, key, 60)
if memc.get('poller.ping.' + key) == key:
memc.delete('poller.ping.' + key)
memc.set("poller.ping." + key, key, 60)
if memc.get("poller.ping." + key) == key:
memc.delete("poller.ping." + key)
return True
else:
return False
@ -94,7 +94,9 @@ def memc_touch(key, time):
def get_time_tag(step):
ts = int(time.time())
return ts - ts % step
#EOC0
# EOC0
"""
@ -114,17 +116,19 @@ def printworker():
global servicedisco
if servicedisco:
if not IsNode:
memc_touch('service.master', 10)
nodes = memc.get('service.nodes')
memc_touch("service.master", 10)
nodes = memc.get("service.nodes")
if nodes is None and not memc_alive():
print("WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly.")
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
servicedisco = False
nodes = nodeso
if nodes is not nodeso:
print("INFO: %s Node(s) Total" % (nodes))
nodeso = nodes
else:
memc_touch('service.nodes', 10)
memc_touch("service.nodes", 10)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
@ -144,11 +148,18 @@ def printworker():
per_device_duration[device_id] = elapsed_time
service_devices += 1
if elapsed_time < 300:
print("INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time))
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print("WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time))
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
"""
This class will fork off single instances of the check-services.php process, record
how long it takes, and push the resulting reports to the printer queue
@ -159,28 +170,48 @@ def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if not servicedisco or memc.get('service.device.' + str(device_id)) is None:
if not servicedisco or memc.get("service.device." + str(device_id)) is None:
if servicedisco:
result = memc.add('service.device.' + str(device_id), config['distributed_poller_name'], 300)
result = memc.add(
"service.device." + str(device_id),
config["distributed_poller_name"],
300,
)
if not result:
print("This device (%s) appears to be being service checked by another service node" % (device_id))
print(
"This device (%s) appears to be being service checked by another service node"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print("Lost Memcached, Not service checking Device %s as Node. Master will check it." % device_id)
print(
"Lost Memcached, Not service checking Device %s as Node. Master will check it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = "-d >> %s/services_device_%s.log" % (log_dir, device_id) if debug else ">> /dev/null"
output = (
"-d >> %s/services_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
# TODO replace with command_runner
command = "/usr/bin/env php %s -h %s %s 2>&1" % (service_path, device_id, output)
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
service_path,
device_id,
output,
)
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put([threading.current_thread().name, device_id, elapsed_time])
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
@ -188,48 +219,60 @@ def poll_worker():
poll_queue.task_done()
if __name__ == '__main__':
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + '/.env')
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
service_path = config['install_dir'] + '/check-services.php'
log_dir = config['log_dir']
service_path = config["install_dir"] + "/check-services.php"
log_dir = config["log_dir"]
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if 'distributed_poller_group' in config:
service_group = str(config['distributed_poller_group'])
if "distributed_poller_group" in config:
service_group = str(config["distributed_poller_group"])
else:
service_group = False
if ('distributed_poller' in config and
'distributed_poller_memcached_host' in config and
'distributed_poller_memcached_port' in config and
config['distributed_poller']):
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
try:
import memcache
import uuid
memc = memcache.Client([config['distributed_poller_memcached_host'] + ':' +
str(config['distributed_poller_memcached_port'])])
if str(memc.get("service.master")) == config['distributed_poller_name']:
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get("service.master")) == config["distributed_poller_name"]:
print("This system is already joined as the service master.")
sys.exit(2)
if memc_alive():
if memc.get("service.master") is None:
print("Registered as Master")
memc.set("service.master", config['distributed_poller_name'], 10)
memc.set("service.master", config["distributed_poller_name"], 10)
memc.set("service.nodes", 0, 300)
IsNode = False
else:
print("Registered as Node joining Master %s" % memc.get("service.master"))
print(
"Registered as Node joining Master %s"
% memc.get("service.master")
)
IsNode = True
memc.incr("service.nodes")
servicedisco = True
else:
print("Could not connect to memcached, disabling distributed service checks.")
print(
"Could not connect to memcached, disabling distributed service checks."
)
servicedisco = False
IsNode = False
except SystemExit:
@ -256,8 +299,13 @@ if __name__ == '__main__':
usage = "usage: %prog [options] <workers> (Default: 1 (Do not set too high)"
description = "Spawn multiple check-services.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option('-d', '--debug', action='store_true', default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.")
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
@ -270,12 +318,23 @@ if __name__ == '__main__':
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if service_group is not False:
query = "SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`poller_group` IN(" + service_group + ") AND `devices`.`disabled` = 0"
query = (
"SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`poller_group` IN("
+ service_group
+ ") AND `devices`.`disabled` = 0"
)
else:
query = "SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`disabled` = 0"
# EOC2
db = LNMS.db_open(config['db_socket'], config['db_host'], config['db_port'], config['db_user'], config['db_pass'], config['db_name'])
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
@ -294,8 +353,10 @@ if __name__ == '__main__':
poll_queue = queue.Queue()
print_queue = queue.Queue()
print("INFO: starting the service check at %s with %s threads" % (time.strftime("%Y-%m-%d %H:%M:%S"),
amount_of_workers))
print(
"INFO: starting the service check at %s with %s threads"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
@ -317,12 +378,15 @@ if __name__ == '__main__':
total_time = int(time.time() - s_time)
print("INFO: services-wrapper checked %s devices in %s seconds with %s workers" % (service_devices, total_time, amount_of_workers))
print(
"INFO: services-wrapper checked %s devices in %s seconds with %s workers"
% (service_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if servicedisco or memc_alive():
master = memc.get("service.master")
if master == config['distributed_poller_name'] and not IsNode:
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all service-nodes to finish")
nodes = memc.get("service.nodes")
while nodes is not None and nodes > 0:
@ -334,7 +398,7 @@ if __name__ == '__main__':
print("Clearing Locks")
x = minlocks
while x <= maxlocks:
memc.delete('service.device.' + str(x))
memc.delete("service.device." + str(x))
x = x + 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
@ -348,17 +412,29 @@ if __name__ == '__main__':
show_stopper = False
if total_time > 300:
print("WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads")
print("INFO: in sequential style service checks the elapsed time would have been: %s seconds" % real_duration)
print(
"WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads"
)
print(
"INFO: in sequential style service checks the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > 300:
print("WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device]))
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print("ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do.")
print(
"ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do."
)
else:
recommend = int(total_time / 300.0 * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend)
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)

View File

@ -32,7 +32,7 @@ from subprocess import check_output, CalledProcessError
from sys import stdout
from time import time
Result = namedtuple('Result', ['ip', 'hostname', 'outcome', 'output'])
Result = namedtuple("Result", ["ip", "hostname", "outcome", "output"])
class Outcome:
@ -45,20 +45,20 @@ class Outcome:
TERMINATED = 6
POLLER_GROUP = '0'
POLLER_GROUP = "0"
VERBOSE_LEVEL = 0
THREADS = 32
CONFIG = {}
EXCLUDED_NETS = []
start_time = time()
stats = {
'count': 0,
"count": 0,
Outcome.ADDED: 0,
Outcome.UNPINGABLE: 0,
Outcome.KNOWN: 0,
Outcome.FAILED: 0,
Outcome.EXCLUDED: 0,
Outcome.TERMINATED: 0
Outcome.TERMINATED: 0,
}
@ -69,31 +69,44 @@ def debug(message, level=2):
def get_outcome_symbol(outcome):
return {
Outcome.UNDEFINED: '?', # should not occur
Outcome.ADDED: '+',
Outcome.UNPINGABLE: '.',
Outcome.KNOWN: '*',
Outcome.FAILED: '-',
Outcome.TERMINATED: ''
Outcome.UNDEFINED: "?", # should not occur
Outcome.ADDED: "+",
Outcome.UNPINGABLE: ".",
Outcome.KNOWN: "*",
Outcome.FAILED: "-",
Outcome.TERMINATED: "",
}[outcome]
def handle_result(data):
if VERBOSE_LEVEL > 0:
print('Scanned \033[1m{}\033[0m {}'.format(
("{} ({})".format(data.hostname, data.ip) if data.hostname else data.ip), data.output))
print(
"Scanned \033[1m{}\033[0m {}".format(
(
"{} ({})".format(data.hostname, data.ip)
if data.hostname
else data.ip
),
data.output,
)
)
else:
print(get_outcome_symbol(data.outcome), end='')
print(get_outcome_symbol(data.outcome), end="")
stdout.flush()
stats['count'] += 0 if data.outcome == Outcome.TERMINATED else 1
stats["count"] += 0 if data.outcome == Outcome.TERMINATED else 1
stats[data.outcome] += 1
def check_ip_excluded(check_ip):
for network_check in EXCLUDED_NETS:
if check_ip in network_check:
debug("\033[91m{} excluded by autodiscovery.nets-exclude\033[0m".format(check_ip), 1)
debug(
"\033[91m{} excluded by autodiscovery.nets-exclude\033[0m".format(
check_ip
),
1,
)
stats[Outcome.EXCLUDED] += 1
return True
return False
@ -113,7 +126,14 @@ def scan_host(scan_ip):
try:
arguments = ['/usr/bin/env', 'php', 'addhost.php', '-g', POLLER_GROUP, hostname or scan_ip]
arguments = [
"/usr/bin/env",
"php",
"addhost.php",
"-g",
POLLER_GROUP,
hostname or scan_ip,
]
if args.ping:
arguments.insert(5, args.ping)
add_output = check_output(arguments)
@ -121,45 +141,79 @@ def scan_host(scan_ip):
except CalledProcessError as err:
output = err.output.decode().rstrip()
if err.returncode == 2:
if 'Could not ping' in output:
if "Could not ping" in output:
return Result(scan_ip, hostname, Outcome.UNPINGABLE, output)
else:
return Result(scan_ip, hostname, Outcome.FAILED, output)
elif err.returncode == 3:
return Result(scan_ip, hostname, Outcome.KNOWN, output)
except KeyboardInterrupt:
return Result(scan_ip, hostname, Outcome.TERMINATED, 'Terminated')
return Result(scan_ip, hostname, Outcome.TERMINATED, "Terminated")
return Result(scan_ip, hostname, Outcome.UNDEFINED, output)
if __name__ == '__main__':
if __name__ == "__main__":
###################
# Parse arguments #
###################
parser = argparse.ArgumentParser(description='Scan network for snmp hosts and add them to LibreNMS.', formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('network', action='append', nargs='*', type=str, help="""CIDR noted IP-Range to scan. Can be specified multiple times
parser = argparse.ArgumentParser(
description="Scan network for snmp hosts and add them to LibreNMS.",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"network",
action="append",
nargs="*",
type=str,
help="""CIDR noted IP-Range to scan. Can be specified multiple times
This argument is only required if 'nets' config is not set
Example: 192.168.0.0/24
Example: 192.168.0.0/31 will be treated as an RFC3021 p-t-p network with two addresses, 192.168.0.0 and 192.168.0.1
Example: 192.168.0.1/32 will be treated as a single host address""")
parser.add_argument('-P', '--ping', action='store_const', const="-b", default="", help="""Add the device as an ICMP only device if it replies to ping but not SNMP.
Example: """ + __file__ + """ -P 192.168.0.0/24""")
parser.add_argument('-t', dest='threads', type=int,
help="How many IPs to scan at a time. More will increase the scan speed," +
" but could overload your system. Default: {}".format(THREADS))
parser.add_argument('-g', dest='group', type=str,
help="The poller group all scanned devices will be added to."
" Default: The first group listed in 'distributed_poller_group', or {} if not specificed".format(POLLER_GROUP))
parser.add_argument('-l', '--legend', action='store_true', help="Print the legend.")
parser.add_argument('-v', '--verbose', action='count',
help="Show debug output. Specifying multiple times increases the verbosity.")
Example: 192.168.0.1/32 will be treated as a single host address""",
)
parser.add_argument(
"-P",
"--ping",
action="store_const",
const="-b",
default="",
help="""Add the device as an ICMP only device if it replies to ping but not SNMP.
Example: """
+ __file__
+ """ -P 192.168.0.0/24""",
)
parser.add_argument(
"-t",
dest="threads",
type=int,
help="How many IPs to scan at a time. More will increase the scan speed,"
+ " but could overload your system. Default: {}".format(THREADS),
)
parser.add_argument(
"-g",
dest="group",
type=str,
help="The poller group all scanned devices will be added to."
" Default: The first group listed in 'distributed_poller_group', or {} if not specificed".format(
POLLER_GROUP
),
)
parser.add_argument("-l", "--legend", action="store_true", help="Print the legend.")
parser.add_argument(
"-v",
"--verbose",
action="count",
help="Show debug output. Specifying multiple times increases the verbosity.",
)
# compatibility arguments
parser.add_argument('-r', dest='network', action='append', help=argparse.SUPPRESS)
parser.add_argument('-d', '-i', dest='verbose', action='count', help=argparse.SUPPRESS)
parser.add_argument('-n', action='store_true', help=argparse.SUPPRESS)
parser.add_argument('-b', action='store_true', help=argparse.SUPPRESS)
parser.add_argument("-r", dest="network", action="append", help=argparse.SUPPRESS)
parser.add_argument(
"-d", "-i", dest="verbose", action="count", help=argparse.SUPPRESS
)
parser.add_argument("-n", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("-b", action="store_true", help=argparse.SUPPRESS)
args = parser.parse_args()
@ -170,12 +224,20 @@ Example: """ + __file__ + """ -P 192.168.0.0/24""")
install_dir = path.dirname(path.realpath(__file__))
chdir(install_dir)
try:
CONFIG = json.loads(check_output(['/usr/bin/env', 'php', 'config_to_json.php']).decode())
CONFIG = json.loads(
check_output(["/usr/bin/env", "php", "config_to_json.php"]).decode()
)
except CalledProcessError as e:
parser.error("Could not execute: {}\n{}".format(' '.join(e.cmd), e.output.decode().rstrip()))
parser.error(
"Could not execute: {}\n{}".format(
" ".join(e.cmd), e.output.decode().rstrip()
)
)
exit(2)
POLLER_GROUP = args.group or str(CONFIG.get('distributed_poller_group')).split(',')[0]
POLLER_GROUP = (
args.group or str(CONFIG.get("distributed_poller_group")).split(",")[0]
)
#######################
# Build network lists #
@ -190,34 +252,40 @@ Example: """ + __file__ + """ -P 192.168.0.0/24""")
netargs.append(a)
# make sure we have something to scan
if not CONFIG.get('nets', []) and not netargs:
parser.error('\'nets\' is not set in your LibreNMS config, you must specify a network to scan')
if not CONFIG.get("nets", []) and not netargs:
parser.error(
"'nets' is not set in your LibreNMS config, you must specify a network to scan"
)
# check for valid networks
networks = []
for net in (netargs if netargs else CONFIG.get('nets', [])):
for net in netargs if netargs else CONFIG.get("nets", []):
try:
networks.append(ip_network(u'%s' % net, True))
debug('Network parsed: {}'.format(net), 2)
networks.append(ip_network(u"%s" % net, True))
debug("Network parsed: {}".format(net), 2)
except ValueError as e:
parser.error('Invalid network format {}'.format(e))
parser.error("Invalid network format {}".format(e))
for net in CONFIG.get('autodiscovery', {}).get('nets-exclude', {}):
for net in CONFIG.get("autodiscovery", {}).get("nets-exclude", {}):
try:
EXCLUDED_NETS.append(ip_network(net, True))
debug('Excluded network: {}'.format(net), 2)
debug("Excluded network: {}".format(net), 2)
except ValueError as e:
parser.error('Invalid excluded network format {}, check your config.php'.format(e))
parser.error(
"Invalid excluded network format {}, check your config.php".format(e)
)
#################
# Scan networks #
#################
debug('SNMP settings from config.php: {}'.format(CONFIG.get('snmp', {})), 2)
debug("SNMP settings from config.php: {}".format(CONFIG.get("snmp", {})), 2)
if args.legend and not VERBOSE_LEVEL:
print('Legend:\n+ Added device\n* Known device\n- Failed to add device\n. Ping failed\n')
print(
"Legend:\n+ Added device\n* Known device\n- Failed to add device\n. Ping failed\n"
)
print('Scanning IPs:')
print("Scanning IPs:")
pool = Pool(processes=THREADS)
@ -240,9 +308,16 @@ Example: """ + __file__ + """ -P 192.168.0.0/24""")
if VERBOSE_LEVEL == 0:
print("\n")
base = 'Scanned {} IPs: {} known devices, added {} devices, failed to add {} devices'
summary = base.format(stats['count'], stats[Outcome.KNOWN], stats[Outcome.ADDED], stats[Outcome.FAILED])
base = (
"Scanned {} IPs: {} known devices, added {} devices, failed to add {} devices"
)
summary = base.format(
stats["count"],
stats[Outcome.KNOWN],
stats[Outcome.ADDED],
stats[Outcome.FAILED],
)
if stats[Outcome.EXCLUDED]:
summary += ', {} ips excluded by config'.format(stats[Outcome.EXCLUDED])
summary += ", {} ips excluded by config".format(stats[Outcome.EXCLUDED])
print(summary)
print('Runtime: {:.2f} seconds'.format(time() - start_time))
print("Runtime: {:.2f} seconds".format(time() - start_time))

View File

@ -21,73 +21,111 @@ class TestLocks(unittest.TestCase):
@staticmethod
def lock_thread(manager, lock_name, expiration, unlock_sleep=0):
manager.lock(lock_name, 'lock_thread', expiration)
manager.lock(lock_name, "lock_thread", expiration)
if unlock_sleep:
sleep(unlock_sleep)
manager.unlock(lock_name, 'lock_thread')
manager.unlock(lock_name, "lock_thread")
def test_threading_lock(self):
lm = LibreNMS.ThreadingLock()
thread = threading.Thread(target=self.lock_thread, args=(lm, 'first.lock', 2, 1))
thread = threading.Thread(
target=self.lock_thread, args=(lm, "first.lock", 2, 1)
)
thread.daemon = True
thread.start()
sleep(0.05)
self.assertFalse(lm.lock('first.lock', 'main_thread', 0), "Acquired lock when it is held by thread")
self.assertFalse(lm.unlock('first.lock', 'main_thread'), "Unlocked lock main doesn't own")
self.assertFalse(
lm.lock("first.lock", "main_thread", 0),
"Acquired lock when it is held by thread",
)
self.assertFalse(
lm.unlock("first.lock", "main_thread"), "Unlocked lock main doesn't own"
)
sleep(1.1)
self.assertTrue(lm.lock('first.lock', 'main_thread', 1),
"Could not acquire lock previously held by thread")
self.assertFalse(lm.lock('first.lock', 'main_thread', 1, False), "Was able to re-lock a lock main owns")
self.assertTrue(lm.lock('first.lock', 'main_thread', 1, True), "Could not re-lock a lock main owns")
self.assertTrue(lm.check_lock('first.lock'))
self.assertTrue(lm.unlock('first.lock', 'main_thread'), "Could not unlock lock main holds")
self.assertFalse(lm.unlock('first.lock', 'main_thread'), "Unlocked an unlocked lock?")
self.assertFalse(lm.check_lock('first.lock'))
self.assertTrue(
lm.lock("first.lock", "main_thread", 1),
"Could not acquire lock previously held by thread",
)
self.assertFalse(
lm.lock("first.lock", "main_thread", 1, False),
"Was able to re-lock a lock main owns",
)
self.assertTrue(
lm.lock("first.lock", "main_thread", 1, True),
"Could not re-lock a lock main owns",
)
self.assertTrue(lm.check_lock("first.lock"))
self.assertTrue(
lm.unlock("first.lock", "main_thread"), "Could not unlock lock main holds"
)
self.assertFalse(
lm.unlock("first.lock", "main_thread"), "Unlocked an unlocked lock?"
)
self.assertFalse(lm.check_lock("first.lock"))
def test_redis_lock(self):
if 'redis' not in sys.modules:
self.assertTrue(True, 'Skipped Redis tests')
if "redis" not in sys.modules:
self.assertTrue(True, "Skipped Redis tests")
else:
rc = redis.Redis()
rc.delete('lock:redis.lock') # make sure no previous data exists
rc.delete("lock:redis.lock") # make sure no previous data exists
lm = LibreNMS.RedisLock(namespace='lock')
thread = threading.Thread(target=self.lock_thread, args=(lm, 'redis.lock', 2, 1))
lm = LibreNMS.RedisLock(namespace="lock")
thread = threading.Thread(
target=self.lock_thread, args=(lm, "redis.lock", 2, 1)
)
thread.daemon = True
thread.start()
sleep(0.05)
self.assertFalse(lm.lock('redis.lock', 'main_thread', 1), "Acquired lock when it is held by thread")
self.assertFalse(lm.unlock('redis.lock', 'main_thread'), "Unlocked lock main doesn't own")
self.assertFalse(
lm.lock("redis.lock", "main_thread", 1),
"Acquired lock when it is held by thread",
)
self.assertFalse(
lm.unlock("redis.lock", "main_thread"), "Unlocked lock main doesn't own"
)
sleep(1.1)
self.assertTrue(lm.lock('redis.lock', 'main_thread', 1),
"Could not acquire lock previously held by thread")
self.assertFalse(lm.lock('redis.lock', 'main_thread', 1), "Relocked an existing lock")
self.assertTrue(lm.lock('redis.lock', 'main_thread', 1, True), "Could not re-lock a lock main owns")
self.assertTrue(lm.unlock('redis.lock', 'main_thread'), "Could not unlock lock main holds")
self.assertFalse(lm.unlock('redis.lock', 'main_thread'), "Unlocked an unlocked lock?")
self.assertTrue(
lm.lock("redis.lock", "main_thread", 1),
"Could not acquire lock previously held by thread",
)
self.assertFalse(
lm.lock("redis.lock", "main_thread", 1), "Relocked an existing lock"
)
self.assertTrue(
lm.lock("redis.lock", "main_thread", 1, True),
"Could not re-lock a lock main owns",
)
self.assertTrue(
lm.unlock("redis.lock", "main_thread"),
"Could not unlock lock main holds",
)
self.assertFalse(
lm.unlock("redis.lock", "main_thread"), "Unlocked an unlocked lock?"
)
def queue_thread(self, manager, expect, wait=True):
self.assertEqual(expect, manager.get(wait), 'Got unexpected data in thread')
self.assertEqual(expect, manager.get(wait), "Got unexpected data in thread")
def test_redis_queue(self):
if 'redis' not in sys.modules:
self.assertTrue(True, 'Skipped Redis tests')
if "redis" not in sys.modules:
self.assertTrue(True, "Skipped Redis tests")
else:
rc = redis.Redis()
rc.delete('queue:testing') # make sure no previous data exists
qm = LibreNMS.RedisUniqueQueue('testing', namespace='queue')
rc.delete("queue:testing") # make sure no previous data exists
qm = LibreNMS.RedisUniqueQueue("testing", namespace="queue")
thread = threading.Thread(target=self.queue_thread, args=(qm, None, False))
thread.daemon = True
thread.start()
thread = threading.Thread(target=self.queue_thread, args=(qm, '2'))
thread = threading.Thread(target=self.queue_thread, args=(qm, "2"))
thread.daemon = True
thread.start()
qm.put(2)
@ -96,9 +134,11 @@ class TestLocks(unittest.TestCase):
qm.put(4)
sleep(0.05)
self.assertEqual(2, qm.qsize())
self.assertEqual('3', qm.get())
self.assertEqual('4', qm.get(), "Did not get second item in queue")
self.assertEqual(None, qm.get_nowait(), "Did not get None when queue should be empty")
self.assertEqual("3", qm.get())
self.assertEqual("4", qm.get(), "Did not get second item in queue")
self.assertEqual(
None, qm.get_nowait(), "Did not get None when queue should be empty"
)
self.assertTrue(qm.empty(), "Queue should be empty")
@ -128,5 +168,6 @@ class TestTimer(unittest.TestCase):
self.assertEqual(3, self.counter)
timer.stop()
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()