Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 205 additions & 5 deletions scapy/layers/hsrp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
- HSRP Version 2:
http://www.smartnetworks.jp/2006/02/hsrp_8_hsrp_version_2.html
"""
import socket


from scapy.config import conf
from scapy.compat import orb
from scapy.compat import orb, plain_str
from scapy.fields import ByteEnumField, ByteField, IntField, IPField, \
ShortEnumField, ShortField, SourceIPField, StrFixedLenField, \
XIntField, XShortField
XIntField, XShortField, Field, MACField, StrLenField
from scapy.packet import Packet, bind_layers, bind_bottom_up
from scapy.pton_ntop import inet_ntop, inet_pton
from scapy.layers.inet import DestIPField, UDP


Expand All @@ -32,6 +35,88 @@
}
_HSRP_ADVERTISE_TYPES = {1: "HSRP interface state"}
_HSRP_ADVERTISE_STATES = {1: "Active", 2: "Passive"}
_HSRP_V2_STATES = {
1: "Initial",
2: "Learn",
3: "Listen",
4: "Speak",
5: "Standby",
6: "Active",
}
_HSRP_V2_TLV_TYPES = {
1: "Group State",
2: "Interface State",
3: "Text Authentication",
4: "MD5 Authentication",
}
_HSRP_V2_IP_VERSIONS = {
4: "IPv4",
6: "IPv6",
}


def _is_hsrpv2(pkt):
if not pkt or len(pkt) < 2:
return False

tlvtype = orb(pkt[0:1])
tlvlength = orb(pkt[1:2])

if tlvtype not in _HSRP_V2_TLV_TYPES:
return False

if len(pkt) < 2 + tlvlength:
return False

if tlvtype == 1:
return (
tlvlength == 40 and
len(pkt) >= 6 and
orb(pkt[2:3]) == 2 and
orb(pkt[3:4]) in _HSRP_OPCODES and
orb(pkt[4:5]) in _HSRP_V2_STATES and
orb(pkt[5:6]) in _HSRP_V2_IP_VERSIONS
)

if tlvtype == 2:
return tlvlength == 4

if tlvtype == 3:
return tlvlength == 8

if tlvtype == 4:
return tlvlength == 28

return True


class _HSRPv2VirtualIPField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "16s")

def i2m(self, pkt, x):
if x is None:
return b"\x00" * 16
if isinstance(x, bytes):
if len(x) == 4:
return x + b"\x00" * 12
if len(x) == 16:
return x
raise ValueError("Virtual IP must be 4 or 16 bytes")
x = plain_str(x)
if pkt is not None and pkt.ipversion == 6:
return inet_pton(socket.AF_INET6, x)
return inet_pton(socket.AF_INET, x) + b"\x00" * 12

def m2i(self, pkt, x):
if pkt is not None and pkt.ipversion == 6:
return inet_ntop(socket.AF_INET6, x)
return inet_ntop(socket.AF_INET, x[:4])

def i2repr(self, pkt, x):
if isinstance(x, bytes):
x = self.m2i(pkt, x)
return plain_str(x)


class HSRP(Packet):
Expand All @@ -51,6 +136,8 @@ class HSRP(Packet):

@classmethod
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _is_hsrpv2(_pkt):
return HSRPv2
if _pkt and len(_pkt) >= 2 and orb(_pkt[1:2]) == 3:
return HSRPAdvertise
return cls
Expand All @@ -62,6 +149,111 @@ def guess_payload_class(self, payload):
return Packet.guess_payload_class(self, payload)


class HSRPv2(Packet):
name = "HSRPv2"
fields_desc = []

def guess_payload_class(self, payload):
if payload:
return HSRPv2TLV
return Packet.guess_payload_class(self, payload)


class HSRPv2TLV(Packet):
name = "HSRPv2 TLV"

@classmethod
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _pkt and len(_pkt) >= 1:
tlv_type = orb(_pkt[0:1])
if tlv_type == 1:
return HSRPv2GroupStateTLV
if tlv_type == 2:
return HSRPv2InterfaceStateTLV
if tlv_type == 3:
return HSRPv2TextAuthTLV
if tlv_type == 4:
return HSRPv2MD5AuthTLV
return HSRPv2UnknownTLV


class _HSRPv2TLVPayload(Packet):
name = "HSRPv2 TLV Payload"

def guess_payload_class(self, payload):
if payload:
return HSRPv2TLV
return Packet.guess_payload_class(self, payload)

def post_build(self, p, pay):
if self.len is None:
tlv_len = len(p) - 2
if tlv_len > 255:
raise ValueError("HSRPv2 TLV length exceeds 255 bytes")
p = p[:1] + bytes([tlv_len]) + p[2:]
return p + pay


class HSRPv2GroupStateTLV(_HSRPv2TLVPayload):
name = "HSRPv2 Group State TLV"
fields_desc = [
ByteEnumField("type", 1, _HSRP_V2_TLV_TYPES),
ByteField("len", None),
ByteField("version", 2),
ByteEnumField("opcode", 0, _HSRP_OPCODES),
ByteEnumField("state", 6, _HSRP_V2_STATES),
ByteEnumField("ipversion", 4, _HSRP_V2_IP_VERSIONS),
ShortField("group", 1),
MACField("identifier", "00:00:00:00:00:00"),
IntField("priority", 100),
IntField("hellotime", 3000),
IntField("holdtime", 10000),
_HSRPv2VirtualIPField("virtualIP", b"\x00" * 16),
]


class HSRPv2InterfaceStateTLV(_HSRPv2TLVPayload):
name = "HSRPv2 Interface State TLV"
fields_desc = [
ByteEnumField("type", 2, _HSRP_V2_TLV_TYPES),
ByteField("len", None),
ShortField("activegroups", 0),
ShortField("passivegroups", 0),
]


class HSRPv2TextAuthTLV(_HSRPv2TLVPayload):
name = "HSRPv2 Text Authentication TLV"
fields_desc = [
ByteEnumField("type", 3, _HSRP_V2_TLV_TYPES),
ByteField("len", None),
StrFixedLenField("auth", b"cisco" + b"\x00" * 3, 8),
]


class HSRPv2MD5AuthTLV(_HSRPv2TLVPayload):
name = "HSRPv2 MD5 Authentication TLV"
fields_desc = [
ByteEnumField("type", 4, _HSRP_V2_TLV_TYPES),
ByteField("len", None),
ByteEnumField("algo", 1, {1: "MD5"}),
ByteField("padding", 0),
XShortField("flags", 0),
SourceIPField("sourceip"),
XIntField("keyid", 0),
StrFixedLenField("authdigest", b"\x00" * 16, 16),
]


class HSRPv2UnknownTLV(_HSRPv2TLVPayload):
name = "HSRPv2 Unknown TLV"
fields_desc = [
ByteEnumField("type", 0, _HSRP_V2_TLV_TYPES),
ByteField("len", None),
StrLenField("value", b"", length_from=lambda pkt: pkt.len),
]


class HSRPAdvertise(Packet):
name = "HSRP Advertise"
fields_desc = [
Expand Down Expand Up @@ -104,11 +296,19 @@ def post_build(self, p, pay):

bind_bottom_up(UDP, HSRP, dport=1985)
bind_bottom_up(UDP, HSRP, sport=1985)
bind_bottom_up(UDP, HSRP, dport=2029)
bind_bottom_up(UDP, HSRP, sport=2029)
bind_layers(UDP, HSRP, dport=1985, sport=1985)
bind_layers(UDP, HSRP, dport=2029, sport=2029)

DestIPField.bind_addr(UDP, "224.0.0.2", dport=1985)
"""
since port 1954 (UDP) would be shared by both hsrpv1 and hsprv2 (ipv4),
users building hsrpv2 (ipv4) packets should set IP(dst="224.0.0.102") explicitly to avoid using wrong multicast destination
"""
# DestIPField.bind_addr(UDP, "224.0.0.102", dport=1985)

if conf.ipv6_enabled:
from scapy.layers.inet6 import DestIP6Field

bind_bottom_up(UDP, HSRPv2, dport=2029)
bind_bottom_up(UDP, HSRPv2, sport=2029)
bind_layers(UDP, HSRPv2, dport=2029, sport=2029)
DestIP6Field.bind_addr(UDP, "ff02::66", dport=2029)
98 changes: 98 additions & 0 deletions test/scapy/layers/hsrp.uts
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,101 @@ assert raw(
reserved2=0x6f000000,
)
) == advertise_raw

= HSRPv2 - IPv4 Group State build & dissection
virtual_ip4 = "192.0.2.1"
virtual_ip4_raw = b"\xc0\x00\x02\x01" + b"\x00" * 12
pkt = IP(raw(
IP(dst="224.0.0.102") /
UDP(dport=1985, sport=1985) /
HSRPv2() /
HSRPv2GroupStateTLV(
group=2,
identifier="00:05:73:a0:00:02",
priority=110,
hellotime=1000,
holdtime=3000,
virtualIP=virtual_ip4,
)
))
assert pkt[IP].dst == "224.0.0.102" and pkt[UDP].sport == pkt[UDP].dport == 1985
assert HSRPv2 in pkt and HSRPv2GroupStateTLV in pkt and HSRP not in pkt
assert pkt[HSRPv2GroupStateTLV].type == 1 and pkt[HSRPv2GroupStateTLV].len == 40
assert pkt[HSRPv2GroupStateTLV].version == 2 and pkt[HSRPv2GroupStateTLV].state == 6
assert pkt[HSRPv2GroupStateTLV].group == 2
assert pkt[HSRPv2GroupStateTLV].identifier == "00:05:73:a0:00:02"
assert pkt[HSRPv2GroupStateTLV].priority == 110
assert pkt[HSRPv2GroupStateTLV].hellotime == 1000
assert pkt[HSRPv2GroupStateTLV].holdtime == 3000
assert pkt[HSRPv2GroupStateTLV].virtualIP == virtual_ip4
assert raw(pkt[HSRPv2GroupStateTLV])[-16:] == virtual_ip4_raw

= HSRPv2 - IPv6 Group State build & dissection
if conf.ipv6_enabled:
virtual_ip6 = "2001:db8::1"
virtual_ip6_raw = b"\x20\x01\x0d\xb8" + b"\x00" * 11 + b"\x01"
pkt = IPv6(raw(
IPv6() /
UDP(dport=2029, sport=2029) /
HSRPv2() /
HSRPv2GroupStateTLV(
ipversion=6,
group=2,
virtualIP=virtual_ip6,
)
))
assert pkt[IPv6].dst == "ff02::66" and pkt[UDP].sport == pkt[UDP].dport == 2029
assert HSRPv2 in pkt and HSRPv2GroupStateTLV in pkt
assert pkt[HSRPv2GroupStateTLV].len == 40
assert pkt[HSRPv2GroupStateTLV].ipversion == 6
assert pkt[HSRPv2GroupStateTLV].virtualIP == virtual_ip6
assert raw(pkt[HSRPv2GroupStateTLV])[-16:] == virtual_ip6_raw

= HSRPv2 - multiple TLVs
pkt = HSRPv2(raw(
HSRPv2GroupStateTLV() /
HSRPv2TextAuthTLV(auth=b"secret\x00\x00") /
HSRPv2InterfaceStateTLV(activegroups=1, passivegroups=2)
))
assert HSRPv2GroupStateTLV in pkt
assert HSRPv2TextAuthTLV in pkt
assert HSRPv2InterfaceStateTLV in pkt
assert pkt[HSRPv2GroupStateTLV].len == 40
assert pkt[HSRPv2TextAuthTLV].len == 8 and pkt[HSRPv2TextAuthTLV].auth == b"secret\x00\x00"
assert pkt[HSRPv2InterfaceStateTLV].len == 4
assert pkt[HSRPv2InterfaceStateTLV].activegroups == 1
assert pkt[HSRPv2InterfaceStateTLV].passivegroups == 2

= HSRPv2 - Unknown TLV build & dissection
unknown_raw = raw(HSRPv2UnknownTLV(type=99, value=b"abcd"))
pkt = HSRPv2(unknown_raw)
assert unknown_raw == b"\x63\x04abcd"
assert HSRPv2UnknownTLV in pkt
assert pkt[HSRPv2UnknownTLV].type == 99
assert pkt[HSRPv2UnknownTLV].len == 4
assert pkt[HSRPv2UnknownTLV].value == b"abcd"

= HSRPv2 - MD5 Auth TLV build & dissection
digest = b"\x11" * 16
pkt = HSRPv2(raw(
HSRPv2MD5AuthTLV(
sourceip="192.0.2.1",
keyid=7,
authdigest=digest,
)
))
assert HSRPv2MD5AuthTLV in pkt
assert pkt[HSRPv2MD5AuthTLV].type == 4 and pkt[HSRPv2MD5AuthTLV].len == 28
assert pkt[HSRPv2MD5AuthTLV].algo == 1
assert pkt[HSRPv2MD5AuthTLV].sourceip == "192.0.2.1"
assert pkt[HSRPv2MD5AuthTLV].keyid == 7
assert pkt[HSRPv2MD5AuthTLV].authdigest == digest

= HSRPv2 - TLV length overflow
try:
raw(HSRPv2UnknownTLV(type=99, value=b"A" * 256))
overflow_raised = False
except ValueError:
overflow_raised = True

assert overflow_raised
Loading