HEX
Server: Apache
System: Linux sg241.singhost.net 2.6.32-896.16.1.lve1.4.51.el6.x86_64 #1 SMP Wed Jan 17 13:19:23 EST 2018 x86_64
User: honghock (909)
PHP: 8.0.30
Disabled: passthru,system,shell_exec,show_source,exec,popen,proc_open
Upload Files
File: //usr/lib/python2.7/site-packages/salt/beacons/cert_info.py
# -*- coding: utf-8 -*-
'''
Beacon to monitor certificate expiration dates from files on the filesystem.

.. versionadded:: 3000

:maintainer: <devops@eitr.tech>
:maturity: new
:depends: OpenSSL
'''

# Import Python libs
from __future__ import absolute_import, unicode_literals
from datetime import datetime
import logging

# Import salt libs
# pylint: disable=import-error,no-name-in-module,redefined-builtin,3rd-party-module-not-gated
from salt.ext.six.moves import map as _map
from salt.ext.six.moves import range as _range
# pylint: enable=import-error,no-name-in-module,redefined-builtin,3rd-party-module-not-gated
import salt.utils.files


# Import Third Party Libs
try:
    from OpenSSL import crypto
    HAS_OPENSSL = True
except ImportError:
    HAS_OPENSSL = False

log = logging.getLogger(__name__)

DEFAULT_NOTIFY_DAYS = 45

__virtualname__ = 'cert_info'


def __virtual__():
    if HAS_OPENSSL is False:
        return False

    return __virtualname__


def validate(config):
    '''
    Validate the beacon configuration
    '''
    _config = {}
    list(_map(_config.update, config))

    # Configuration for cert_info beacon should be a list of dicts
    if not isinstance(config, list):
        return False, ('Configuration for cert_info beacon must be a list.')

    if 'files' not in _config:
        return False, ('Configuration for cert_info beacon '
                       'must contain files option.')
    return True, 'Valid beacon configuration'


def beacon(config):
    '''
    Monitor the certificate files on the minion.

    Specify a notification threshold in days and only emit a beacon if any certificates are
    expiring within that timeframe or if `notify_days` equals `-1` (always report information).
    The default notification threshold is 45 days and can be overridden at the beacon level and
    at an individual certificate level.

    .. code-block:: yaml

        beacons:
          cert_info:
            - files:
                - /etc/pki/tls/certs/mycert.pem
                - /etc/pki/tls/certs/yourcert.pem:
                    notify_days: 15
                - /etc/pki/tls/certs/ourcert.pem
            - notify_days: 45
            - interval: 86400

    '''
    ret = []
    certificates = []
    CryptoError = crypto.Error  # pylint: disable=invalid-name

    _config = {}
    list(_map(_config.update, config))

    global_notify_days = _config.get('notify_days', DEFAULT_NOTIFY_DAYS)

    for cert_path in _config.get('files', []):
        notify_days = global_notify_days

        if isinstance(cert_path, dict):
            try:
                notify_days = cert_path[cert_path.keys()[0]].get('notify_days', global_notify_days)
                cert_path = cert_path.keys()[0]
            except IndexError as exc:
                log.error('Unable to load certificate %s (%s)', cert_path, exc)
                continue

        try:
            with salt.utils.files.fopen(cert_path) as fp_:
                cert = crypto.load_certificate(crypto.FILETYPE_PEM, fp_.read())
        except (IOError, CryptoError) as exc:
            log.error('Unable to load certificate %s (%s)', cert_path, exc)
            continue

        cert_date = datetime.strptime(cert.get_notAfter().decode(encoding='UTF-8'), "%Y%m%d%H%M%SZ")
        date_diff = (cert_date - datetime.today()).days
        log.debug('Certificate %s expires in %s days.', cert_path, date_diff)

        if notify_days < 0 or date_diff <= notify_days:
            log.debug('Certificate %s triggered beacon due to %s day notification threshold.', cert_path, notify_days)
            extensions = []
            for ext in _range(0, cert.get_extension_count()):
                extensions.append(
                    {
                        'ext_name': cert.get_extension(ext).get_short_name().decode(encoding='UTF-8'),
                        'ext_data': str(cert.get_extension(ext))
                    }
                )

            certificates.append(
                {
                    'cert_path': cert_path,
                    'issuer': ','.join(
                        ['{0}="{1}"'.format(
                            t[0].decode(encoding='UTF-8'),
                            t[1].decode(encoding='UTF-8')
                        ) for t in cert.get_issuer().get_components()]),
                    'issuer_dict': {
                        k.decode('UTF-8'): v.decode('UTF-8') for k, v in cert.get_issuer().get_components()
                    },
                    'notAfter_raw': cert.get_notAfter().decode(encoding='UTF-8'),
                    'notAfter': cert_date.strftime("%Y-%m-%d %H:%M:%SZ"),
                    'notBefore_raw': cert.get_notBefore().decode(encoding='UTF-8'),
                    'notBefore': datetime.strptime(
                                     cert.get_notBefore().decode(encoding='UTF-8'), "%Y%m%d%H%M%SZ"
                                 ).strftime("%Y-%m-%d %H:%M:%SZ"),
                    'serial_number': cert.get_serial_number(),
                    'signature_algorithm': cert.get_signature_algorithm().decode(encoding='UTF-8'),
                    'subject': ','.join(
                        ['{0}="{1}"'.format(
                            t[0].decode(encoding='UTF-8'),
                            t[1].decode(encoding='UTF-8')
                        ) for t in cert.get_subject().get_components()]),
                    'subject_dict': {
                        k.decode('UTF-8'): v.decode('UTF-8') for k, v in cert.get_subject().get_components()
                    },
                    'version': cert.get_version(),
                    'extensions': extensions,
                    'has_expired': cert.has_expired()
                }
            )

    if certificates:
        ret.append({'certificates': certificates})

    return ret