diff --git a/scapy/layers/hsrp.py b/scapy/layers/hsrp.py index b64b9425d16..07a475b8176 100644 --- a/scapy/layers/hsrp.py +++ b/scapy/layers/hsrp.py @@ -11,13 +11,16 @@ - HSRP Version 2: http://www.smartnetworks.jp/2006/02/hsrp_8_hsrp_version_2.html """ +import socket + from scapy.config import conf -from scapy.compat import orb +from scapy.compat import orb, plain_str from scapy.fields import ByteEnumField, ByteField, IntField, IPField, \ ShortEnumField, ShortField, SourceIPField, StrFixedLenField, \ - XIntField, XShortField + XIntField, XShortField, Field, MACField, StrLenField from scapy.packet import Packet, bind_layers, bind_bottom_up +from scapy.pton_ntop import inet_ntop, inet_pton from scapy.layers.inet import DestIPField, UDP @@ -32,6 +35,88 @@ } _HSRP_ADVERTISE_TYPES = {1: "HSRP interface state"} _HSRP_ADVERTISE_STATES = {1: "Active", 2: "Passive"} +_HSRP_V2_STATES = { + 1: "Initial", + 2: "Learn", + 3: "Listen", + 4: "Speak", + 5: "Standby", + 6: "Active", +} +_HSRP_V2_TLV_TYPES = { + 1: "Group State", + 2: "Interface State", + 3: "Text Authentication", + 4: "MD5 Authentication", +} +_HSRP_V2_IP_VERSIONS = { + 4: "IPv4", + 6: "IPv6", +} + + +def _is_hsrpv2(pkt): + if not pkt or len(pkt) < 2: + return False + + tlvtype = orb(pkt[0:1]) + tlvlength = orb(pkt[1:2]) + + if tlvtype not in _HSRP_V2_TLV_TYPES: + return False + + if len(pkt) < 2 + tlvlength: + return False + + if tlvtype == 1: + return ( + tlvlength == 40 and + len(pkt) >= 6 and + orb(pkt[2:3]) == 2 and + orb(pkt[3:4]) in _HSRP_OPCODES and + orb(pkt[4:5]) in _HSRP_V2_STATES and + orb(pkt[5:6]) in _HSRP_V2_IP_VERSIONS + ) + + if tlvtype == 2: + return tlvlength == 4 + + if tlvtype == 3: + return tlvlength == 8 + + if tlvtype == 4: + return tlvlength == 28 + + return True + + +class _HSRPv2VirtualIPField(Field): + def __init__(self, name, default): + Field.__init__(self, name, default, "16s") + + def i2m(self, pkt, x): + if x is None: + return b"\x00" * 16 + if isinstance(x, bytes): + if len(x) == 4: + return x + b"\x00" * 12 + if len(x) == 16: + return x + raise ValueError("Virtual IP must be 4 or 16 bytes") + x = plain_str(x) + if pkt is not None and pkt.ipversion == 6: + return inet_pton(socket.AF_INET6, x) + return inet_pton(socket.AF_INET, x) + b"\x00" * 12 + + def m2i(self, pkt, x): + if pkt is not None and pkt.ipversion == 6: + return inet_ntop(socket.AF_INET6, x) + return inet_ntop(socket.AF_INET, x[:4]) + + def i2repr(self, pkt, x): + if isinstance(x, bytes): + x = self.m2i(pkt, x) + return plain_str(x) class HSRP(Packet): @@ -51,6 +136,8 @@ class HSRP(Packet): @classmethod def dispatch_hook(cls, _pkt=None, *args, **kargs): + if _is_hsrpv2(_pkt): + return HSRPv2 if _pkt and len(_pkt) >= 2 and orb(_pkt[1:2]) == 3: return HSRPAdvertise return cls @@ -62,6 +149,111 @@ def guess_payload_class(self, payload): return Packet.guess_payload_class(self, payload) +class HSRPv2(Packet): + name = "HSRPv2" + fields_desc = [] + + def guess_payload_class(self, payload): + if payload: + return HSRPv2TLV + return Packet.guess_payload_class(self, payload) + + +class HSRPv2TLV(Packet): + name = "HSRPv2 TLV" + + @classmethod + def dispatch_hook(cls, _pkt=None, *args, **kargs): + if _pkt and len(_pkt) >= 1: + tlv_type = orb(_pkt[0:1]) + if tlv_type == 1: + return HSRPv2GroupStateTLV + if tlv_type == 2: + return HSRPv2InterfaceStateTLV + if tlv_type == 3: + return HSRPv2TextAuthTLV + if tlv_type == 4: + return HSRPv2MD5AuthTLV + return HSRPv2UnknownTLV + + +class _HSRPv2TLVPayload(Packet): + name = "HSRPv2 TLV Payload" + + def guess_payload_class(self, payload): + if payload: + return HSRPv2TLV + return Packet.guess_payload_class(self, payload) + + def post_build(self, p, pay): + if self.len is None: + tlv_len = len(p) - 2 + if tlv_len > 255: + raise ValueError("HSRPv2 TLV length exceeds 255 bytes") + p = p[:1] + bytes([tlv_len]) + p[2:] + return p + pay + + +class HSRPv2GroupStateTLV(_HSRPv2TLVPayload): + name = "HSRPv2 Group State TLV" + fields_desc = [ + ByteEnumField("type", 1, _HSRP_V2_TLV_TYPES), + ByteField("len", None), + ByteField("version", 2), + ByteEnumField("opcode", 0, _HSRP_OPCODES), + ByteEnumField("state", 6, _HSRP_V2_STATES), + ByteEnumField("ipversion", 4, _HSRP_V2_IP_VERSIONS), + ShortField("group", 1), + MACField("identifier", "00:00:00:00:00:00"), + IntField("priority", 100), + IntField("hellotime", 3000), + IntField("holdtime", 10000), + _HSRPv2VirtualIPField("virtualIP", b"\x00" * 16), + ] + + +class HSRPv2InterfaceStateTLV(_HSRPv2TLVPayload): + name = "HSRPv2 Interface State TLV" + fields_desc = [ + ByteEnumField("type", 2, _HSRP_V2_TLV_TYPES), + ByteField("len", None), + ShortField("activegroups", 0), + ShortField("passivegroups", 0), + ] + + +class HSRPv2TextAuthTLV(_HSRPv2TLVPayload): + name = "HSRPv2 Text Authentication TLV" + fields_desc = [ + ByteEnumField("type", 3, _HSRP_V2_TLV_TYPES), + ByteField("len", None), + StrFixedLenField("auth", b"cisco" + b"\x00" * 3, 8), + ] + + +class HSRPv2MD5AuthTLV(_HSRPv2TLVPayload): + name = "HSRPv2 MD5 Authentication TLV" + fields_desc = [ + ByteEnumField("type", 4, _HSRP_V2_TLV_TYPES), + ByteField("len", None), + ByteEnumField("algo", 1, {1: "MD5"}), + ByteField("padding", 0), + XShortField("flags", 0), + SourceIPField("sourceip"), + XIntField("keyid", 0), + StrFixedLenField("authdigest", b"\x00" * 16, 16), + ] + + +class HSRPv2UnknownTLV(_HSRPv2TLVPayload): + name = "HSRPv2 Unknown TLV" + fields_desc = [ + ByteEnumField("type", 0, _HSRP_V2_TLV_TYPES), + ByteField("len", None), + StrLenField("value", b"", length_from=lambda pkt: pkt.len), + ] + + class HSRPAdvertise(Packet): name = "HSRP Advertise" fields_desc = [ @@ -104,11 +296,19 @@ def post_build(self, p, pay): bind_bottom_up(UDP, HSRP, dport=1985) bind_bottom_up(UDP, HSRP, sport=1985) -bind_bottom_up(UDP, HSRP, dport=2029) -bind_bottom_up(UDP, HSRP, sport=2029) bind_layers(UDP, HSRP, dport=1985, sport=1985) -bind_layers(UDP, HSRP, dport=2029, sport=2029) + DestIPField.bind_addr(UDP, "224.0.0.2", dport=1985) +""" +since port 1954 (UDP) would be shared by both hsrpv1 and hsprv2 (ipv4), +users building hsrpv2 (ipv4) packets should set IP(dst="224.0.0.102") explicitly to avoid using wrong multicast destination +""" +# DestIPField.bind_addr(UDP, "224.0.0.102", dport=1985) + if conf.ipv6_enabled: from scapy.layers.inet6 import DestIP6Field + + bind_bottom_up(UDP, HSRPv2, dport=2029) + bind_bottom_up(UDP, HSRPv2, sport=2029) + bind_layers(UDP, HSRPv2, dport=2029, sport=2029) DestIP6Field.bind_addr(UDP, "ff02::66", dport=2029) diff --git a/test/scapy/layers/hsrp.uts b/test/scapy/layers/hsrp.uts index 00bbbb7340d..03c994ac9a2 100644 --- a/test/scapy/layers/hsrp.uts +++ b/test/scapy/layers/hsrp.uts @@ -32,3 +32,101 @@ assert raw( reserved2=0x6f000000, ) ) == advertise_raw + += HSRPv2 - IPv4 Group State build & dissection +virtual_ip4 = "192.0.2.1" +virtual_ip4_raw = b"\xc0\x00\x02\x01" + b"\x00" * 12 +pkt = IP(raw( + IP(dst="224.0.0.102") / + UDP(dport=1985, sport=1985) / + HSRPv2() / + HSRPv2GroupStateTLV( + group=2, + identifier="00:05:73:a0:00:02", + priority=110, + hellotime=1000, + holdtime=3000, + virtualIP=virtual_ip4, + ) +)) +assert pkt[IP].dst == "224.0.0.102" and pkt[UDP].sport == pkt[UDP].dport == 1985 +assert HSRPv2 in pkt and HSRPv2GroupStateTLV in pkt and HSRP not in pkt +assert pkt[HSRPv2GroupStateTLV].type == 1 and pkt[HSRPv2GroupStateTLV].len == 40 +assert pkt[HSRPv2GroupStateTLV].version == 2 and pkt[HSRPv2GroupStateTLV].state == 6 +assert pkt[HSRPv2GroupStateTLV].group == 2 +assert pkt[HSRPv2GroupStateTLV].identifier == "00:05:73:a0:00:02" +assert pkt[HSRPv2GroupStateTLV].priority == 110 +assert pkt[HSRPv2GroupStateTLV].hellotime == 1000 +assert pkt[HSRPv2GroupStateTLV].holdtime == 3000 +assert pkt[HSRPv2GroupStateTLV].virtualIP == virtual_ip4 +assert raw(pkt[HSRPv2GroupStateTLV])[-16:] == virtual_ip4_raw + += HSRPv2 - IPv6 Group State build & dissection +if conf.ipv6_enabled: + virtual_ip6 = "2001:db8::1" + virtual_ip6_raw = b"\x20\x01\x0d\xb8" + b"\x00" * 11 + b"\x01" + pkt = IPv6(raw( + IPv6() / + UDP(dport=2029, sport=2029) / + HSRPv2() / + HSRPv2GroupStateTLV( + ipversion=6, + group=2, + virtualIP=virtual_ip6, + ) + )) + assert pkt[IPv6].dst == "ff02::66" and pkt[UDP].sport == pkt[UDP].dport == 2029 + assert HSRPv2 in pkt and HSRPv2GroupStateTLV in pkt + assert pkt[HSRPv2GroupStateTLV].len == 40 + assert pkt[HSRPv2GroupStateTLV].ipversion == 6 + assert pkt[HSRPv2GroupStateTLV].virtualIP == virtual_ip6 + assert raw(pkt[HSRPv2GroupStateTLV])[-16:] == virtual_ip6_raw + += HSRPv2 - multiple TLVs +pkt = HSRPv2(raw( + HSRPv2GroupStateTLV() / + HSRPv2TextAuthTLV(auth=b"secret\x00\x00") / + HSRPv2InterfaceStateTLV(activegroups=1, passivegroups=2) +)) +assert HSRPv2GroupStateTLV in pkt +assert HSRPv2TextAuthTLV in pkt +assert HSRPv2InterfaceStateTLV in pkt +assert pkt[HSRPv2GroupStateTLV].len == 40 +assert pkt[HSRPv2TextAuthTLV].len == 8 and pkt[HSRPv2TextAuthTLV].auth == b"secret\x00\x00" +assert pkt[HSRPv2InterfaceStateTLV].len == 4 +assert pkt[HSRPv2InterfaceStateTLV].activegroups == 1 +assert pkt[HSRPv2InterfaceStateTLV].passivegroups == 2 + += HSRPv2 - Unknown TLV build & dissection +unknown_raw = raw(HSRPv2UnknownTLV(type=99, value=b"abcd")) +pkt = HSRPv2(unknown_raw) +assert unknown_raw == b"\x63\x04abcd" +assert HSRPv2UnknownTLV in pkt +assert pkt[HSRPv2UnknownTLV].type == 99 +assert pkt[HSRPv2UnknownTLV].len == 4 +assert pkt[HSRPv2UnknownTLV].value == b"abcd" + += HSRPv2 - MD5 Auth TLV build & dissection +digest = b"\x11" * 16 +pkt = HSRPv2(raw( + HSRPv2MD5AuthTLV( + sourceip="192.0.2.1", + keyid=7, + authdigest=digest, + ) +)) +assert HSRPv2MD5AuthTLV in pkt +assert pkt[HSRPv2MD5AuthTLV].type == 4 and pkt[HSRPv2MD5AuthTLV].len == 28 +assert pkt[HSRPv2MD5AuthTLV].algo == 1 +assert pkt[HSRPv2MD5AuthTLV].sourceip == "192.0.2.1" +assert pkt[HSRPv2MD5AuthTLV].keyid == 7 +assert pkt[HSRPv2MD5AuthTLV].authdigest == digest + += HSRPv2 - TLV length overflow +try: + raw(HSRPv2UnknownTLV(type=99, value=b"A" * 256)) + overflow_raised = False +except ValueError: + overflow_raised = True + +assert overflow_raised