From 4d1e6b9968fe0d716c55b1d369c04e0ec34042aa Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Tue, 30 Jun 2026 02:58:34 +0200 Subject: [PATCH 1/2] WinSSP: support pinning KDC Pinning / Unpinning the KDC is super useful in domains with multiple KDCs. AI-Assisted: no --- scapy/arch/windows/sspi.py | 219 +++++++++++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) diff --git a/scapy/arch/windows/sspi.py b/scapy/arch/windows/sspi.py index 8567d534d80..22a68be6b12 100644 --- a/scapy/arch/windows/sspi.py +++ b/scapy/arch/windows/sspi.py @@ -184,6 +184,22 @@ class SecHandle(ctypes.Structure): ] +class LSA_STRING(ctypes.Structure): + _fields_ = [ + ("Length", ctypes.wintypes.USHORT), + ("MaximumLength", ctypes.wintypes.USHORT), + ("Buffer", ctypes.wintypes.LPCSTR), + ] + + +class LSA_UNICODE_STRING(ctypes.Structure): + _fields_ = [ + ("Length", ctypes.wintypes.USHORT), + ("MaximumLength", ctypes.wintypes.USHORT), + ("Buffer", ctypes.wintypes.LPCWSTR), + ] + + _winapi_AcquireCredentialsHandle = ctypes.windll.secur32.AcquireCredentialsHandleW _winapi_AcquireCredentialsHandle.restype = ctypes.wintypes.DWORD _winapi_AcquireCredentialsHandle.argtypes = [ @@ -345,6 +361,73 @@ def ParseBuffer(Buffers: ctypes.ARRAY, BufferType: int, cls): ctypes.POINTER(ctypes.wintypes.LPWSTR), ] +_winapi_LsaConnectUntrusted = ctypes.windll.secur32.LsaConnectUntrusted +_winapi_LsaConnectUntrusted.restype = ctypes.wintypes.DWORD +_winapi_LsaConnectUntrusted.argtypes = [ + ctypes.POINTER(ctypes.wintypes.HANDLE), +] + +_winapi_LsaLookupAuthenticationPackage = ( + ctypes.windll.secur32.LsaLookupAuthenticationPackage +) +_winapi_LsaLookupAuthenticationPackage.restype = ctypes.wintypes.DWORD +_winapi_LsaLookupAuthenticationPackage.argtypes = [ + ctypes.wintypes.HANDLE, + ctypes.POINTER(LSA_STRING), + ctypes.POINTER(ctypes.wintypes.ULONG), +] + +_winapi_LsaCallAuthenticationPackage = ( + ctypes.windll.secur32.LsaCallAuthenticationPackage +) +_winapi_LsaCallAuthenticationPackage.restype = ctypes.wintypes.DWORD +_winapi_LsaCallAuthenticationPackage.argtypes = [ + ctypes.wintypes.HANDLE, # LsaHandle + ctypes.wintypes.ULONG, # AuthenticationPackage + ctypes.c_void_p, # ProtocolSubmitBuffer + ctypes.wintypes.ULONG, # SubmitBufferLength + ctypes.c_void_p, # *ProtocolReturnBuffer + ctypes.POINTER(ctypes.wintypes.ULONG), # ReturnBufferLength + ctypes.POINTER(ctypes.wintypes.DWORD), # ProtocolStatus +] + +_winapi_LsaDeregisterLogonProcess = ctypes.windll.secur32.LsaDeregisterLogonProcess +_winapi_LsaDeregisterLogonProcess.restype = ctypes.wintypes.DWORD +_winapi_LsaDeregisterLogonProcess.argtypes = [ + ctypes.wintypes.HANDLE, +] + +_winapi_LsaFreeReturnBuffer = ctypes.windll.secur32.LsaFreeReturnBuffer +_winapi_LsaFreeReturnBuffer.restype = ctypes.wintypes.DWORD +_winapi_LsaFreeReturnBuffer.argtypes = [ + ctypes.c_void_p, # Buffer +] + + +# See Windows::Win32::Security::Authentication::Identity in Rust + + +class KERB_PROTOCOL_MESSAGE_TYPE(enum.IntEnum): + KerbPinKdcMessage = 30 + KerbUnpinAllKdcsMessage = 31 + + +class SecPkgCallPackagePinDcRequest(ctypes.Structure): + _fields_ = [ + ("MessageType", ctypes.wintypes.ULONG), + ("Flags", ctypes.wintypes.ULONG), + ("DomainName", LSA_UNICODE_STRING), + ("DcName", LSA_UNICODE_STRING), + ("DcFlags", ctypes.wintypes.ULONG), + ] + + +class SecPkgCallPackageUnpinAllDcsRequest(ctypes.Structure): + _fields_ = [ + ("MessageType", ctypes.wintypes.ULONG), + ("Flags", ctypes.wintypes.ULONG), + ] + # Types @@ -998,3 +1081,139 @@ def GSS_UnwrapEx(self, Context, msgs, signature): for i in range(len(msgs)): msgs[i].data = MessageBuffers[i].GetData() return msgs + + # LSA SSP-specific commands + + def _LsaConnectUntrusted(self): + """ + Call LsaConnectUntrusted to get a handle towards the LSA RPC + """ + # Connect to Lsa + handle = ctypes.wintypes.HANDLE() + status = _winapi_LsaConnectUntrusted( + ctypes.byref(handle), + ) + if status != 0: + raise OSError("LsaConnectUntrusted failed with %s" % status) + + return handle + + def _LsaLookupAuthenticationPackage(self, Handle, package: str = "Kerberos"): + """ + Call LookupAuthenticationPackage to find the Package id by its name + """ + # Find the kerberos AP + PackageName = LSA_STRING() + PackageName.Length = len(package) + PackageName.MaximumLength = len(package) + PackageName.Buffer = ctypes.cast( + ctypes.create_string_buffer(package.encode()), ctypes.c_char_p + ) + AuthenticationPackage = ctypes.wintypes.ULONG() + status = _winapi_LsaLookupAuthenticationPackage( + Handle, + ctypes.byref(PackageName), + ctypes.byref(AuthenticationPackage), + ) + if status != 0: + raise OSError("LsaLookupAuthenticationPackage failed with %s" % status) + + return AuthenticationPackage + + def _LsaCallAuthenticationPackage( + self, Handle, AuthenticationPackage, ProtocolSubmitBuffer + ): + """ + Call LsaCallAuthenticationPackage to send a SSP-specific request + """ + ProtocolSubmitBuffer = bytes(ProtocolSubmitBuffer) + + ReturnBufferLength = ctypes.wintypes.ULONG() + ProtocolStatus = ctypes.wintypes.ULONG() + ProtocolReturnBuffer = ctypes.c_void_p() + status = _winapi_LsaCallAuthenticationPackage( + Handle, + AuthenticationPackage, + ProtocolSubmitBuffer, + ctypes.wintypes.ULONG(len(ProtocolSubmitBuffer)), + ctypes.byref(ProtocolReturnBuffer), + ctypes.byref(ReturnBufferLength), + ctypes.byref(ProtocolStatus), + ) + if status != 0: + raise OSError("LsaCallAuthenticationPackage failed with %s" % status) + + response = bytes(ProtocolReturnBuffer) + + if ReturnBufferLength.value != 0: + status = _winapi_LsaFreeReturnBuffer(ProtocolReturnBuffer) + if status != 0: + raise OSError("LsaFreeReturnBuffer failed with %s" % status) + + return response + + def _LsaDeregisterLogonProcess(self, Handle): + """ + Call LsaDeregisterLogonProcess to close the handle + """ + status = _winapi_LsaDeregisterLogonProcess(Handle) + if status != 0: + raise OSError("LsaDeregisterLogonProcess failed with %s" % status) + + def PinKdc(self, DomainName: str, DcName: str): + """ + Pin the KDC of a domain. This asks Windows to use that KDC to request + additional tickets, etc. It is very useful for debugging when in an environment + that has multiple KDCs. + (This only applies to this process) + """ + # Build request + ProtocolSubmitBuffer = SecPkgCallPackagePinDcRequest() + ProtocolSubmitBuffer.MessageType = KERB_PROTOCOL_MESSAGE_TYPE.KerbPinKdcMessage + ProtocolSubmitBuffer.DomainName = LSA_UNICODE_STRING() + ProtocolSubmitBuffer.DomainName.Length = len(DomainName) + ProtocolSubmitBuffer.DomainName.MaximumLength = len(DomainName) + ProtocolSubmitBuffer.DomainName.Buffer = ctypes.cast( + ctypes.create_string_buffer(DomainName.encode("utf-16le")), ctypes.c_wchar_p + ) + ProtocolSubmitBuffer.DcName = LSA_UNICODE_STRING() + ProtocolSubmitBuffer.DcName.Length = len(DcName) + ProtocolSubmitBuffer.DcName.MaximumLength = len(DcName) + ProtocolSubmitBuffer.DcName.Buffer = ctypes.cast( + ctypes.create_string_buffer(DcName.encode("utf-16le")), ctypes.c_wchar_p + ) + + # Send + Handle = self._LsaConnectUntrusted() + try: + AuthenticationPackage = self._LsaLookupAuthenticationPackage( + Handle, "Kerberos" + ) + self._LsaCallAuthenticationPackage( + Handle, AuthenticationPackage, ProtocolSubmitBuffer + ) + finally: + self._LsaDeregisterLogonProcess(Handle) + + def UnpinAllKdcs(self): + """ + Un-pin all KDCs for all domains + (This only applies to this process) + """ + # Build request + ProtocolSubmitBuffer = SecPkgCallPackageUnpinAllDcsRequest() + ProtocolSubmitBuffer.MessageType = ( + KERB_PROTOCOL_MESSAGE_TYPE.KerbUnpinAllKdcsMessage + ) + + # Send + Handle = self._LsaConnectUntrusted() + try: + AuthenticationPackage = self._LsaLookupAuthenticationPackage( + Handle, "Kerberos" + ) + self._LsaCallAuthenticationPackage( + Handle, AuthenticationPackage, ProtocolSubmitBuffer + ) + finally: + self._LsaDeregisterLogonProcess(Handle) From 68f66ad7f6161529b331831fc526bef41b7d1ccd Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Tue, 30 Jun 2026 02:59:13 +0200 Subject: [PATCH 2/2] Kerberos: begin GSS_C_DELEG_FLAG support We need to be able to make a KRB_CRED to delegate tickets using GSS_C_DELEG_FLAG AI-Assisted: no --- scapy/modules/ticketer.py | 71 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/scapy/modules/ticketer.py b/scapy/modules/ticketer.py index 27cb98a642f..e1e3a26a2a2 100644 --- a/scapy/modules/ticketer.py +++ b/scapy/modules/ticketer.py @@ -22,8 +22,8 @@ from scapy.asn1.asn1 import ( ASN1_BIT_STRING, - ASN1_GENERAL_STRING, ASN1_GENERALIZED_TIME, + ASN1_GENERAL_STRING, ASN1_INTEGER, ASN1_STRING, ) @@ -56,12 +56,15 @@ from scapy.layers.kerberos import ( AuthorizationData, AuthorizationDataItem, + EncKrbCredPart, EncTicketPart, EncryptedData, EncryptionKey, + KRB_CRED, KRB_Ticket, KerberosClient, KerberosSSP, + KrbCredInfo, PrincipalName, TransitedEncoding, _ADDR_TYPES, @@ -144,6 +147,12 @@ class CCPrincipal(Packet): ), ] + def toPrincipalName(self): + return PrincipalName( + nameType=self.name_type, + nameString=[ASN1_GENERAL_STRING(x.data) for x in self.components], + ) + def toPN(self): return "%s@%s" % ( "/".join(x.data.decode() for x in self.components), @@ -196,6 +205,11 @@ class CCAddress(Packet): PacketField("address", CCCountedOctetString(), CCCountedOctetString), ] + def toHostAddress(self): + return HostAddress( + addrType=self.addrtype, address=ASN1_STRING(self.address.data) + ) + def guess_payload_class(self, payload): return conf.padding_layer @@ -287,6 +301,61 @@ class CCache(Packet): PacketListField("credentials", [], CCCredential), ] + def toKRBCRED(self): + """ + Return a compatible KRB_CRED from this CCache structure. + This can be used when doing unconstrained delegation + """ + return KRB_CRED( + encPart=EncryptedData( + etype=0, + cipher=ASN1_STRING( + bytes( + EncKrbCredPart( + ticketInfo=[ + KrbCredInfo( + key=EncryptionKey.fromKey(cred.keyblock.toKey()), + prealm=cred.client.realm.data, + pname=cred.client.toPrincipalName(), + flags=str(cred.ticket_flags), + authtime=ASN1_GENERALIZED_TIME( + datetime.fromtimestamp( + cred.authtime, timezone.utc + ) + ), + starttime=ASN1_GENERALIZED_TIME( + datetime.fromtimestamp( + cred.starttime, timezone.utc + ) + ), + endtime=ASN1_GENERALIZED_TIME( + datetime.fromtimestamp( + cred.endtime, timezone.utc + ) + ), + renewTill=ASN1_GENERALIZED_TIME( + datetime.fromtimestamp( + cred.renew_till, timezone.utc + ) + ), + srealm=cred.server.realm.data, + sname=cred.server.toPrincipalName(), + caddr=[c.toHostAddress() for c in cred.addrs], + ) + for cred in self.credentials + if not cred.is_xcacheconf() + ] + ) + ) + ), + ), + tickets=[ + KRB_Ticket(cred.ticket.data) + for cred in self.credentials + if not cred.is_xcacheconf() + ], + ) + # Keytab # https://web.mit.edu/kerberos/krb5-devel/doc/formats/keytab_file_format.html (official but garbage)