From def03a151cd042748b2687729a922d2f93fbd570 Mon Sep 17 00:00:00 2001 From: Ilya Mashchenko Date: Tue, 21 Jul 2020 09:46:31 +0300 Subject: [PATCH] python.d: add job file lock registry (#9564) --- REDISTRIBUTED.md | 4 + collectors/python.d.plugin/Makefile.am | 1 + collectors/python.d.plugin/python.d.plugin.in | 65 +++ .../python_modules/third_party/filelock.py | 451 ++++++++++++++++++ 4 files changed, 521 insertions(+) create mode 100644 collectors/python.d.plugin/python_modules/third_party/filelock.py diff --git a/REDISTRIBUTED.md b/REDISTRIBUTED.md index bc1ce59ca8..5a3c328709 100644 --- a/REDISTRIBUTED.md +++ b/REDISTRIBUTED.md @@ -178,4 +178,8 @@ connectivity is not available. Copyright 2014, 2015, 2016 Ori Livneh [ori@wikimedia.org](mailto:ori@wikimedia.org) [Apache-2.0](http://www.apache.org/licenses/LICENSE-2.0) +- [filelock](https://github.com/benediktschmitt/py-filelock) + + Copyright 2015, Benedikt Schmitt [Unlicense License](https://unlicense.org/) + [![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2FREDISTRIBUTED&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/collectors/python.d.plugin/Makefile.am b/collectors/python.d.plugin/Makefile.am index e678f86a21..176bd3cb01 100644 --- a/collectors/python.d.plugin/Makefile.am +++ b/collectors/python.d.plugin/Makefile.am @@ -141,6 +141,7 @@ dist_third_party_DATA = \ python_modules/third_party/mcrcon.py \ python_modules/third_party/boinc_client.py \ python_modules/third_party/monotonic.py \ + python_modules/third_party/filelock.py \ $(NULL) pythonyaml2dir=$(pythonmodulesdir)/pyyaml2 diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in index b289def994..106a77475e 100644 --- a/collectors/python.d.plugin/python.d.plugin.in +++ b/collectors/python.d.plugin/python.d.plugin.in @@ -52,6 +52,7 @@ ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR' ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' +ENV_NETDATA_LOCKS_DIR = 'NETDATA_LOCKS_DIR' def add_pythond_packages(): @@ -66,6 +67,7 @@ add_pythond_packages() from bases.collection import safe_print from bases.loggers import PythonDLogger from bases.loaders import load_config +from third_party import filelock try: from collections import OrderedDict @@ -90,6 +92,10 @@ def dirs(): ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__), ) + locks = os.getenv( + ENV_NETDATA_LOCKS_DIR, + # TODO: add '@locksdir_POST@ + ) modules_user_config = os.path.join(plugin_user_config, 'python.d') modules_stock_config = os.path.join(plugin_stock_config, 'python.d') modules = os.path.abspath(pluginsd + '/../python.d') @@ -103,6 +109,7 @@ def dirs(): 'modules_stock_config', 'modules', 'var_lib', + 'locks', ] ) return Dirs( @@ -112,6 +119,7 @@ def dirs(): modules_stock_config, modules, var_lib, + locks, ) @@ -304,6 +312,9 @@ class Job(threading.Thread): def init(self): self.job = self.service(configuration=copy.deepcopy(self.config)) + def full_name(self): + return self.job.name + def check(self): ok = self.job.check() self.checks -= self.checks != self.inf and not ok @@ -445,6 +456,41 @@ class PluginConfig(dict): return self['default_run'] +class FileLockRegistry: + def __init__(self, path): + self.path = path + self.locks = dict() + + def register(self, name): + if name in self.locks: + return + file = os.path.join(self.path, name) + lock = filelock.FileLock(file) + lock.acquire(timeout=0) + self.locks[name] = lock + + def unregister(self, name): + if name not in self.locks: + return + lock = self.locks[name] + lock.release() + del self.locks[name] + + +class DummyRegistry: + def register(self, name): + pass + + def unregister(self, name): + pass + + +def create_jobs_registry(): + if not DIRS.locks: + return DummyRegistry() + return FileLockRegistry(DIRS.locks) + + class Plugin: config_name = 'python.d.conf' jobs_status_dump_name = 'pythond-jobs-statuses.json' @@ -454,6 +500,7 @@ class Plugin: self.min_update_every = min_update_every self.config = PluginConfig(PLUGIN_BASE_CONF) self.log = PythonDLogger() + self.registry = create_jobs_registry() self.started_jobs = collections.defaultdict(dict) self.jobs = list() self.saver = None @@ -604,12 +651,30 @@ class Plugin: continue self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name)) + try: + self.registry.register(job.full_name()) + except filelock.Timeout as error: + self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format( + job.module_name, job.real_name, error)) + job.status = JOB_STATUS_DROPPED + continue + except Exception as error: + self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format( + job.module_name, job.real_name, error)) + job.status = JOB_STATUS_DROPPED + continue + try: job.create() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format( job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED + try: + self.registry.unregister(job.full_name()) + except Exception as error: + self.log.warning('{0}[{1}] : deregistration failed: {2}'.format( + job.module_name, job.real_name, error)) continue self.started_jobs[job.module_name] = job.actual_name diff --git a/collectors/python.d.plugin/python_modules/third_party/filelock.py b/collectors/python.d.plugin/python_modules/third_party/filelock.py new file mode 100644 index 0000000000..4c981672bc --- /dev/null +++ b/collectors/python.d.plugin/python_modules/third_party/filelock.py @@ -0,0 +1,451 @@ +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# For more information, please refer to + +""" +A platform independent file lock that supports the with-statement. +""" + + +# Modules +# ------------------------------------------------ +import logging +import os +import threading +import time +try: + import warnings +except ImportError: + warnings = None + +try: + import msvcrt +except ImportError: + msvcrt = None + +try: + import fcntl +except ImportError: + fcntl = None + + +# Backward compatibility +# ------------------------------------------------ +try: + TimeoutError +except NameError: + TimeoutError = OSError + + +# Data +# ------------------------------------------------ +__all__ = [ + "Timeout", + "BaseFileLock", + "WindowsFileLock", + "UnixFileLock", + "SoftFileLock", + "FileLock" +] + +__version__ = "3.0.12" + + +_logger = None +def logger(): + """Returns the logger instance used in this module.""" + global _logger + _logger = _logger or logging.getLogger(__name__) + return _logger + + +# Exceptions +# ------------------------------------------------ +class Timeout(TimeoutError): + """ + Raised when the lock could not be acquired in *timeout* + seconds. + """ + + def __init__(self, lock_file): + """ + """ + #: The path of the file lock. + self.lock_file = lock_file + return None + + def __str__(self): + temp = "The file lock '{}' could not be acquired."\ + .format(self.lock_file) + return temp + + +# Classes +# ------------------------------------------------ + +# This is a helper class which is returned by :meth:`BaseFileLock.acquire` +# and wraps the lock to make sure __enter__ is not called twice when entering +# the with statement. +# If we would simply return *self*, the lock would be acquired again +# in the *__enter__* method of the BaseFileLock, but not released again +# automatically. +# +# :seealso: issue #37 (memory leak) +class _Acquire_ReturnProxy(object): + + def __init__(self, lock): + self.lock = lock + return None + + def __enter__(self): + return self.lock + + def __exit__(self, exc_type, exc_value, traceback): + self.lock.release() + return None + + +class BaseFileLock(object): + """ + Implements the base class of a file lock. + """ + + def __init__(self, lock_file, timeout = -1): + """ + """ + # The path to the lock file. + self._lock_file = lock_file + + # The file descriptor for the *_lock_file* as it is returned by the + # os.open() function. + # This file lock is only NOT None, if the object currently holds the + # lock. + self._lock_file_fd = None + + # The default timeout value. + self.timeout = timeout + + # We use this lock primarily for the lock counter. + self._thread_lock = threading.Lock() + + # The lock counter is used for implementing the nested locking + # mechanism. Whenever the lock is acquired, the counter is increased and + # the lock is only released, when this value is 0 again. + self._lock_counter = 0 + return None + + @property + def lock_file(self): + """ + The path to the lock file. + """ + return self._lock_file + + @property + def timeout(self): + """ + You can set a default timeout for the filelock. It will be used as + fallback value in the acquire method, if no timeout value (*None*) is + given. + + If you want to disable the timeout, set it to a negative value. + + A timeout of 0 means, that there is exactly one attempt to acquire the + file lock. + + .. versionadded:: 2.0.0 + """ + return self._timeout + + @timeout.setter + def timeout(self, value): + """ + """ + self._timeout = float(value) + return None + + # Platform dependent locking + # -------------------------------------------- + + def _acquire(self): + """ + Platform dependent. If the file lock could be + acquired, self._lock_file_fd holds the file descriptor + of the lock file. + """ + raise NotImplementedError() + + def _release(self): + """ + Releases the lock and sets self._lock_file_fd to None. + """ + raise NotImplementedError() + + # Platform independent methods + # -------------------------------------------- + + @property + def is_locked(self): + """ + True, if the object holds the file lock. + + .. versionchanged:: 2.0.0 + + This was previously a method and is now a property. + """ + return self._lock_file_fd is not None + + def acquire(self, timeout=None, poll_intervall=0.05): + """ + Acquires the file lock or fails with a :exc:`Timeout` error. + + .. code-block:: python + + # You can use this method in the context manager (recommended) + with lock.acquire(): + pass + + # Or use an equivalent try-finally construct: + lock.acquire() + try: + pass + finally: + lock.release() + + :arg float timeout: + The maximum time waited for the file lock. + If ``timeout < 0``, there is no timeout and this method will + block until the lock could be acquired. + If ``timeout`` is None, the default :attr:`~timeout` is used. + + :arg float poll_intervall: + We check once in *poll_intervall* seconds if we can acquire the + file lock. + + :raises Timeout: + if the lock could not be acquired in *timeout* seconds. + + .. versionchanged:: 2.0.0 + + This method returns now a *proxy* object instead of *self*, + so that it can be used in a with statement without side effects. + """ + # Use the default timeout, if no timeout is provided. + if timeout is None: + timeout = self.timeout + + # Increment the number right at the beginning. + # We can still undo it, if something fails. + with self._thread_lock: + self._lock_counter += 1 + + lock_id = id(self) + lock_filename = self._lock_file + start_time = time.time() + try: + while True: + with self._thread_lock: + if not self.is_locked: + logger().debug('Attempting to acquire lock %s on %s', lock_id, lock_filename) + self._acquire() + + if self.is_locked: + logger().info('Lock %s acquired on %s', lock_id, lock_filename) + break + elif timeout >= 0 and time.time() - start_time > timeout: + logger().debug('Timeout on acquiring lock %s on %s', lock_id, lock_filename) + raise Timeout(self._lock_file) + else: + logger().debug( + 'Lock %s not acquired on %s, waiting %s seconds ...', + lock_id, lock_filename, poll_intervall + ) + time.sleep(poll_intervall) + except: + # Something did go wrong, so decrement the counter. + with self._thread_lock: + self._lock_counter = max(0, self._lock_counter - 1) + + raise + return _Acquire_ReturnProxy(lock = self) + + def release(self, force = False): + """ + Releases the file lock. + + Please note, that the lock is only completly released, if the lock + counter is 0. + + Also note, that the lock file itself is not automatically deleted. + + :arg bool force: + If true, the lock counter is ignored and the lock is released in + every case. + """ + with self._thread_lock: + + if self.is_locked: + self._lock_counter -= 1 + + if self._lock_counter == 0 or force: + lock_id = id(self) + lock_filename = self._lock_file + + logger().debug('Attempting to release lock %s on %s', lock_id, lock_filename) + self._release() + self._lock_counter = 0 + logger().info('Lock %s released on %s', lock_id, lock_filename) + + return None + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.release() + return None + + def __del__(self): + self.release(force = True) + return None + + +# Windows locking mechanism +# ~~~~~~~~~~~~~~~~~~~~~~~~~ + +class WindowsFileLock(BaseFileLock): + """ + Uses the :func:`msvcrt.locking` function to hard lock the lock file on + windows systems. + """ + + def _acquire(self): + open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC + + try: + fd = os.open(self._lock_file, open_mode) + except OSError: + pass + else: + try: + msvcrt.locking(fd, msvcrt.LK_NBLCK, 1) + except (IOError, OSError): + os.close(fd) + else: + self._lock_file_fd = fd + return None + + def _release(self): + fd = self._lock_file_fd + self._lock_file_fd = None + msvcrt.locking(fd, msvcrt.LK_UNLCK, 1) + os.close(fd) + + try: + os.remove(self._lock_file) + # Probably another instance of the application + # that acquired the file lock. + except OSError: + pass + return None + +# Unix locking mechanism +# ~~~~~~~~~~~~~~~~~~~~~~ + +class UnixFileLock(BaseFileLock): + """ + Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems. + """ + + def _acquire(self): + open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC + fd = os.open(self._lock_file, open_mode) + + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except (IOError, OSError): + os.close(fd) + else: + self._lock_file_fd = fd + return None + + def _release(self): + # Do not remove the lockfile: + # + # https://github.com/benediktschmitt/py-filelock/issues/31 + # https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition + fd = self._lock_file_fd + self._lock_file_fd = None + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + return None + +# Soft lock +# ~~~~~~~~~ + +class SoftFileLock(BaseFileLock): + """ + Simply watches the existence of the lock file. + """ + + def _acquire(self): + open_mode = os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_TRUNC + try: + fd = os.open(self._lock_file, open_mode) + except (IOError, OSError): + pass + else: + self._lock_file_fd = fd + return None + + def _release(self): + os.close(self._lock_file_fd) + self._lock_file_fd = None + + try: + os.remove(self._lock_file) + # The file is already deleted and that's what we want. + except OSError: + pass + return None + + +# Platform filelock +# ~~~~~~~~~~~~~~~~~ + +#: Alias for the lock, which should be used for the current platform. On +#: Windows, this is an alias for :class:`WindowsFileLock`, on Unix for +#: :class:`UnixFileLock` and otherwise for :class:`SoftFileLock`. +FileLock = None + +if msvcrt: + FileLock = WindowsFileLock +elif fcntl: + FileLock = UnixFileLock +else: + FileLock = SoftFileLock + + if warnings is not None: + warnings.warn("only soft file lock is available")