Start of work folding threading and forking together

This commit is contained in:
James Cammarata 2017-10-03 15:10:29 -05:00
parent ba0b91b51b
commit 847327eb9a
6 changed files with 356 additions and 102 deletions

View File

@ -56,6 +56,16 @@ ANSIBLE_NOCOWS:
- {key: nocows, section: defaults}
type: boolean
yaml: {key: display.i_am_no_fun}
ANSIBLE_PROCESS_MODEL:
name: Sets the Ansible process model
default: "forking"
description:
- "Ansible supports two process models: forking (default) and threading."
- "The threading model is currently a technology preview."
env: [{name: ANSIBLE_PROCESS_MODEL}]
ini:
- {key: process_model, section: defaults}
yaml: {key: defaults.process_model}
ANSIBLE_PIPELINING:
name: Connection pipelining
default: False

View File

@ -0,0 +1,116 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import sys
import time
import traceback
from jinja2.exceptions import TemplateNotFound
from ansible.errors import AnsibleConnectionFailure
from ansible.executor.task_executor import TaskExecutor
from ansible.executor.task_result import TaskResult
from ansible.module_utils._text import to_text
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
__all__ = ['WorkerProcess']
def run_worker(tqm, shared_loader_obj):
'''
The worker thread class, which uses TaskExecutor to run tasks
read from a job queue and pushes results into a results queue
for reading later.
'''
# import cProfile, pstats, StringIO
# pr = cProfile.Profile()
# pr.enable()
display.debug("STARTING WORKER")
while not tqm._terminated:
job = tqm.get_job()
if job is None:
time.sleep(0.0001)
continue
display.debug("WORKER GOT A JOB")
(host, task, play_context, task_vars) = job
try:
# execute the task and build a TaskResult from the result
display.debug("running TaskExecutor() for %s/%s" % (host, task))
executor_result = TaskExecutor(
host,
task,
task_vars,
play_context,
None, #new_stdin
tqm._loader,
shared_loader_obj,
tqm, #rslt_q
).run()
display.debug("done running TaskExecutor() for %s/%s" % (host, task))
# put the result on the result queue
display.debug("sending task result")
tqm.put_result(TaskResult(
host,
task,
executor_result,
))
display.debug("done task result")
except AnsibleConnectionFailure:
tqm.put_result(TaskResult(
host,
task,
dict(unreachable=True),
))
except Exception as e:
if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
try:
tqm.put_result(TaskResult(
host,
task,
dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''),
))
except:
display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc()))
# pr.disable()
# s = StringIO.StringIO()
# sortby = 'time'
# ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
# ps.print_stats()
# with open('worker_%06d.stats' % os.getpid(), 'w') as f:
# f.write(s.getvalue())
display.debug("WORKER PROCESS EXITING")

View File

@ -0,0 +1,35 @@
# (c) 2017, Red Hat, Inc. <support@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
from ansible.plugins.loader import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
class SharedPluginLoaderObj:
'''
A simple object to make pass the various plugin loaders to
the forked processes over the queue easier
'''
def __init__(self):
self.action_loader = action_loader
self.connection_loader = connection_loader
self.filter_loader = filter_loader
self.lookup_loader = lookup_loader
self.module_loader = module_loader
self.test_loader = test_loader

View File

@ -22,12 +22,20 @@ __metaclass__ = type
import multiprocessing
import os
import tempfile
import threading
import time
from collections import deque
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.process.threading import run_worker
from ansible.executor.shared_plugin_loader import SharedPluginLoaderObj
from ansible.executor.stats import AggregateStats
from ansible.module_utils.six import string_types
from ansible.module_utils.six.moves import queue as Queue
from ansible.module_utils._text import to_text
from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext
@ -47,6 +55,26 @@ except ImportError:
__all__ = ['TaskQueueManager']
class ResultsSentinel:
pass
_sentinel = ResultsSentinel()
def results_thread_main(tqm):
while True:
try:
result = tqm._final_q.get()
if isinstance(result, ResultsSentinel):
break
else:
tqm._results_lock.acquire()
tqm._res_queue.append(result)
tqm._results_lock.release()
except (IOError, EOFError):
break
except Queue.Empty:
pass
class TaskQueueManager:
@ -100,18 +128,72 @@ class TaskQueueManager:
self._failed_hosts = dict()
self._unreachable_hosts = dict()
self._final_q = multiprocessing.Queue()
# the final results queue we'll use for both forking and threading
self._res_queue = deque()
self._res_queue_lock = threading.Lock()
self._res_ready = threading.Event()
# A temporary file (opened pre-fork) used by connection
# plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile()
def _initialize_processes(self, num):
if C.ANSIBLE_PROCESS_MODEL == 'forking':
self._final_q = multiprocessing.Queue()
self._job_queue = None
self._job_queue_lock = None
self.put_job = self._forked_put_job
self.get_job = self._forked_get_job
self.put_result = self._forked_put_result
self.get_result = self._forked_get_result
self._cleanup_processes = self._cleanup_forked_processes
self._initialize_forked_processes(num)
# create the result processing thread for reading results in the background
self._results_lock = threading.Condition(threading.Lock())
self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
self._results_thread.daemon = True
self._results_thread.start()
elif C.ANSIBLE_PROCESS_MODEL == 'threading':
self._job_queue = deque()
self._job_queue_lock = threading.Lock()
self.put_job = self._threaded_put_job
self.get_job = self._threaded_get_job
self.put_result = self._threaded_put_result
self.get_result = self._threaded_get_result
self._cleanup_processes = self._cleanup_threaded_processes
self._initialize_threaded_processes(num)
else:
self._cleanup_processes = self._cleanup_dummy
raise AnsibleError(
'Invalid process model specified: "%s". ' \
'The process model must be set to either "forking" or "threading"'
)
def _initialize_forked_processes(self, num):
self._workers = []
self._cur_worker = 0
for i in range(num):
self._workers.append([None, None])
def _initialize_threaded_processes(self, num):
# FIXME: do we need a global lock for workers here instead of a per-worker?
self._workers = []
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
for i in range(num):
rslt_q = multiprocessing.Queue()
self._workers.append([None, rslt_q])
w_thread = threading.Thread(target=run_worker, args=(self, shared_loader_obj))
w_thread.start()
w_lock = threading.Lock()
self._workers.append([w_thread, w_lock])
def _initialize_notified_handlers(self, play):
'''
@ -305,26 +387,36 @@ class TaskQueueManager:
for host_name in iterator.get_failed_hosts():
self._failed_hosts[host_name] = True
strategy.cleanup()
self._cleanup_processes()
return play_return
def cleanup(self):
display.debug("RUNNING CLEANUP")
self.terminate()
self._final_q.close()
if hasattr(self, '_final_q'):
self._final_q.put(_sentinel)
self._results_thread.join()
self._final_q.close()
self._cleanup_processes()
def _cleanup_processes(self):
def _cleanup_dummy(self):
return
def _cleanup_forked_processes(self):
if hasattr(self, '_workers'):
for (worker_prc, rslt_q) in self._workers:
rslt_q.close()
for (worker_prc, _) in self._workers:
if worker_prc and worker_prc.is_alive():
try:
worker_prc.terminate()
except AttributeError:
pass
def _cleanup_threaded_processes(self):
if hasattr(self, '_workers'):
for (w_thread, w_lock) in self._workers:
if w_thread and not w_thread.is_alive():
w_thread.join()
def clear_failed_hosts(self):
self._failed_hosts = dict()
@ -338,7 +430,7 @@ class TaskQueueManager:
return self._loader
def get_workers(self):
return self._workers[:]
return self._workers
def terminate(self):
self._terminated = True
@ -380,3 +472,78 @@ class TaskQueueManager:
from traceback import format_tb
from sys import exc_info
display.vvv('Callback Exception: \n' + ' '.join(format_tb(exc_info()[2])))
# helpers for forking
def _forked_put_job(self, data):
try:
(host, task, play_context, task_vars) = data
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
queued = False
starting_worker = self._cur_worker
while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive():
worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
if queued:
break
elif self._cur_worker == starting_worker:
time.sleep(0.0001)
return True
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
display.debug("got an error while queuing: %s" % e)
return False
def _forked_get_job(self):
pass
def _forked_put_result(self):
pass
def _forked_get_result(self):
return self._pop_off_queue(self._res_queue, self._res_queue_lock)
# helpers for threading
def _put_in_queue(self, data, queue, lock):
lock.acquire()
queue.appendleft(data)
lock.release()
def _pop_off_queue(self, queue, lock):
try:
data = None
lock.acquire()
data = queue.pop()
except:
pass
finally:
lock.release()
return data
def _threaded_put_job(self, data):
self._put_in_queue(data, self._job_queue, self._job_queue_lock)
return True
def _threaded_get_job(self):
return self._pop_off_queue(self._job_queue, self._job_queue_lock)
def _threaded_put_result(self, data):
self._put_in_queue(data, self._res_queue, self._res_queue_lock)
self._res_ready.set()
return True
def _threaded_get_result(self):
return self._pop_off_queue(self._res_queue, self._res_queue_lock)

View File

@ -15,6 +15,7 @@ import sys
import warnings
from collections import defaultdict
from threading import Lock
from ansible import constants as C
from ansible.plugins import get_plugin_class, MODULE_CACHE, PATH_CACHE, PLUGIN_PATH_CACHE
@ -73,6 +74,7 @@ class PluginLoader:
self._extra_dirs = []
self._searched_paths = set()
self._lock = Lock()
def __setstate__(self, data):
'''
@ -360,11 +362,14 @@ class PluginLoader:
if path is None:
return None
self._lock.acquire()
if path not in self._module_cache:
self._module_cache[path] = self._load_module_source(name, path)
found_in_cache = False
obj = getattr(self._module_cache[path], self.class_name)
self._lock.release()
if self.base_class:
# The import path is hardcoded and should be the right place,
# so we are not expecting an ImportError.
@ -423,15 +428,18 @@ class PluginLoader:
yield path
continue
if path not in self._module_cache:
self._module_cache[path] = self._load_module_source(name, path)
found_in_cache = False
try:
self._lock.acquire()
if path not in self._module_cache:
self._module_cache[path] = self._load_module_source(name, path)
found_in_cache = False
obj = getattr(self._module_cache[path], self.class_name)
except AttributeError as e:
display.warning("Skipping plugin (%s) as it seems to be invalid: %s" % (path, to_text(e)))
continue
finally:
self._lock.release()
if self.base_class:
# The import path is hardcoded and should be the right place,

View File

@ -30,17 +30,15 @@ from jinja2.exceptions import UndefinedError
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor import action_write_locks
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host
from ansible.module_utils.six.moves import queue as Queue
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils._text import to_text
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.included_file import IncludedFile
from ansible.playbook.task_include import TaskInclude
from ansible.playbook.role_include import IncludeRole
from ansible.plugins.loader import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
from ansible.plugins.loader import action_loader, connection_loader
from ansible.template import Templar
from ansible.utils.vars import combine_vars
from ansible.vars.manager import strip_internal_keys
@ -55,50 +53,11 @@ except ImportError:
__all__ = ['StrategyBase']
class StrategySentinel:
pass
# TODO: this should probably be in the plugins/__init__.py, with
# a smarter mechanism to set all of the attributes based on
# the loaders created there
class SharedPluginLoaderObj:
'''
A simple object to make pass the various plugin loaders to
the forked processes over the queue easier
'''
def __init__(self):
self.action_loader = action_loader
self.connection_loader = connection_loader
self.filter_loader = filter_loader
self.test_loader = test_loader
self.lookup_loader = lookup_loader
self.module_loader = module_loader
_sentinel = StrategySentinel()
def results_thread_main(strategy):
while True:
try:
result = strategy._final_q.get()
if isinstance(result, StrategySentinel):
break
else:
strategy._results_lock.acquire()
strategy._results.append(result)
strategy._results_lock.release()
except (IOError, EOFError):
break
except Queue.Empty:
pass
class StrategyBase:
'''
This is the base class for strategy plugins, which contains some common
code useful to all strategies like running handlers, cleanup actions, etc.
code useful to all strategies like running handlers, etc.
'''
def __init__(self, tqm):
@ -109,7 +68,6 @@ class StrategyBase:
self._listening_handlers = tqm._listening_handlers
self._variable_manager = tqm.get_variable_manager()
self._loader = tqm.get_loader()
self._final_q = tqm._final_q
self._step = getattr(tqm._options, 'step', False)
self._diff = getattr(tqm._options, 'diff', False)
@ -118,24 +76,13 @@ class StrategyBase:
# internal counters
self._pending_results = 0
self._cur_worker = 0
# this dictionary is used to keep track of hosts that have
# outstanding tasks still in queue
self._blocked_hosts = dict()
self._results = deque()
self._results_lock = threading.Condition(threading.Lock())
# create the result processing thread for reading results in the background
self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
self._results_thread.daemon = True
self._results_thread.start()
def cleanup(self):
self._final_q.put(_sentinel)
self._results_thread.join()
def run(self, iterator, play_context, result=0):
# execute one more pass through the iterator without peeking, to
# make sure that all of the hosts are advanced to their final task.
@ -203,38 +150,13 @@ class StrategyBase:
if task.action not in action_write_locks.action_write_locks:
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = Lock()
# and then queue the new task
try:
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
queued = False
starting_worker = self._cur_worker
while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive():
worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
if queued:
break
elif self._cur_worker == starting_worker:
time.sleep(0.0001)
action_write_locks.action_write_locks[task.action] = threading.Lock()
if self._tqm.put_job((host, task, play_context, task_vars)):
self._pending_results += 1
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
display.debug("got an error while queuing: %s" % e)
return
else:
raise AnsibleError('Could not put the job')
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
def get_task_hosts(self, iterator, task_host, task):
@ -325,13 +247,9 @@ class StrategyBase:
cur_pass = 0
while True:
try:
self._results_lock.acquire()
task_result = self._results.popleft()
except IndexError:
task_result = self._tqm.get_result()
if task_result is None:
break
finally:
self._results_lock.release()
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
original_host = get_original_host(task_result._host)