python.d: add job file lock registry (#9564)

This commit is contained in:
Ilya Mashchenko 2020-07-21 09:46:31 +03:00 committed by GitHub
parent 79ea229a81
commit def03a151c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 521 additions and 0 deletions

View File

@ -178,4 +178,8 @@ connectivity is not available.
Copyright 2014, 2015, 2016 Ori Livneh [ori@wikimedia.org](mailto:ori@wikimedia.org)
[Apache-2.0](http://www.apache.org/licenses/LICENSE-2.0)
- [filelock](https://github.com/benediktschmitt/py-filelock)
Copyright 2015, Benedikt Schmitt [Unlicense License](https://unlicense.org/)
[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2FREDISTRIBUTED&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)

View File

@ -141,6 +141,7 @@ dist_third_party_DATA = \
python_modules/third_party/mcrcon.py \
python_modules/third_party/boinc_client.py \
python_modules/third_party/monotonic.py \
python_modules/third_party/filelock.py \
$(NULL)
pythonyaml2dir=$(pythonmodulesdir)/pyyaml2

View File

@ -52,6 +52,7 @@ ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
ENV_NETDATA_LOCKS_DIR = 'NETDATA_LOCKS_DIR'
def add_pythond_packages():
@ -66,6 +67,7 @@ add_pythond_packages()
from bases.collection import safe_print
from bases.loggers import PythonDLogger
from bases.loaders import load_config
from third_party import filelock
try:
from collections import OrderedDict
@ -90,6 +92,10 @@ def dirs():
ENV_NETDATA_PLUGINS_DIR,
os.path.dirname(__file__),
)
locks = os.getenv(
ENV_NETDATA_LOCKS_DIR,
# TODO: add '@locksdir_POST@
)
modules_user_config = os.path.join(plugin_user_config, 'python.d')
modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
modules = os.path.abspath(pluginsd + '/../python.d')
@ -103,6 +109,7 @@ def dirs():
'modules_stock_config',
'modules',
'var_lib',
'locks',
]
)
return Dirs(
@ -112,6 +119,7 @@ def dirs():
modules_stock_config,
modules,
var_lib,
locks,
)
@ -304,6 +312,9 @@ class Job(threading.Thread):
def init(self):
self.job = self.service(configuration=copy.deepcopy(self.config))
def full_name(self):
return self.job.name
def check(self):
ok = self.job.check()
self.checks -= self.checks != self.inf and not ok
@ -445,6 +456,41 @@ class PluginConfig(dict):
return self['default_run']
class FileLockRegistry:
def __init__(self, path):
self.path = path
self.locks = dict()
def register(self, name):
if name in self.locks:
return
file = os.path.join(self.path, name)
lock = filelock.FileLock(file)
lock.acquire(timeout=0)
self.locks[name] = lock
def unregister(self, name):
if name not in self.locks:
return
lock = self.locks[name]
lock.release()
del self.locks[name]
class DummyRegistry:
def register(self, name):
pass
def unregister(self, name):
pass
def create_jobs_registry():
if not DIRS.locks:
return DummyRegistry()
return FileLockRegistry(DIRS.locks)
class Plugin:
config_name = 'python.d.conf'
jobs_status_dump_name = 'pythond-jobs-statuses.json'
@ -454,6 +500,7 @@ class Plugin:
self.min_update_every = min_update_every
self.config = PluginConfig(PLUGIN_BASE_CONF)
self.log = PythonDLogger()
self.registry = create_jobs_registry()
self.started_jobs = collections.defaultdict(dict)
self.jobs = list()
self.saver = None
@ -604,12 +651,30 @@ class Plugin:
continue
self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
try:
self.registry.register(job.full_name())
except filelock.Timeout as error:
self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
job.module_name, job.real_name, error))
job.status = JOB_STATUS_DROPPED
continue
except Exception as error:
self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
job.module_name, job.real_name, error))
job.status = JOB_STATUS_DROPPED
continue
try:
job.create()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
job.module_name, job.real_name, repr(error)))
job.status = JOB_STATUS_DROPPED
try:
self.registry.unregister(job.full_name())
except Exception as error:
self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
job.module_name, job.real_name, error))
continue
self.started_jobs[job.module_name] = job.actual_name

View File

@ -0,0 +1,451 @@
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# For more information, please refer to <http://unlicense.org>
"""
A platform independent file lock that supports the with-statement.
"""
# Modules
# ------------------------------------------------
import logging
import os
import threading
import time
try:
import warnings
except ImportError:
warnings = None
try:
import msvcrt
except ImportError:
msvcrt = None
try:
import fcntl
except ImportError:
fcntl = None
# Backward compatibility
# ------------------------------------------------
try:
TimeoutError
except NameError:
TimeoutError = OSError
# Data
# ------------------------------------------------
__all__ = [
"Timeout",
"BaseFileLock",
"WindowsFileLock",
"UnixFileLock",
"SoftFileLock",
"FileLock"
]
__version__ = "3.0.12"
_logger = None
def logger():
"""Returns the logger instance used in this module."""
global _logger
_logger = _logger or logging.getLogger(__name__)
return _logger
# Exceptions
# ------------------------------------------------
class Timeout(TimeoutError):
"""
Raised when the lock could not be acquired in *timeout*
seconds.
"""
def __init__(self, lock_file):
"""
"""
#: The path of the file lock.
self.lock_file = lock_file
return None
def __str__(self):
temp = "The file lock '{}' could not be acquired."\
.format(self.lock_file)
return temp
# Classes
# ------------------------------------------------
# This is a helper class which is returned by :meth:`BaseFileLock.acquire`
# and wraps the lock to make sure __enter__ is not called twice when entering
# the with statement.
# If we would simply return *self*, the lock would be acquired again
# in the *__enter__* method of the BaseFileLock, but not released again
# automatically.
#
# :seealso: issue #37 (memory leak)
class _Acquire_ReturnProxy(object):
def __init__(self, lock):
self.lock = lock
return None
def __enter__(self):
return self.lock
def __exit__(self, exc_type, exc_value, traceback):
self.lock.release()
return None
class BaseFileLock(object):
"""
Implements the base class of a file lock.
"""
def __init__(self, lock_file, timeout = -1):
"""
"""
# The path to the lock file.
self._lock_file = lock_file
# The file descriptor for the *_lock_file* as it is returned by the
# os.open() function.
# This file lock is only NOT None, if the object currently holds the
# lock.
self._lock_file_fd = None
# The default timeout value.
self.timeout = timeout
# We use this lock primarily for the lock counter.
self._thread_lock = threading.Lock()
# The lock counter is used for implementing the nested locking
# mechanism. Whenever the lock is acquired, the counter is increased and
# the lock is only released, when this value is 0 again.
self._lock_counter = 0
return None
@property
def lock_file(self):
"""
The path to the lock file.
"""
return self._lock_file
@property
def timeout(self):
"""
You can set a default timeout for the filelock. It will be used as
fallback value in the acquire method, if no timeout value (*None*) is
given.
If you want to disable the timeout, set it to a negative value.
A timeout of 0 means, that there is exactly one attempt to acquire the
file lock.
.. versionadded:: 2.0.0
"""
return self._timeout
@timeout.setter
def timeout(self, value):
"""
"""
self._timeout = float(value)
return None
# Platform dependent locking
# --------------------------------------------
def _acquire(self):
"""
Platform dependent. If the file lock could be
acquired, self._lock_file_fd holds the file descriptor
of the lock file.
"""
raise NotImplementedError()
def _release(self):
"""
Releases the lock and sets self._lock_file_fd to None.
"""
raise NotImplementedError()
# Platform independent methods
# --------------------------------------------
@property
def is_locked(self):
"""
True, if the object holds the file lock.
.. versionchanged:: 2.0.0
This was previously a method and is now a property.
"""
return self._lock_file_fd is not None
def acquire(self, timeout=None, poll_intervall=0.05):
"""
Acquires the file lock or fails with a :exc:`Timeout` error.
.. code-block:: python
# You can use this method in the context manager (recommended)
with lock.acquire():
pass
# Or use an equivalent try-finally construct:
lock.acquire()
try:
pass
finally:
lock.release()
:arg float timeout:
The maximum time waited for the file lock.
If ``timeout < 0``, there is no timeout and this method will
block until the lock could be acquired.
If ``timeout`` is None, the default :attr:`~timeout` is used.
:arg float poll_intervall:
We check once in *poll_intervall* seconds if we can acquire the
file lock.
:raises Timeout:
if the lock could not be acquired in *timeout* seconds.
.. versionchanged:: 2.0.0
This method returns now a *proxy* object instead of *self*,
so that it can be used in a with statement without side effects.
"""
# Use the default timeout, if no timeout is provided.
if timeout is None:
timeout = self.timeout
# Increment the number right at the beginning.
# We can still undo it, if something fails.
with self._thread_lock:
self._lock_counter += 1
lock_id = id(self)
lock_filename = self._lock_file
start_time = time.time()
try:
while True:
with self._thread_lock:
if not self.is_locked:
logger().debug('Attempting to acquire lock %s on %s', lock_id, lock_filename)
self._acquire()
if self.is_locked:
logger().info('Lock %s acquired on %s', lock_id, lock_filename)
break
elif timeout >= 0 and time.time() - start_time > timeout:
logger().debug('Timeout on acquiring lock %s on %s', lock_id, lock_filename)
raise Timeout(self._lock_file)
else:
logger().debug(
'Lock %s not acquired on %s, waiting %s seconds ...',
lock_id, lock_filename, poll_intervall
)
time.sleep(poll_intervall)
except:
# Something did go wrong, so decrement the counter.
with self._thread_lock:
self._lock_counter = max(0, self._lock_counter - 1)
raise
return _Acquire_ReturnProxy(lock = self)
def release(self, force = False):
"""
Releases the file lock.
Please note, that the lock is only completly released, if the lock
counter is 0.
Also note, that the lock file itself is not automatically deleted.
:arg bool force:
If true, the lock counter is ignored and the lock is released in
every case.
"""
with self._thread_lock:
if self.is_locked:
self._lock_counter -= 1
if self._lock_counter == 0 or force:
lock_id = id(self)
lock_filename = self._lock_file
logger().debug('Attempting to release lock %s on %s', lock_id, lock_filename)
self._release()
self._lock_counter = 0
logger().info('Lock %s released on %s', lock_id, lock_filename)
return None
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.release()
return None
def __del__(self):
self.release(force = True)
return None
# Windows locking mechanism
# ~~~~~~~~~~~~~~~~~~~~~~~~~
class WindowsFileLock(BaseFileLock):
"""
Uses the :func:`msvcrt.locking` function to hard lock the lock file on
windows systems.
"""
def _acquire(self):
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
try:
fd = os.open(self._lock_file, open_mode)
except OSError:
pass
else:
try:
msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
except (IOError, OSError):
os.close(fd)
else:
self._lock_file_fd = fd
return None
def _release(self):
fd = self._lock_file_fd
self._lock_file_fd = None
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
os.close(fd)
try:
os.remove(self._lock_file)
# Probably another instance of the application
# that acquired the file lock.
except OSError:
pass
return None
# Unix locking mechanism
# ~~~~~~~~~~~~~~~~~~~~~~
class UnixFileLock(BaseFileLock):
"""
Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.
"""
def _acquire(self):
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
fd = os.open(self._lock_file, open_mode)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError):
os.close(fd)
else:
self._lock_file_fd = fd
return None
def _release(self):
# Do not remove the lockfile:
#
# https://github.com/benediktschmitt/py-filelock/issues/31
# https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
fd = self._lock_file_fd
self._lock_file_fd = None
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)
return None
# Soft lock
# ~~~~~~~~~
class SoftFileLock(BaseFileLock):
"""
Simply watches the existence of the lock file.
"""
def _acquire(self):
open_mode = os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_TRUNC
try:
fd = os.open(self._lock_file, open_mode)
except (IOError, OSError):
pass
else:
self._lock_file_fd = fd
return None
def _release(self):
os.close(self._lock_file_fd)
self._lock_file_fd = None
try:
os.remove(self._lock_file)
# The file is already deleted and that's what we want.
except OSError:
pass
return None
# Platform filelock
# ~~~~~~~~~~~~~~~~~
#: Alias for the lock, which should be used for the current platform. On
#: Windows, this is an alias for :class:`WindowsFileLock`, on Unix for
#: :class:`UnixFileLock` and otherwise for :class:`SoftFileLock`.
FileLock = None
if msvcrt:
FileLock = WindowsFileLock
elif fcntl:
FileLock = UnixFileLock
else:
FileLock = SoftFileLock
if warnings is not None:
warnings.warn("only soft file lock is available")