Full Python code fusion / refactor and hardening 2nd edition (#13188)

* New service/discovery/poller wrapper

* Convert old wrapper scripts to bootstrap loaders for wrapper.py

* Move wrapper.py to LibreNMS module directory

* Reformat files

* File reformatting

* bootstrap files reformatting

* Fusion service and wrapper database connections and get_config_data functions

* Moved subprocess calls to command_runner

* LibreNMS library and __init__ fusion

* Reformat files

* Normalize logging use

* Reformatting code

* Fix missing argument for error log

* Fix refactor typo in DBConfig class

* Add default timeout for config.php data fetching

* distributed discovery should finish with a timestamp instead of an epoch

* Fix docstring inside dict prevents service key to work

* Fix poller insert statement

* Fix service wrapper typo

* Update docstring since we changed function behavior

* Normalize SQL statements

* Convert optparse to argparse

* Revert discovery thread number

* Handle debug logging

* Fix file option typo

* Reformat code

* Add credits to source package

* Rename logs depending on the wrapper type

* Cap max logfile size to 10MB

* Reformat code

* Add exception for Redis < 5.0

* Make sure we always log something from service

* Fix bogus description

* Add an error message on missing config file

* Improve error message when .env file cannot be loaded

* Improve wrapper logging

* Fix cron run may fail when environment path is not set

* Add missing -wrapper suffix for logs

* Conform to prior naming scheme

* Linter fix

* Add inline copy of command_runner

* Another linter fix

* Raise exception after logging

* Updated inline command_runner

* Add command_runner to requirements

* I guess I love linter fixes ;)

* Don't spawn more threads than devices

* Fix typo in log call

* Add exit codes to log on error, add command line to debug log

* Add thread name to error message

* Log errors in end message for easier debugging

* Typo fix

* In love of linting
This commit is contained in:
Orsiris de Jong 2021-09-27 21:24:25 +02:00 committed by GitHub
parent 9b5684da4e
commit bfa200f3f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1831 additions and 1669 deletions

View File

@ -1,15 +1,17 @@
import json
import logging
import os
import subprocess
import sys
import tempfile
import threading
import timeit
from collections import deque
from logging import critical, info, debug, exception
from logging.handlers import RotatingFileHandler
from math import ceil
from queue import Queue
from time import time
from .service import Service, ServiceConfig
from .command_runner import command_runner
from .queuemanager import (
QueueManager,
TimedQueueManager,
@ -20,6 +22,161 @@ from .queuemanager import (
PollerQueueManager,
DiscoveryQueueManager,
)
from .service import Service, ServiceConfig
# Hard limit script execution time so we don't get to "hang"
DEFAULT_SCRIPT_TIMEOUT = 3600
MAX_LOGFILE_SIZE = (1024 ** 2) * 10 # 10 Megabytes max log files
logger = logging.getLogger(__name__)
# Logging functions ########################################################
# Original logger functions from ofunctions.logger_utils package
FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s")
def logger_get_console_handler():
try:
console_handler = logging.StreamHandler(sys.stdout)
except OSError as exc:
print("Cannot log to stdout, trying stderr. Message %s" % exc)
try:
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(FORMATTER)
return console_handler
except OSError as exc:
print("Cannot log to stderr neither. Message %s" % exc)
return False
else:
console_handler.setFormatter(FORMATTER)
return console_handler
def logger_get_file_handler(log_file):
err_output = None
try:
file_handler = RotatingFileHandler(
log_file,
mode="a",
encoding="utf-8",
maxBytes=MAX_LOGFILE_SIZE,
backupCount=3,
)
except OSError as exc:
try:
print(
"Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s"
% exc
)
err_output = str(exc)
temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log"
print("Trying temporary log file in " + temp_log_file)
file_handler = RotatingFileHandler(
temp_log_file,
mode="a",
encoding="utf-8",
maxBytes=MAX_LOGFILE_SIZE,
backupCount=1,
)
file_handler.setFormatter(FORMATTER)
err_output += "\nUsing [%s]" % temp_log_file
return file_handler, err_output
except OSError as exc:
print(
"Cannot create temporary log file either. Will not log to file. Message: %s"
% exc
)
return False
else:
file_handler.setFormatter(FORMATTER)
return file_handler, err_output
def logger_get_logger(log_file=None, temp_log_file=None, debug=False):
# If a name is given to getLogger, than modules can't log to the root logger
_logger = logging.getLogger()
if debug is True:
_logger.setLevel(logging.DEBUG)
else:
_logger.setLevel(logging.INFO)
console_handler = logger_get_console_handler()
if console_handler:
_logger.addHandler(console_handler)
if log_file is not None:
file_handler, err_output = logger_get_file_handler(log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
if temp_log_file is not None:
if os.path.isfile(temp_log_file):
try:
os.remove(temp_log_file)
except OSError:
logger.warning("Cannot remove temp log file [%s]." % temp_log_file)
file_handler, err_output = logger_get_file_handler(temp_log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
return _logger
# Generic functions ########################################################
def check_for_file(file):
try:
with open(file) as file:
pass
except IOError as exc:
logger.error("File '%s' is not readable" % file)
logger.debug("Traceback:", exc_info=True)
sys.exit(2)
# Config functions #########################################################
def get_config_data(base_dir):
check_for_file(os.path.join(base_dir, ".env"))
try:
import dotenv
env_path = "{}/.env".format(base_dir)
logger.info("Attempting to load .env from '%s'", env_path)
dotenv.load_dotenv(dotenv_path=env_path, verbose=True)
if not os.getenv("NODE_ID"):
logger.critical(".env does not contain a valid NODE_ID setting.")
except ImportError as exc:
logger.critical(
'Could not import "%s" - Please check that the poller user can read the file, and that composer install has been run recently\nAdditional info: %s'
% (env_path, exc)
)
logger.debug("Traceback:", exc_info=True)
config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % base_dir]
try:
exit_code, output = command_runner(config_cmd, timeout=300)
if exit_code == 0:
return json.loads(output)
raise EnvironmentError
except Exception as exc:
logger.critical("ERROR: Could not execute command [%s]: %s" % (config_cmd, exc))
logger.debug("Traceback:", exc_info=True)
return None
def normalize_wait(seconds):
@ -28,8 +185,9 @@ def normalize_wait(seconds):
def call_script(script, args=()):
"""
Run a LibreNMS script. Captures all output and throws an exception if a non-zero
status is returned. Blocks parent signals (like SIGINT and SIGTERM).
Run a LibreNMS script. Captures all output returns exit code.
Blocks parent signals (like SIGINT and SIGTERM).
Kills script if it takes too long
:param script: the name of the executable relative to the base directory
:param args: a tuple of arguments to send to the command
:returns the output of the command
@ -42,14 +200,10 @@ def call_script(script, args=()):
base_dir = os.path.realpath(os.path.dirname(__file__) + "/..")
cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args))
debug("Running {}".format(cmd))
logger.debug("Running {}".format(cmd))
# preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default)
return subprocess.check_call(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
close_fds=True,
return command_runner(
cmd, preexec_fn=os.setsid, close_fds=True, timeout=DEFAULT_SCRIPT_TIMEOUT
)
@ -70,15 +224,15 @@ class DB:
import pymysql
pymysql.install_as_MySQLdb()
info("Using pure python SQL client")
logger.info("Using pure python SQL client")
except ImportError:
info("Using other SQL client")
logger.info("Using other SQL client")
try:
import MySQLdb
except ImportError:
critical("ERROR: missing a mysql python module")
critical(
logger.critical("ERROR: missing a mysql python module")
logger.critical(
"Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI"
)
raise
@ -99,7 +253,7 @@ class DB:
conn.ping(True)
self._db[threading.get_ident()] = conn
except Exception as e:
critical("ERROR: Could not connect to MySQL database! {}".format(e))
logger.critical("ERROR: Could not connect to MySQL database! {}".format(e))
raise
def db_conn(self):
@ -128,7 +282,7 @@ class DB:
cursor.close()
return cursor
except Exception as e:
critical("DB Connection exception {}".format(e))
logger.critical("DB Connection exception {}".format(e))
self.close()
raise
@ -167,7 +321,7 @@ class RecurringTimer:
class Lock:
""" Base lock class this is not thread safe"""
"""Base lock class this is not thread safe"""
def __init__(self):
self._locks = {} # store a tuple (owner, expiration)
@ -210,7 +364,7 @@ class Lock:
return False
def print_locks(self):
debug(self._locks)
logger.debug(self._locks)
class ThreadingLock(Lock):
@ -269,7 +423,7 @@ class RedisLock(Lock):
self._redis = redis.Redis(**kwargs)
self._redis.ping()
self._namespace = namespace
info(
logger.info(
"Created redis lock manager with socket_timeout of {}s".format(
redis_kwargs["socket_timeout"]
)
@ -296,7 +450,7 @@ class RedisLock(Lock):
non_existing = not (allow_owner_relock and self._redis.get(key) == owner)
return self._redis.set(key, owner, ex=int(expiration), nx=non_existing)
except redis.exceptions.ResponseError as e:
exception(
logger.critical(
"Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s",
name,
owner,
@ -351,7 +505,7 @@ class RedisUniqueQueue(object):
self._redis = redis.Redis(**kwargs)
self._redis.ping()
self.key = "{}:{}".format(namespace, name)
info(
logger.info(
"Created redis queue with socket_timeout of {}s".format(
redis_kwargs["socket_timeout"]
)
@ -371,10 +525,20 @@ class RedisUniqueQueue(object):
self._redis.zadd(self.key, {item: time()}, nx=True)
def get(self, block=True, timeout=None):
if block:
item = self._redis.bzpopmin(self.key, timeout=timeout)
else:
item = self._redis.zpopmin(self.key)
try:
if block:
item = self._redis.bzpopmin(self.key, timeout=timeout)
else:
item = self._redis.zpopmin(self.key)
# Unfortunately we cannot use _redis.exceptions.ResponseError Exception here
# Since it would trigger another exception in queuemanager
except Exception as e:
logger.critical(
"BZPOPMIN/ZPOPMIN command failed: {}\nNote that redis >= 5.0 is required.".format(
e
)
)
raise
if item:
item = item[1]

640
LibreNMS/command_runner.py Normal file
View File

@ -0,0 +1,640 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of command_runner module
"""
command_runner is a quick tool to launch commands from Python, get exit code
and output, and handle most errors that may happen
Versioning semantics:
Major version: backward compatibility breaking changes
Minor version: New functionality
Patch version: Backwards compatible bug fixes
"""
__intname__ = "command_runner"
__author__ = "Orsiris de Jong"
__copyright__ = "Copyright (C) 2015-2021 Orsiris de Jong"
__licence__ = "BSD 3 Clause"
__version__ = "1.2.1"
__build__ = "2021090901"
import io
import os
import shlex
import subprocess
import sys
from datetime import datetime
from logging import getLogger
from time import sleep
try:
import psutil
except ImportError:
# Don't bother with an error since we need command_runner to work without dependencies
pass
try:
import signal
except ImportError:
pass
# Python 2.7 compat fixes (queue was Queue)
try:
import queue
except ImportError:
import Queue as queue
import threading
# Python 2.7 compat fixes (missing typing and FileNotFoundError)
try:
from typing import Union, Optional, List, Tuple, NoReturn, Any
except ImportError:
pass
try:
FileNotFoundError
except NameError:
# pylint: disable=W0622 (redefined-builtin)
FileNotFoundError = IOError
try:
TimeoutExpired = subprocess.TimeoutExpired
except AttributeError:
class TimeoutExpired(BaseException):
"""
Basic redeclaration when subprocess.TimeoutExpired does not exist, python <= 3.3
"""
def __init__(self, cmd, timeout, output=None, stderr=None):
self.cmd = cmd
self.timeout = timeout
self.output = output
self.stderr = stderr
def __str__(self):
return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout)
@property
def stdout(self):
return self.output
@stdout.setter
def stdout(self, value):
# There's no obvious reason to set this, but allow it anyway so
# .stdout is a transparent alias for .output
self.output = value
class KbdInterruptGetOutput(BaseException):
"""
Make sure we get the current output when KeyboardInterrupt is made
"""
def __init__(self, output):
self._output = output
@property
def output(self):
return self._output
logger = getLogger(__intname__)
PIPE = subprocess.PIPE
MIN_RESOLUTION = 0.05 # Minimal sleep time between polling, reduces CPU usage
def kill_childs_mod(
pid=None, # type: int
itself=False, # type: bool
soft_kill=False, # type: bool
):
# type: (...) -> bool
"""
Inline version of ofunctions.kill_childs that has no hard dependency on psutil
Kills all childs of pid (current pid can be obtained with os.getpid())
If no pid given current pid is taken
Good idea when using multiprocessing, is to call with atexit.register(ofunctions.kill_childs, os.getpid(),)
Beware: MS Windows does not maintain a process tree, so child dependencies are computed on the fly
Knowing this, orphaned processes (where parent process died) cannot be found and killed this way
Prefer using process.send_signal() in favor of process.kill() to avoid race conditions when PID was reused too fast
:param pid: Which pid tree we'll kill
:param itself: Should parent be killed too ?
"""
sig = None
### BEGIN COMMAND_RUNNER MOD
if "psutil" not in sys.modules:
logger.error(
"No psutil module present. Can only kill direct pids, not child subtree."
)
if "signal" not in sys.modules:
logger.error(
"No signal module present. Using direct psutil kill API which might have race conditions when PID is reused too fast."
)
else:
"""
Extract from Python3 doc
On Windows, signal() can only be called with SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM, or SIGBREAK.
A ValueError will be raised in any other case. Note that not all systems define the same set of signal names;
an AttributeError will be raised if a signal name is not defined as SIG* module level constant.
"""
try:
if not soft_kill and hasattr(signal, "SIGKILL"):
# Don't bother to make pylint go crazy on Windows
# pylint: disable=E1101
sig = signal.SIGKILL
else:
sig = signal.SIGTERM
except NameError:
sig = None
### END COMMAND_RUNNER MOD
def _process_killer(
process, # type: Union[subprocess.Popen, psutil.Process]
sig, # type: signal.valid_signals
soft_kill, # type: bool
):
# (...) -> None
"""
Simple abstract process killer that works with signals in order to avoid reused PID race conditions
and can prefers using terminate than kill
"""
if sig:
try:
process.send_signal(sig)
# psutil.NoSuchProcess might not be available, let's be broad
# pylint: disable=W0703
except Exception:
pass
else:
if soft_kill:
process.terminate()
else:
process.kill()
try:
current_process = psutil.Process(pid if pid is not None else os.getpid())
# psutil.NoSuchProcess might not be available, let's be broad
# pylint: disable=W0703
except Exception:
if itself:
os.kill(
pid, 15
) # 15 being signal.SIGTERM or SIGKILL depending on the platform
return False
for child in current_process.children(recursive=True):
_process_killer(child, sig, soft_kill)
if itself:
_process_killer(current_process, sig, soft_kill)
return True
def command_runner(
command, # type: Union[str, List[str]]
valid_exit_codes=None, # type: Optional[List[int]]
timeout=3600, # type: Optional[int]
shell=False, # type: bool
encoding=None, # type: Optional[str]
stdout=None, # type: Union[int, str]
stderr=None, # type: Union[int, str]
windows_no_window=False, # type: bool
live_output=False, # type: bool
method="monitor", # type: str
**kwargs # type: Any
):
# type: (...) -> Tuple[Optional[int], str]
"""
Unix & Windows compatible subprocess wrapper that handles output encoding and timeouts
Newer Python check_output already handles encoding and timeouts, but this one is retro-compatible
It is still recommended to set cp437 for windows and utf-8 for unix
Also allows a list of various valid exit codes (ie no error when exit code = arbitrary int)
command should be a list of strings, eg ['ping', '127.0.0.1', '-c 2']
command can also be a single string, ex 'ping 127.0.0.1 -c 2' if shell=True or if os is Windows
Accepts all of subprocess.popen arguments
Whenever we can, we need to avoid shell=True in order to preserve better security
Avoiding shell=True involves passing absolute paths to executables since we don't have shell PATH environment
When no stdout option is given, we'll get output into the returned (exit_code, output) tuple
When stdout = filename or stderr = filename, we'll write output to the given file
live_output will poll the process for output and show it on screen (output may be non reliable, don't use it if
your program depends on the commands' stdout output)
windows_no_window will disable visible window (MS Windows platform only)
Returns a tuple (exit_code, output)
"""
# Choose default encoding when none set
# cp437 encoding assures we catch most special characters from cmd.exe
if not encoding:
encoding = "cp437" if os.name == "nt" else "utf-8"
# Fix when unix command was given as single string
# This is more secure than setting shell=True
if os.name == "posix" and shell is False and isinstance(command, str):
command = shlex.split(command)
# Set default values for kwargs
errors = kwargs.pop(
"errors", "backslashreplace"
) # Don't let encoding issues make you mad
universal_newlines = kwargs.pop("universal_newlines", False)
creationflags = kwargs.pop("creationflags", 0)
# subprocess.CREATE_NO_WINDOW was added in Python 3.7 for Windows OS only
if (
windows_no_window
and sys.version_info[0] >= 3
and sys.version_info[1] >= 7
and os.name == "nt"
):
# Disable the following pylint error since the code also runs on nt platform, but
# triggers an error on Unix
# pylint: disable=E1101
creationflags = creationflags | subprocess.CREATE_NO_WINDOW
close_fds = kwargs.pop("close_fds", "posix" in sys.builtin_module_names)
# Default buffer size. line buffer (1) is deprecated in Python 3.7+
bufsize = kwargs.pop("bufsize", 16384)
# Decide whether we write to output variable only (stdout=None), to output variable and stdout (stdout=PIPE)
# or to output variable and to file (stdout='path/to/file')
if stdout is None:
_stdout = PIPE
stdout_to_file = False
elif isinstance(stdout, str):
# We will send anything to file
_stdout = open(stdout, "wb")
stdout_to_file = True
else:
# We will send anything to given stdout pipe
_stdout = stdout
stdout_to_file = False
# The only situation where we don't add stderr to stdout is if a specific target file was given
if isinstance(stderr, str):
_stderr = open(stderr, "wb")
stderr_to_file = True
else:
_stderr = subprocess.STDOUT
stderr_to_file = False
def to_encoding(
process_output, # type: Union[str, bytes]
encoding, # type: str
errors, # type: str
):
# type: (...) -> str
"""
Convert bytes output to string and handles conversion errors
"""
# Compatibility for earlier Python versions where Popen has no 'encoding' nor 'errors' arguments
if isinstance(process_output, bytes):
try:
process_output = process_output.decode(encoding, errors=errors)
except TypeError:
try:
# handle TypeError: don't know how to handle UnicodeDecodeError in error callback
process_output = process_output.decode(encoding, errors="ignore")
except (ValueError, TypeError):
# What happens when str cannot be concatenated
logger.debug("Output cannot be captured {}".format(process_output))
return process_output
def _read_pipe(
stream, # type: io.StringIO
output_queue, # type: queue.Queue
):
# type: (...) -> None
"""
will read from subprocess.PIPE
Must be threaded since readline() might be blocking on Windows GUI apps
Partly based on https://stackoverflow.com/a/4896288/2635443
"""
# WARNING: Depending on the stream type (binary or text), the sentinel character
# needs to be of the same type, or the iterator won't have an end
# We also need to check that stream has readline, in case we're writing to files instead of PIPE
if hasattr(stream, "readline"):
sentinel_char = "" if hasattr(stream, "encoding") else b""
for line in iter(stream.readline, sentinel_char):
output_queue.put(line)
output_queue.put(None)
stream.close()
def _poll_process(
process, # type: Union[subprocess.Popen[str], subprocess.Popen]
timeout, # type: int
encoding, # type: str
errors, # type: str
):
# type: (...) -> Tuple[Optional[int], str]
"""
Process stdout/stderr output polling is only used in live output mode
since it takes more resources than using communicate()
Reads from process output pipe until:
- Timeout is reached, in which case we'll terminate the process
- Process ends by itself
Returns an encoded string of the pipe output
"""
begin_time = datetime.now()
output = ""
output_queue = queue.Queue()
def __check_timeout(
begin_time, # type: datetime.timestamp
timeout, # type: int
):
# type: (...) -> None
"""
Simple subfunction to check whether timeout is reached
Since we check this alot, we put it into a function
"""
if timeout and (datetime.now() - begin_time).total_seconds() > timeout:
kill_childs_mod(process.pid, itself=True, soft_kill=False)
raise TimeoutExpired(process, timeout, output)
try:
read_thread = threading.Thread(
target=_read_pipe, args=(process.stdout, output_queue)
)
read_thread.daemon = True # thread dies with the program
read_thread.start()
while True:
try:
line = output_queue.get(timeout=MIN_RESOLUTION)
except queue.Empty:
__check_timeout(begin_time, timeout)
else:
if line is None:
break
else:
line = to_encoding(line, encoding, errors)
if live_output:
sys.stdout.write(line)
output += line
__check_timeout(begin_time, timeout)
# Make sure we wait for the process to terminate, even after
# output_queue has finished sending data, so we catch the exit code
while process.poll() is None:
__check_timeout(begin_time, timeout)
# Additional timeout check to make sure we don't return an exit code from processes
# that were killed because of timeout
__check_timeout(begin_time, timeout)
exit_code = process.poll()
return exit_code, output
except KeyboardInterrupt:
raise KbdInterruptGetOutput(output)
def _timeout_check_thread(
process, # type: Union[subprocess.Popen[str], subprocess.Popen]
timeout, # type: int
timeout_queue, # type: queue.Queue
):
# type: (...) -> None
"""
Since elder python versions don't have timeout, we need to manually check the timeout for a process
"""
begin_time = datetime.now()
while True:
if timeout and (datetime.now() - begin_time).total_seconds() > timeout:
kill_childs_mod(process.pid, itself=True, soft_kill=False)
timeout_queue.put(True)
break
if process.poll() is not None:
break
sleep(MIN_RESOLUTION)
def _monitor_process(
process, # type: Union[subprocess.Popen[str], subprocess.Popen]
timeout, # type: int
encoding, # type: str
errors, # type: str
):
# type: (...) -> Tuple[Optional[int], str]
"""
Create a thread in order to enforce timeout
Get stdout output and return it
"""
# Shared mutable objects have proven to have race conditions with PyPy 3.7 (mutable object
# is changed in thread, but outer monitor function has still old mutable object state)
# Strangely, this happened only sometimes on github actions/ubuntu 20.04.3 & pypy 3.7
# Let's create a queue to get the timeout thread response on a deterministic way
timeout_queue = queue.Queue()
is_timeout = False
thread = threading.Thread(
target=_timeout_check_thread,
args=(process, timeout, timeout_queue),
)
thread.setDaemon(True)
thread.start()
process_output = None
stdout = None
try:
# Don't use process.wait() since it may deadlock on old Python versions
# Also it won't allow communicate() to get incomplete output on timeouts
while process.poll() is None:
sleep(MIN_RESOLUTION)
try:
is_timeout = timeout_queue.get_nowait()
except queue.Empty:
pass
else:
break
# We still need to use process.communicate() in this loop so we don't get stuck
# with poll() is not None even after process is finished
try:
stdout, _ = process.communicate()
# ValueError is raised on closed IO file
except (TimeoutExpired, ValueError):
pass
exit_code = process.poll()
try:
stdout, _ = process.communicate()
except (TimeoutExpired, ValueError):
pass
process_output = to_encoding(stdout, encoding, errors)
# On PyPy 3.7 only, we can have a race condition where we try to read the queue before
# the thread could write to it, failing to register a timeout.
# This workaround prevents reading the queue while the thread is still alive
while thread.is_alive():
sleep(MIN_RESOLUTION)
try:
is_timeout = timeout_queue.get_nowait()
except queue.Empty:
pass
if is_timeout:
raise TimeoutExpired(process, timeout, process_output)
return exit_code, process_output
except KeyboardInterrupt:
raise KbdInterruptGetOutput(process_output)
try:
# Finally, we won't use encoding & errors arguments for Popen
# since it would defeat the idea of binary pipe reading in live mode
# Python >= 3.3 has SubProcessError(TimeoutExpired) class
# Python >= 3.6 has encoding & error arguments
# universal_newlines=True makes netstat command fail under windows
# timeout does not work under Python 2.7 with subprocess32 < 3.5
# decoder may be cp437 or unicode_escape for dos commands or utf-8 for powershell
# Disabling pylint error for the same reason as above
# pylint: disable=E1123
if sys.version_info >= (3, 6):
process = subprocess.Popen(
command,
stdout=_stdout,
stderr=_stderr,
shell=shell,
universal_newlines=universal_newlines,
encoding=encoding,
errors=errors,
creationflags=creationflags,
bufsize=bufsize, # 1 = line buffered
close_fds=close_fds,
**kwargs
)
else:
process = subprocess.Popen(
command,
stdout=_stdout,
stderr=_stderr,
shell=shell,
universal_newlines=universal_newlines,
creationflags=creationflags,
bufsize=bufsize,
close_fds=close_fds,
**kwargs
)
try:
if method == "poller" or live_output:
exit_code, output = _poll_process(process, timeout, encoding, errors)
else:
exit_code, output = _monitor_process(process, timeout, encoding, errors)
except KbdInterruptGetOutput as exc:
exit_code = -252
output = "KeyboardInterrupted. Partial output\n{}".format(exc.output)
try:
kill_childs_mod(process.pid, itself=True, soft_kill=False)
except AttributeError:
pass
if stdout_to_file:
_stdout.write(output.encode(encoding, errors=errors))
logger.debug(
'Command "{}" returned with exit code "{}". Command output was:'.format(
command, exit_code
)
)
except subprocess.CalledProcessError as exc:
exit_code = exc.returncode
try:
output = exc.output
except AttributeError:
output = "command_runner: Could not obtain output from command."
if exit_code in valid_exit_codes if valid_exit_codes is not None else [0]:
logger.debug(
'Command "{}" returned with exit code "{}". Command output was:'.format(
command, exit_code
)
)
logger.error(
'Command "{}" failed with exit code "{}". Command output was:'.format(
command, exc.returncode
)
)
logger.error(output)
except FileNotFoundError as exc:
logger.error('Command "{}" failed, file not found: {}'.format(command, exc))
exit_code, output = -253, exc.__str__()
# On python 2.7, OSError is also raised when file is not found (no FileNotFoundError)
# pylint: disable=W0705 (duplicate-except)
except (OSError, IOError) as exc:
logger.error('Command "{}" failed because of OS: {}'.format(command, exc))
exit_code, output = -253, exc.__str__()
except TimeoutExpired as exc:
message = 'Timeout {} seconds expired for command "{}" execution. Original output was: {}'.format(
timeout, command, exc.output
)
logger.error(message)
if stdout_to_file:
_stdout.write(message.encode(encoding, errors=errors))
exit_code, output = (
-254,
'Timeout of {} seconds expired for command "{}" execution. Original output was: {}'.format(
timeout, command, exc.output
),
)
# We need to be able to catch a broad exception
# pylint: disable=W0703
except Exception as exc:
logger.error(
'Command "{}" failed for unknown reasons: {}'.format(command, exc),
exc_info=True,
)
logger.debug("Error:", exc_info=True)
exit_code, output = -255, exc.__str__()
finally:
if stdout_to_file:
_stdout.close()
if stderr_to_file:
_stderr.close()
logger.debug(output)
return exit_code, output
def deferred_command(command, defer_time=300):
# type: (str, int) -> None
"""
This is basically an ugly hack to launch commands which are detached from parent process
Especially useful to launch an auto update/deletion of a running executable after a given amount of
seconds after it finished
"""
# Use ping as a standard timer in shell since it's present on virtually *any* system
if os.name == "nt":
deferrer = "ping 127.0.0.1 -n {} > NUL & ".format(defer_time)
else:
deferrer = "ping 127.0.0.1 -c {} > /dev/null && ".format(defer_time)
# We'll create a independent shell process that will not be attached to any stdio interface
# Our command shall be a single string since shell=True
subprocess.Popen(
deferrer + command,
shell=True,
stdin=None,
stdout=None,
stderr=None,
close_fds=True,
)

View File

@ -1,174 +0,0 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import os
import logging
import tempfile
import subprocess
import threading
import time
from logging.handlers import RotatingFileHandler
try:
import MySQLdb
except ImportError:
try:
import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
except ImportError as exc:
print("ERROR: missing the mysql python module please run:")
print("pip install -r requirements.txt")
print("ERROR: %s" % exc)
sys.exit(2)
logger = logging.getLogger(__name__)
# Logging functions ########################################################
FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s")
def logger_get_console_handler():
try:
console_handler = logging.StreamHandler(sys.stdout)
except OSError as exc:
print("Cannot log to stdout, trying stderr. Message %s" % exc)
try:
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(FORMATTER)
return console_handler
except OSError as exc:
print("Cannot log to stderr neither. Message %s" % exc)
return False
else:
console_handler.setFormatter(FORMATTER)
return console_handler
def logger_get_file_handler(log_file):
err_output = None
try:
file_handler = RotatingFileHandler(
log_file, mode="a", encoding="utf-8", maxBytes=1024000, backupCount=3
)
except OSError as exc:
try:
print(
"Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s"
% exc
)
err_output = str(exc)
temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log"
print("Trying temporary log file in " + temp_log_file)
file_handler = RotatingFileHandler(
temp_log_file,
mode="a",
encoding="utf-8",
maxBytes=1000000,
backupCount=1,
)
file_handler.setFormatter(FORMATTER)
err_output += "\nUsing [%s]" % temp_log_file
return file_handler, err_output
except OSError as exc:
print(
"Cannot create temporary log file either. Will not log to file. Message: %s"
% exc
)
return False
else:
file_handler.setFormatter(FORMATTER)
return file_handler, err_output
def logger_get_logger(log_file=None, temp_log_file=None, debug=False):
# If a name is given to getLogger, than modules can't log to the root logger
_logger = logging.getLogger()
if debug is True:
_logger.setLevel(logging.DEBUG)
else:
_logger.setLevel(logging.INFO)
console_handler = logger_get_console_handler()
if console_handler:
_logger.addHandler(console_handler)
if log_file is not None:
file_handler, err_output = logger_get_file_handler(log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
if temp_log_file is not None:
if os.path.isfile(temp_log_file):
try:
os.remove(temp_log_file)
except OSError:
logger.warning("Cannot remove temp log file [%s]." % temp_log_file)
file_handler, err_output = logger_get_file_handler(temp_log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
return _logger
# Generic functions ########################################################
def check_for_file(file):
try:
with open(file) as f:
pass
except IOError as exc:
logger.error("Oh dear... %s does not seem readable" % file)
logger.debug("ERROR:", exc_info=True)
sys.exit(2)
# Config functions #########################################################
def get_config_data(install_dir):
config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % install_dir]
try:
proc = subprocess.Popen(
config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
return proc.communicate()[0].decode()
except Exception as e:
print("ERROR: Could not execute: %s" % config_cmd)
print(e)
sys.exit(2)
# Database functions #######################################################
def db_open(db_socket, db_server, db_port, db_username, db_password, db_dbname):
try:
options = dict(
host=db_server,
port=int(db_port),
user=db_username,
passwd=db_password,
db=db_dbname,
)
if db_socket:
options["unix_socket"] = db_socket
return MySQLdb.connect(**options)
except Exception as dbexc:
print("ERROR: Could not connect to MySQL database!")
print("ERROR: %s" % dbexc)
sys.exit(2)

View File

@ -1,14 +1,16 @@
import logging
import pymysql
import subprocess
import threading
import traceback
from logging import debug, info, error, critical, warning
from queue import Empty
from subprocess import CalledProcessError
import LibreNMS
logger = logging.getLogger(__name__)
class QueueManager:
def __init__(
self, config, lock_manager, type_desc, uses_groups=False, auto_start=True
@ -39,8 +41,8 @@ class QueueManager:
self._stop_event = threading.Event()
info("Groups: {}".format(self.config.group))
info(
logger.info("Groups: {}".format(self.config.group))
logger.info(
"{} QueueManager created: {} workers, {}s frequency".format(
self.type.title(),
self.get_poller_config().workers,
@ -52,9 +54,9 @@ class QueueManager:
self.start()
def _service_worker(self, queue_id):
debug("Worker started {}".format(threading.current_thread().getName()))
logger.debug("Worker started {}".format(threading.current_thread().getName()))
while not self._stop_event.is_set():
debug(
logger.debug(
"Worker {} checking queue {} ({}) for work".format(
threading.current_thread().getName(),
queue_id,
@ -68,13 +70,13 @@ class QueueManager:
if (
device_id is not None
): # None returned by redis after timeout when empty
debug(
logger.debug(
"Worker {} ({}) got work {} ".format(
threading.current_thread().getName(), queue_id, device_id
)
)
with LibreNMS.TimeitContext.start() as t:
debug("Queues: {}".format(self._queues))
logger.debug("Queues: {}".format(self._queues))
target_desc = (
"{} ({})".format(device_id if device_id else "", queue_id)
if queue_id
@ -83,7 +85,7 @@ class QueueManager:
self.do_work(device_id, queue_id)
runtime = t.delta()
info(
logger.info(
"Completed {} run for {} in {:.2f}s".format(
self.type, target_desc, runtime
)
@ -92,13 +94,13 @@ class QueueManager:
except Empty:
pass # ignore empty queue exception from subprocess.Queue
except CalledProcessError as e:
error(
logger.error(
"{} poller script error! {} returned {}: {}".format(
self.type.title(), e.cmd, e.returncode, e.output
)
)
except Exception as e:
error("{} poller exception! {}".format(self.type.title(), e))
logger.error("{} poller exception! {}".format(self.type.title(), e))
traceback.print_exc()
def post_work(self, payload, queue_id):
@ -108,7 +110,7 @@ class QueueManager:
:param queue_id: which queue to post to, 0 is the default
"""
self.get_queue(queue_id).put(payload)
debug(
logger.debug(
"Posted work for {} to {}:{} queue size: {}".format(
payload, self.type, queue_id, self.get_queue(queue_id).qsize()
)
@ -131,7 +133,7 @@ class QueueManager:
thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1)
self.spawn_worker(thread_name, group)
debug(
logger.debug(
"Started {} {} threads for group {}".format(
group_workers, self.type, group
)
@ -196,7 +198,7 @@ class QueueManager:
:param group:
:return:
"""
info("Creating queue {}".format(self.queue_name(queue_type, group)))
logger.info("Creating queue {}".format(self.queue_name(queue_type, group)))
try:
return LibreNMS.RedisUniqueQueue(
self.queue_name(queue_type, group),
@ -213,15 +215,19 @@ class QueueManager:
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical(
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical(
"Please install redis-py, either through your os software repository or from PyPI"
)
exit(2)
except Exception as e:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical("Could not connect to Redis. {}".format(e))
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical("Could not connect to Redis. {}".format(e))
exit(2)
return LibreNMS.UniqueQueue()
@ -341,11 +347,19 @@ class BillingQueueManager(TimedQueueManager):
def do_work(self, run_type, group):
if run_type == "poll":
info("Polling billing")
LibreNMS.call_script("poll-billing.php")
logger.info("Polling billing")
exit_code, output = LibreNMS.call_script("poll-billing.php")
if exit_code != 0:
logger.warning(
"Error {} in Polling billing:\n{}".format(exit_code, output)
)
else: # run_type == 'calculate'
info("Calculating billing")
LibreNMS.call_script("billing-calculate.php")
logger.info("Calculating billing")
exit_code, output = LibreNMS.call_script("billing-calculate.php")
if exit_code != 0:
logger.warning(
"Error {} in Calculating billing:\n{}".format(exit_code, output)
)
class PingQueueManager(TimedQueueManager):
@ -365,13 +379,19 @@ class PingQueueManager(TimedQueueManager):
for group in groups:
self.post_work("", group[0])
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
logger.critical("DB Exception ({})".format(e))
def do_work(self, context, group):
if self.lock(group, "group", timeout=self.config.ping.frequency):
try:
info("Running fast ping")
LibreNMS.call_script("ping.php", ("-g", group))
logger.info("Running fast ping")
exit_code, output = LibreNMS.call_script("ping.php", ("-g", group))
if exit_code != 0:
logger.warning(
"Running fast ping for {} failed with error code {}: {}".format(
group, exit_code, output
)
)
finally:
self.unlock(group, "group")
@ -396,16 +416,19 @@ class ServicesQueueManager(TimedQueueManager):
for device in devices:
self.post_work(device[0], device[1])
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
logger.critical("DB Exception ({})".format(e))
def do_work(self, device_id, group):
if self.lock(device_id, timeout=self.config.services.frequency):
try:
info("Checking services on device {}".format(device_id))
LibreNMS.call_script("check-services.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 5:
info(
logger.info("Checking services on device {}".format(device_id))
exit_code, output = LibreNMS.call_script(
"check-services.php", ("-h", device_id)
)
if exit_code == 0:
self.unlock(device_id)
else:
if exit_code == 5:
logger.info(
"Device {} is down, cannot poll service, waiting {}s for retry".format(
device_id, self.config.down_retry
)
@ -413,8 +436,12 @@ class ServicesQueueManager(TimedQueueManager):
self.lock(
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
self.unlock(device_id)
else:
logger.warning(
"Unknown error while checking services on device {} with exit code {}: {}".format(
device_id, exit_code, output
)
)
class AlertQueueManager(TimedQueueManager):
@ -432,14 +459,13 @@ class AlertQueueManager(TimedQueueManager):
self.post_work("alerts", 0)
def do_work(self, device_id, group):
try:
info("Checking alerts")
LibreNMS.call_script("alerts.php")
except subprocess.CalledProcessError as e:
if e.returncode == 1:
warning("There was an error issuing alerts: {}".format(e.output))
logger.info("Checking alerts")
exit_code, output = LibreNMS.call_script("alerts.php")
if exit_code != 0:
if exit_code == 1:
logger.warning("There was an error issuing alerts: {}".format(output))
else:
raise
raise CalledProcessError
class PollerQueueManager(QueueManager):
@ -454,13 +480,14 @@ class PollerQueueManager(QueueManager):
def do_work(self, device_id, group):
if self.lock(device_id, timeout=self.config.poller.frequency):
info("Polling device {}".format(device_id))
logger.info("Polling device {}".format(device_id))
try:
LibreNMS.call_script("poller.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 6:
warning(
exit_code, output = LibreNMS.call_script("poller.php", ("-h", device_id))
if exit_code == 0:
self.unlock(device_id)
else:
if exit_code == 6:
logger.warning(
"Polling device {} unreachable, waiting {}s for retry".format(
device_id, self.config.down_retry
)
@ -470,12 +497,14 @@ class PollerQueueManager(QueueManager):
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
error("Polling device {} failed! {}".format(device_id, e))
logger.error(
"Polling device {} failed with exit code {}: {}".format(
device_id, exit_code, output
)
)
self.unlock(device_id)
else:
self.unlock(device_id)
else:
debug("Tried to poll {}, but it is locked".format(device_id))
logger.debug("Tried to poll {}, but it is locked".format(device_id))
class DiscoveryQueueManager(TimedQueueManager):
@ -497,18 +526,19 @@ class DiscoveryQueueManager(TimedQueueManager):
for device in devices:
self.post_work(device[0], device[1])
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
logger.critical("DB Exception ({})".format(e))
def do_work(self, device_id, group):
if self.lock(
device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency)
):
try:
info("Discovering device {}".format(device_id))
LibreNMS.call_script("discovery.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 5:
info(
logger.info("Discovering device {}".format(device_id))
exit_code, output = LibreNMS.call_script("discovery.php", ("-h", device_id))
if exit_code == 0:
self.unlock(device_id)
else:
if exit_code == 5:
logger.info(
"Device {} is down, cannot discover, waiting {}s for retry".format(
device_id, self.config.down_retry
)
@ -517,6 +547,9 @@ class DiscoveryQueueManager(TimedQueueManager):
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
logger.error(
"Discovering device {} failed with exit code {}: {}".format(
device_id, exit_code, output
)
)
self.unlock(device_id)
else:
self.unlock(device_id)

View File

@ -1,10 +1,7 @@
import LibreNMS
import json
import logging
import os
import pymysql
import subprocess
import threading
import sys
import time
@ -16,7 +13,6 @@ except ImportError:
from datetime import timedelta
from datetime import datetime
from logging import debug, info, warning, error, critical, exception
from platform import python_version
from time import sleep
from socket import gethostname
@ -28,6 +24,8 @@ try:
except ImportError:
pass
logger = logging.getLogger(__name__)
class ServiceConfig:
def __init__(self):
@ -99,7 +97,7 @@ class ServiceConfig:
watchdog_logfile = "logs/librenms.log"
def populate(self):
config = self._get_config_data()
config = LibreNMS.get_config_data(self.BASE_DIR)
# populate config variables
self.node_id = os.getenv("NODE_ID")
@ -232,7 +230,7 @@ class ServiceConfig:
try:
logging.getLogger().setLevel(self.log_level)
except ValueError:
error(
logger.error(
"Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(
self.log_level
)
@ -301,39 +299,7 @@ class ServiceConfig:
if settings["watchdog_log"] is not None:
self.watchdog_logfile = settings["watchdog_log"]
except pymysql.err.Error:
warning("Unable to load poller (%s) config", self.node_id)
def _get_config_data(self):
try:
import dotenv
env_path = "{}/.env".format(self.BASE_DIR)
info("Attempting to load .env from '%s'", env_path)
dotenv.load_dotenv(dotenv_path=env_path, verbose=True)
if not os.getenv("NODE_ID"):
raise ImportError(".env does not contain a valid NODE_ID setting.")
except ImportError as e:
exception(
"Could not import .env - check that the poller user can read the file, and that composer install has been run recently"
)
sys.exit(3)
config_cmd = [
"/usr/bin/env",
"php",
"{}/config_to_json.php".format(self.BASE_DIR),
"2>&1",
]
try:
return json.loads(subprocess.check_output(config_cmd).decode())
except subprocess.CalledProcessError as e:
error(
"ERROR: Could not load or parse configuration! {}: {}".format(
subprocess.list2cmdline(e.cmd), e.output.decode()
)
)
logger.warning("Unable to load poller (%s) config", self.node_id)
@staticmethod
def parse_group(g):
@ -347,7 +313,7 @@ class ServiceConfig:
except ValueError:
pass
error("Could not parse group string, defaulting to 0")
logger.error("Could not parse group string, defaulting to 0")
return [0]
@ -382,7 +348,7 @@ class Service:
self.config.poller.frequency, self.log_performance_stats, "performance"
)
if self.config.watchdog_enabled:
info(
logger.info(
"Starting watchdog timer for log file: {}".format(
self.config.watchdog_logfile
)
@ -391,7 +357,7 @@ class Service:
self.config.poller.frequency, self.logfile_watchdog, "watchdog"
)
else:
info("Watchdog is disabled.")
logger.info("Watchdog is disabled.")
self.systemd_watchdog_timer = LibreNMS.RecurringTimer(
10, self.systemd_watchdog, "systemd-watchdog"
)
@ -401,14 +367,16 @@ class Service:
return time.time() - self.start_time
def attach_signals(self):
info("Attaching signal handlers on thread %s", threading.current_thread().name)
logger.info(
"Attaching signal handlers on thread %s", threading.current_thread().name
)
signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully
signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully
signal(SIGINT, self.terminate) # capture sigint and exit gracefully
signal(SIGHUP, self.reload) # capture sighup and restart gracefully
if "psutil" not in sys.modules:
warning("psutil is not available, polling gap possible")
logger.warning("psutil is not available, polling gap possible")
else:
signal(SIGCHLD, self.reap) # capture sigchld and reap the process
@ -427,7 +395,7 @@ class Service:
if status == psutil.STATUS_ZOMBIE:
pid = p.pid
r = os.waitpid(p.pid, os.WNOHANG)
warning(
logger.warning(
'Reaped long running job "%s" in state %s with PID %d - job returned %d',
cmd,
status,
@ -439,7 +407,7 @@ class Service:
continue
def start(self):
debug("Performing startup checks...")
logger.debug("Performing startup checks...")
if self.config.single_instance:
self.check_single_instance() # don't allow more than one service at a time
@ -448,7 +416,7 @@ class Service:
raise RuntimeWarning("Not allowed to start Poller twice")
self._started = True
debug("Starting up queue managers...")
logger.debug("Starting up queue managers...")
# initialize and start the worker pools
self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm)
@ -478,8 +446,8 @@ class Service:
if self.config.watchdog_enabled:
self.watchdog_timer.start()
info("LibreNMS Service: {} started!".format(self.config.unique_name))
info(
logger.info("LibreNMS Service: {} started!".format(self.config.unique_name))
logger.info(
"Poller group {}. Using Python {} and {} locks and queues".format(
"0 (default)" if self.config.group == [0] else self.config.group,
python_version(),
@ -487,19 +455,19 @@ class Service:
)
)
if self.config.update_enabled:
info(
logger.info(
"Maintenance tasks will be run every {}".format(
timedelta(seconds=self.config.update_frequency)
)
)
else:
warning("Maintenance tasks are disabled.")
logger.warning("Maintenance tasks are disabled.")
# Main dispatcher loop
try:
while not self.terminate_flag:
if self.reload_flag:
info("Picked up reload flag, calling the reload process")
logger.info("Picked up reload flag, calling the reload process")
self.restart()
if self.reap_flag:
@ -509,7 +477,9 @@ class Service:
master_lock = self._acquire_master()
if master_lock:
if not self.is_master:
info("{} is now the master dispatcher".format(self.config.name))
logger.info(
"{} is now the master dispatcher".format(self.config.name)
)
self.is_master = True
self.start_dispatch_timers()
@ -525,7 +495,7 @@ class Service:
self.dispatch_immediate_discovery(device_id, group)
else:
if self.is_master:
info(
logger.info(
"{} is no longer the master dispatcher".format(
self.config.name
)
@ -536,7 +506,7 @@ class Service:
except KeyboardInterrupt:
pass
info("Dispatch loop terminated")
logger.info("Dispatch loop terminated")
self.shutdown()
def _acquire_master(self):
@ -565,7 +535,7 @@ class Service:
if elapsed > (
self.config.poller.frequency - self.config.master_resolution
):
debug(
logger.debug(
"Dispatching polling for device {}, time since last poll {:.2f}s".format(
device_id, elapsed
)
@ -602,7 +572,7 @@ class Service:
except pymysql.err.Error:
self.db_failures += 1
if self.db_failures > self.config.max_db_failures:
warning(
logger.warning(
"Too many DB failures ({}), attempting to release master".format(
self.db_failures
)
@ -622,23 +592,22 @@ class Service:
wait = 5
max_runtime = 86100
max_tries = int(max_runtime / wait)
info("Waiting for schema lock")
logger.info("Waiting for schema lock")
while not self._lm.lock("schema-update", self.config.unique_name, max_runtime):
attempt += 1
if attempt >= max_tries: # don't get stuck indefinitely
warning("Reached max wait for other pollers to update, updating now")
logger.warning(
"Reached max wait for other pollers to update, updating now"
)
break
sleep(wait)
info("Running maintenance tasks")
try:
output = LibreNMS.call_script("daily.sh")
info("Maintenance tasks complete\n{}".format(output))
except subprocess.CalledProcessError as e:
error(
"Error in daily.sh:\n"
+ (e.output.decode() if e.output is not None else "No output")
)
logger.info("Running maintenance tasks")
exit_code, output = LibreNMS.call_script("daily.sh")
if exit_code == 0:
logger.info("Maintenance tasks complete\n{}".format(output))
else:
logger.error("Error {} in daily.sh:\n{}".format(exit_code, output))
self._lm.unlock("schema-update", self.config.unique_name)
@ -665,15 +634,19 @@ class Service:
)
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical(
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical(
"Please install redis-py, either through your os software repository or from PyPI"
)
self.exit(2)
except Exception as e:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical("Could not connect to Redis. {}".format(e))
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical("Could not connect to Redis. {}".format(e))
self.exit(2)
return LibreNMS.ThreadingLock()
@ -684,14 +657,16 @@ class Service:
Has the effect of reloading the python files from disk.
"""
if sys.version_info < (3, 4, 0):
warning("Skipping restart as running under an incompatible interpreter")
warning("Please restart manually")
logger.warning(
"Skipping restart as running under an incompatible interpreter"
)
logger.warning("Please restart manually")
return
info("Restarting service... ")
logger.info("Restarting service... ")
if "psutil" not in sys.modules:
warning("psutil is not available, polling gap possible")
logger.warning("psutil is not available, polling gap possible")
self._stop_managers_and_wait()
else:
self._stop_managers()
@ -715,7 +690,9 @@ class Service:
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
info("Received signal on thread %s, handling", threading.current_thread().name)
logger.info(
"Received signal on thread %s, handling", threading.current_thread().name
)
self.reload_flag = True
def terminate(self, signalnum=None, flag=None):
@ -724,7 +701,9 @@ class Service:
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
info("Received signal on thread %s, handling", threading.current_thread().name)
logger.info(
"Received signal on thread %s, handling", threading.current_thread().name
)
self.terminate_flag = True
def shutdown(self, signalnum=None, flag=None):
@ -733,7 +712,7 @@ class Service:
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
info("Shutting down, waiting for running jobs to complete...")
logger.info("Shutting down, waiting for running jobs to complete...")
self.stop_dispatch_timers()
self._release_master()
@ -747,7 +726,9 @@ class Service:
self._stop_managers_and_wait()
# try to release master lock
info("Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name)
logger.info(
"Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name
)
self.exit(0)
def start_dispatch_timers(self):
@ -802,11 +783,11 @@ class Service:
try:
fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
warning("Another instance is already running, quitting.")
logger.warning("Another instance is already running, quitting.")
self.exit(2)
def log_performance_stats(self):
info("Counting up time spent polling")
logger.info("Counting up time spent polling")
try:
# Report on the poller instance as a whole
@ -851,8 +832,9 @@ class Service:
)
)
except pymysql.err.Error:
exception(
"Unable to log performance statistics - is the database still online?"
logger.critical(
"Unable to log performance statistics - is the database still online?",
exc_info=True,
)
def systemd_watchdog(self):
@ -867,18 +849,19 @@ class Service:
self.config.watchdog_logfile
)
except FileNotFoundError as e:
error("Log file not found! {}".format(e))
logger.error("Log file not found! {}".format(e))
return
if logfile_mdiff > self.config.poller.frequency:
critical(
logger.critical(
"BARK! Log file older than {}s, restarting service!".format(
self.config.poller.frequency
)
),
exc_info=True,
)
self.restart()
else:
info("Log file updated {}s ago".format(int(logfile_mdiff)))
logger.info("Log file updated {}s ago".format(int(logfile_mdiff)))
def exit(self, code=0):
sys.stdout.flush()

669
LibreNMS/wrapper.py Normal file
View File

@ -0,0 +1,669 @@
#! /usr/bin/env python3
"""
wrapper A small tool which wraps services, discovery and poller php scripts
in order to run them as threads with Queue and workers
Authors: Orsiris de Jong <contact@netpower.fr>
Neil Lathwood <neil@librenms.org>
Job Snijders <job.snijders@atrato.com>
Distributed poller code (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org>
All code parts that belong to Daniel are enclosed in EOC comments
Date: Sep 2021
Usage: This program accepts three command line arguments
- the number of threads (defaults to 1 for discovery / service, and 16 for poller)
- the wrapper type (service, discovery or poller)
- optional debug boolean
Ubuntu Linux: apt-get install python-mysqldb
FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean
RHEL 7: yum install MySQL-python
RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient
Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 / AlmaLinux 8.4
License: This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see https://www.gnu.org/licenses/.
LICENSE.txt contains a copy of the full GPLv3 licensing conditions.
"""
import logging
import os
import queue
import sys
import threading
import time
import uuid
from argparse import ArgumentParser
import LibreNMS
from LibreNMS.command_runner import command_runner
logger = logging.getLogger(__name__)
# Timeout in seconds for any poller / service / discovery action per device
# Should be higher than stepping which defaults to 300
PER_DEVICE_TIMEOUT = 900
# 5 = no new discovered devices, 6 = unreachable device
VALID_EXIT_CODES = [0, 5, 6]
DISTRIBUTED_POLLING = False # Is overriden by config.php
REAL_DURATION = 0
DISCOVERED_DEVICES_COUNT = 0
PER_DEVICE_DURATION = {}
ERRORS = 0
MEMC = None
IS_NODE = None
STEPPING = None
MASTER_TAG = None
NODES_TAG = None
TIME_TAG = ""
"""
Per wrapper type configuration
All time related variables are in seconds
"""
wrappers = {
"service": {
"executable": "check-services.php",
"table_name": "services",
"memc_touch_time": 10,
"stepping": 300,
"nodes_stepping": 300,
"total_exec_time": 300,
},
"discovery": {
"executable": "discovery.php",
"table_name": "devices",
"memc_touch_time": 30,
"stepping": 300,
"nodes_stepping": 3600,
"total_exec_time": 21600,
},
"poller": {
"executable": "poller.php",
"table_name": "devices",
"memc_touch_time": 10,
"stepping": 300,
"nodes_stepping": 300,
"total_exec_time": 300,
},
}
"""
Threading helper functions
"""
# <<<EOC
def memc_alive(name): # Type: str
"""
Checks if memcache is working by injecting a random string and trying to read it again
"""
try:
key = str(uuid.uuid4())
MEMC.set(name + ".ping." + key, key, 60)
if MEMC.get(name + ".ping." + key) == key:
MEMC.delete(name + ".ping." + key)
return True
return False
except:
return False
def memc_touch(key, _time): # Type: str # Type: int
"""
Updates a memcache key wait time
"""
try:
val = MEMC.get(key)
MEMC.set(key, val, _time)
except:
pass
def get_time_tag(step): # Type: int
"""
Get current time tag as timestamp module stepping
"""
timestamp = int(time.time())
return timestamp - timestamp % step
# EOC
def print_worker(print_queue, wrapper_type): # Type: Queue # Type: str
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke:
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then they have two problems.
"""
nodeso = 0
while True:
# <<<EOC
global IS_NODE
global DISTRIBUTED_POLLING
if DISTRIBUTED_POLLING:
if not IS_NODE:
memc_touch(MASTER_TAG, wrappers[wrapper_type]["memc_touch_time"])
nodes = MEMC.get(NODES_TAG)
if nodes is None and not memc_alive(wrapper_type):
logger.warning(
"Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
DISTRIBUTED_POLLING = False
nodes = nodeso
if nodes is not nodeso:
logger.info("{} Node(s) Total".format(nodes))
nodeso = nodes
else:
memc_touch(NODES_TAG, wrappers[wrapper_type]["memc_touch_time"])
try:
(
worker_id,
device_id,
elapsed_time,
command,
exit_code,
) = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time, command, exit_code = print_queue.get()
# EOC
global REAL_DURATION
global PER_DEVICE_DURATION
global DISCOVERED_DEVICES_COUNT
REAL_DURATION += elapsed_time
PER_DEVICE_DURATION[device_id] = elapsed_time
DISCOVERED_DEVICES_COUNT += 1
if elapsed_time < STEPPING and exit_code in VALID_EXIT_CODES:
logger.info(
"worker {} finished device {} in {} seconds".format(
worker_id, device_id, elapsed_time
)
)
else:
logger.warning(
"worker {} finished device {} in {} seconds with exit code {}".format(
worker_id, device_id, elapsed_time, exit_code
)
)
logger.debug("Command was {}".format(command))
print_queue.task_done()
def poll_worker(
poll_queue, # Type: Queue
print_queue, # Type: Queue
config, # Type: dict
log_dir, # Type: str
wrapper_type, # Type: str
debug, # Type: bool
):
"""
This function will fork off single instances of the php process, record
how long it takes, and push the resulting reports to the printer queue
"""
global ERRORS
while True:
device_id = poll_queue.get()
# <<<EOC
if (
not DISTRIBUTED_POLLING
or MEMC.get("{}.device.{}{}".format(wrapper_type, device_id, TIME_TAG))
is None
):
if DISTRIBUTED_POLLING:
result = MEMC.add(
"{}.device.{}{}".format(wrapper_type, device_id, TIME_TAG),
config["distributed_poller_name"],
STEPPING,
)
if not result:
logger.info(
"The device {} appears to be being checked by another node".format(
device_id
)
)
poll_queue.task_done()
continue
if not memc_alive(wrapper_type) and IS_NODE:
logger.warning(
"Lost Memcached, Not checking Device {} as Node. Master will check it.".format(
device_id
)
)
poll_queue.task_done()
continue
# EOC
try:
start_time = time.time()
device_log = os.path.join(
log_dir, "{}_device_{}.log".format(wrapper_type, device_id)
)
executable = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
wrappers[wrapper_type]["executable"],
)
command = "/usr/bin/env php {} -h {}".format(executable, device_id)
if debug:
command = command + " -d"
exit_code, output = command_runner(
command,
shell=True,
timeout=PER_DEVICE_TIMEOUT,
valid_exit_codes=VALID_EXIT_CODES,
)
if exit_code not in [0, 6]:
logger.error(
"Thread {} exited with code {}".format(
threading.current_thread().name, exit_code
)
)
ERRORS += 1
logger.error(output)
elif exit_code == 5:
logger.info("Unreachable device {}".format(device_id))
else:
logger.debug(output)
if debug:
with open(device_log, "w", encoding="utf-8") as dev_log_file:
dev_log_file.write(output)
elapsed_time = int(time.time() - start_time)
print_queue.put(
[
threading.current_thread().name,
device_id,
elapsed_time,
command,
exit_code,
]
)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
logger.error("Unknown problem happened: ")
logger.error("Traceback:", exc_info=True)
poll_queue.task_done()
class DBConfig:
"""
Bare minimal config class for LibreNMS.service.DB class usage
"""
def __init__(self, _config):
self.db_socket = _config["db_socket"]
self.db_host = _config["db_host"]
self.db_port = int(_config["db_port"])
self.db_user = _config["db_user"]
self.db_pass = _config["db_pass"]
self.db_name = _config["db_name"]
def wrapper(
wrapper_type, # Type: str
amount_of_workers, # Type: int
config, # Type: dict
log_dir, # Type: str
_debug=False, # Type: bool
): # -> None
"""
Actual code that runs various php scripts, in single node mode or distributed poller mode
"""
global MEMC
global IS_NODE
global DISTRIBUTED_POLLING
global MASTER_TAG
global NODES_TAG
global TIME_TAG
global STEPPING
# Setup wrapper dependent variables
STEPPING = wrappers[wrapper_type]["stepping"]
if wrapper_type == "poller":
if "rrd" in config and "step" in config["rrd"]:
STEPPING = config["rrd"]["step"]
TIME_TAG = "." + str(get_time_tag(STEPPING))
MASTER_TAG = "{}.master{}".format(wrapper_type, TIME_TAG)
NODES_TAG = "{}.nodes{}".format(wrapper_type, TIME_TAG)
# <<<EOC
if "distributed_poller_group" in config:
poller_group = str(config["distributed_poller_group"])
else:
poller_group = False
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
try:
import memcache
MEMC = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(MEMC.get(MASTER_TAG)) == config["distributed_poller_name"]:
logger.info("This system is already joined as the service master.")
sys.exit(2)
if memc_alive(wrapper_type):
if MEMC.get(MASTER_TAG) is None:
logger.info("Registered as Master")
MEMC.set(MASTER_TAG, config["distributed_poller_name"], 10)
MEMC.set(NODES_TAG, 0, wrappers[wrapper_type]["nodes_stepping"])
IS_NODE = False
else:
logger.info(
"Registered as Node joining Master {}".format(
MEMC.get(MASTER_TAG)
)
)
IS_NODE = True
MEMC.incr(NODES_TAG)
DISTRIBUTED_POLLING = True
else:
logger.warning(
"Could not connect to memcached, disabling distributed service checks."
)
DISTRIBUTED_POLLING = False
IS_NODE = False
except SystemExit:
raise
except ImportError:
logger.critical("ERROR: missing memcache python module:")
logger.critical("On deb systems: apt-get install python3-memcache")
logger.critical("On other systems: pip3 install python-memcached")
logger.critical("Disabling distributed discovery.")
DISTRIBUTED_POLLING = False
else:
DISTRIBUTED_POLLING = False
# EOC
s_time = time.time()
devices_list = []
if wrapper_type == "service":
# <<<EOC
if poller_group is not False:
query = (
"SELECT DISTINCT(services.device_id) FROM services LEFT JOIN devices ON "
"services.device_id = devices.device_id WHERE devices.poller_group IN({}) AND "
"devices.disabled = 0".format(poller_group)
)
else:
query = (
"SELECT DISTINCT(services.device_id) FROM services LEFT JOIN devices ON "
"services.device_id = devices.device_id WHERE devices.disabled = 0"
)
# EOC
elif wrapper_type in ["discovery", "poller"]:
"""
This query specificly orders the results depending on the last_discovered_timetaken variable
Because this way, we put the devices likely to be slow, in the top of the queue
thus greatening our chances of completing _all_ the work in exactly the time it takes to
discover the slowest device! cool stuff he
"""
# <<<EOC
if poller_group is not False:
query = (
"SELECT device_id FROM devices WHERE poller_group IN ({}) AND "
"disabled = 0 ORDER BY last_polled_timetaken DESC".format(poller_group)
)
else:
query = "SELECT device_id FROM devices WHERE disabled = 0 ORDER BY last_polled_timetaken DESC"
# EOC
else:
logger.critical("Bogus wrapper type called")
sys.exit(3)
sconfig = DBConfig(config)
db_connection = LibreNMS.DB(sconfig)
cursor = db_connection.query(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
# <<<EOC
if DISTRIBUTED_POLLING and not IS_NODE:
query = "SELECT max(device_id),min(device_id) FROM {}".format(
wrappers[wrapper_type]["table_name"]
)
cursor = db_connection.query(query)
devices = cursor.fetchall()
maxlocks = devices[0][0] or 0
minlocks = devices[0][1] or 0
# EOC
poll_queue = queue.Queue()
print_queue = queue.Queue()
# Don't have more threads than workers
amount_of_devices = len(devices_list)
if amount_of_workers > amount_of_devices:
amount_of_workers = amount_of_devices
logger.info(
"starting the {} check at {} with {} threads for {} devices".format(
wrapper_type,
time.strftime("%Y-%m-%d %H:%M:%S"),
amount_of_workers,
amount_of_devices,
)
)
for device_id in devices_list:
poll_queue.put(device_id)
for _ in range(amount_of_workers):
worker = threading.Thread(
target=poll_worker,
kwargs={
"poll_queue": poll_queue,
"print_queue": print_queue,
"config": config,
"log_dir": log_dir,
"wrapper_type": wrapper_type,
"debug": _debug,
},
)
worker.setDaemon(True)
worker.start()
pworker = threading.Thread(
target=print_worker,
kwargs={"print_queue": print_queue, "wrapper_type": wrapper_type},
)
pworker.setDaemon(True)
pworker.start()
try:
poll_queue.join()
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
end_msg = "{}-wrapper checked {} devices in {} seconds with {} workers with {} errors".format(
wrapper_type, DISCOVERED_DEVICES_COUNT, total_time, amount_of_workers, ERRORS
)
if ERRORS == 0:
logger.info(end_msg)
else:
logger.error(end_msg)
# <<<EOC
if DISTRIBUTED_POLLING or memc_alive(wrapper_type):
master = MEMC.get(MASTER_TAG)
if master == config["distributed_poller_name"] and not IS_NODE:
logger.info("Wait for all service-nodes to finish")
nodes = MEMC.get(NODES_TAG)
while nodes is not None and nodes > 0:
try:
time.sleep(1)
nodes = MEMC.get(NODES_TAG)
except:
pass
logger.info("Clearing Locks for {}".format(NODES_TAG))
x = minlocks
while x <= maxlocks:
MEMC.delete("{}.device.{}".format(wrapper_type, x))
x = x + 1
logger.info("{} Locks Cleared".format(x))
logger.info("Clearing Nodes")
MEMC.delete(MASTER_TAG)
MEMC.delete(NODES_TAG)
else:
MEMC.decr(NODES_TAG)
logger.info("Finished {}.".format(time.strftime("%Y-%m-%d %H:%M:%S")))
# EOC
# Update poller statistics
if wrapper_type == "poller":
query = "UPDATE pollers SET last_polled=NOW(), devices='{}', time_taken='{}' WHERE poller_name='{}'".format(
DISCOVERED_DEVICES_COUNT, total_time, config["distributed_poller_name"]
)
cursor = db_connection.query(query)
if cursor.rowcount < 1:
query = "INSERT INTO pollers SET poller_name='{}', last_polled=NOW(), devices='{}', time_taken='{}'".format(
config["distributed_poller_name"], DISCOVERED_DEVICES_COUNT, total_time
)
db_connection.query(query)
db_connection.close()
if total_time > wrappers[wrapper_type]["total_exec_time"]:
logger.warning(
"the process took more than {} seconds to finish, you need faster hardware or more threads".format(
wrappers[wrapper_type]["total_exec_time"]
)
)
logger.warning(
"in sequential style service checks the elapsed time would have been: {} seconds".format(
REAL_DURATION
)
)
show_stopper = False
for device in PER_DEVICE_DURATION:
if PER_DEVICE_DURATION[device] > wrappers[wrapper_type]["nodes_stepping"]:
logger.warning(
"device {} is taking too long: {} seconds".format(
device, PER_DEVICE_DURATION[device]
)
)
show_stopper = True
if show_stopper:
logger.error(
"Some devices are taking more than {} seconds, the script cannot recommend you what to do.".format(
wrappers[wrapper_type]["nodes_stepping"]
)
)
else:
recommend = int(total_time / STEPPING * amount_of_workers + 1)
logger.warning(
"Consider setting a minimum of {} threads. (This does not constitute professional advice!)".format(
recommend
)
)
sys.exit(2)
if __name__ == "__main__":
parser = ArgumentParser(
prog="wrapper.py",
usage="usage: %(prog)s [options] <wrapper_type> <workers>\n"
"wrapper_type = 'service', 'poller' or 'disccovery'"
"workers defaults to 1 for service and discovery, and 16 for poller "
"(Do not set too high, or you will get an OOM)",
description="Spawn multiple librenms php processes in parallel.",
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
parser.add_argument(
dest="wrapper",
default=None,
help="Execute wrapper for 'service', 'poller' or 'discovery'",
)
parser.add_argument(
dest="threads", action="store_true", default=None, help="Number of workers"
)
args = parser.parse_args()
debug = args.debug
wrapper_type = args.wrapper
amount_of_workers = args.threads
if wrapper_type not in ["service", "discovery", "poller"]:
parser.error("Invalid wrapper type '{}'".format(wrapper_type))
sys.exit(4)
config = LibreNMS.get_config_data(
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
)
log_dir = config["log_dir"]
log_file = os.path.join(log_dir, wrapper_type + ".log")
logger = LibreNMS.logger_get_logger(log_file, debug=debug)
try:
amount_of_workers = int(amount_of_workers)
except (IndexError, ValueError, TypeError):
amount_of_workers = (
16 if wrapper_type == "poller" else 1
) # Defaults to 1 for service/discovery, 16 for poller
logger.warning(
"Bogus number of workers given. Using default number ({}) of workers.".format(
amount_of_workers
)
)
wrapper(wrapper_type, amount_of_workers, config, log_dir, _debug=debug)

View File

@ -1,436 +1,63 @@
#! /usr/bin/env python3
"""
discovery-wrapper A small tool which wraps around discovery and tries to
guide the discovery process with a more modern approach with a
Queue and workers.
Based on the original version of poller-wrapper.py by Job Snijders
Author: Neil Lathwood <neil@librenms.org>
Orsiris de Jong <contact@netpower.fr>
Date: Oct 2019
Usage: This program accepts one command line argument: the number of threads
that should run simultaneously. If no argument is given it will assume
a default of 1 thread.
Ubuntu Linux: apt-get install python-mysqldb
FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean
RHEL 7: yum install MySQL-python
RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient
Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8
License: This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see https://www.gnu.org/licenses/.
LICENSE.txt contains a copy of the full GPLv3 licensing conditions.
This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups
"""
import LibreNMS.library as LNMS
import os
import sys
import logging
from argparse import ArgumentParser
import LibreNMS
import LibreNMS.wrapper as wrapper
WRAPPER_TYPE = "discovery"
DEFAULT_WORKERS = 1
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default
"""
usage = (
"usage: %(prog)s [options] <amount_of_workers> (Default: {}"
"(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS)
)
description = "Spawn multiple discovery.php processes in parallel."
parser = ArgumentParser(usage=usage, description=description)
parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS)
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
args = parser.parse_args()
config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__)))
if not config:
logger = logging.getLogger(__name__)
logger.critical("Could not run {} wrapper. Missing config".format(WRAPPER_TYPE))
sys.exit(1)
log_dir = config["log_dir"]
log_file = os.path.join(log_dir, WRAPPER_TYPE + "_wrapper.log")
logger = LibreNMS.logger_get_logger(log_file, debug=args.debug)
try:
import json
import os
import queue
import subprocess
import sys
import threading
import time
from optparse import OptionParser
except ImportError as exc:
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
APP_NAME = "discovery_wrapper"
LOG_FILE = "logs/" + APP_NAME + ".log"
_DEBUG = False
distdisco = False
real_duration = 0
discovered_devices = 0
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC0
def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set("discovery.ping." + key, key, 60)
if memc.get("discovery.ping." + key) == key:
memc.delete("discovery.ping." + key)
return True
else:
return False
except:
return False
def memc_touch(key, time):
try:
global memc
val = memc.get(key)
memc.set(key, val, time)
except:
pass
# EOC0
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke:
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then they two they hav erpoblesms.
"""
def printworker():
nodeso = 0
while True:
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
global IsNode
global distdisco
if distdisco:
if not IsNode:
memc_touch("discovery.master", 30)
nodes = memc.get("discovery.nodes")
if nodes is None and not memc_alive():
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
distdisco = False
nodes = nodeso
if nodes is not nodeso:
print("INFO: %s Node(s) Total" % (nodes))
nodeso = nodes
else:
memc_touch("discovery.nodes", 30)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time = print_queue.get()
# EOC4
global real_duration
global per_device_duration
global discovered_devices
real_duration += elapsed_time
per_device_duration[device_id] = elapsed_time
discovered_devices += 1
if elapsed_time < 300:
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
"""
This class will fork off single instances of the discovery.php process, record
how long it takes, and push the resulting reports to the printer queue
"""
def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if not distdisco or memc.get("discovery.device." + str(device_id)) is None:
if distdisco:
result = memc.add(
"discovery.device." + str(device_id),
config["distributed_poller_name"],
300,
)
if not result:
print(
"This device (%s) appears to be being discovered by another discovery node"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print(
"Lost Memcached, Not discovering Device %s as Node. Master will discover it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = (
"-d >> %s/discover_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
discovery_path,
device_id,
output,
)
# TODO: Replace with command_runner
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
poll_queue.task_done()
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
discovery_path = config["install_dir"] + "/discovery.php"
log_dir = config["log_dir"]
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if "distributed_poller_group" in config:
discovery_group = str(config["distributed_poller_group"])
else:
discovery_group = False
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
try:
import memcache
import uuid
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get("discovery.master")) == config["distributed_poller_name"]:
print("This system is already joined as the discovery master.")
sys.exit(2)
if memc_alive():
if memc.get("discovery.master") is None:
print("Registered as Master")
memc.set("discovery.master", config["distributed_poller_name"], 30)
memc.set("discovery.nodes", 0, 3600)
IsNode = False
else:
print(
"Registered as Node joining Master %s"
% memc.get("discovery.master")
)
IsNode = True
memc.incr("discovery.nodes")
distdisco = True
else:
print(
"Could not connect to memcached, disabling distributed discovery."
)
distdisco = False
IsNode = False
except SystemExit:
raise
except ImportError:
print("ERROR: missing memcache python module:")
print("On deb systems: apt-get install python3-memcache")
print("On other systems: pip3 install python-memcached")
print("Disabling distributed discovery.")
distdisco = False
else:
distdisco = False
# EOC1
s_time = time.time()
real_duration = 0
per_device_duration = {}
discovered_devices = 0
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default of 1
"""
usage = "usage: %prog [options] <workers> (Default: 1 Do not set too high)"
description = "Spawn multiple discovery.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
try:
amount_of_workers = int(args[0])
except (IndexError, ValueError):
amount_of_workers = 1
devices_list = []
"""
This query specificly orders the results depending on the last_discovered_timetaken variable
Because this way, we put the devices likely to be slow, in the top of the queue
thus greatening our chances of completing _all_ the work in exactly the time it takes to
discover the slowest device! cool stuff he
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if discovery_group is not False:
query = (
"select device_id from devices where poller_group IN("
+ discovery_group
+ ") and disabled = 0 order by last_polled_timetaken desc"
amount_of_workers = int(args.amount_of_workers)
except (IndexError, ValueError):
amount_of_workers = DEFAULT_WORKERS
logger.warning(
"Bogus number of workers given. Using default number ({}) of workers.".format(
amount_of_workers
)
else:
query = "select device_id from devices where disabled = 0 order by last_polled_timetaken desc"
# EOC2
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
int(config["db_port"]),
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
if distdisco and not IsNode:
query = "select max(device_id),min(device_id) from devices"
cursor.execute(query)
devices = cursor.fetchall()
maxlocks = devices[0][0] or 0
minlocks = devices[0][1] or 0
# EOC3
db.close()
poll_queue = queue.Queue()
print_queue = queue.Queue()
print(
"INFO: starting the discovery at %s with %s threads, slowest devices first"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
for i in range(amount_of_workers):
t = threading.Thread(target=poll_worker)
t.setDaemon(True)
t.start()
p = threading.Thread(target=printworker)
p.setDaemon(True)
p.start()
try:
poll_queue.join()
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
print(
"INFO: discovery-wrapper polled %s devices in %s seconds with %s workers"
% (discovered_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if distdisco or memc_alive():
master = memc.get("discovery.master")
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all discovery-nodes to finish")
nodes = memc.get("discovery.nodes")
while nodes is not None and nodes > 0:
try:
time.sleep(1)
nodes = memc.get("discovery.nodes")
except:
pass
print("Clearing Locks")
x = minlocks
while x <= maxlocks:
memc.delete("discovery.device." + str(x))
x = x + 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
memc.delete("discovery.master")
memc.delete("discovery.nodes")
else:
memc.decr("discovery.nodes")
print("Finished %s." % time.time())
# EOC6
show_stopper = False
if total_time > 21600:
print(
"WARNING: the process took more than 6 hours to finish, you need faster hardware or more threads"
)
print(
"INFO: in sequential style discovery the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > 3600:
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print(
"ERROR: Some devices are taking more than 3600 seconds, the script cannot recommend you what to do."
)
else:
recommend = int(total_time / 300.0 * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)
wrapper.wrapper(
WRAPPER_TYPE,
amount_of_workers=amount_of_workers,
config=config,
log_dir=log_dir,
_debug=args.debug,
)

View File

@ -43,9 +43,10 @@ if __name__ == "__main__":
if args.verbose:
logging.getLogger().setLevel(logging.INFO)
if args.debug:
elif args.debug:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.WARNING)
info("Configuring LibreNMS service")
try:

View File

@ -1,468 +1,63 @@
#! /usr/bin/env python3
"""
poller-wrapper A small tool which wraps around the poller and tries to
guide the polling process with a more modern approach with a
Queue and workers
Authors: Job Snijders <job.snijders@atrato.com>
Orsiris de Jong <contact@netpower.fr>
Date: Oct 2019
Usage: This program accepts one command line argument: the number of threads
that should run simultaneously. If no argument is given it will assume
a default of 16 threads.
Ubuntu Linux: apt-get install python-mysqldb
FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean
RHEL 7: yum install MySQL-python
RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient
Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8.0
License: To the extent possible under law, Job Snijders has waived all
copyright and related or neighboring rights to this script.
This script has been put into the Public Domain. This work is
published from: The Netherlands.
This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups
"""
import LibreNMS.library as LNMS
import os
import sys
import logging
from argparse import ArgumentParser
import LibreNMS
import LibreNMS.wrapper as wrapper
WRAPPER_TYPE = "poller"
DEFAULT_WORKERS = 16
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default
"""
usage = (
"usage: %(prog)s [options] <amount_of_workers> (Default: {}"
"(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS)
)
description = "Spawn multiple poller.php processes in parallel."
parser = ArgumentParser(usage=usage, description=description)
parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS)
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
args = parser.parse_args()
config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__)))
if not config:
logger = logging.getLogger(__name__)
logger.critical("Could not run {} wrapper. Missing config".format(WRAPPER_TYPE))
sys.exit(1)
log_dir = config["log_dir"]
log_file = os.path.join(log_dir, WRAPPER_TYPE + "_wrapper.log")
logger = LibreNMS.logger_get_logger(log_file, debug=args.debug)
try:
import json
import os
import queue
import subprocess
import sys
import threading
import time
from optparse import OptionParser
except ImportError as exc:
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
APP_NAME = "poller_wrapper"
LOG_FILE = "logs/" + APP_NAME + ".log"
_DEBUG = False
distpoll = False
real_duration = 0
polled_devices = 0
"""
Threading helper functions
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC0
def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set("poller.ping." + key, key, 60)
if memc.get("poller.ping." + key) == key:
memc.delete("poller.ping." + key)
return True
else:
return False
except:
return False
def memc_touch(key, time):
try:
global memc
val = memc.get(key)
memc.set(key, val, time)
except:
pass
def get_time_tag(step):
ts = int(time.time())
return ts - ts % step
# EOC0
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke:
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then two they hav erpoblesms.
"""
def printworker():
nodeso = 0
while True:
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
global IsNode
global distpoll
if distpoll:
if not IsNode:
memc_touch(master_tag, 10)
nodes = memc.get(nodes_tag)
if nodes is None and not memc_alive():
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
distpoll = False
nodes = nodeso
if nodes is not nodeso:
print("INFO: %s Node(s) Total" % (nodes))
nodeso = nodes
else:
memc_touch(nodes_tag, 10)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time = print_queue.get()
# EOC4
global real_duration
global per_device_duration
global polled_devices
real_duration += elapsed_time
per_device_duration[device_id] = elapsed_time
polled_devices += 1
if elapsed_time < step:
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
"""
This class will fork off single instances of the poller.php process, record
how long it takes, and push the resulting reports to the printer queue
"""
def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if (
not distpoll
or memc.get("poller.device.%s.%s" % (device_id, time_tag)) is None
):
if distpoll:
result = memc.add(
"poller.device.%s.%s" % (device_id, time_tag),
config["distributed_poller_name"],
step,
)
if not result:
print(
"This device (%s) appears to be being polled by another poller"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print(
"Lost Memcached, Not polling Device %s as Node. Master will poll it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = (
"-d >> %s/poll_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
poller_path,
device_id,
output,
)
# TODO: replace with command_runner
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
poll_queue.task_done()
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
poller_path = config["install_dir"] + "/poller.php"
log_dir = config["log_dir"]
if "rrd" in config and "step" in config["rrd"]:
step = config["rrd"]["step"]
else:
step = 300
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if "distributed_poller_group" in config:
poller_group = str(config["distributed_poller_group"])
else:
poller_group = False
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
time_tag = str(get_time_tag(step))
master_tag = "poller.master." + time_tag
nodes_tag = "poller.nodes." + time_tag
try:
import memcache
import uuid
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get(master_tag)) == config["distributed_poller_name"]:
print("This system is already joined as the poller master.")
sys.exit(2)
if memc_alive():
if memc.get(master_tag) is None:
print("Registered as Master")
memc.set(master_tag, config["distributed_poller_name"], 10)
memc.set(nodes_tag, 0, step)
IsNode = False
else:
print("Registered as Node joining Master %s" % memc.get(master_tag))
IsNode = True
memc.incr(nodes_tag)
distpoll = True
else:
print("Could not connect to memcached, disabling distributed poller.")
distpoll = False
IsNode = False
except SystemExit:
raise
except ImportError:
print("ERROR: missing memcache python module:")
print("On deb systems: apt-get install python3-memcache")
print("On other systems: pip3 install python-memcached")
print("Disabling distributed poller.")
distpoll = False
else:
distpoll = False
# EOC1
s_time = time.time()
real_duration = 0
per_device_duration = {}
polled_devices = 0
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default of 16
"""
usage = "usage: %prog [options] <workers> (Default: 16 (Do not set too high)"
description = "Spawn multiple poller.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
try:
amount_of_workers = int(args[0])
except (IndexError, ValueError):
amount_of_workers = 16
devices_list = []
"""
This query specificly orders the results depending on the last_polled_timetaken variable
Because this way, we put the devices likely to be slow, in the top of the queue
thus greatening our chances of completing _all_ the work in exactly the time it takes to
poll the slowest device! cool stuff he
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if poller_group is not False:
query = (
"select device_id from devices where poller_group IN("
+ poller_group
+ ") and disabled = 0 order by last_polled_timetaken desc"
amount_of_workers = int(args.amount_of_workers)
except (IndexError, ValueError):
amount_of_workers = DEFAULT_WORKERS
logger.warning(
"Bogus number of workers given. Using default number ({}) of workers.".format(
amount_of_workers
)
else:
query = "select device_id from devices where disabled = 0 order by last_polled_timetaken desc"
# EOC2
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
if distpoll and not IsNode:
query = "select max(device_id),min(device_id) from devices"
cursor.execute(query)
devices = cursor.fetchall()
maxlocks = devices[0][0] or 0
minlocks = devices[0][1] or 0
# EOC3
db.close()
poll_queue = queue.Queue()
print_queue = queue.Queue()
print(
"INFO: starting the poller at %s with %s threads, slowest devices first"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
for i in range(amount_of_workers):
t = threading.Thread(target=poll_worker)
t.setDaemon(True)
t.start()
p = threading.Thread(target=printworker)
p.setDaemon(True)
p.start()
try:
poll_queue.join()
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
print(
"INFO: poller-wrapper polled %s devices in %s seconds with %s workers"
% (polled_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if distpoll or memc_alive():
master = memc.get(master_tag)
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all poller-nodes to finish")
nodes = memc.get(nodes_tag)
while nodes is not None and nodes > 0:
try:
time.sleep(1)
nodes = memc.get(nodes_tag)
except:
pass
print("Clearing Locks for %s" % time_tag)
x = minlocks
while x <= maxlocks:
res = memc.delete("poller.device.%s.%s" % (x, time_tag))
x += 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
memc.delete(master_tag)
memc.delete(nodes_tag)
else:
memc.decr(nodes_tag)
print("Finished %.3fs after interval start." % (time.time() - int(time_tag)))
# EOC6
show_stopper = False
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
query = (
"update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'"
% (polled_devices, total_time, config["distributed_poller_name"])
)
response = cursor.execute(query)
if response == 1:
db.commit()
else:
query = (
"insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'"
% (config["distributed_poller_name"], polled_devices, total_time)
)
cursor.execute(query)
db.commit()
db.close()
if total_time > step:
print(
"WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads"
% step
)
print(
"INFO: in sequential style polling the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > step:
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print(
"ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do."
% step
)
else:
recommend = int(total_time / step * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)
wrapper.wrapper(
WRAPPER_TYPE,
amount_of_workers=amount_of_workers,
config=config,
log_dir=log_dir,
_debug=args.debug,
)

View File

@ -3,3 +3,4 @@ python-dotenv
redis>=3.0
setuptools
psutil
command_runner>=1.2.1

View File

@ -1,440 +1,63 @@
#! /usr/bin/env python3
"""
services-wrapper A small tool which wraps around check-services.php and tries to
guide the services process with a more modern approach with a
Queue and workers.
Based on the original version of poller-wrapper.py by Job Snijders
Author: Neil Lathwood <neil@librenms.org>
Orsiris de Jong <contact@netpower.fr>
Date: Oct 2019
Usage: This program accepts one command line argument: the number of threads
that should run simultaneously. If no argument is given it will assume
a default of 1 thread.
Ubuntu Linux: apt-get install python-mysqldb
FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean
RHEL 7: yum install MySQL-python
RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient
Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8
License: This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see https://www.gnu.org/licenses/.
LICENSE.txt contains a copy of the full GPLv3 licensing conditions.
This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups
"""
import LibreNMS.library as LNMS
import os
import sys
import logging
from argparse import ArgumentParser
import LibreNMS
import LibreNMS.wrapper as wrapper
WRAPPER_TYPE = "service"
DEFAULT_WORKERS = 1
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default
"""
usage = (
"usage: %(prog)s [options] <amount_of_workers> (Default: {}"
"(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS)
)
description = "Spawn multiple check-services.php processes in parallel."
parser = ArgumentParser(usage=usage, description=description)
parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS)
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
args = parser.parse_args()
config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__)))
if not config:
logger = logging.getLogger(__name__)
logger.critical("Could not run {} wrapper. Missing config".format(WRAPPER_TYPE))
sys.exit(1)
log_dir = config["log_dir"]
log_file = os.path.join(log_dir, WRAPPER_TYPE + "_wrapper.log")
logger = LibreNMS.logger_get_logger(log_file, debug=args.debug)
try:
import json
import os
import queue
import subprocess
import sys
import threading
import time
from optparse import OptionParser
except ImportError as exc:
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
APP_NAME = "services_wrapper"
LOG_FILE = "logs/" + APP_NAME + ".log"
_DEBUG = False
servicedisco = False
real_duration = 0
service_devices = 0
"""
Threading helper functions
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC0
def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set("poller.ping." + key, key, 60)
if memc.get("poller.ping." + key) == key:
memc.delete("poller.ping." + key)
return True
else:
return False
except:
return False
def memc_touch(key, time):
try:
global memc
val = memc.get(key)
memc.set(key, val, time)
except:
pass
def get_time_tag(step):
ts = int(time.time())
return ts - ts % step
# EOC0
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke:
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then they two they hav erpoblesms.
"""
def printworker():
nodeso = 0
while True:
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
global IsNode
global servicedisco
if servicedisco:
if not IsNode:
memc_touch("service.master", 10)
nodes = memc.get("service.nodes")
if nodes is None and not memc_alive():
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
servicedisco = False
nodes = nodeso
if nodes is not nodeso:
print("INFO: %s Node(s) Total" % (nodes))
nodeso = nodes
else:
memc_touch("service.nodes", 10)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time = print_queue.get()
# EOC4
global real_duration
global per_device_duration
global service_devices
real_duration += elapsed_time
per_device_duration[device_id] = elapsed_time
service_devices += 1
if elapsed_time < 300:
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
"""
This class will fork off single instances of the check-services.php process, record
how long it takes, and push the resulting reports to the printer queue
"""
def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if not servicedisco or memc.get("service.device." + str(device_id)) is None:
if servicedisco:
result = memc.add(
"service.device." + str(device_id),
config["distributed_poller_name"],
300,
)
if not result:
print(
"This device (%s) appears to be being service checked by another service node"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print(
"Lost Memcached, Not service checking Device %s as Node. Master will check it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = (
"-d >> %s/services_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
# TODO replace with command_runner
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
service_path,
device_id,
output,
)
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
poll_queue.task_done()
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
service_path = config["install_dir"] + "/check-services.php"
log_dir = config["log_dir"]
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if "distributed_poller_group" in config:
service_group = str(config["distributed_poller_group"])
else:
service_group = False
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
try:
import memcache
import uuid
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get("service.master")) == config["distributed_poller_name"]:
print("This system is already joined as the service master.")
sys.exit(2)
if memc_alive():
if memc.get("service.master") is None:
print("Registered as Master")
memc.set("service.master", config["distributed_poller_name"], 10)
memc.set("service.nodes", 0, 300)
IsNode = False
else:
print(
"Registered as Node joining Master %s"
% memc.get("service.master")
)
IsNode = True
memc.incr("service.nodes")
servicedisco = True
else:
print(
"Could not connect to memcached, disabling distributed service checks."
)
servicedisco = False
IsNode = False
except SystemExit:
raise
except ImportError:
print("ERROR: missing memcache python module:")
print("On deb systems: apt-get install python3-memcache")
print("On other systems: pip3 install python-memcached")
print("Disabling distributed discovery.")
servicedisco = False
else:
servicedisco = False
# EOC1
s_time = time.time()
real_duration = 0
per_device_duration = {}
service_devices = 0
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default of 16
"""
usage = "usage: %prog [options] <workers> (Default: 1 (Do not set too high)"
description = "Spawn multiple check-services.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
try:
amount_of_workers = int(args[0])
except (IndexError, ValueError):
amount_of_workers = 1
devices_list = []
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if service_group is not False:
query = (
"SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`poller_group` IN("
+ service_group
+ ") AND `devices`.`disabled` = 0"
amount_of_workers = int(args.amount_of_workers)
except (IndexError, ValueError):
amount_of_workers = DEFAULT_WORKERS
logger.warning(
"Bogus number of workers given. Using default number ({}) of workers.".format(
amount_of_workers
)
else:
query = "SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`disabled` = 0"
# EOC2
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
if servicedisco and not IsNode:
query = "SELECT MAX(`device_id`), MIN(`device_id`) FROM `services`"
cursor.execute(query)
devices = cursor.fetchall()
maxlocks = devices[0][0] or 0
minlocks = devices[0][1] or 0
# EOC3
db.close()
poll_queue = queue.Queue()
print_queue = queue.Queue()
print(
"INFO: starting the service check at %s with %s threads"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
for i in range(amount_of_workers):
t = threading.Thread(target=poll_worker)
t.setDaemon(True)
t.start()
p = threading.Thread(target=printworker)
p.setDaemon(True)
p.start()
try:
poll_queue.join()
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
print(
"INFO: services-wrapper checked %s devices in %s seconds with %s workers"
% (service_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if servicedisco or memc_alive():
master = memc.get("service.master")
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all service-nodes to finish")
nodes = memc.get("service.nodes")
while nodes is not None and nodes > 0:
try:
time.sleep(1)
nodes = memc.get("service.nodes")
except:
pass
print("Clearing Locks")
x = minlocks
while x <= maxlocks:
memc.delete("service.device." + str(x))
x = x + 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
memc.delete("service.master")
memc.delete("service.nodes")
else:
memc.decr("service.nodes")
print("Finished %s." % time.time())
# EOC6
show_stopper = False
if total_time > 300:
print(
"WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads"
)
print(
"INFO: in sequential style service checks the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > 300:
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print(
"ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do."
)
else:
recommend = int(total_time / 300.0 * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)
wrapper.wrapper(
WRAPPER_TYPE,
amount_of_workers=amount_of_workers,
config=config,
log_dir=log_dir,
_debug=args.debug,
)