File: //usr/lib/python2.7/site-packages/salt/returners/postgres.py
# -*- coding: utf-8 -*-
'''
Return data to a postgresql server
.. note::
There are three PostgreSQL returners. Any can function as an external
:ref:`master job cache <external-job-cache>`. but each has different
features. SaltStack recommends
:mod:`returners.pgjsonb <salt.returners.pgjsonb>` if you are working with
a version of PostgreSQL that has the appropriate native binary JSON types.
Otherwise, review
:mod:`returners.postgres <salt.returners.postgres>` and
:mod:`returners.postgres_local_cache <salt.returners.postgres_local_cache>`
to see which module best suits your particular needs.
:maintainer: None
:maturity: New
:depends: psycopg2
:platform: all
To enable this returner the minion will need the psycopg2 installed and
the following values configured in the minion or master config:
.. code-block:: yaml
returner.postgres.host: 'salt'
returner.postgres.user: 'salt'
returner.postgres.passwd: 'salt'
returner.postgres.db: 'salt'
returner.postgres.port: 5432
Alternative configuration values can be used by prefacing the configuration.
Any values not found in the alternative configuration will be pulled from
the default location:
.. code-block:: yaml
alternative.returner.postgres.host: 'salt'
alternative.returner.postgres.user: 'salt'
alternative.returner.postgres.passwd: 'salt'
alternative.returner.postgres.db: 'salt'
alternative.returner.postgres.port: 5432
Running the following commands as the postgres user should create the database
correctly:
.. code-block:: sql
psql << EOF
CREATE ROLE salt WITH PASSWORD 'salt';
CREATE DATABASE salt WITH OWNER salt;
EOF
psql -h localhost -U salt << EOF
--
-- Table structure for table 'jids'
--
DROP TABLE IF EXISTS jids;
CREATE TABLE jids (
jid varchar(20) PRIMARY KEY,
load text NOT NULL
);
--
-- Table structure for table 'salt_returns'
--
DROP TABLE IF EXISTS salt_returns;
CREATE TABLE salt_returns (
fun varchar(50) NOT NULL,
jid varchar(255) NOT NULL,
return text NOT NULL,
full_ret text,
id varchar(255) NOT NULL,
success varchar(10) NOT NULL,
alter_time TIMESTAMP WITH TIME ZONE DEFAULT now()
);
CREATE INDEX idx_salt_returns_id ON salt_returns (id);
CREATE INDEX idx_salt_returns_jid ON salt_returns (jid);
CREATE INDEX idx_salt_returns_fun ON salt_returns (fun);
CREATE INDEX idx_salt_returns_updated ON salt_returns (alter_time);
--
-- Table structure for table `salt_events`
--
DROP TABLE IF EXISTS salt_events;
DROP SEQUENCE IF EXISTS seq_salt_events_id;
CREATE SEQUENCE seq_salt_events_id;
CREATE TABLE salt_events (
id BIGINT NOT NULL UNIQUE DEFAULT nextval('seq_salt_events_id'),
tag varchar(255) NOT NULL,
data text NOT NULL,
alter_time TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
master_id varchar(255) NOT NULL
);
CREATE INDEX idx_salt_events_tag on salt_events (tag);
EOF
Required python modules: psycopg2
To use the postgres returner, append '--return postgres' to the salt command.
.. code-block:: bash
salt '*' test.ping --return postgres
To use the alternative configuration, append '--return_config alternative' to the salt command.
.. versionadded:: 2015.5.0
.. code-block:: bash
salt '*' test.ping --return postgres --return_config alternative
To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.
.. versionadded:: 2016.3.0
.. code-block:: bash
salt '*' test.ping --return postgres --return_kwargs '{"db": "another-salt"}'
'''
from __future__ import absolute_import, print_function, unicode_literals
# Import python libs
import sys
import logging
from contextlib import contextmanager
# Import Salt libs
import salt.utils.jid
import salt.utils.json
import salt.returners
import salt.exceptions
# Import third party libs
from salt.ext import six
try:
import psycopg2
HAS_POSTGRES = True
except ImportError:
HAS_POSTGRES = False
__virtualname__ = 'postgres'
log = logging.getLogger(__name__)
def __virtual__():
if not HAS_POSTGRES:
return False, 'Could not import postgres returner; psycopg2 is not installed.'
return __virtualname__
def _get_options(ret=None):
'''
Get the postgres options from salt.
'''
defaults = {'host': 'localhost',
'user': 'salt',
'passwd': 'salt',
'db': 'salt',
'port': 5432}
attrs = {'host': 'host',
'user': 'user',
'passwd': 'passwd',
'db': 'db',
'port': 'port'}
_options = salt.returners.get_returner_options('returner.{0}'.format(__virtualname__),
ret,
attrs,
__salt__=__salt__,
__opts__=__opts__,
defaults=defaults)
# Ensure port is an int
if 'port' in _options:
_options['port'] = int(_options['port'])
return _options
@contextmanager
def _get_serv(ret=None, commit=False):
'''
Return a Pg cursor
'''
_options = _get_options(ret)
try:
conn = psycopg2.connect(host=_options.get('host'),
user=_options.get('user'),
password=_options.get('passwd'),
database=_options.get('db'),
port=_options.get('port'))
except psycopg2.OperationalError as exc:
raise salt.exceptions.SaltMasterError('postgres returner could not connect to database: {exc}'.format(exc=exc))
cursor = conn.cursor()
try:
yield cursor
except psycopg2.DatabaseError as err:
error = err.args
sys.stderr.write(six.text_type(error))
cursor.execute("ROLLBACK")
six.reraise(*sys.exc_info())
else:
if commit:
cursor.execute("COMMIT")
else:
cursor.execute("ROLLBACK")
finally:
conn.close()
def returner(ret):
'''
Return data to a postgres server
'''
try:
with _get_serv(ret, commit=True) as cur:
sql = '''INSERT INTO salt_returns
(fun, jid, return, id, success, full_ret)
VALUES (%s, %s, %s, %s, %s, %s)'''
cur.execute(
sql, (
ret['fun'],
ret['jid'],
salt.utils.json.dumps(ret['return']),
ret['id'],
ret.get('success', False),
salt.utils.json.dumps(ret)))
except salt.exceptions.SaltMasterError:
log.critical('Could not store return with postgres returner. PostgreSQL server unavailable.')
def event_return(events):
'''
Return event to Pg server
Requires that configuration be enabled via 'event_return'
option in master config.
'''
with _get_serv(events, commit=True) as cur:
for event in events:
tag = event.get('tag', '')
data = event.get('data', '')
sql = '''INSERT INTO salt_events (tag, data, master_id)
VALUES (%s, %s, %s)'''
cur.execute(sql, (tag,
salt.utils.json.dumps(data),
__opts__['id']))
def save_load(jid, load, minions=None): # pylint: disable=unused-argument
'''
Save the load to the specified jid id
'''
with _get_serv(commit=True) as cur:
sql = '''INSERT INTO jids
(jid, load)
VALUES (%s, %s)'''
try:
cur.execute(sql, (jid,
salt.utils.json.dumps(load)))
except psycopg2.IntegrityError:
# https://github.com/saltstack/salt/issues/22171
# Without this try/except we get tons of duplicate entry errors
# which result in job returns not being stored properly
pass
def save_minions(jid, minions, syndic_id=None): # pylint: disable=unused-argument
'''
Included for API consistency
'''
def get_load(jid):
'''
Return the load data that marks a specified jid
'''
with _get_serv(ret=None, commit=True) as cur:
sql = '''SELECT load FROM jids WHERE jid = %s;'''
cur.execute(sql, (jid,))
data = cur.fetchone()
if data:
return salt.utils.json.loads(data[0])
return {}
def get_jid(jid):
'''
Return the information returned when the specified job id was executed
'''
with _get_serv(ret=None, commit=True) as cur:
sql = '''SELECT id, full_ret FROM salt_returns
WHERE jid = %s'''
cur.execute(sql, (jid,))
data = cur.fetchall()
ret = {}
if data:
for minion, full_ret in data:
ret[minion] = salt.utils.json.loads(full_ret)
return ret
def get_fun(fun):
'''
Return a dict of the last function called for all minions
'''
with _get_serv(ret=None, commit=True) as cur:
sql = '''SELECT s.id,s.jid, s.full_ret
FROM salt_returns s
JOIN ( SELECT MAX(`jid`) as jid
from salt_returns GROUP BY fun, id) max
ON s.jid = max.jid
WHERE s.fun = %s
'''
cur.execute(sql, (fun,))
data = cur.fetchall()
ret = {}
if data:
for minion, _, full_ret in data:
ret[minion] = salt.utils.json.loads(full_ret)
return ret
def get_jids():
'''
Return a list of all job ids
'''
with _get_serv(ret=None, commit=True) as cur:
sql = '''SELECT jid, load
FROM jids'''
cur.execute(sql)
data = cur.fetchall()
ret = {}
for jid, load in data:
ret[jid] = salt.utils.jid.format_jid_instance(jid,
salt.utils.json.loads(load))
return ret
def get_minions():
'''
Return a list of minions
'''
with _get_serv(ret=None, commit=True) as cur:
sql = '''SELECT DISTINCT id
FROM salt_returns'''
cur.execute(sql)
data = cur.fetchall()
ret = []
for minion in data:
ret.append(minion[0])
return ret
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''
return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)