HEX
Server: Apache
System: Linux sg241.singhost.net 2.6.32-896.16.1.lve1.4.51.el6.x86_64 #1 SMP Wed Jan 17 13:19:23 EST 2018 x86_64
User: honghock (909)
PHP: 8.0.30
Disabled: passthru,system,shell_exec,show_source,exec,popen,proc_open
Upload Files
File: //proc/self/root/usr/lib/python2.7/site-packages/salt/returners/carbon_return.py
# -*- coding: utf-8 -*-
'''
Take data from salt and "return" it into a carbon receiver

Add the following configuration to the minion configuration file:

.. code-block:: yaml

    carbon.host: <server ip address>
    carbon.port: 2003

Errors when trying to convert data to numbers may be ignored by setting
``carbon.skip_on_error`` to `True`:

.. code-block:: yaml

    carbon.skip_on_error: True

By default, data will be sent to carbon using the plaintext protocol. To use
the pickle protocol, set ``carbon.mode`` to ``pickle``:

.. code-block:: yaml

    carbon.mode: pickle

You can also specify the pattern used for the metric base path (except for virt modules metrics):
    carbon.metric_base_pattern: carbon.[minion_id].[module].[function]

These tokens can used :
    [module]: salt module
    [function]: salt function
    [minion_id]: minion id

Default is :
    carbon.metric_base_pattern: [module].[function].[minion_id]

Carbon settings may also be configured as:

.. code-block:: yaml

    carbon:
      host: <server IP or hostname>
      port: <carbon port>
      skip_on_error: True
      mode: (pickle|text)
      metric_base_pattern: <pattern> | [module].[function].[minion_id]

Alternative configuration values can be used by prefacing the configuration.
Any values not found in the alternative configuration will be pulled from
the default location:

.. code-block:: yaml

    alternative.carbon:
      host: <server IP or hostname>
      port: <carbon port>
      skip_on_error: True
      mode: (pickle|text)

To use the carbon returner, append '--return carbon' to the salt command.

.. code-block:: bash

    salt '*' test.ping --return carbon

To use the alternative configuration, append '--return_config alternative' to the salt command.

.. versionadded:: 2015.5.0

.. code-block:: bash

    salt '*' test.ping --return carbon --return_config alternative

To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.

.. versionadded:: 2016.3.0

.. code-block:: bash

    salt '*' test.ping --return carbon --return_kwargs '{"skip_on_error": False}'

'''

# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import collections
import logging
import socket
import struct
import time
from contextlib import contextmanager

# Import salt libs
import salt.utils.jid
import salt.returners

# Import 3rd-party libs
from salt.ext import six
from salt.ext.six.moves import cPickle, map  # pylint: disable=import-error,no-name-in-module,redefined-builtin

log = logging.getLogger(__name__)

# Define the module's virtual name
__virtualname__ = 'carbon'


def __virtual__():
    return __virtualname__


def _get_options(ret):
    '''
    Returns options used for the carbon returner.
    '''
    attrs = {'host': 'host',
             'port': 'port',
             'skip': 'skip_on_error',
             'mode': 'mode'}

    _options = salt.returners.get_returner_options(__virtualname__,
                                                   ret,
                                                   attrs,
                                                   __salt__=__salt__,
                                                   __opts__=__opts__)
    return _options


@contextmanager
def _carbon(host, port):
    '''
    Context manager to ensure the clean creation and destruction of a socket.

    host
        The IP or hostname of the carbon server
    port
        The port that carbon is listening on
    '''
    carbon_sock = None

    try:
        carbon_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM,
                                    socket.IPPROTO_TCP)

        carbon_sock.connect((host, port))
    except socket.error as err:
        log.error('Error connecting to %s:%s, %s', host, port, err)
        raise
    else:
        log.debug('Connected to carbon')
        yield carbon_sock
    finally:
        if carbon_sock is not None:
            # Shut down and close socket
            log.debug('Destroying carbon socket')

            carbon_sock.shutdown(socket.SHUT_RDWR)
            carbon_sock.close()


def _send_picklemetrics(metrics):
    '''
    Format metrics for the carbon pickle protocol
    '''

    metrics = [(metric_name, (timestamp, value))
               for (metric_name, value, timestamp) in metrics]

    data = cPickle.dumps(metrics, -1)
    payload = struct.pack(b'!L', len(data)) + data

    return payload


def _send_textmetrics(metrics):
    '''
    Format metrics for the carbon plaintext protocol
    '''

    data = [' '.join(map(six.text_type, metric)) for metric in metrics] + ['']

    return '\n'.join(data)


def _walk(path, value, metrics, timestamp, skip):
    '''
    Recursively include metrics from *value*.

    path
        The dot-separated path of the metric.
    value
        A dictionary or value from a dictionary. If a dictionary, ``_walk``
        will be called again with the each key/value pair as a new set of
        metrics.
    metrics
        The list of metrics that will be sent to carbon, formatted as::

            (path, value, timestamp)
    skip
        Whether or not to skip metrics when there's an error casting the value
        to a float. Defaults to `False`.
    '''
    log.trace(
        'Carbon return walking path: %s, value: %s, metrics: %s, '
        'timestamp: %s', path, value, metrics, timestamp
    )
    if isinstance(value, collections.Mapping):
        for key, val in six.iteritems(value):
            _walk('{0}.{1}'.format(path, key), val, metrics, timestamp, skip)
    elif isinstance(value, list):
        for item in value:
            _walk('{0}.{1}'.format(path, item), item, metrics, timestamp, skip)

    else:
        try:
            val = float(value)
            metrics.append((path, val, timestamp))
        except (TypeError, ValueError):
            msg = 'Error in carbon returner, when trying to convert metric: ' \
                  '{0}, with val: {1}'.format(path, value)
            if skip:
                log.debug(msg)
            else:
                log.info(msg)
                raise


def _send(saltdata, metric_base, opts):
    '''
    Send the data to carbon
    '''

    host = opts.get('host')
    port = opts.get('port')
    skip = opts.get('skip')
    metric_base_pattern = opts.get('carbon.metric_base_pattern')
    mode = opts.get('mode').lower() if 'mode' in opts else 'text'

    log.debug('Carbon minion configured with host: %s:%s', host, port)
    log.debug('Using carbon protocol: %s', mode)

    if not (host and port):
        log.error('Host or port not defined')
        return

    # TODO: possible to use time return from salt job to be slightly more precise?
    # convert the jid to unix timestamp?
    # {'fun': 'test.version', 'jid': '20130113193949451054', 'return': '0.11.0', 'id': 'salt'}
    timestamp = int(time.time())

    handler = _send_picklemetrics if mode == 'pickle' else _send_textmetrics
    metrics = []
    log.trace('Carbon returning walking data: %s', saltdata)
    _walk(metric_base, saltdata, metrics, timestamp, skip)
    data = handler(metrics)
    log.trace('Carbon inserting data: %s', data)

    with _carbon(host, port) as sock:
        total_sent_bytes = 0
        while total_sent_bytes < len(data):
            sent_bytes = sock.send(data[total_sent_bytes:])
            if sent_bytes == 0:
                log.error('Bytes sent 0, Connection reset?')
                return

            log.debug('Sent %s bytes to carbon', sent_bytes)
            total_sent_bytes += sent_bytes


def event_return(events):
    '''
    Return event data to remote carbon server

    Provide a list of events to be stored in carbon
    '''
    opts = _get_options({})  # Pass in empty ret, since this is a list of events
    opts['skip'] = True
    for event in events:
        log.trace('Carbon returner received event: %s', event)
        metric_base = event['tag']
        saltdata = event['data'].get('data')
        _send(saltdata, metric_base, opts)


def returner(ret):
    '''
    Return data to a remote carbon server using the text metric protocol

    Each metric will look like::

        [module].[function].[minion_id].[metric path [...]].[metric name]

    '''
    opts = _get_options(ret)
    metric_base = ret['fun']
    # Strip the hostname from the carbon base if we are returning from virt
    # module since then we will get stable metric bases even if the VM is
    # migrate from host to host
    if not metric_base.startswith('virt.'):
        metric_base += '.' + ret['id'].replace('.', '_')

    saltdata = ret['return']
    _send(saltdata, metric_base, opts)


def prep_jid(nocache=False, passed_jid=None):  # pylint: disable=unused-argument
    '''
    Do any work necessary to prepare a JID, including sending a custom id
    '''
    return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)