Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions python/neutron-understack/neutron_understack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
"the '/etc/undersync/token' will be read instead."
),
),
cfg.BoolOpt(
"undersync_use_keystone_auth",
default=False,
help=(
"Use Keystone authentication for Undersync. "
"If True, the driver will use the existing Neutron service token "
"from the [keystone_authtoken] config section instead of the "
"static JWT token."
),
),
cfg.BoolOpt(
"undersync_dry_run", default=True, help="Call Undersync with dry-run mode"
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from uuid import UUID

from keystoneauth1 import loading as ks_loading
from keystoneauth1 import session as ks_session
from neutron_lib import constants as p_const
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import events
Expand Down Expand Up @@ -37,11 +39,59 @@ def connectivity(self): # type: ignore
def initialize(self):
config.register_ml2_understack_opts(cfg.CONF)
conf = cfg.CONF.ml2_understack
self.undersync = Undersync(conf.undersync_token, conf.undersync_url)

if conf.undersync_use_keystone_auth:
session = self._get_keystone_session()
self.undersync = Undersync(
session=session,
api_url=conf.undersync_url,
use_keystone_auth=True,
)
else:
self.undersync = Undersync(conf.undersync_token, conf.undersync_url)

self.ironic_client = IronicClient()
self.trunk_driver = UnderStackTrunkDriver.create(self)
self.subscribe()

def _get_keystone_session(self) -> ks_session.Session:
"""Get a Keystone session using the Neutron service credentials.

This uses the existing [keystone_authtoken] configuration section
to create a session that can automatically refresh tokens.

Returns:
ks_session.Session: The Keystone session for authenticating with Undersync.

Raises:
Exception: If unable to create session from Keystone.
"""
try:
# Load credentials from the [keystone_authtoken] section
auth = ks_loading.load_auth_from_conf_options(
cfg.CONF, "keystone_authtoken"
)
# Create session manually to avoid missing config options
sess = ks_session.Session(auth=auth, timeout=30)

# Verify we can get a token (test session is working)
token = sess.get_token()
if not token:
raise ValueError("Unable to obtain initial token from session")

LOG.info(
"Successfully created Keystone session for Undersync authentication"
)
return sess

except Exception as e:
LOG.error(
"Failed to create Keystone session: %(error)s. "
"Please check your [keystone_authtoken] configuration.",
{"error": e},
)
raise

def subscribe(self):
registry.subscribe(
routers.handle_router_interface_removal,
Expand Down Expand Up @@ -266,7 +316,7 @@ def _bind_port_segment(self, context: PortContext, segment):
current_vlan_segment = utils.vlan_segment_for_physnet(context, vlan_group_name)
if current_vlan_segment:
LOG.info(
"vlan segment: %(segment)s already preset for physnet: " "%(physnet)s",
"vlan segment: %(segment)s already preset for physnet: %(physnet)s",
{"segment": current_vlan_segment, "physnet": vlan_group_name},
)
dynamic_segment = current_vlan_segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def ironic_client(mocker) -> IronicClient:
@pytest.fixture
def understack_driver(oslo_config, ironic_client) -> UnderstackDriver:
driver = UnderstackDriver()
driver.undersync = Undersync("auth_token", "api_url")
driver.undersync = Undersync("auth_token", "api_url", use_keystone_auth=False)
driver.ironic_client = ironic_client
return driver

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from itertools import cycle
from unittest.mock import Mock

from neutron_understack.neutron_understack_mech import UnderstackDriver


class TestKeystoneTokenRefresh:
"""Test that Keystone tokens can be refreshed when they expire."""

def test_undersync_refreshes_expired_keystone_token(self, mocker, oslo_config):
"""Test that Undersync can handle token expiration by refreshing the token.

This test simulates a scenario where:
1. First call to get_token() returns token_1
2. First Undersync API call succeeds with token_1
3. Second call to get_token() returns token_2 (token_1 expired)
4. Second Undersync API call succeeds with token_2

This proves the session can refresh tokens automatically.
"""
oslo_config.config(
undersync_use_keystone_auth=True,
group="ml2_understack",
)

# Mock the keystone session to simulate token refresh
mock_session = Mock()
# Use cycle to provide different tokens on each call (simulating refresh)
mock_session.get_token.side_effect = cycle(["token_1", "token_2", "token_3"])

mocker.patch("keystoneauth1.loading.load_auth_from_conf_options")
mocker.patch(
"neutron_understack.neutron_understack_mech.ks_session.Session",
return_value=mock_session,
)

# Mock IronicClient to avoid config issues
mocker.patch("neutron_understack.neutron_understack_mech.IronicClient")

# Create driver - this should store the session, not extract token
driver = UnderstackDriver()
driver.initialize()

# Mock the HTTP requests to verify the correct token is used
mock_post = mocker.patch("requests.Session.post")
mock_post.return_value.status_code = 200
mock_post.return_value.json.return_value = {"result": "success"}

# First call - should use token_1
result1 = driver.undersync.sync_devices("vlan-group-1")

# Second call - should refresh and use token_2
result2 = driver.undersync.sync_devices("vlan-group-2")

# Verify session.get_token() was called three times:
# 1. During initialization (verification)
# 2. First sync_devices call
# 3. Second sync_devices call
assert mock_session.get_token.call_count == 3

# Verify both calls succeeded
assert result1.status_code == 200
assert result2.status_code == 200

# Verify HTTP calls were made
assert (
len(mock_post.call_args_list) == 2
), f"Expected 2 calls, got {len(mock_post.call_args_list)}"

# The key verification is that session.get_token() was called for each request,
# proving that tokens are refreshed rather than cached
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from dataclasses import dataclass

import pytest
from oslo_config import cfg

from neutron_understack.neutron_understack_mech import UnderstackDriver


class TestUpdatePortPostCommit:
Expand Down Expand Up @@ -95,3 +98,59 @@ class FakeContext:
]

understack_driver.create_network_postcommit(FakeContext())


class TestKeystoneAuthentication:
def test_initialize_with_keystone_auth(self, mocker, oslo_config):
"""Test that driver initializes with Keystone authentication when enabled."""
oslo_config.config(
undersync_use_keystone_auth=True,
group="ml2_understack",
)

mock_auth = mocker.patch("keystoneauth1.loading.load_auth_from_conf_options")
mock_session_class = mocker.patch(
"neutron_understack.neutron_understack_mech.ks_session.Session"
)
mock_get_token = mocker.MagicMock(return_value="test_service_token")

mock_session_instance = mocker.MagicMock()
mock_session_instance.get_token = mock_get_token
mock_session_class.return_value = mock_session_instance

# Mock IronicClient to avoid config issues
mocker.patch("neutron_understack.neutron_understack_mech.IronicClient")

driver = UnderstackDriver()
driver.initialize()

mock_auth.assert_called_once_with(cfg.CONF, "keystone_authtoken")
mock_session_class.assert_called_once()
mock_get_token.assert_called_once()
assert driver.undersync.use_keystone_auth is True
assert driver.undersync.session == mock_session_instance

def test_initialize_with_jwt_auth(self, mocker, oslo_config):
"""Test that driver initializes with JWT auth only."""
oslo_config.config(
undersync_use_keystone_auth=False,
undersync_token="test_jwt_token",
group="ml2_understack",
)

mock_auth = mocker.patch("keystoneauth1.loading.load_auth_from_conf_options")
mock_session = mocker.patch(
"keystoneauth1.loading.load_session_from_conf_options"
)

# Mock IronicClient to avoid config issues
mocker.patch("neutron_understack.neutron_understack_mech.IronicClient")

driver = UnderstackDriver()
driver.initialize()

# Should not call Keystone auth functions
mock_auth.assert_not_called()
mock_session.assert_not_called()
assert driver.undersync.use_keystone_auth is False
assert driver.undersync.token == "test_jwt_token"
52 changes: 52 additions & 0 deletions python/neutron-understack/neutron_understack/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,3 +635,55 @@ def test_port_bound_to_uuid_when_agent_reports_hostname(self, mocker):

assert result == "trunk-456"
mock_ironic.baremetal_node_uuid.assert_called_once_with("gateway-host-1")


class TestUndersyncAuthentication:
def test_undersync_with_keystone_auth(self, mocker):
"""Test that Undersync client uses X-Auth-Token header when enabled."""
from neutron_understack.undersync import Undersync

client = Undersync(
auth_token="test_token",
api_url="http://test.api",
use_keystone_auth=True,
)

# Access the client property to trigger its creation
session = client.client

assert session.headers["Content-Type"] == "application/json"
assert session.headers["X-Auth-Token"] == "test_token"
assert "Authorization" not in session.headers

def test_undersync_with_jwt_auth(self, mocker):
"""Test that Undersync client uses Authorization Bearer header with JWT."""
from neutron_understack.undersync import Undersync

client = Undersync(
auth_token="test_jwt_token",
api_url="http://test.api",
use_keystone_auth=False,
)

# Access the client property to trigger its creation
session = client.client

assert session.headers["Content-Type"] == "application/json"
assert session.headers["Authorization"] == "Bearer test_jwt_token"
assert "X-Auth-Token" not in session.headers

def test_undersync_default_to_jwt_auth(self, mocker):
"""Test that Undersync client defaults to JWT auth when not specified."""
from neutron_understack.undersync import Undersync

client = Undersync(
auth_token="test_token",
api_url="http://test.api",
)

# Access the client property to trigger its creation
session = client.client

assert session.headers["Content-Type"] == "application/json"
assert session.headers["Authorization"] == "Bearer test_token"
assert "X-Auth-Token" not in session.headers
43 changes: 35 additions & 8 deletions python/neutron-understack/neutron_understack/undersync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pathlib
import urllib.parse
from functools import cached_property

import requests
from oslo_log import log
Expand All @@ -19,12 +18,32 @@ def __init__(
auth_token: str | None = None,
api_url: str | None = None,
timeout: int = 90,
use_keystone_auth: bool = False,
session=None,
) -> None:
"""Simple client for Undersync."""
self.token = auth_token or self._fetch_undersync_token()
"""Simple client for Undersync.

Args:
auth_token: Authentication token. If use_keystone_auth is True,
this should be a Keystone service token. Otherwise,
it should be a JWT token. If not provided, it will be
fetched from /etc/undersync/token.
api_url: Undersync API URL.
timeout: Request timeout in seconds.
use_keystone_auth: If True, use X-Auth-Token header for Keystone
authentication. Otherwise, use Authorization:
Bearer header for JWT authentication.
session: Keystone session object for automatic token refresh.
If provided, takes precedence over auth_token for Keystone auth.
"""
self.session = session
self.token = auth_token or (
self._fetch_undersync_token() if not session else None
)
self.url = "http://undersync.undersync.svc.cluster.local:8080"
self.api_url = api_url or self.url
self.timeout = timeout
self.use_keystone_auth = use_keystone_auth

def _fetch_undersync_token(self) -> str:
file = pathlib.Path("/etc/undersync/token")
Expand All @@ -48,13 +67,21 @@ def sync_devices(
else:
return self.sync(vlan_group)

@cached_property
@property
def client(self):
session = requests.Session()
session.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}",
}
session.headers = {"Content-Type": "application/json"}

if self.use_keystone_auth and self.session:
# Get fresh token from session for each request (enables refresh)
token = self.session.get_token()
session.headers["X-Auth-Token"] = token
elif self.use_keystone_auth:
# Fallback to stored token for backward compatibility
session.headers["X-Auth-Token"] = self.token
else:
session.headers["Authorization"] = f"Bearer {self.token}"

return session

def _undersync_post(self, action: str, vlan_group: str) -> requests.Response:
Expand Down
2 changes: 2 additions & 0 deletions python/neutron-understack/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,6 @@ target-version = "py312"
"S311", # allow non-cryptographic secure bits for test data
"S101",
"RUF012", # test fixture classes often use mutable class-level data
"S105", # allow hardcoded passwords for testing
"S106", # allow hardcoded passwords for testing
]
Loading