netdata/collectors/python.d.plugin/python.d.plugin.in

770 lines
22 KiB
Bash

#!/usr/bin/env bash
'''':;
exec "$(command -v python || command -v python3 || command -v python2 ||
echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
# -*- coding: utf-8 -*-
# Description:
# Author: Pawel Krupa (paulfantom)
# Author: Ilya Mashchenko (l2isbad)
# SPDX-License-Identifier: GPL-3.0-or-later
import collections
import copy
import gc
import json
import os
import pprint
import re
import sys
import time
import threading
import types
try:
from queue import Queue
except ImportError:
from Queue import Queue
PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
if PY_VERSION > (3, 1):
from importlib.machinery import SourceFileLoader
else:
from imp import load_source as SourceFileLoader
ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
def add_pythond_packages():
pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
pythond = os.path.abspath(pluginsd + '/../python.d')
packages = os.path.join(pythond, 'python_modules')
sys.path.append(packages)
add_pythond_packages()
from bases.collection import safe_print
from bases.loggers import PythonDLogger
from bases.loaders import load_config
try:
from collections import OrderedDict
except ImportError:
from third_party.ordereddict import OrderedDict
def dirs():
var_lib = os.getenv(
ENV_NETDATA_LIB_DIR,
'@varlibdir_POST@',
)
plugin_user_config = os.getenv(
ENV_NETDATA_USER_CONFIG_DIR,
'@configdir_POST@',
)
plugin_stock_config = os.getenv(
ENV_NETDATA_STOCK_CONFIG_DIR,
'@libconfigdir_POST@',
)
pluginsd = os.getenv(
ENV_NETDATA_PLUGINS_DIR,
os.path.dirname(__file__),
)
modules_user_config = os.path.join(plugin_user_config, 'python.d')
modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
modules = os.path.abspath(pluginsd + '/../python.d')
Dirs = collections.namedtuple(
'Dirs',
[
'plugin_user_config',
'plugin_stock_config',
'modules_user_config',
'modules_stock_config',
'modules',
'var_lib',
]
)
return Dirs(
plugin_user_config,
plugin_stock_config,
modules_user_config,
modules_stock_config,
modules,
var_lib,
)
DIRS = dirs()
IS_ATTY = sys.stdout.isatty()
MODULE_SUFFIX = '.chart.py'
def available_modules():
obsolete = (
'apache_cache', # replaced by web_log
'cpuidle', # rewritten in C
'cpufreq', # rewritten in C
'gunicorn_log', # replaced by web_log
'linux_power_supply', # rewritten in C
'nginx_log', # replaced by web_log
'mdstat', # rewritten in C
'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
)
files = sorted(os.listdir(DIRS.modules))
modules = [m[:-len(MODULE_SUFFIX)] for m in files if m.endswith(MODULE_SUFFIX)]
avail = [m for m in modules if m not in obsolete]
return tuple(avail)
AVAILABLE_MODULES = available_modules()
JOB_BASE_CONF = {
'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
'priority': 60000,
'autodetection_retry': 0,
'chart_cleanup': 10,
'penalty': True,
'name': str(),
}
PLUGIN_BASE_CONF = {
'enabled': True,
'default_run': True,
'gc_run': True,
'gc_interval': 300,
}
def multi_path_find(name, *paths):
for path in paths:
abs_name = os.path.join(path, name)
if os.path.isfile(abs_name):
return abs_name
return str()
def load_module(name):
abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
module = SourceFileLoader(name, abs_path)
if isinstance(module, types.ModuleType):
return module
return module.load_module()
class ModuleConfig:
def __init__(self, name, config=None):
self.name = name
self.config = config or OrderedDict()
def load(self, abs_path):
self.config.update(load_config(abs_path) or dict())
def defaults(self):
keys = (
'update_every',
'priority',
'autodetection_retry',
'chart_cleanup',
'penalty',
)
return dict((k, self.config[k]) for k in keys if k in self.config)
def create_job(self, job_name, job_config=None):
job_config = job_config or dict()
config = OrderedDict()
config.update(job_config)
config['job_name'] = job_name
for k, v in self.defaults().items():
config.setdefault(k, v)
return config
def job_names(self):
return [v for v in self.config if isinstance(self.config.get(v), dict)]
def single_job(self):
return [self.create_job(self.name, self.config)]
def multi_job(self):
return [self.create_job(n, self.config[n]) for n in self.job_names()]
def create_jobs(self):
return self.multi_job() or self.single_job()
class JobsConfigsBuilder:
def __init__(self, config_dirs):
self.config_dirs = config_dirs
self.log = PythonDLogger()
self.job_defaults = None
self.module_defaults = None
self.min_update_every = None
def load_module_config(self, module_name):
name = '{0}.conf'.format(module_name)
self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
config = ModuleConfig(module_name)
abs_path = multi_path_find(name, *self.config_dirs)
if not abs_path:
self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
return config
self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
try:
config.load(abs_path)
except Exception as error:
self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
return None
self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
return config
@staticmethod
def apply_defaults(jobs, defaults):
if defaults is None:
return
for k, v in defaults.items():
for job in jobs:
job.setdefault(k, v)
def set_min_update_every(self, jobs, min_update_every):
if min_update_every is None:
return
for job in jobs:
if 'update_every' in job and job['update_every'] < self.min_update_every:
job['update_every'] = self.min_update_every
def build(self, module_name):
config = self.load_module_config(module_name)
if config is None:
return None
configs = config.create_jobs()
self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))
self.apply_defaults(configs, self.module_defaults)
self.apply_defaults(configs, self.job_defaults)
self.set_min_update_every(configs, self.min_update_every)
return configs
JOB_STATUS_ACTIVE = 'active'
JOB_STATUS_RECOVERING = 'recovering'
JOB_STATUS_DROPPED = 'dropped'
JOB_STATUS_INIT = 'initial'
class Job(threading.Thread):
inf = -1
def __init__(self, service, module_name, config):
threading.Thread.__init__(self)
self.daemon = True
self.service = service
self.module_name = module_name
self.config = config
self.real_name = config['job_name']
self.actual_name = config['override_name'] or self.real_name
self.autodetection_retry = config['autodetection_retry']
self.checks = self.inf
self.job = None
self.status = JOB_STATUS_INIT
def is_inited(self):
return self.job is not None
def init(self):
self.job = self.service(configuration=copy.deepcopy(self.config))
def check(self):
ok = self.job.check()
self.checks -= self.checks != self.inf and not ok
return ok
def create(self):
self.job.create()
def need_to_recheck(self):
return self.autodetection_retry != 0 and self.checks != 0
def run(self):
self.job.run()
class ModuleSrc:
def __init__(self, name):
self.name = name
self.src = None
def load(self):
self.src = load_module(self.name)
def get(self, key):
return getattr(self.src, key, None)
def service(self):
return self.get('Service')
def defaults(self):
keys = (
'update_every',
'priority',
'autodetection_retry',
'chart_cleanup',
'penalty',
)
return dict((k, self.get(k)) for k in keys if self.get(k) is not None)
def is_disabled_by_default(self):
return bool(self.get('disabled_by_default'))
class JobsStatuses:
def __init__(self):
self.items = OrderedDict()
def dump(self):
return json.dumps(self.items, indent=2)
def get(self, module_name, job_name):
if module_name not in self.items:
return None
return self.items[module_name].get(job_name)
def has(self, module_name, job_name):
return self.get(module_name, job_name) is not None
def from_file(self, path):
with open(path) as f:
data = json.load(f)
return self.from_json(data)
@staticmethod
def from_json(items):
if not isinstance(items, dict):
raise Exception('items obj has wrong type : {0}'.format(type(items)))
if not items:
return JobsStatuses()
v = OrderedDict()
for mod_name in sorted(items):
if not items[mod_name]:
continue
v[mod_name] = OrderedDict()
for job_name in sorted(items[mod_name]):
v[mod_name][job_name] = items[mod_name][job_name]
rv = JobsStatuses()
rv.items = v
return rv
@staticmethod
def from_jobs(jobs):
v = OrderedDict()
for job in jobs:
status = job.status
if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
continue
if job.module_name not in v:
v[job.module_name] = OrderedDict()
v[job.module_name][job.real_name] = status
rv = JobsStatuses()
rv.items = v
return rv
class StdoutSaver:
@staticmethod
def save(dump):
print(dump)
class CachedFileSaver:
def __init__(self, path):
self.last_save_success = False
self.last_saved_dump = str()
self.path = path
def save(self, dump):
if self.last_save_success and self.last_saved_dump == dump:
return
try:
with open(self.path, 'w') as out:
out.write(dump)
except Exception:
self.last_save_success = False
raise
self.last_saved_dump = dump
self.last_save_success = True
class PluginConfig(dict):
def __init__(self, *args):
dict.__init__(self, *args)
def is_module_explicitly_enabled(self, module_name):
return self._is_module_enabled(module_name, True)
def is_module_enabled(self, module_name):
return self._is_module_enabled(module_name, False)
def _is_module_enabled(self, module_name, explicit):
if module_name in self:
return self[module_name]
if explicit:
return False
return self['default_run']
class Plugin:
config_name = 'python.d.conf'
jobs_status_dump_name = 'pythond-jobs-statuses.json'
def __init__(self, modules_to_run, min_update_every):
self.modules_to_run = modules_to_run
self.min_update_every = min_update_every
self.config = PluginConfig(PLUGIN_BASE_CONF)
self.log = PythonDLogger()
self.started_jobs = collections.defaultdict(dict)
self.jobs = list()
self.saver = None
self.runs = 0
def load_config(self):
paths = [
DIRS.plugin_user_config,
DIRS.plugin_stock_config,
]
self.log.debug("looking for '{0}' in {1}".format(self.config_name, paths))
abs_path = multi_path_find(self.config_name, *paths)
if not abs_path:
self.log.warning("'{0}' was not found, using defaults".format(self.config_name))
return True
self.log.debug("loading '{0}'".format(abs_path))
try:
config = load_config(abs_path)
except Exception as error:
self.log.error("error on loading '{0}' : {1}".format(abs_path, repr(error)))
return False
self.log.debug("'{0}' is loaded".format(abs_path))
self.config.update(config)
return True
def load_job_statuses(self):
self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
if not abs_path:
self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
return
self.log.debug("loading '{0}'".format(abs_path))
try:
statuses = JobsStatuses().from_file(abs_path)
except Exception as error:
self.log.warning("error on loading '{0}' : {1}".format(abs_path, repr(error)))
return None
self.log.debug("'{0}' is loaded".format(abs_path))
return statuses
def create_jobs(self, job_statuses=None):
paths = [
DIRS.modules_user_config,
DIRS.modules_stock_config,
]
builder = JobsConfigsBuilder(paths)
builder.job_defaults = JOB_BASE_CONF
builder.min_update_every = self.min_update_every
jobs = list()
for mod_name in self.modules_to_run:
if not self.config.is_module_enabled(mod_name):
self.log.info("[{0}] is disabled in the configuration file, skipping it".format(mod_name))
continue
src = ModuleSrc(mod_name)
try:
src.load()
except Exception as error:
self.log.warning("[{0}] error on loading source : {1}, skipping it".format(mod_name, repr(error)))
continue
if not (src.service() and callable(src.service())):
self.log.warning("[{0}] has no callable Service object, skipping it".format(mod_name))
continue
if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(mod_name):
self.log.info("[{0}] is disabled by default, skipping it".format(mod_name))
continue
builder.module_defaults = src.defaults()
configs = builder.build(mod_name)
if not configs:
self.log.info("[{0}] has no job configs, skipping it".format(mod_name))
continue
for config in configs:
config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))
job = Job(src.service(), mod_name, config)
was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
if was_previously_active and job.autodetection_retry == 0:
self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
job.module_name, job.real_name))
job.checks = 11
job.autodetection_retry = 30
jobs.append(job)
return jobs
def setup(self):
if not self.load_config():
return False
if not self.config['enabled']:
self.log.info('disabled in the configuration file')
return False
statuses = self.load_job_statuses()
self.jobs = self.create_jobs(statuses)
if not self.jobs:
self.log.info('no jobs to run')
return False
if not IS_ATTY:
abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
self.saver = CachedFileSaver(abs_path)
return True
def start_jobs(self, *jobs):
for job in jobs:
if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
continue
if job.actual_name in self.started_jobs[job.module_name]:
self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
job.module_name, job.real_name))
job.status = JOB_STATUS_DROPPED
continue
if not job.is_inited():
try:
job.init()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job",
job.module_name, job.real_name, repr(error))
job.status = JOB_STATUS_DROPPED
continue
try:
ok = job.check()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job",
job.module_name, job.real_name, repr(error))
job.status = JOB_STATUS_DROPPED
continue
if not ok:
self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
continue
self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
try:
job.create()
except Exception as error:
self.log.error("{0}[{1}] : unhandled exception on create : {2}, skipping the job",
job.module_name, job.real_name, repr(error))
job.status = JOB_STATUS_DROPPED
continue
self.started_jobs[job.module_name] = job.actual_name
job.status = JOB_STATUS_ACTIVE
job.start()
@staticmethod
def keep_alive():
if not IS_ATTY:
safe_print('\n')
def garbage_collection(self):
if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
v = gc.collect()
self.log.debug('GC collection run result: {0}'.format(v))
def restart_recovering_jobs(self):
for job in self.jobs:
if job.status != JOB_STATUS_RECOVERING:
continue
if self.runs % job.autodetection_retry != 0:
continue
self.start_jobs(job)
def cleanup_jobs(self):
self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]
def have_alive_jobs(self):
return next(
(True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
False,
)
def save_job_statuses(self):
if self.saver is None:
return
if self.runs % 10 != 0:
return
dump = JobsStatuses().from_jobs(self.jobs).dump()
try:
self.saver.save(dump)
except Exception as error:
self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))
def serve_once(self):
if not self.have_alive_jobs():
self.log.info('no jobs to serve')
return False
time.sleep(1)
self.runs += 1
self.keep_alive()
self.garbage_collection()
self.cleanup_jobs()
self.restart_recovering_jobs()
self.save_job_statuses()
return True
def serve(self):
while self.serve_once():
pass
def run(self):
self.start_jobs(*self.jobs)
self.serve()
def parse_command_line():
opts = sys.argv[:][1:]
debug = False
trace = False
update_every = 1
modules_to_run = list()
def find_first_positive_int(values):
return next((v for v in values if v.isdigit() and int(v) >= 1), None)
u = find_first_positive_int(opts)
if u is not None:
update_every = int(u)
opts.remove(u)
if 'debug' in opts:
debug = True
opts.remove('debug')
if 'trace' in opts:
trace = True
opts.remove('trace')
if opts:
modules_to_run = list(opts)
cmd = collections.namedtuple(
'CMD',
[
'update_every',
'debug',
'trace',
'modules_to_run',
])
return cmd(
update_every,
debug,
trace,
modules_to_run
)
def guess_module(modules, *names):
def guess(n):
found = None
for i, _ in enumerate(n):
cur = [x for x in modules if x.startswith(name[:i + 1])]
if not cur:
return found
found = cur
return found
guessed = list()
for name in names:
name = name.lower()
m = guess(name)
if m:
guessed.extend(m)
return sorted(set(guessed))
def disable():
if not IS_ATTY:
safe_print('DISABLE')
exit(0)
def main():
cmd = parse_command_line()
log = PythonDLogger()
if cmd.debug:
log.logger.severity = 'DEBUG'
if cmd.trace:
log.log_traceback = True
log.info('using python v{0}'.format(PY_VERSION[0]))
unknown = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
if unknown:
log.error('unknown modules : {0}'.format(sorted(list(unknown))))
guessed = guess_module(AVAILABLE_MODULES, *cmd.modules_to_run)
if guessed:
log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
return
p = Plugin(
cmd.modules_to_run or AVAILABLE_MODULES,
cmd.update_every,
)
try:
if not p.setup():
return
p.run()
except KeyboardInterrupt:
pass
log.info('exiting from main...')
if __name__ == "__main__":
main()
disable()