HEX
Server: Apache
System: Linux sg241.singhost.net 2.6.32-896.16.1.lve1.4.51.el6.x86_64 #1 SMP Wed Jan 17 13:19:23 EST 2018 x86_64
User: honghock (909)
PHP: 8.0.30
Disabled: passthru,system,shell_exec,show_source,exec,popen,proc_open
Upload Files
File: //usr/lib/python2.7/site-packages/salt/returners/postgres.py
# -*- coding: utf-8 -*-
'''
Return data to a postgresql server

.. note::
    There are three PostgreSQL returners.  Any can function as an external
    :ref:`master job cache <external-job-cache>`. but each has different
    features.  SaltStack recommends
    :mod:`returners.pgjsonb <salt.returners.pgjsonb>` if you are working with
    a version of PostgreSQL that has the appropriate native binary JSON types.
    Otherwise, review
    :mod:`returners.postgres <salt.returners.postgres>` and
    :mod:`returners.postgres_local_cache <salt.returners.postgres_local_cache>`
    to see which module best suits your particular needs.

:maintainer:    None
:maturity:      New
:depends:       psycopg2
:platform:      all

To enable this returner the minion will need the psycopg2 installed and
the following values configured in the minion or master config:

.. code-block:: yaml

    returner.postgres.host: 'salt'
    returner.postgres.user: 'salt'
    returner.postgres.passwd: 'salt'
    returner.postgres.db: 'salt'
    returner.postgres.port: 5432

Alternative configuration values can be used by prefacing the configuration.
Any values not found in the alternative configuration will be pulled from
the default location:

.. code-block:: yaml

    alternative.returner.postgres.host: 'salt'
    alternative.returner.postgres.user: 'salt'
    alternative.returner.postgres.passwd: 'salt'
    alternative.returner.postgres.db: 'salt'
    alternative.returner.postgres.port: 5432

Running the following commands as the postgres user should create the database
correctly:

.. code-block:: sql

    psql << EOF
    CREATE ROLE salt WITH PASSWORD 'salt';
    CREATE DATABASE salt WITH OWNER salt;
    EOF

    psql -h localhost -U salt << EOF
    --
    -- Table structure for table 'jids'
    --

    DROP TABLE IF EXISTS jids;
    CREATE TABLE jids (
      jid   varchar(20) PRIMARY KEY,
      load  text NOT NULL
    );

    --
    -- Table structure for table 'salt_returns'
    --

    DROP TABLE IF EXISTS salt_returns;
    CREATE TABLE salt_returns (
      fun       varchar(50) NOT NULL,
      jid       varchar(255) NOT NULL,
      return    text NOT NULL,
      full_ret  text,
      id        varchar(255) NOT NULL,
      success   varchar(10) NOT NULL,
      alter_time   TIMESTAMP WITH TIME ZONE DEFAULT now()
    );

    CREATE INDEX idx_salt_returns_id ON salt_returns (id);
    CREATE INDEX idx_salt_returns_jid ON salt_returns (jid);
    CREATE INDEX idx_salt_returns_fun ON salt_returns (fun);
    CREATE INDEX idx_salt_returns_updated ON salt_returns (alter_time);

    --
    -- Table structure for table `salt_events`
    --

    DROP TABLE IF EXISTS salt_events;
    DROP SEQUENCE IF EXISTS seq_salt_events_id;
    CREATE SEQUENCE seq_salt_events_id;
    CREATE TABLE salt_events (
        id BIGINT NOT NULL UNIQUE DEFAULT nextval('seq_salt_events_id'),
        tag varchar(255) NOT NULL,
        data text NOT NULL,
        alter_time TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
        master_id varchar(255) NOT NULL
    );

    CREATE INDEX idx_salt_events_tag on salt_events (tag);

    EOF

Required python modules: psycopg2

To use the postgres returner, append '--return postgres' to the salt command.

.. code-block:: bash

    salt '*' test.ping --return postgres

To use the alternative configuration, append '--return_config alternative' to the salt command.

.. versionadded:: 2015.5.0

.. code-block:: bash

    salt '*' test.ping --return postgres --return_config alternative

To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.

.. versionadded:: 2016.3.0

.. code-block:: bash

    salt '*' test.ping --return postgres --return_kwargs '{"db": "another-salt"}'

'''
from __future__ import absolute_import, print_function, unicode_literals

# Import python libs
import sys
import logging
from contextlib import contextmanager

# Import Salt libs
import salt.utils.jid
import salt.utils.json
import salt.returners
import salt.exceptions

# Import third party libs
from salt.ext import six
try:
    import psycopg2
    HAS_POSTGRES = True
except ImportError:
    HAS_POSTGRES = False

__virtualname__ = 'postgres'

log = logging.getLogger(__name__)


def __virtual__():
    if not HAS_POSTGRES:
        return False, 'Could not import postgres returner; psycopg2 is not installed.'
    return __virtualname__


def _get_options(ret=None):
    '''
    Get the postgres options from salt.
    '''
    defaults = {'host': 'localhost',
                'user': 'salt',
                'passwd': 'salt',
                'db': 'salt',
                'port': 5432}

    attrs = {'host': 'host',
             'user': 'user',
             'passwd': 'passwd',
             'db': 'db',
             'port': 'port'}

    _options = salt.returners.get_returner_options('returner.{0}'.format(__virtualname__),
                                                   ret,
                                                   attrs,
                                                   __salt__=__salt__,
                                                   __opts__=__opts__,
                                                   defaults=defaults)
    # Ensure port is an int
    if 'port' in _options:
        _options['port'] = int(_options['port'])
    return _options


@contextmanager
def _get_serv(ret=None, commit=False):
    '''
    Return a Pg cursor
    '''
    _options = _get_options(ret)
    try:
        conn = psycopg2.connect(host=_options.get('host'),
                                user=_options.get('user'),
                                password=_options.get('passwd'),
                                database=_options.get('db'),
                                port=_options.get('port'))

    except psycopg2.OperationalError as exc:
        raise salt.exceptions.SaltMasterError('postgres returner could not connect to database: {exc}'.format(exc=exc))

    cursor = conn.cursor()

    try:
        yield cursor
    except psycopg2.DatabaseError as err:
        error = err.args
        sys.stderr.write(six.text_type(error))
        cursor.execute("ROLLBACK")
        six.reraise(*sys.exc_info())
    else:
        if commit:
            cursor.execute("COMMIT")
        else:
            cursor.execute("ROLLBACK")
    finally:
        conn.close()


def returner(ret):
    '''
    Return data to a postgres server
    '''
    try:
        with _get_serv(ret, commit=True) as cur:
            sql = '''INSERT INTO salt_returns
                    (fun, jid, return, id, success, full_ret)
                    VALUES (%s, %s, %s, %s, %s, %s)'''
            cur.execute(
                sql, (
                    ret['fun'],
                    ret['jid'],
                    salt.utils.json.dumps(ret['return']),
                    ret['id'],
                    ret.get('success', False),
                    salt.utils.json.dumps(ret)))
    except salt.exceptions.SaltMasterError:
        log.critical('Could not store return with postgres returner. PostgreSQL server unavailable.')


def event_return(events):
    '''
    Return event to Pg server

    Requires that configuration be enabled via 'event_return'
    option in master config.
    '''
    with _get_serv(events, commit=True) as cur:
        for event in events:
            tag = event.get('tag', '')
            data = event.get('data', '')
            sql = '''INSERT INTO salt_events (tag, data, master_id)
                     VALUES (%s, %s, %s)'''
            cur.execute(sql, (tag,
                              salt.utils.json.dumps(data),
                              __opts__['id']))


def save_load(jid, load, minions=None):  # pylint: disable=unused-argument
    '''
    Save the load to the specified jid id
    '''
    with _get_serv(commit=True) as cur:

        sql = '''INSERT INTO jids
               (jid, load)
                VALUES (%s, %s)'''

        try:
            cur.execute(sql, (jid,
                              salt.utils.json.dumps(load)))
        except psycopg2.IntegrityError:
            # https://github.com/saltstack/salt/issues/22171
            # Without this try/except we get tons of duplicate entry errors
            # which result in job returns not being stored properly
            pass


def save_minions(jid, minions, syndic_id=None):  # pylint: disable=unused-argument
    '''
    Included for API consistency
    '''


def get_load(jid):
    '''
    Return the load data that marks a specified jid
    '''
    with _get_serv(ret=None, commit=True) as cur:
        sql = '''SELECT load FROM jids WHERE jid = %s;'''
        cur.execute(sql, (jid,))
        data = cur.fetchone()
        if data:
            return salt.utils.json.loads(data[0])
        return {}


def get_jid(jid):
    '''
    Return the information returned when the specified job id was executed
    '''
    with _get_serv(ret=None, commit=True) as cur:

        sql = '''SELECT id, full_ret FROM salt_returns
                WHERE jid = %s'''

        cur.execute(sql, (jid,))
        data = cur.fetchall()
        ret = {}
        if data:
            for minion, full_ret in data:
                ret[minion] = salt.utils.json.loads(full_ret)
        return ret


def get_fun(fun):
    '''
    Return a dict of the last function called for all minions
    '''
    with _get_serv(ret=None, commit=True) as cur:

        sql = '''SELECT s.id,s.jid, s.full_ret
                FROM salt_returns s
                JOIN ( SELECT MAX(`jid`) as jid
                    from salt_returns GROUP BY fun, id) max
                ON s.jid = max.jid
                WHERE s.fun = %s
                '''

        cur.execute(sql, (fun,))
        data = cur.fetchall()

        ret = {}
        if data:
            for minion, _, full_ret in data:
                ret[minion] = salt.utils.json.loads(full_ret)
        return ret


def get_jids():
    '''
    Return a list of all job ids
    '''
    with _get_serv(ret=None, commit=True) as cur:

        sql = '''SELECT jid, load
                FROM jids'''

        cur.execute(sql)
        data = cur.fetchall()
        ret = {}
        for jid, load in data:
            ret[jid] = salt.utils.jid.format_jid_instance(jid,
                                                          salt.utils.json.loads(load))
        return ret


def get_minions():
    '''
    Return a list of minions
    '''
    with _get_serv(ret=None, commit=True) as cur:

        sql = '''SELECT DISTINCT id
                FROM salt_returns'''

        cur.execute(sql)
        data = cur.fetchall()
        ret = []
        for minion in data:
            ret.append(minion[0])
        return ret


def prep_jid(nocache=False, passed_jid=None):  # pylint: disable=unused-argument
    '''
    Do any work necessary to prepare a JID, including sending a custom id
    '''
    return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)