Skip to content

Commit 06ccaa8

Browse files
committed
refactor: migrate connection string sanitization from regex to parser-based approach
- move sanitize_connection_string() to connection_string_parser.py using _ConnectionStringParser for correct ODBC braced-value handling - helpers.py retains thin delegate for backward compatibility - connection.py imports directly from connection_string_parser - add 5 new tests for braced values, escaped braces, and edge cases
1 parent 9688b10 commit 06ccaa8

4 files changed

Lines changed: 120 additions & 14 deletions

File tree

mssql_python/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import mssql_python
2121
from mssql_python.cursor import Cursor
2222
from mssql_python.helpers import (
23-
sanitize_connection_string,
2423
sanitize_user_input,
2524
validate_attribute_value,
2625
)
26+
from mssql_python.connection_string_parser import sanitize_connection_string
2727
from mssql_python.logging import logger
2828
from mssql_python import ddbc_bindings
2929
from mssql_python.pooling import PoolingManager

mssql_python/connection_string_parser.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
- Collects all errors and reports them together
1616
"""
1717

18+
import re
1819
from typing import Dict, Tuple, Optional
1920
from mssql_python.exceptions import ConnectionStringParseError
2021
from mssql_python.constants import _ALLOWED_CONNECTION_STRING_PARAMS, _RESERVED_PARAMETERS
2122
from mssql_python.helpers import sanitize_user_input
2223
from mssql_python.logging import logger
2324

25+
_SENSITIVE_KEYS = frozenset({"pwd", "password"})
26+
2427

2528
class _ConnectionStringParser:
2629
"""
@@ -375,3 +378,52 @@ def _parse_braced_value(self, connection_str: str, start_pos: int) -> Tuple[str,
375378

376379
# Reached end without finding closing '}'
377380
raise ValueError(f"Unclosed braced value starting at position {brace_start_pos}")
381+
382+
383+
def sanitize_connection_string(conn_str: str) -> str:
384+
"""
385+
Sanitize a connection string by masking sensitive values (PWD, Password).
386+
387+
Uses _ConnectionStringParser to correctly handle ODBC braced values
388+
(e.g. PWD={Top;Secret}) rather than a simple regex, which would truncate
389+
at the first semicolon and leak the tail of the password.
390+
391+
Falls back to an improved regex for malformed strings that cannot be parsed.
392+
393+
Args:
394+
conn_str (str): The connection string to sanitize.
395+
Returns:
396+
str: The sanitized connection string.
397+
"""
398+
from mssql_python.connection_string_builder import _ConnectionStringBuilder
399+
400+
logger.debug(
401+
"sanitize_connection_string: Sanitizing connection string (length=%d)", len(conn_str)
402+
)
403+
404+
try:
405+
parser = _ConnectionStringParser(validate_keywords=False)
406+
params = parser._parse(conn_str)
407+
408+
sanitized_params = {}
409+
for key, value in params.items():
410+
canonical = _ConnectionStringParser.normalize_key(key)
411+
display_key = canonical if canonical else key
412+
if key in _SENSITIVE_KEYS:
413+
sanitized_params[display_key] = "***"
414+
else:
415+
sanitized_params[display_key] = value
416+
417+
builder = _ConnectionStringBuilder(sanitized_params)
418+
sanitized = builder.build()
419+
except Exception:
420+
# Fallback for malformed strings: improved regex that handles braced values
421+
sanitized = re.sub(
422+
r"(Pwd\s*=\s*)(\{[^}]*(?:\}\}[^}]*)*\}|[^;]*)",
423+
r"\1***",
424+
conn_str,
425+
flags=re.IGNORECASE,
426+
)
427+
428+
logger.debug("sanitize_connection_string: Password fields masked")
429+
return sanitized

mssql_python/helpers.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,20 @@ def check_error(handle_type: int, handle: Any, ret: int) -> None:
4141
def sanitize_connection_string(conn_str: str) -> str:
4242
"""
4343
Sanitize the connection string by removing sensitive information.
44+
45+
Delegates to the parser-based implementation in connection_string_parser
46+
which correctly handles ODBC braced values (e.g. PWD={Top;Secret}).
47+
4448
Args:
4549
conn_str (str): The connection string to sanitize.
4650
Returns:
4751
str: The sanitized connection string.
4852
"""
49-
logger.debug(
50-
"sanitize_connection_string: Sanitizing connection string (length=%d)", len(conn_str)
53+
from mssql_python.connection_string_parser import (
54+
sanitize_connection_string as _sanitize,
5155
)
52-
# Remove sensitive information from the connection string, Pwd section
53-
# Replace Pwd=...; or Pwd=... (end of string) with Pwd=***;
54-
sanitized = re.sub(r"(Pwd\s*=\s*)[^;]*", r"\1***", conn_str, flags=re.IGNORECASE)
55-
logger.debug("sanitize_connection_string: Password fields masked")
56-
return sanitized
56+
57+
return _sanitize(conn_str)
5758

5859

5960
def sanitize_user_input(user_input: str, max_length: int = 50) -> str:

tests/test_007_logging.py

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,20 +314,73 @@ def test_pwd_sanitization(self, cleanup_logger):
314314
assert "secret123" not in sanitized
315315

316316
def test_pwd_case_insensitive(self, cleanup_logger):
317-
"""PWD/Pwd/pwd should all be sanitized (case-insensitive)"""
317+
"""PWD/Pwd/pwd should all be sanitized to canonical PWD=***"""
318318
from mssql_python.helpers import sanitize_connection_string
319319

320320
test_cases = [
321-
("Server=localhost;PWD=secret;Database=test", "PWD=***"),
322-
("Server=localhost;Pwd=secret;Database=test", "Pwd=***"),
323-
("Server=localhost;pwd=secret;Database=test", "pwd=***"),
321+
"Server=localhost;PWD=secret;Database=test",
322+
"Server=localhost;Pwd=secret;Database=test",
323+
"Server=localhost;pwd=secret;Database=test",
324324
]
325325

326-
for conn_str, expected in test_cases:
326+
for conn_str in test_cases:
327327
sanitized = sanitize_connection_string(conn_str)
328-
assert expected in sanitized
328+
assert "PWD=***" in sanitized
329329
assert "secret" not in sanitized
330330

331+
def test_pwd_braced_value_with_semicolon(self, cleanup_logger):
332+
"""PWD with braced value containing semicolons must be fully masked."""
333+
from mssql_python.helpers import sanitize_connection_string
334+
335+
conn_str = "Server=localhost;PWD={Top;Secret};Database=test"
336+
sanitized = sanitize_connection_string(conn_str)
337+
338+
assert "PWD=***" in sanitized
339+
assert "Top" not in sanitized
340+
assert "Secret" not in sanitized
341+
342+
def test_pwd_braced_value_with_escaped_braces(self, cleanup_logger):
343+
"""PWD with escaped closing braces (}}) must be fully masked."""
344+
from mssql_python.helpers import sanitize_connection_string
345+
346+
conn_str = "Server=localhost;PWD={p}}w{{d};Database=test"
347+
sanitized = sanitize_connection_string(conn_str)
348+
349+
assert "PWD=***" in sanitized
350+
assert "p}w{d" not in sanitized
351+
352+
def test_pwd_braced_value_multiple_semicolons(self, cleanup_logger):
353+
"""PWD with multiple semicolons inside braces must be fully masked."""
354+
from mssql_python.helpers import sanitize_connection_string
355+
356+
conn_str = "Server=localhost;PWD={a;b;c;d};Database=test"
357+
sanitized = sanitize_connection_string(conn_str)
358+
359+
assert "PWD=***" in sanitized
360+
for fragment in ("a;b;c;d", "{a;", "b;c", "c;d}"):
361+
assert fragment not in sanitized
362+
363+
def test_pwd_at_end_of_string(self, cleanup_logger):
364+
"""PWD at end of connection string (no trailing semicolon) must be masked."""
365+
from mssql_python.helpers import sanitize_connection_string
366+
367+
conn_str = "Server=localhost;Database=test;PWD=secret"
368+
sanitized = sanitize_connection_string(conn_str)
369+
370+
assert "PWD=***" in sanitized
371+
assert "secret" not in sanitized
372+
373+
def test_no_pwd_unchanged(self, cleanup_logger):
374+
"""Connection string without PWD should be returned intact."""
375+
from mssql_python.helpers import sanitize_connection_string
376+
377+
conn_str = "Server=localhost;Database=test;UID=user"
378+
sanitized = sanitize_connection_string(conn_str)
379+
380+
assert "Server=" in sanitized
381+
assert "Database=" in sanitized
382+
assert "UID=" in sanitized
383+
331384
def test_explicit_sanitization_in_logging(self, cleanup_logger):
332385
"""Verify that explicit sanitization works when logging"""
333386
from mssql_python.helpers import sanitize_connection_string

0 commit comments

Comments
 (0)