HEX
Server: Apache
System: Linux sg241.singhost.net 2.6.32-896.16.1.lve1.4.51.el6.x86_64 #1 SMP Wed Jan 17 13:19:23 EST 2018 x86_64
User: honghock (909)
PHP: 8.0.30
Disabled: passthru,system,shell_exec,show_source,exec,popen,proc_open
Upload Files
File: //proc/self/root/usr/lib/python2.7/site-packages/salt/queues/pgjsonb_queue.py
# -*- coding: utf-8 -*-
'''
.. versionadded:: 2016.3.0

This is a queue with postgres as the backend.  It uses the jsonb store to
store information for queues.

:depends:       python-psycopg2

To enable this queue, the following needs to be configured in your master
config. These are the defaults:

.. code-block:: yaml

    queue.pgjsonb.host: 'salt'
    queue.pgjsonb.user: 'salt'
    queue.pgjsonb.password: 'salt'
    queue.pgjsonb.dbname: 'salt'
    queue.pgjsonb.port: 5432

Use the following Pg database schema:

.. code-block:: sql

    CREATE DATABASE  salt WITH ENCODING 'utf-8';

    --
    -- Table structure for table `salt`
    --
    DROP TABLE IF EXISTS salt;
    CREATE OR REPLACE TABLE salt(
       id SERIAL PRIMARY KEY,
       data jsonb NOT NULL
    );

.. code-block:: bash

    salt-run queue.insert test '{"name": "redis", "host": "172.16.0.8", "port": 6379}' backend=pgjsonb
    salt-run queue.process_queue test all backend=pgjsonb
'''

# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
from contextlib import contextmanager
import sys

# import salt libs
import salt.utils.json
from salt.ext import six
from salt.exceptions import SaltInvocationError, SaltMasterError

try:
    import psycopg2
    HAS_PG = True
except ImportError:
    HAS_PG = False

import logging
log = logging.getLogger(__name__)

# Define the module's virtual name
__virtualname__ = 'pgjsonb'


def __virtual__():
    if HAS_PG is False:
        return False
    return __virtualname__


@contextmanager
def _conn(commit=False):
    '''
    Return an postgres cursor
    '''
    defaults = {'host': 'localhost',
                'user': 'salt',
                'password': 'salt',
                'dbname': 'salt',
                'port': 5432}

    conn_kwargs = {}
    for key, value in defaults.items():
        conn_kwargs[key] = __opts__.get('queue.{0}.{1}'.format(__virtualname__, key), value)
    try:
        conn = psycopg2.connect(**conn_kwargs)
    except psycopg2.OperationalError as exc:
        raise SaltMasterError('pgjsonb returner could not connect to database: {exc}'.format(exc=exc))

    cursor = conn.cursor()

    try:
        yield cursor
    except psycopg2.DatabaseError as err:
        error = err.args
        sys.stderr.write(six.text_type(error))
        cursor.execute("ROLLBACK")
        six.reraise(*sys.exc_info())
    else:
        if commit:
            cursor.execute("COMMIT")
        else:
            cursor.execute("ROLLBACK")
    finally:
        conn.close()


def _list_tables(cur):
    cmd = "select relname from pg_class where relkind='r' and relname !~ '^(pg_|sql_)';"
    log.debug('SQL Query: %s', cmd)
    cur.execute(cmd)
    result = cur.fetchall()
    return [x[0] for x in result]


def _create_table(cur, queue):
    cmd = 'CREATE TABLE {0}(id SERIAL PRIMARY KEY, '\
          'data jsonb NOT NULL)'.format(queue)
    log.debug('SQL Query: %s', cmd)
    cur.execute(cmd)
    return True


def _list_items(queue):
    '''
    Private function to list contents of a queue
    '''
    with _conn() as cur:
        cmd = 'SELECT data FROM {0}'.format(queue)
        log.debug('SQL Query: %s', cmd)
        cur.execute(cmd)
        contents = cur.fetchall()
        return contents


def list_queues():
    '''
    Return a list of Salt Queues on the Salt Master
    '''
    with _conn() as cur:
        queues = _list_tables(cur)
    return queues


def list_items(queue):
    '''
    List contents of a queue
    '''
    itemstuple = _list_items(queue)
    items = [item[0] for item in itemstuple]
    return items


def list_length(queue):
    '''
    Provide the number of items in a queue
    '''
    items = _list_items(queue)
    return len(items)


def _queue_exists(queue):
    '''
    Does this queue exist
    :param queue: Name of the queue
    :type str
    :return: True if this queue exists and
    False otherwise
    :rtype bool
    '''
    return queue in list_queues()


def handle_queue_creation(queue):
    if not _queue_exists(queue):
        with _conn(commit=True) as cur:
            log.debug('Queue %s does not exist. Creating', queue)
            _create_table(cur, queue)
    else:
        log.debug('Queue %s already exists.', queue)


def insert(queue, items):
    '''
    Add an item or items to a queue
    '''
    handle_queue_creation(queue)

    with _conn(commit=True) as cur:
        if isinstance(items, dict):
            items = salt.utils.json.dumps(items)
            cmd = str('''INSERT INTO {0}(data) VALUES('{1}')''').format(queue, items)  # future lint: disable=blacklisted-function
            log.debug('SQL Query: %s', cmd)
            try:
                cur.execute(cmd)
            except psycopg2.IntegrityError as esc:
                return ('Item already exists in this queue. '
                        'postgres error: {0}'.format(esc))
        if isinstance(items, list):
            items = [(salt.utils.json.dumps(el),) for el in items]
            cmd = str("INSERT INTO {0}(data) VALUES (%s)").format(queue)  # future lint: disable=blacklisted-function
            log.debug('SQL Query: %s', cmd)
            try:
                cur.executemany(cmd, items)
            except psycopg2.IntegrityError as esc:
                return ('One or more items already exists in this queue. '
                        'postgres error: {0}'.format(esc))
    return True


def delete(queue, items):
    '''
    Delete an item or items from a queue
    '''
    with _conn(commit=True) as cur:
        if isinstance(items, dict):
            cmd = str("""DELETE FROM {0} WHERE data = '{1}'""").format(  # future lint: disable=blacklisted-function
                queue,
                salt.utils.json.dumps(items))
            log.debug('SQL Query: %s', cmd)
            cur.execute(cmd)
            return True
        if isinstance(items, list):
            items = [(salt.utils.json.dumps(el),) for el in items]
            cmd = 'DELETE FROM {0} WHERE data = %s'.format(queue)
            log.debug('SQL Query: %s', cmd)
            cur.executemany(cmd, items)
    return True


def pop(queue, quantity=1, is_runner=False):
    '''
    Pop one or more or all items from the queue return them.
    '''
    cmd = 'SELECT id, data FROM {0}'.format(queue)
    if quantity != 'all':
        try:
            quantity = int(quantity)
        except ValueError as exc:
            error_txt = ('Quantity must be an integer or "all".\n'
                         'Error: "{0}".'.format(exc))
            raise SaltInvocationError(error_txt)
        cmd = ''.join([cmd, ' LIMIT {0};'.format(quantity)])
    log.debug('SQL Query: %s', cmd)
    items = []
    with _conn(commit=True) as cur:
        cur.execute(cmd)
        result = cur.fetchall()
        if len(result) > 0:
            ids = [six.text_type(item[0]) for item in result]
            items = [item[1] for item in result]
            idlist = "','".join(ids)
            del_cmd = '''DELETE FROM {0} WHERE id IN ('{1}');'''.format(
                queue, idlist)

            log.debug('SQL Query: %s', del_cmd)

            cur.execute(del_cmd)
    return items