-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathundersync.py
More file actions
106 lines (89 loc) · 3.7 KB
/
undersync.py
File metadata and controls
106 lines (89 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import pathlib
import urllib.parse
import requests
from oslo_log import log
from requests.models import HTTPError
LOG = log.getLogger(__name__)
class UndersyncError(Exception):
pass
class Undersync:
def __init__(
self,
auth_token: str | None = None,
api_url: str | None = None,
timeout: int = 90,
use_keystone_auth: bool = False,
session=None,
) -> None:
"""Simple client for Undersync.
Args:
auth_token: Authentication token. If use_keystone_auth is True,
this should be a Keystone service token. Otherwise,
it should be a JWT token. If not provided, it will be
fetched from /etc/undersync/token.
api_url: Undersync API URL.
timeout: Request timeout in seconds.
use_keystone_auth: If True, use X-Auth-Token header for Keystone
authentication. Otherwise, use Authorization:
Bearer header for JWT authentication.
session: Keystone session object for automatic token refresh.
If provided, takes precedence over auth_token for Keystone auth.
"""
self.session = session
self.token = auth_token or (
self._fetch_undersync_token() if not session else None
)
self.url = "http://undersync.undersync.svc.cluster.local:8080"
self.api_url = api_url or self.url
self.timeout = timeout
self.use_keystone_auth = use_keystone_auth
def _fetch_undersync_token(self) -> str:
file = pathlib.Path("/etc/undersync/token")
with file.open() as f:
return f.read().strip()
def _log_and_raise_for_status(self, response: requests.Response):
try:
response.raise_for_status()
except HTTPError as error:
LOG.error("Undersync error: %(error)s", {"error": error})
raise UndersyncError() from error
def sync_devices(
self, vlan_group: str, force=False, dry_run=False
) -> requests.Response:
if dry_run:
return self.dry_run(vlan_group)
elif force:
return self.force(vlan_group)
else:
return self.sync(vlan_group)
@property
def client(self):
session = requests.Session()
session.headers = {"Content-Type": "application/json"}
if self.use_keystone_auth and self.session:
# Get fresh token from session for each request (enables refresh)
token = self.session.get_token()
session.headers["X-Auth-Token"] = token
elif self.use_keystone_auth:
# Fallback to stored token for backward compatibility
session.headers["X-Auth-Token"] = self.token
else:
session.headers["Authorization"] = f"Bearer {self.token}"
return session
def _undersync_post(self, action: str, vlan_group: str) -> requests.Response:
vlan_group = urllib.parse.quote(vlan_group, safe="")
response = self.client.post(
f"{self.api_url}/v1/vlan-group/{vlan_group}/{action}", timeout=self.timeout
)
LOG.debug(
"undersync %(action)s resp: %(resp)s",
{"resp": response.json(), "action": action},
)
self._log_and_raise_for_status(response)
return response
def sync(self, vlan_group: str) -> requests.Response:
return self._undersync_post("sync", vlan_group)
def dry_run(self, vlan_group: str) -> requests.Response:
return self._undersync_post("dry-run", vlan_group)
def force(self, vlan_group: str) -> requests.Response:
return self._undersync_post("force", vlan_group)