HEX
Server: Apache
System: Linux sg241.singhost.net 2.6.32-896.16.1.lve1.4.51.el6.x86_64 #1 SMP Wed Jan 17 13:19:23 EST 2018 x86_64
User: honghock (909)
PHP: 8.0.30
Disabled: passthru,system,shell_exec,show_source,exec,popen,proc_open
Upload Files
File: //proc/self/root/usr/lib/python2.7/site-packages/salt/modules/win_pki.py
# -*- coding: utf-8 -*-
'''
Microsoft certificate management via the PKI Client PowerShell module.
https://technet.microsoft.com/en-us/itpro/powershell/windows/pkiclient/pkiclient

The PKI Client PowerShell module is only available on Windows 8+ and Windows
Server 2012+.
https://technet.microsoft.com/en-us/library/hh848636(v=wps.620).aspx

:platform:      Windows

:depends:
    - PowerShell 4
    - PKI Client Module (Windows 8+ / Windows Server 2012+)

.. versionadded:: 2016.11.0
'''
# Import Python libs
from __future__ import absolute_import, unicode_literals, print_function
import ast
import logging
import os

# Import salt libs
import salt.utils.json
import salt.utils.platform
import salt.utils.powershell
import salt.utils.versions
from salt.exceptions import SaltInvocationError

# Import 3rd party libs
from salt.ext import six

_DEFAULT_CONTEXT = 'LocalMachine'
_DEFAULT_FORMAT = 'cer'
_DEFAULT_STORE = 'My'
_LOG = logging.getLogger(__name__)

# Define the module's virtual name
__virtualname__ = 'win_pki'


def __virtual__():
    '''
    Requires Windows
    Requires Windows 8+ / Windows Server 2012+
    Requires PowerShell
    Requires PKI Client PowerShell module installed.
    '''
    if not salt.utils.platform.is_windows():
        return False, 'Only available on Windows Systems'

    if salt.utils.versions.version_cmp(__grains__['osversion'], '6.2.9200') == -1:
        return False, 'Only available on Windows 8+ / Windows Server 2012 +'

    if not __salt__['cmd.shell_info']('powershell')['installed']:
        return False, 'Powershell not available'

    if not salt.utils.powershell.module_exists('PKI'):
        return False, 'PowerShell PKI module not available'

    return __virtualname__


def _cmd_run(cmd, as_json=False):
    '''
    Ensure that the Pki module is loaded, and convert to and extract data from
    Json as needed.
    '''
    cmd_full = ['Import-Module -Name PKI; ']

    if as_json:
        cmd_full.append(r'ConvertTo-Json -Compress -Depth 4 -InputObject '
                        r'@({0})'.format(cmd))
    else:
        cmd_full.append(cmd)
    cmd_ret = __salt__['cmd.run_all'](
        six.text_type().join(cmd_full), shell='powershell', python_shell=True)

    if cmd_ret['retcode'] != 0:
        _LOG.error('Unable to execute command: %s\nError: %s', cmd,
                   cmd_ret['stderr'])

    if as_json:
        try:
            items = salt.utils.json.loads(cmd_ret['stdout'], strict=False)
            return items
        except ValueError:
            _LOG.error('Unable to parse return data as Json.')

    return cmd_ret['stdout']


def _validate_cert_path(name):
    '''
    Ensure that the certificate path, as determind from user input, is valid.
    '''
    cmd = r"Test-Path -Path '{0}'".format(name)

    if not ast.literal_eval(_cmd_run(cmd=cmd)):
        raise SaltInvocationError(r"Invalid path specified: {0}".format(name))


def _validate_cert_format(name):
    '''
    Ensure that the certificate format, as determind from user input, is valid.
    '''
    cert_formats = ['cer', 'pfx']

    if name not in cert_formats:
        message = ("Invalid certificate format '{0}' specified. Valid formats:"
                   ' {1}').format(name, cert_formats)
        raise SaltInvocationError(message)


def get_stores():
    '''
    Get the certificate location contexts and their corresponding stores.

    :return: A dictionary of the certificate location contexts and stores.
    :rtype: dict

    CLI Example:

    .. code-block:: bash

        salt '*' win_pki.get_stores
    '''
    ret = dict()
    cmd = r"Get-ChildItem -Path 'Cert:\' | " \
          r"Select-Object LocationName, StoreNames"

    items = _cmd_run(cmd=cmd, as_json=True)

    for item in items:
        ret[item['LocationName']] = list()

        for store in item['StoreNames']:
            ret[item['LocationName']].append(store)
    return ret


def get_certs(context=_DEFAULT_CONTEXT, store=_DEFAULT_STORE):
    '''
    Get the available certificates in the given store.

    :param str context: The name of the certificate store location context.
    :param str store: The name of the certificate store.

    :return: A dictionary of the certificate thumbprints and properties.
    :rtype: dict

    CLI Example:

    .. code-block:: bash

        salt '*' win_pki.get_certs
    '''
    ret = dict()
    cmd = list()
    blacklist_keys = ['DnsNameList']
    store_path = r'Cert:\{0}\{1}'.format(context, store)

    _validate_cert_path(name=store_path)

    cmd.append(r"Get-ChildItem -Path '{0}' | Select-Object".format(store_path))
    cmd.append(' DnsNameList, SerialNumber, Subject, Thumbprint, Version')

    items = _cmd_run(cmd=six.text_type().join(cmd), as_json=True)

    for item in items:
        cert_info = dict()
        for key in item:
            if key not in blacklist_keys:
                cert_info[key.lower()] = item[key]

        names = item.get('DnsNameList', None)
        if isinstance(names, list):
            cert_info['dnsnames'] = [name.get('Unicode') for name in names]
        else:
            cert_info['dnsnames'] = []
        ret[item['Thumbprint']] = cert_info
    return ret


def get_cert_file(name, cert_format=_DEFAULT_FORMAT, password=''):
    '''
    Get the details of the certificate file.

    :param str name: The filesystem path of the certificate file.
    :param str cert_format: The certificate format. Specify 'cer' for X.509, or
        'pfx' for PKCS #12.
    :param str password: The password of the certificate. Only applicable to pfx
        format. Note that if used interactively, the password will be seen by all minions.
        To protect the password, use a state and get the password from pillar.

    :return: A dictionary of the certificate thumbprints and properties.
    :rtype: dict

    CLI Example:

    .. code-block:: bash

        salt '*' win_pki.get_cert_file name='C:\\certs\\example.cer'
    '''
    ret = dict()
    cmd = list()
    blacklist_keys = ['DnsNameList']
    cert_format = cert_format.lower()

    _validate_cert_format(name=cert_format)

    if not name or not os.path.isfile(name):
        _LOG.error('Path is not present: %s', name)
        return ret

    if cert_format == 'pfx':
        if password:
            cmd.append('$CertObject = New-Object')
            cmd.append(' System.Security.Cryptography.X509Certificates.X509Certificate2;')
            cmd.append(r" $CertObject.Import('{0}'".format(name))
            cmd.append(",'{0}'".format(password))
            cmd.append(",'DefaultKeySet') ; $CertObject")
            cmd.append(' | Select-Object DnsNameList, SerialNumber, Subject, '
                    'Thumbprint, Version')
        else:
            cmd.append(r"Get-PfxCertificate -FilePath '{0}'".format(name))
            cmd.append(' | Select-Object DnsNameList, SerialNumber, Subject, '
                    'Thumbprint, Version')
    else:
        cmd.append('$CertObject = New-Object')
        cmd.append(' System.Security.Cryptography.X509Certificates.X509Certificate2;')
        cmd.append(r" $CertObject.Import('{0}'); $CertObject".format(name))
        cmd.append(' | Select-Object DnsNameList, SerialNumber, Subject, '
                   'Thumbprint, Version')

    items = _cmd_run(cmd=six.text_type().join(cmd), as_json=True)

    for item in items:
        for key in item:
            if key not in blacklist_keys:
                ret[key.lower()] = item[key]

        ret['dnsnames'] = [name['Unicode'] for name in item['DnsNameList']]

    if ret:
        _LOG.debug('Certificate thumbprint obtained successfully: %s', name)
    else:
        _LOG.error('Unable to obtain certificate thumbprint: %s', name)
    return ret


def import_cert(name,
                cert_format=_DEFAULT_FORMAT,
                context=_DEFAULT_CONTEXT,
                store=_DEFAULT_STORE,
                exportable=True,
                password='',
                saltenv='base'):
    '''
    Import the certificate file into the given certificate store.

    :param str name: The path of the certificate file to import.
    :param str cert_format: The certificate format. Specify 'cer' for X.509, or
        'pfx' for PKCS #12.
    :param str context: The name of the certificate store location context.
    :param str store: The name of the certificate store.
    :param bool exportable: Mark the certificate as exportable. Only applicable
        to pfx format.
    :param str password: The password of the certificate. Only applicable to pfx
        format. Note that if used interactively, the password will be seen by all minions.
        To protect the password, use a state and get the password from pillar.
    :param str saltenv: The environment the file resides in.

    :return: A boolean representing whether all changes succeeded.
    :rtype: bool

    CLI Example:

    .. code-block:: bash

        salt '*' win_pki.import_cert name='salt://cert.cer'
    '''
    cmd = list()
    thumbprint = None
    store_path = r'Cert:\{0}\{1}'.format(context, store)
    cert_format = cert_format.lower()

    _validate_cert_format(name=cert_format)

    cached_source_path = __salt__['cp.cache_file'](name, saltenv)

    if not cached_source_path:
        _LOG.error('Unable to get cached copy of file: %s', name)
        return False

    if password:
        cert_props = get_cert_file(name=cached_source_path, cert_format=cert_format, password=password)
    else:
        cert_props = get_cert_file(name=cached_source_path, cert_format=cert_format)

    current_certs = get_certs(context=context, store=store)

    if cert_props['thumbprint'] in current_certs:
        _LOG.debug("Certificate thumbprint '%s' already present in store: %s",
                   cert_props['thumbprint'], store_path)
        return True

    if cert_format == 'pfx':
        # In instances where an empty password is needed, we use a
        # System.Security.SecureString object since ConvertTo-SecureString will
        # not convert an empty string.
        if password:
            cmd.append(r"$Password = ConvertTo-SecureString "
                       r"-String '{0}'".format(password))
            cmd.append(' -AsPlainText -Force; ')
        else:
            cmd.append('$Password = New-Object System.Security.SecureString; ')

        cmd.append(r"Import-PfxCertificate "
                   r"-FilePath '{0}'".format(cached_source_path))
        cmd.append(r" -CertStoreLocation '{0}'".format(store_path))
        cmd.append(r" -Password $Password")

        if exportable:
            cmd.append(' -Exportable')
    else:
        cmd.append(r"Import-Certificate "
                   r"-FilePath '{0}'".format(cached_source_path))
        cmd.append(r" -CertStoreLocation '{0}'".format(store_path))

    _cmd_run(cmd=six.text_type().join(cmd))

    new_certs = get_certs(context=context, store=store)

    for new_cert in new_certs:
        if new_cert not in current_certs:
            thumbprint = new_cert

    if thumbprint:
        _LOG.debug('Certificate imported successfully: %s', name)
        return True
    _LOG.error('Unable to import certificate: %s', name)
    return False


def export_cert(name,
                thumbprint,
                cert_format=_DEFAULT_FORMAT,
                context=_DEFAULT_CONTEXT,
                store=_DEFAULT_STORE,
                password=''):
    '''
    Export the certificate to a file from the given certificate store.

    :param str name: The destination path for the exported certificate file.
    :param str thumbprint: The thumbprint value of the target certificate.
    :param str cert_format: The certificate format. Specify 'cer' for X.509, or
        'pfx' for PKCS #12.
    :param str context: The name of the certificate store location context.
    :param str store: The name of the certificate store.
    :param str password: The password of the certificate. Only applicable to pfx
        format. Note that if used interactively, the password will be seen by all minions.
        To protect the password, use a state and get the password from pillar.

    :return: A boolean representing whether all changes succeeded.
    :rtype: bool

    CLI Example:

    .. code-block:: bash

        salt '*' win_pki.export_cert name='C:\\certs\\example.cer' thumbprint='AAA000'
    '''
    cmd = list()
    thumbprint = thumbprint.upper()
    cert_path = r'Cert:\{0}\{1}\{2}'.format(context, store, thumbprint)
    cert_format = cert_format.lower()

    _validate_cert_path(name=cert_path)
    _validate_cert_format(name=cert_format)

    if cert_format == 'pfx':
        # In instances where an empty password is needed, we use a
        # System.Security.SecureString object since ConvertTo-SecureString will
        # not convert an empty string.
        if password:
            cmd.append(r"$Password = ConvertTo-SecureString "
                       r"-String '{0}'".format(password))
            cmd.append(' -AsPlainText -Force; ')
        else:
            cmd.append('$Password = New-Object System.Security.SecureString; ')

        cmd.append(r"Export-PfxCertificate "
                   r"-Cert '{0}' -FilePath '{1}'".format(cert_path, name))
        cmd.append(r" -Password $Password")
    else:
        cmd.append(r"Export-Certificate "
                   r"-Cert '{0}' -FilePath '{1}'".format(cert_path, name))

    cmd.append(r" | Out-Null; Test-Path -Path '{0}'".format(name))

    ret = ast.literal_eval(_cmd_run(cmd=six.text_type().join(cmd)))

    if ret:
        _LOG.debug('Certificate exported successfully: %s', name)
    else:
        _LOG.error('Unable to export certificate: %s', name)
    return ret


def test_cert(thumbprint,
              context=_DEFAULT_CONTEXT,
              store=_DEFAULT_STORE,
              untrusted_root=False,
              dns_name='',
              eku=''):
    '''
    Check the certificate for validity.

    :param str thumbprint: The thumbprint value of the target certificate.
    :param str context: The name of the certificate store location context.
    :param str store: The name of the certificate store.
    :param bool untrusted_root: Whether the root certificate is required to be
        trusted in chain building.
    :param str dns_name: The DNS name to verify as valid for the certificate.
    :param str eku: The enhanced key usage object identifiers to verify for the
        certificate chain.

    :return: A boolean representing whether the certificate was considered
        valid.
    :rtype: bool

    CLI Example:

    .. code-block:: bash

        salt '*' win_pki.test_cert thumbprint='AAA000' dns_name='example.test'
    '''
    cmd = list()
    thumbprint = thumbprint.upper()
    cert_path = r'Cert:\{0}\{1}\{2}'.format(context, store, thumbprint)
    cmd.append(r"Test-Certificate -Cert '{0}'".format(cert_path))

    _validate_cert_path(name=cert_path)

    if untrusted_root:
        cmd.append(' -AllowUntrustedRoot')
    if dns_name:
        cmd.append(" -DnsName '{0}'".format(dns_name))
    if eku:
        cmd.append(" -EKU '{0}'".format(eku))

    cmd.append(' -ErrorAction SilentlyContinue')

    return ast.literal_eval(_cmd_run(cmd=six.text_type().join(cmd)))


def remove_cert(thumbprint, context=_DEFAULT_CONTEXT, store=_DEFAULT_STORE):
    '''
    Remove the certificate from the given certificate store.

    :param str thumbprint: The thumbprint value of the target certificate.
    :param str context: The name of the certificate store location context.
    :param str store: The name of the certificate store.

    :return: A boolean representing whether all changes succeeded.
    :rtype: bool

    CLI Example:

    .. code-block:: bash

        salt '*' win_pki.remove_cert thumbprint='AAA000'
    '''
    thumbprint = thumbprint.upper()
    store_path = r'Cert:\{0}\{1}'.format(context, store)
    cert_path = r'{0}\{1}'.format(store_path, thumbprint)
    cmd = r"Remove-Item -Path '{0}'".format(cert_path)

    current_certs = get_certs(context=context, store=store)

    if thumbprint not in current_certs:
        _LOG.debug("Certificate '%s' already absent in store: %s", thumbprint,
                   store_path)
        return True

    _validate_cert_path(name=cert_path)
    _cmd_run(cmd=cmd)

    new_certs = get_certs(context=context, store=store)

    if thumbprint in new_certs:
        _LOG.error('Unable to remove certificate: %s', cert_path)
        return False
    _LOG.debug('Certificate removed successfully: %s', cert_path)
    return True