HEX
Server: Apache
System: Linux sg241.singhost.net 2.6.32-896.16.1.lve1.4.51.el6.x86_64 #1 SMP Wed Jan 17 13:19:23 EST 2018 x86_64
User: honghock (909)
PHP: 8.0.30
Disabled: passthru,system,shell_exec,show_source,exec,popen,proc_open
Upload Files
File: //usr/lib/python2.7/site-packages/salt/returners/kafka_return.py
# -*- coding: utf-8 -*-

'''
Return data to a Kafka topic

:maintainer: Justin Desilets (justin.desilets@gmail.com)
:maturity: 20181119
:depends: confluent-kafka
:platform: all

To enable this returner install confluent-kafka and enable the following
settings in the minion config:

    returner.kafka.bootstrap:
      - "server1:9092"
      - "server2:9092"
      - "server3:9092"

    returner.kafka.topic: 'topic'

To use the kafka returner, append `--return kafka` to the Salt command, eg;

    salt '*' test.ping --return kafka

'''
from __future__ import absolute_import, print_function, unicode_literals
import logging
import salt.utils.json

# Import third-party libs
try:
    from confluent_kafka import Producer
    HAS_KAFKA = True
except ImportError:
    HAS_KAFKA = False

log = logging.getLogger(__name__)


__virtualname__ = 'kafka'


def __virtual__():
    if not HAS_KAFKA:
        return False, 'Could not import kafka returner; confluent-kafka is not installed.'
    return __virtualname__


def _get_conn():
    '''
    Return a kafka connection
    '''
    if __salt__['config.option']('returner.kafka.bootstrap'):
        bootstrap = ','.join(__salt__['config.option']('returner.kafka.bootstrap'))
    else:
        log.error('Unable to find kafka returner config option: bootstrap')
        return None
    return bootstrap


def _delivery_report(err, msg):
    ''' Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). '''
    if err is not None:
        log.error('Message delivery failed: %s', err)
    else:
        log.debug('Message delivered to %s [%s]', msg.topic(), msg.partition())


def returner(ret):
    '''
    Return information to a Kafka server
    '''
    if __salt__['config.option']('returner.kafka.topic'):
        topic = __salt__['config.option']('returner.kafka.topic')

        conn = _get_conn()
        producer = Producer({'bootstrap.servers': conn})
        producer.poll(0)
        producer.produce(topic, salt.utils.json.dumps(ret), str(ret).encode('utf-8'), callback=_delivery_report)

        producer.flush()
    else:
        log.error('Unable to find kafka returner config option: topic')