File: //proc/self/root/usr/lib/python2.7/site-packages/salt/utils/reactor.py
# -*- coding: utf-8 -*-
'''
Functions which implement running reactor jobs
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import fnmatch
import glob
import logging
# Import salt libs
import salt.client
import salt.runner
import salt.state
import salt.utils.args
import salt.utils.cache
import salt.utils.data
import salt.utils.event
import salt.utils.files
import salt.utils.process
import salt.utils.yaml
import salt.wheel
import salt.defaults.exitcodes
# Import 3rd-party libs
from salt.ext import six
log = logging.getLogger(__name__)
REACTOR_INTERNAL_KEYWORDS = frozenset([
'__id__',
'__sls__',
'name',
'order',
'fun',
'state',
])
class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
'''
Read in the reactor configuration variable and compare it to events
processed on the master.
The reactor has the capability to execute pre-programmed executions
as reactions to events
'''
aliases = {
'cmd': 'local',
}
def __init__(self, opts, **kwargs):
super(Reactor, self).__init__(**kwargs)
local_minion_opts = opts.copy()
local_minion_opts['file_client'] = 'local'
self.minion = salt.minion.MasterMinion(local_minion_opts)
salt.state.Compiler.__init__(self, opts, self.minion.rend)
# We need __setstate__ and __getstate__ to avoid pickling errors since
# 'self.rend' (from salt.state.Compiler) contains a function reference
# which is not picklable.
# These methods are only used when pickling so will not be used on
# non-Windows platforms.
def __setstate__(self, state):
Reactor.__init__(
self, state['opts'],
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
)
def __getstate__(self):
return {
'opts': self.opts,
'log_queue': self.log_queue,
'log_queue_level': self.log_queue_level
}
def render_reaction(self, glob_ref, tag, data):
'''
Execute the render system against a single reaction file and return
the data structure
'''
react = {}
if glob_ref.startswith('salt://'):
glob_ref = self.minion.functions['cp.cache_file'](glob_ref) or ''
globbed_ref = glob.glob(glob_ref)
if not globbed_ref:
log.error('Can not render SLS %s for tag %s. File missing or not found.', glob_ref, tag)
for fn_ in globbed_ref:
try:
res = self.render_template(
fn_,
tag=tag,
data=data)
# for #20841, inject the sls name here since verify_high()
# assumes it exists in case there are any errors
for name in res:
res[name]['__sls__'] = fn_
react.update(res)
except Exception: # pylint: disable=broad-except
log.exception('Failed to render "%s": ', fn_)
return react
def list_reactors(self, tag):
'''
Take in the tag from an event and return a list of the reactors to
process
'''
log.debug('Gathering reactors for tag %s', tag)
reactors = []
if isinstance(self.opts['reactor'], six.string_types):
try:
with salt.utils.files.fopen(self.opts['reactor']) as fp_:
react_map = salt.utils.yaml.safe_load(fp_)
except (OSError, IOError):
log.error('Failed to read reactor map: "%s"', self.opts['reactor'])
except Exception: # pylint: disable=broad-except
log.error('Failed to parse YAML in reactor map: "%s"', self.opts['reactor'])
else:
react_map = self.opts['reactor']
for ropt in react_map:
if not isinstance(ropt, dict):
continue
if len(ropt) != 1:
continue
key = next(six.iterkeys(ropt))
val = ropt[key]
if fnmatch.fnmatch(tag, key):
if isinstance(val, six.string_types):
reactors.append(val)
elif isinstance(val, list):
reactors.extend(val)
return reactors
def list_all(self):
'''
Return a list of the reactors
'''
if isinstance(self.minion.opts['reactor'], six.string_types):
log.debug('Reading reactors from yaml %s', self.opts['reactor'])
try:
with salt.utils.files.fopen(self.opts['reactor']) as fp_:
react_map = salt.utils.yaml.safe_load(fp_)
except (OSError, IOError):
log.error('Failed to read reactor map: "%s"', self.opts['reactor'])
except Exception: # pylint: disable=broad-except
log.error(
'Failed to parse YAML in reactor map: "%s"',
self.opts['reactor']
)
else:
log.debug('Not reading reactors from yaml')
react_map = self.minion.opts['reactor']
return react_map
def add_reactor(self, tag, reaction):
'''
Add a reactor
'''
reactors = self.list_all()
for reactor in reactors:
_tag = next(six.iterkeys(reactor))
if _tag == tag:
return {'status': False, 'comment': 'Reactor already exists.'}
self.minion.opts['reactor'].append({tag: reaction})
return {'status': True, 'comment': 'Reactor added.'}
def delete_reactor(self, tag):
'''
Delete a reactor
'''
reactors = self.list_all()
for reactor in reactors:
_tag = next(six.iterkeys(reactor))
if _tag == tag:
self.minion.opts['reactor'].remove(reactor)
return {'status': True, 'comment': 'Reactor deleted.'}
return {'status': False, 'comment': 'Reactor does not exists.'}
def resolve_aliases(self, chunks):
'''
Preserve backward compatibility by rewriting the 'state' key in the low
chunks if it is using a legacy type.
'''
for idx, _ in enumerate(chunks):
new_state = self.aliases.get(chunks[idx]['state'])
if new_state is not None:
chunks[idx]['state'] = new_state
def reactions(self, tag, data, reactors):
'''
Render a list of reactor files and returns a reaction struct
'''
log.debug('Compiling reactions for tag %s', tag)
high = {}
chunks = []
try:
for fn_ in reactors:
high.update(self.render_reaction(fn_, tag, data))
if high:
errors = self.verify_high(high)
if errors:
log.error(
'Unable to render reactions for event %s due to '
'errors (%s) in one or more of the sls files (%s)',
tag, errors, reactors
)
return [] # We'll return nothing since there was an error
chunks = self.order_chunks(self.compile_high_data(high))
except Exception as exc: # pylint: disable=broad-except
log.exception('Exception encountered while compiling reactions')
self.resolve_aliases(chunks)
return chunks
def call_reactions(self, chunks):
'''
Execute the reaction state
'''
for chunk in chunks:
self.wrap.run(chunk)
def run(self):
'''
Enter into the server loop
'''
salt.utils.process.appendproctitle(self.__class__.__name__)
# instantiate some classes inside our new process
with salt.utils.event.get_event(
self.opts['__role'],
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=True) as event:
self.wrap = ReactWrap(self.opts)
for data in event.iter_events(full=True):
# skip all events fired by ourselves
if data['data'].get('user') == self.wrap.event_user:
continue
if data['tag'].endswith('salt/reactors/manage/add'):
_data = data['data']
res = self.add_reactor(_data['event'], _data['reactors'])
event.fire_event({'reactors': self.list_all(),
'result': res},
'salt/reactors/manage/add-complete')
elif data['tag'].endswith('salt/reactors/manage/delete'):
_data = data['data']
res = self.delete_reactor(_data['event'])
event.fire_event({'reactors': self.list_all(),
'result': res},
'salt/reactors/manage/delete-complete')
elif data['tag'].endswith('salt/reactors/manage/list'):
event.fire_event({'reactors': self.list_all()},
'salt/reactors/manage/list-results')
else:
reactors = self.list_reactors(data['tag'])
if not reactors:
continue
chunks = self.reactions(data['tag'], data['data'], reactors)
if chunks:
try:
self.call_reactions(chunks)
except SystemExit:
log.warning('Exit ignored by reactor')
class ReactWrap(object):
'''
Wrapper that executes low data for the Reactor System
'''
# class-wide cache of clients
client_cache = None
event_user = 'Reactor'
reaction_class = {
'local': salt.client.LocalClient,
'runner': salt.runner.RunnerClient,
'wheel': salt.wheel.Wheel,
'caller': salt.client.Caller,
}
def __init__(self, opts):
self.opts = opts
if ReactWrap.client_cache is None:
ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval'])
self.pool = salt.utils.process.ThreadPool(
self.opts['reactor_worker_threads'], # number of workers for runner/wheel
queue_size=self.opts['reactor_worker_hwm'] # queue size for those workers
)
def populate_client_cache(self, low):
'''
Populate the client cache with an instance of the specified type
'''
reaction_type = low['state']
if reaction_type not in self.client_cache:
log.debug('Reactor is populating %s client cache', reaction_type)
if reaction_type in ('runner', 'wheel'):
# Reaction types that run locally on the master want the full
# opts passed.
self.client_cache[reaction_type] = \
self.reaction_class[reaction_type](self.opts)
# The len() function will cause the module functions to load if
# they aren't already loaded. We want to load them so that the
# spawned threads don't need to load them. Loading in the
# spawned threads creates race conditions such as sometimes not
# finding the required function because another thread is in
# the middle of loading the functions.
len(self.client_cache[reaction_type].functions)
else:
# Reactions which use remote pubs only need the conf file when
# instantiating a client instance.
self.client_cache[reaction_type] = \
self.reaction_class[reaction_type](self.opts['conf_file'])
def run(self, low):
'''
Execute a reaction by invoking the proper wrapper func
'''
self.populate_client_cache(low)
try:
l_fun = getattr(self, low['state'])
except AttributeError:
log.error(
'ReactWrap is missing a wrapper function for \'%s\'',
low['state']
)
try:
wrap_call = salt.utils.args.format_call(l_fun, low)
args = wrap_call.get('args', ())
kwargs = wrap_call.get('kwargs', {})
# TODO: Setting user doesn't seem to work for actual remote pubs
if low['state'] in ('runner', 'wheel'):
# Update called function's low data with event user to
# segregate events fired by reactor and avoid reaction loops
kwargs['__user__'] = self.event_user
# Replace ``state`` kwarg which comes from high data compiler.
# It breaks some runner functions and seems unnecessary.
kwargs['__state__'] = kwargs.pop('state')
# NOTE: if any additional keys are added here, they will also
# need to be added to filter_kwargs()
if 'args' in kwargs:
# New configuration
reactor_args = kwargs.pop('args')
for item in ('arg', 'kwarg'):
if item in low:
log.warning(
'Reactor \'%s\' is ignoring \'%s\' param %s due to '
'presence of \'args\' param. Check the Reactor System '
'documentation for the correct argument format.',
low['__id__'], item, low[item]
)
if low['state'] == 'caller' \
and isinstance(reactor_args, list) \
and not salt.utils.data.is_dictlist(reactor_args):
# Legacy 'caller' reactors were already using the 'args'
# param, but only supported a list of positional arguments.
# If low['args'] is a list but is *not* a dictlist, then
# this is actually using the legacy configuration. So, put
# the reactor args into kwarg['arg'] so that the wrapper
# interprets them as positional args.
kwargs['arg'] = reactor_args
kwargs['kwarg'] = {}
else:
kwargs['arg'] = ()
kwargs['kwarg'] = reactor_args
if not isinstance(kwargs['kwarg'], dict):
kwargs['kwarg'] = salt.utils.data.repack_dictlist(kwargs['kwarg'])
if not kwargs['kwarg']:
log.error(
'Reactor \'%s\' failed to execute %s \'%s\': '
'Incorrect argument format, check the Reactor System '
'documentation for the correct format.',
low['__id__'], low['state'], low['fun']
)
return
else:
# Legacy configuration
react_call = {}
if low['state'] in ('runner', 'wheel'):
if 'arg' not in kwargs or 'kwarg' not in kwargs:
# Runner/wheel execute on the master, so we can use
# format_call to get the functions args/kwargs
react_fun = self.client_cache[low['state']].functions.get(low['fun'])
if react_fun is None:
log.error(
'Reactor \'%s\' failed to execute %s \'%s\': '
'function not available',
low['__id__'], low['state'], low['fun']
)
return
react_call = salt.utils.args.format_call(
react_fun,
low,
expected_extra_kws=REACTOR_INTERNAL_KEYWORDS
)
if 'arg' not in kwargs:
kwargs['arg'] = react_call.get('args', ())
if 'kwarg' not in kwargs:
kwargs['kwarg'] = react_call.get('kwargs', {})
# Execute the wrapper with the proper args/kwargs. kwargs['arg']
# and kwargs['kwarg'] contain the positional and keyword arguments
# that will be passed to the client interface to execute the
# desired runner/wheel/remote-exec/etc. function.
l_fun(*args, **kwargs)
except SystemExit:
log.warning(
'Reactor \'%s\' attempted to exit. Ignored.', low['__id__']
)
except Exception: # pylint: disable=broad-except
log.error(
'Reactor \'%s\' failed to execute %s \'%s\'',
low['__id__'], low['state'], low['fun'], exc_info=True
)
def runner(self, fun, **kwargs):
'''
Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
'''
self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))
def wheel(self, fun, **kwargs):
'''
Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
'''
self.pool.fire_async(self.client_cache['wheel'].low, args=(fun, kwargs))
def local(self, fun, tgt, **kwargs):
'''
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
'''
self.client_cache['local'].cmd_async(tgt, fun, **kwargs)
def caller(self, fun, **kwargs):
'''
Wrap LocalCaller to execute remote exec functions locally on the Minion
'''
self.client_cache['caller'].cmd(fun, *kwargs['arg'], **kwargs['kwarg'])