netdata/collectors/python.d.plugin/python.d.plugin.in

428 lines
14 KiB
Bash

#!/usr/bin/env bash
'''':; exec "$(command -v python || command -v python3 || command -v python2 ||
echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
# -*- coding: utf-8 -*-
# Description:
# Author: Pawel Krupa (paulfantom)
# Author: Ilya Mashchenko (l2isbad)
# SPDX-License-Identifier: GPL-3.0-or-later
import gc
import os
import sys
import threading
from re import sub
from sys import version_info, argv
from time import sleep
GC_RUN = True
GC_COLLECT_EVERY = 300
PY_VERSION = version_info[:2]
USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@')
STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@')
PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d')
PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d')
PLUGINS_DIR = os.path.abspath(os.getenv(
'NETDATA_PLUGINS_DIR',
os.path.dirname(__file__)) + '/../python.d')
PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules')
sys.path.append(PYTHON_MODULES_DIR)
from bases.loaders import ModuleAndConfigLoader # noqa: E402
from bases.loggers import PythonDLogger # noqa: E402
from bases.collection import setdefault_values, run_and_exit # noqa: E402
try:
from collections import OrderedDict
except ImportError:
from third_party.ordereddict import OrderedDict
BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
'priority': 60000,
'autodetection_retry': 0,
'chart_cleanup': 10,
'penalty': True,
'name': str()}
MODULE_EXTENSION = '.chart.py'
OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log', 'cpufreq', 'cpuidle', 'mdstat']
def module_ok(m):
return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES
ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)]
def parse_cmd():
debug = 'debug' in argv[1:]
trace = 'trace' in argv[1:]
override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False)
modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES]
return debug, trace, override_update_every, modules or ALL_MODULES
def multi_job_check(config):
return next((True for key in config if isinstance(config[key], dict)), False)
class RawModule:
def __init__(self, name, path, explicitly_enabled=True):
self.name = name
self.path = path
self.explicitly_enabled = explicitly_enabled
class Job(object):
def __init__(self, initialized_job, job_id):
"""
:param initialized_job: instance of <Class Service>
:param job_id: <str>
"""
self.job = initialized_job
self.id = job_id # key in Modules.jobs()
self.module_name = self.job.__module__ # used in Plugin.delete_job()
self.recheck_every = self.job.configuration.pop('autodetection_retry')
self.checked = False # used in Plugin.check_job()
self.created = False # used in Plugin.create_job_charts()
if self.job.update_every < int(OVERRIDE_UPDATE_EVERY):
self.job.update_every = int(OVERRIDE_UPDATE_EVERY)
def __getattr__(self, item):
return getattr(self.job, item)
def __repr__(self):
return self.job.__repr__()
def is_dead(self):
return bool(self.ident) and not self.is_alive()
def not_launched(self):
return not bool(self.ident)
def is_autodetect(self):
return self.recheck_every
class Module(object):
def __init__(self, service, config):
"""
:param service: <Module>
:param config: <dict>
"""
self.service = service
self.name = service.__name__
self.config = self.jobs_configurations_builder(config)
self.jobs = OrderedDict()
self.counter = 1
self.initialize_jobs()
def __repr__(self):
return "<Class Module '{name}'>".format(name=self.name)
def __iter__(self):
return iter(OrderedDict(self.jobs).values())
def __getitem__(self, item):
return self.jobs[item]
def __delitem__(self, key):
del self.jobs[key]
def __len__(self):
return len(self.jobs)
def __bool__(self):
return bool(self.jobs)
def __nonzero__(self):
return self.__bool__()
def jobs_configurations_builder(self, config):
"""
:param config: <dict>
:return:
"""
counter = 0
job_base_config = dict()
for attr in BASE_CONFIG:
job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr]))
if not config:
config = {str(): dict()}
elif not multi_job_check(config):
config = {str(): config}
for job_name in config:
if not isinstance(config[job_name], dict):
continue
job_config = setdefault_values(config[job_name], base_dict=job_base_config)
job_name = sub(r'\s+', '_', job_name)
config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name'])
counter += 1
job_id = 'job' + str(counter).zfill(3)
yield job_id, job_name, job_config
def initialize_jobs(self):
"""
:return:
"""
for job_id, job_name, job_config in self.config:
job_config['job_name'] = job_name
job_config['override_name'] = job_config.pop('name')
try:
initialized_job = self.service.Service(configuration=job_config)
except Exception as error:
Logger.error("job initialization: '{module_name} {job_name}' "
"=> ['FAILED'] ({error})".format(module_name=self.name,
job_name=job_name,
error=error))
continue
else:
Logger.debug("job initialization: '{module_name} {job_name}' "
"=> ['OK']".format(module_name=self.name,
job_name=job_name or self.name))
self.jobs[job_id] = Job(initialized_job=initialized_job,
job_id=job_id)
del self.config
del self.service
class Plugin(object):
def __init__(self):
self.loader = ModuleAndConfigLoader()
self.modules = OrderedDict()
self.sleep_time = 1
self.runs_counter = 0
user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf')
stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf')
Logger.debug("loading '{0}'".format(user_config))
self.config, error = self.loader.load_config_from_file(user_config)
if error:
Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error))
Logger.debug("loading '{0}'".format(stock_config))
self.config, error = self.loader.load_config_from_file(stock_config)
if error:
Logger.error("cannot load '{0}': {1}".format(stock_config, error))
self.do_gc = self.config.get("gc_run", GC_RUN)
self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY)
if not self.config.get('enabled', True):
run_and_exit(Logger.info)('DISABLED in configuration file.')
self.load_and_initialize_modules()
if not self.modules:
run_and_exit(Logger.info)('No modules to run. Exit...')
def __iter__(self):
return iter(OrderedDict(self.modules).values())
@property
def jobs(self):
return (job for mod in self for job in mod)
@property
def dead_jobs(self):
return (job for job in self.jobs if job.is_dead())
@property
def autodetect_jobs(self):
return [job for job in self.jobs if job.not_launched()]
def enabled_modules(self):
for mod in MODULES_TO_RUN:
mod_name = mod[:-len(MODULE_EXTENSION)]
mod_path = os.path.join(PLUGINS_DIR, mod)
if any(
[
self.config.get('default_run', True) and self.config.get(mod_name, True),
(not self.config.get('default_run')) and self.config.get(mod_name),
]
):
yield RawModule(
name=mod_name,
path=mod_path,
explicitly_enabled=self.config.get(mod_name),
)
def load_and_initialize_modules(self):
for mod in self.enabled_modules():
# Load module from file ------------------------------------------------------------
loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path)
log = Logger.error if error else Logger.debug
log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
module_name=mod.name))
if error:
Logger.error("load source error : {0}".format(error))
continue
# Load module config from file ------------------------------------------------------
user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf')
stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf')
Logger.debug("loading '{0}'".format(user_config))
loaded_config, error = self.loader.load_config_from_file(user_config)
if error:
Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error))
Logger.debug("loading '{0}'".format(stock_config))
loaded_config, error = self.loader.load_config_from_file(stock_config)
if error:
Logger.error("cannot load '{0}': {1}".format(stock_config, error))
# Skip disabled modules
if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled:
Logger.info("module '{0}' disabled by default".format(loaded_module.__name__))
continue
# Module initialization ---------------------------------------------------
initialized_module = Module(service=loaded_module, config=loaded_config)
Logger.debug("module status: '{module_name}' => [{status}] "
"(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED',
module_name=initialized_module.name,
jobs_number=len(initialized_module)))
if initialized_module:
self.modules[initialized_module.name] = initialized_module
@staticmethod
def check_job(job):
"""
:param job: <Job>
:return:
"""
try:
check_ok = bool(job.check())
except Exception as error:
job.error('check() unhandled exception: {error}'.format(error=error))
return None
else:
return check_ok
@staticmethod
def create_job_charts(job):
"""
:param job: <Job>
:return:
"""
try:
create_ok = job.create()
except Exception as error:
job.error('create() unhandled exception: {error}'.format(error=error))
return False
else:
return create_ok
def delete_job(self, job):
"""
:param job: <Job>
:return:
"""
del self.modules[job.module_name][job.id]
def run_check(self):
checked = list()
for job in self.jobs:
if job.name in checked:
job.info('check() => [DROPPED] (already served by another job)')
self.delete_job(job)
continue
ok = self.check_job(job)
if ok:
job.info('check() => [OK]')
checked.append(job.name)
job.checked = True
continue
if not job.is_autodetect() or ok is None:
job.info('check() => [FAILED]')
self.delete_job(job)
else:
job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every))
def run_create(self):
for job in self.jobs:
if not job.checked:
# skip autodetection_retry jobs
continue
ok = self.create_job_charts(job)
if ok:
job.debug('create() => [OK] (charts: {0})'.format(len(job.charts)))
job.created = True
continue
job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts)))
self.delete_job(job)
def start(self):
self.run_check()
self.run_create()
for job in self.jobs:
if job.created:
job.start()
while True:
if threading.active_count() <= 1 and not self.autodetect_jobs:
run_and_exit(Logger.info)('FINISHED')
sleep(self.sleep_time)
self.cleanup()
self.autodetect_retry()
# FIXME: https://github.com/netdata/netdata/issues/3817
if self.do_gc and self.runs_counter % self.gc_interval == 0:
v = gc.collect()
Logger.debug("GC full collection run result: {0}".format(v))
def cleanup(self):
for job in self.dead_jobs:
self.delete_job(job)
for mod in self:
if not mod:
del self.modules[mod.name]
def autodetect_retry(self):
self.runs_counter += self.sleep_time
for job in self.autodetect_jobs:
if self.runs_counter % job.recheck_every == 0:
checked = self.check_job(job)
if checked:
created = self.create_job_charts(job)
if not created:
self.delete_job(job)
continue
job.start()
if __name__ == '__main__':
DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd()
Logger = PythonDLogger()
if DEBUG:
Logger.logger.severity = 'DEBUG'
if TRACE:
Logger.log_traceback = True
Logger.info('Using python {version}'.format(version=PY_VERSION[0]))
plugin = Plugin()
plugin.start()