Skip to content

Commit

Permalink
reintroduced MS-RRP for retrieving CA configuration in case MS-CSRA f…
Browse files Browse the repository at this point in the history
…ails
  • Loading branch information
ly4k committed Feb 22, 2022
1 parent 09e0684 commit c121297
Showing 1 changed file with 120 additions and 35 deletions.
155 changes: 120 additions & 35 deletions certipy/ca.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import argparse
import logging
from typing import Callable, List, Tuple
import time

from impacket.dcerpc.v5 import rpcrt, scmr
from impacket.dcerpc.v5 import rpcrt, scmr, rrp
from impacket.dcerpc.v5.dcom.oaut import VARIANT
from impacket.dcerpc.v5.dcomrt import DCOMANSWER, DCOMCALL, IRemUnknown
from impacket.dcerpc.v5.dtypes import DWORD, LONG, LPWSTR, PBYTE, ULONG, WSTR
Expand All @@ -18,7 +19,11 @@
from certipy.constants import CERTIFICATION_AUTHORITY_RIGHTS
from certipy.errors import translate_error_code
from certipy.ldap import LDAPConnection, LDAPEntry
from certipy.rpc import get_dce_rpc, get_dcom_connection
from certipy.rpc import (
get_dce_rpc,
get_dce_rpc_from_string_binding,
get_dcom_connection,
)
from certipy.security import CASecurity
from certipy.target import Target
from certipy.template import Template
Expand Down Expand Up @@ -199,6 +204,7 @@ def __init__(
self._connection: LDAPConnection = connection
self._cert_admin: ICertAdminD = None
self._cert_admin2: ICertAdminD2 = None
self._rrp_dce = None

@property
def connection(self):
Expand All @@ -211,7 +217,7 @@ def connection(self):
return self._connection

@property
def cert_admin(self):
def cert_admin(self) -> ICertAdminD:
if self._cert_admin is not None:
return self._cert_admin

Expand All @@ -222,69 +228,148 @@ def cert_admin(self):
return self._cert_admin

@property
def cert_admin2(self):
def cert_admin2(self) -> ICertAdminD2:
if self._cert_admin2 is not None:
return self._cert_admin2

dcom = get_dcom_connection(self.target)
iInterface = dcom.CoCreateInstanceEx(CLSID_ICertAdminD, IID_ICertAdminD2)
iInterface.get_cinstance().set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
self._cert_admin2 = ICertAdminD2(iInterface)

return self._cert_admin2

def get_request_disposition(self) -> int:
request = ICertAdminD2_GetConfigEntry()
request["pwszAuthority"] = checkNullString(self.ca)
request["pwszNodePath"] = checkNullString(
"PolicyModules\\CertificateAuthority_MicrosoftDefault.Policy"
@property
def rrp_dce(self):
if self._rrp_dce is not None:
return self._rrp_dce

dce = get_dce_rpc_from_string_binding(
"ncacn_np:445[\\pipe\\winreg]", self.target, timeout=self.target.timeout
)
request["pwszEntry"] = checkNullString("RequestDisposition")

try:
resp = self.cert_admin2.request(request)
except DCERPCSessionError as e:
logging.warning(
"Got error while trying to get CA RequestDisposition: %s" % str(e)
)
for _ in range(3):
try:
dce.connect()
dce.bind(rrp.MSRPC_UUID_RRP)
logging.debug(
"Connected to remote registry at %s (%s)"
% (repr(self.target.remote_name), self.target.target_ip)
)
break
except Exception as e:
if "STATUS_PIPE_NOT_AVAILABLE" in str(e):
logging.warning(
(
"Failed to connect to remote registry. Service should be "
"starting now. Trying again..."
)
)
time.sleep(1)
else:
raise e
else:
logging.warning("Failed to connect to remote registry")
return None

return resp["pVariant"]["_varUnion"]["lVal"]
self._rrp_dce = dce

return self._rrp_dce

def get_edit_flags(self) -> int:
def get_config_csra(self) -> Tuple[int, int, CASecurity]:
request = ICertAdminD2_GetConfigEntry()
request["pwszAuthority"] = checkNullString(self.ca)
request["pwszNodePath"] = checkNullString(
"PolicyModules\\CertificateAuthority_MicrosoftDefault.Policy"
)
request["pwszEntry"] = checkNullString("RequestDisposition")

resp = self.cert_admin2.request(request)

request_disposition = resp["pVariant"]["_varUnion"]["lVal"]

request["pwszEntry"] = checkNullString("EditFlags")

try:
resp = self.cert_admin2.request(request)
except DCERPCSessionError as e:
logging.warning("Got error while trying to get CA EditFlags: %s" % str(e))
return None
resp = self.cert_admin2.request(request)

return resp["pVariant"]["_varUnion"]["lVal"]
edit_flags = resp["pVariant"]["_varUnion"]["lVal"]

def get_security(self) -> bytes:
request = ICertAdminD2_GetCASecurity()
request["pwszAuthority"] = checkNullString(self.ca)

try:
resp = self.cert_admin2.request(request)
except DCERPCSessionError as e:
logging.warning("Got error while trying to get CA security: %s" % str(e))
return None
resp = self.cert_admin2.request(request)

security = CASecurity(b"".join(resp["pctbSD"]["pb"]))
return security

return (edit_flags, request_disposition, security)

def get_config_rrp(self) -> Tuple[int, int, CASecurity]:
hklm = rrp.hOpenLocalMachine(self.rrp_dce)

h_root_key = hklm["phKey"]

policy_key = rrp.hBaseRegOpenKey(
self.rrp_dce,
h_root_key,
(
"SYSTEM\\CurrentControlSet\\Services\\CertSvc\\Configuration\\%s\\"
"PolicyModules\\CertificateAuthority_MicrosoftDefault.Policy"
)
% self.ca,
)

_, edit_flags = rrp.hBaseRegQueryValue(
self.rrp_dce, policy_key["phkResult"], "EditFlags"
)

_, request_disposition = rrp.hBaseRegQueryValue(
self.rrp_dce, policy_key["phkResult"], "RequestDisposition"
)

configuration_key = rrp.hBaseRegOpenKey(
self.rrp_dce,
h_root_key,
"SYSTEM\\CurrentControlSet\\Services\\CertSvc\\Configuration\\%s" % self.ca,
)

_, security_descriptor = rrp.hBaseRegQueryValue(
self.rrp_dce, configuration_key["phkResult"], "Security"
)

security_descriptor = CASecurity(security_descriptor)

return (edit_flags, request_disposition, security_descriptor)

def get_config(self) -> Tuple[int, int, CASecurity]:
edit_flags = self.get_edit_flags()
request_disposition = self.get_request_disposition()
security = self.get_security()
try:
logging.info(
"Trying to get CA configuration for %s via CSRA" % repr(self.ca)
)
result = self.get_config_csra()
logging.info("Got CA configuration for %s" % repr(self.ca))
return result
except Exception as e:
logging.warning(
"Got error while trying to get CA configuration for %s via CSRA: %s"
% (repr(self.ca), str(e))
)

return (edit_flags, request_disposition, security)
try:
logging.info(
"Trying to get CA configuration for %s via RRP" % repr(self.ca)
)
result = self.get_config_rrp()
logging.info("Got CA configuration for %s" % repr(self.ca))
return result
except Exception as e:
logging.warning(
"Got error while trying to get CA configuration for %s via RRP: %s"
% (repr(self.ca), str(e))
)

logging.warning("Failed to get CA configuration for %s" % repr(self.ca))

return (None, None, None)

def issue(self) -> bool:
if self.request_id is None:
Expand Down

0 comments on commit c121297

Please sign in to comment.