File: //proc/self/root/usr/lib/python2.7/site-packages/salt/returners/carbon_return.py
# -*- coding: utf-8 -*-
'''
Take data from salt and "return" it into a carbon receiver
Add the following configuration to the minion configuration file:
.. code-block:: yaml
carbon.host: <server ip address>
carbon.port: 2003
Errors when trying to convert data to numbers may be ignored by setting
``carbon.skip_on_error`` to `True`:
.. code-block:: yaml
carbon.skip_on_error: True
By default, data will be sent to carbon using the plaintext protocol. To use
the pickle protocol, set ``carbon.mode`` to ``pickle``:
.. code-block:: yaml
carbon.mode: pickle
You can also specify the pattern used for the metric base path (except for virt modules metrics):
carbon.metric_base_pattern: carbon.[minion_id].[module].[function]
These tokens can used :
[module]: salt module
[function]: salt function
[minion_id]: minion id
Default is :
carbon.metric_base_pattern: [module].[function].[minion_id]
Carbon settings may also be configured as:
.. code-block:: yaml
carbon:
host: <server IP or hostname>
port: <carbon port>
skip_on_error: True
mode: (pickle|text)
metric_base_pattern: <pattern> | [module].[function].[minion_id]
Alternative configuration values can be used by prefacing the configuration.
Any values not found in the alternative configuration will be pulled from
the default location:
.. code-block:: yaml
alternative.carbon:
host: <server IP or hostname>
port: <carbon port>
skip_on_error: True
mode: (pickle|text)
To use the carbon returner, append '--return carbon' to the salt command.
.. code-block:: bash
salt '*' test.ping --return carbon
To use the alternative configuration, append '--return_config alternative' to the salt command.
.. versionadded:: 2015.5.0
.. code-block:: bash
salt '*' test.ping --return carbon --return_config alternative
To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.
.. versionadded:: 2016.3.0
.. code-block:: bash
salt '*' test.ping --return carbon --return_kwargs '{"skip_on_error": False}'
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import collections
import logging
import socket
import struct
import time
from contextlib import contextmanager
# Import salt libs
import salt.utils.jid
import salt.returners
# Import 3rd-party libs
from salt.ext import six
from salt.ext.six.moves import cPickle, map # pylint: disable=import-error,no-name-in-module,redefined-builtin
log = logging.getLogger(__name__)
# Define the module's virtual name
__virtualname__ = 'carbon'
def __virtual__():
return __virtualname__
def _get_options(ret):
'''
Returns options used for the carbon returner.
'''
attrs = {'host': 'host',
'port': 'port',
'skip': 'skip_on_error',
'mode': 'mode'}
_options = salt.returners.get_returner_options(__virtualname__,
ret,
attrs,
__salt__=__salt__,
__opts__=__opts__)
return _options
@contextmanager
def _carbon(host, port):
'''
Context manager to ensure the clean creation and destruction of a socket.
host
The IP or hostname of the carbon server
port
The port that carbon is listening on
'''
carbon_sock = None
try:
carbon_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM,
socket.IPPROTO_TCP)
carbon_sock.connect((host, port))
except socket.error as err:
log.error('Error connecting to %s:%s, %s', host, port, err)
raise
else:
log.debug('Connected to carbon')
yield carbon_sock
finally:
if carbon_sock is not None:
# Shut down and close socket
log.debug('Destroying carbon socket')
carbon_sock.shutdown(socket.SHUT_RDWR)
carbon_sock.close()
def _send_picklemetrics(metrics):
'''
Format metrics for the carbon pickle protocol
'''
metrics = [(metric_name, (timestamp, value))
for (metric_name, value, timestamp) in metrics]
data = cPickle.dumps(metrics, -1)
payload = struct.pack(b'!L', len(data)) + data
return payload
def _send_textmetrics(metrics):
'''
Format metrics for the carbon plaintext protocol
'''
data = [' '.join(map(six.text_type, metric)) for metric in metrics] + ['']
return '\n'.join(data)
def _walk(path, value, metrics, timestamp, skip):
'''
Recursively include metrics from *value*.
path
The dot-separated path of the metric.
value
A dictionary or value from a dictionary. If a dictionary, ``_walk``
will be called again with the each key/value pair as a new set of
metrics.
metrics
The list of metrics that will be sent to carbon, formatted as::
(path, value, timestamp)
skip
Whether or not to skip metrics when there's an error casting the value
to a float. Defaults to `False`.
'''
log.trace(
'Carbon return walking path: %s, value: %s, metrics: %s, '
'timestamp: %s', path, value, metrics, timestamp
)
if isinstance(value, collections.Mapping):
for key, val in six.iteritems(value):
_walk('{0}.{1}'.format(path, key), val, metrics, timestamp, skip)
elif isinstance(value, list):
for item in value:
_walk('{0}.{1}'.format(path, item), item, metrics, timestamp, skip)
else:
try:
val = float(value)
metrics.append((path, val, timestamp))
except (TypeError, ValueError):
msg = 'Error in carbon returner, when trying to convert metric: ' \
'{0}, with val: {1}'.format(path, value)
if skip:
log.debug(msg)
else:
log.info(msg)
raise
def _send(saltdata, metric_base, opts):
'''
Send the data to carbon
'''
host = opts.get('host')
port = opts.get('port')
skip = opts.get('skip')
metric_base_pattern = opts.get('carbon.metric_base_pattern')
mode = opts.get('mode').lower() if 'mode' in opts else 'text'
log.debug('Carbon minion configured with host: %s:%s', host, port)
log.debug('Using carbon protocol: %s', mode)
if not (host and port):
log.error('Host or port not defined')
return
# TODO: possible to use time return from salt job to be slightly more precise?
# convert the jid to unix timestamp?
# {'fun': 'test.version', 'jid': '20130113193949451054', 'return': '0.11.0', 'id': 'salt'}
timestamp = int(time.time())
handler = _send_picklemetrics if mode == 'pickle' else _send_textmetrics
metrics = []
log.trace('Carbon returning walking data: %s', saltdata)
_walk(metric_base, saltdata, metrics, timestamp, skip)
data = handler(metrics)
log.trace('Carbon inserting data: %s', data)
with _carbon(host, port) as sock:
total_sent_bytes = 0
while total_sent_bytes < len(data):
sent_bytes = sock.send(data[total_sent_bytes:])
if sent_bytes == 0:
log.error('Bytes sent 0, Connection reset?')
return
log.debug('Sent %s bytes to carbon', sent_bytes)
total_sent_bytes += sent_bytes
def event_return(events):
'''
Return event data to remote carbon server
Provide a list of events to be stored in carbon
'''
opts = _get_options({}) # Pass in empty ret, since this is a list of events
opts['skip'] = True
for event in events:
log.trace('Carbon returner received event: %s', event)
metric_base = event['tag']
saltdata = event['data'].get('data')
_send(saltdata, metric_base, opts)
def returner(ret):
'''
Return data to a remote carbon server using the text metric protocol
Each metric will look like::
[module].[function].[minion_id].[metric path [...]].[metric name]
'''
opts = _get_options(ret)
metric_base = ret['fun']
# Strip the hostname from the carbon base if we are returning from virt
# module since then we will get stable metric bases even if the VM is
# migrate from host to host
if not metric_base.startswith('virt.'):
metric_base += '.' + ret['id'].replace('.', '_')
saltdata = ret['return']
_send(saltdata, metric_base, opts)
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''
return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)