-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathconnection.py
More file actions
1631 lines (1386 loc) · 66.6 KB
/
connection.py
File metadata and controls
1631 lines (1386 loc) · 66.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
This module defines the Connection class, which is used to manage a connection to a database.
The class provides methods to establish a connection, create cursors, commit transactions,
roll back transactions, and close the connection.
Resource Management:
- All cursors created from this connection are tracked internally.
- When close() is called on the connection, all open cursors are automatically closed.
- Do not use any cursor after the connection is closed; doing so will raise an exception.
- Cursors are also cleaned up automatically when no longer referenced, to prevent memory leaks.
"""
import weakref
import re
import codecs
from typing import Any, Dict, Optional, Union, List, Tuple, Callable, TYPE_CHECKING
import threading
import mssql_python
from mssql_python.cursor import Cursor
from mssql_python.helpers import (
sanitize_user_input,
validate_attribute_value,
)
from mssql_python.connection_string_parser import sanitize_connection_string
from mssql_python.logging import logger
from mssql_python import ddbc_bindings
from mssql_python.pooling import PoolingManager
from mssql_python.exceptions import (
Warning, # pylint: disable=redefined-builtin
Error,
InterfaceError,
DatabaseError,
DataError,
OperationalError,
IntegrityError,
InternalError,
ProgrammingError,
NotSupportedError,
)
from mssql_python.auth import extract_auth_type, process_connection_string
from mssql_python.constants import ConstantsDDBC, GetInfoConstants
from mssql_python.connection_string_parser import _ConnectionStringParser
from mssql_python.connection_string_builder import _ConnectionStringBuilder
from mssql_python.constants import _RESERVED_PARAMETERS
if TYPE_CHECKING:
from mssql_python.row import Row
# Add SQL_WMETADATA constant for metadata decoding configuration
SQL_WMETADATA: int = -99 # Special flag for column name decoding
# Threshold to determine if an info type is string-based
INFO_TYPE_STRING_THRESHOLD: int = 10000
# UTF-16 encoding variants that should use SQL_WCHAR by default
# Note: "utf-16" with BOM is NOT included as it's problematic for SQL_WCHAR
UTF16_ENCODINGS: frozenset[str] = frozenset(["utf-16le", "utf-16be"])
def _validate_utf16_wchar_compatibility(
encoding: str, wchar_type: int, context: str = "SQL_WCHAR"
) -> None:
"""
Validates UTF-16 encoding compatibility with SQL_WCHAR.
Centralizes the validation logic to eliminate duplication across setencoding/setdecoding.
Args:
encoding: The encoding string (already normalized to lowercase)
wchar_type: The SQL_WCHAR constant value to check against
context: Context string for error messages ('SQL_WCHAR', 'SQL_WCHAR ctype', etc.)
Raises:
ProgrammingError: If encoding is incompatible with SQL_WCHAR
"""
if encoding == "utf-16":
# UTF-16 with BOM is rejected due to byte order ambiguity
logger.warning("utf-16 with BOM rejected for %s", context)
raise ProgrammingError(
driver_error="UTF-16 with Byte Order Mark not supported for SQL_WCHAR",
ddbc_error=(
"Cannot use 'utf-16' encoding with SQL_WCHAR due to Byte Order Mark ambiguity. "
"Use 'utf-16le' or 'utf-16be' instead for explicit byte order."
),
)
elif encoding not in UTF16_ENCODINGS:
# Non-UTF-16 encodings are not supported with SQL_WCHAR
logger.warning(
"Non-UTF-16 encoding %s attempted with %s", sanitize_user_input(encoding), context
)
# Generate context-appropriate error messages
if "ctype" in context:
driver_error = f"SQL_WCHAR ctype only supports UTF-16 encodings"
ddbc_context = "SQL_WCHAR ctype"
else:
driver_error = f"SQL_WCHAR only supports UTF-16 encodings"
ddbc_context = "SQL_WCHAR"
raise ProgrammingError(
driver_error=driver_error,
ddbc_error=(
f"Cannot use encoding '{encoding}' with {ddbc_context}. "
f"SQL_WCHAR requires UTF-16 encodings (utf-16le, utf-16be)"
),
)
def _validate_encoding(encoding: str) -> bool:
"""
Cached encoding validation using codecs.lookup().
Args:
encoding (str): The encoding name to validate.
Returns:
bool: True if encoding is valid, False otherwise.
Note:
Uses LRU cache to avoid repeated expensive codecs.lookup() calls.
Cache size is limited to 128 entries which should cover most use cases.
Also validates that encoding name only contains safe characters.
"""
# Basic security checks - prevent obvious attacks
if not encoding or not isinstance(encoding, str):
return False
# Check length limit (prevent DOS)
if len(encoding) > 100:
return False
# Prevent null bytes and control characters that could cause issues
if "\x00" in encoding or any(ord(c) < 32 and c not in "\t\n\r" for c in encoding):
return False
# Then check if it's a valid Python codec
try:
codecs.lookup(encoding)
return True
except LookupError:
return False
class Connection:
"""
A class to manage a connection to a database, compliant with DB-API 2.0 specifications.
This class provides methods to establish a connection to a database, create cursors,
commit transactions, roll back transactions, and close the connection. It is designed
to be used in a context where database operations are required, such as executing queries
and fetching results.
The Connection class supports the Python context manager protocol (with statement).
When used as a context manager, it will automatically close the connection when
exiting the context, ensuring proper resource cleanup.
Example usage:
with connect(connection_string) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO table VALUES (?)", [value])
# Connection is automatically closed when exiting the with block
For long-lived connections, use without context manager:
conn = connect(connection_string)
try:
# Multiple operations...
finally:
conn.close()
Methods:
__init__(database: str) -> None:
connect_to_db() -> None:
cursor() -> Cursor:
commit() -> None:
rollback() -> None:
close() -> None:
__enter__() -> Connection:
__exit__() -> None:
setencoding(encoding=None, ctype=None) -> None:
setdecoding(sqltype, encoding=None, ctype=None) -> None:
getdecoding(sqltype) -> dict:
set_attr(attribute, value) -> None:
"""
# DB-API 2.0 Exception attributes
# These allow users to catch exceptions using connection.Error,
# connection.ProgrammingError, etc.
Warning = Warning
Error = Error
InterfaceError = InterfaceError
DatabaseError = DatabaseError
DataError = DataError
OperationalError = OperationalError
IntegrityError = IntegrityError
InternalError = InternalError
ProgrammingError = ProgrammingError
NotSupportedError = NotSupportedError
def __init__(
self,
connection_str: str = "",
autocommit: bool = False,
attrs_before: Optional[Dict[int, Union[int, str, bytes]]] = None,
timeout: int = 0,
native_uuid: Optional[bool] = None,
**kwargs: Any,
) -> None:
"""
Initialize the connection object with the specified connection string and parameters.
Args:
connection_str (str): The connection string to connect to.
autocommit (bool): If True, causes a commit to be performed after
each SQL statement.
attrs_before (dict, optional): Dictionary of connection attributes to set before
connection establishment. Keys are SQL_ATTR_* constants,
and values are their corresponding settings.
Use this for attributes that must be set before
connecting, such as SQL_ATTR_LOGIN_TIMEOUT,
SQL_ATTR_ODBC_CURSORS, and SQL_ATTR_PACKET_SIZE.
timeout (int): Login timeout in seconds. 0 means no timeout.
native_uuid (bool, optional): Controls whether UNIQUEIDENTIFIER columns return
uuid.UUID objects (True) or str (False) for cursors created from this connection.
None (default) defers to the module-level ``mssql_python.native_uuid`` setting (True).
**kwargs: Additional key/value pairs for the connection string.
Returns:
None
Raises:
ValueError: If the connection string is invalid or connection fails.
This method sets up the initial state for the connection object,
preparing it for further operations such as connecting to the
database, executing queries, etc.
Example:
>>> # Setting login timeout using attrs_before
>>> import mssql_python as ms
>>> conn = ms.connect("Server=myserver;Database=mydb",
... attrs_before={ms.SQL_ATTR_LOGIN_TIMEOUT: 30})
>>> # Return native uuid.UUID objects instead of strings
>>> conn = ms.connect("Server=myserver;Database=mydb", native_uuid=True)
"""
# Store per-connection native_uuid override.
# None means "use module-level mssql_python.native_uuid".
if native_uuid is not None and not isinstance(native_uuid, bool):
raise ValueError("native_uuid must be a boolean value or None")
self._native_uuid = native_uuid
self.connection_str = self._construct_connection_string(connection_str, **kwargs)
self._attrs_before = attrs_before or {}
# Initialize encoding settings with defaults for Python 3
# Python 3 only has str (which is Unicode), so we use utf-16le by default
self._encoding_settings = {
"encoding": "utf-16le",
"ctype": ConstantsDDBC.SQL_WCHAR.value,
}
# Initialize decoding settings with Python 3 defaults
self._decoding_settings = {
ConstantsDDBC.SQL_CHAR.value: {
"encoding": "utf-8",
"ctype": ConstantsDDBC.SQL_CHAR.value,
},
ConstantsDDBC.SQL_WCHAR.value: {
"encoding": "utf-16le",
"ctype": ConstantsDDBC.SQL_WCHAR.value,
},
SQL_WMETADATA: {
"encoding": "utf-16le",
"ctype": ConstantsDDBC.SQL_WCHAR.value,
},
}
# Auth type for acquiring fresh tokens at bulk copy time.
# We intentionally do NOT cache the token — a fresh one is acquired
# each time bulkcopy() is called to avoid expired-token errors.
self._auth_type = None
# Check if the connection string contains authentication parameters
# This is important for processing the connection string correctly.
# If authentication is specified, it will be processed to handle
# different authentication types like interactive, device code, etc.
if re.search(r"authentication", self.connection_str, re.IGNORECASE):
connection_result = process_connection_string(self.connection_str)
self.connection_str = connection_result[0]
if connection_result[1]:
self._attrs_before.update(connection_result[1])
# Store auth type so bulkcopy() can acquire a fresh token later.
# On Windows Interactive, process_connection_string returns None
# (DDBC handles auth natively), so fall back to the connection string.
self._auth_type = connection_result[2] or extract_auth_type(self.connection_str)
self._closed = False
self._timeout = timeout
# Using WeakSet which automatically removes cursors when they are no
# longer in use
# It is a set that holds weak references to its elements.
# When an object is only weakly referenced, it can be garbage
# collected even if it's still in the set.
# It prevents memory leaks by ensuring that cursors are cleaned up
# when no longer in use without requiring explicit deletion.
# TODO: Think and implement scenarios for multi-threaded access
# to cursors
self._cursors = weakref.WeakSet()
# Initialize output converters dictionary and its lock for thread safety
self._output_converters = {}
self._converters_lock = threading.Lock()
# Initialize encoding/decoding settings lock for thread safety
# This lock protects both _encoding_settings and _decoding_settings dictionaries
# from concurrent modification. We use a simple Lock (not RLock) because:
# - Write operations (setencoding/setdecoding) replace the entire dict atomically
# - Read operations (getencoding/getdecoding) return a copy, so they're safe
# - No recursive locking is needed in our usage pattern
# This is more performant than RLock for the multiple-readers-single-writer pattern
self._encoding_lock = threading.Lock()
# Initialize search escape character
self._searchescape = None
# Auto-enable pooling if user never called
if not PoolingManager.is_initialized():
PoolingManager.enable()
self._pooling = PoolingManager.is_enabled()
self._conn = ddbc_bindings.Connection(
self.connection_str, self._pooling, self._attrs_before
)
self.setautocommit(autocommit)
# Register this connection for cleanup before Python shutdown
# This ensures ODBC handles are freed in correct order, preventing leaks
try:
if hasattr(mssql_python, "_register_connection"):
mssql_python._register_connection(self)
except AttributeError as e:
# If registration fails, continue - cleanup will still happen via __del__
logger.warning(
f"Failed to register connection for shutdown cleanup: {type(e).__name__}: {e}"
)
except Exception as e:
# Catch any other unexpected errors during registration
logger.error(
f"Unexpected error during connection registration: {type(e).__name__}: {e}"
)
def _construct_connection_string(self, connection_str: str = "", **kwargs: Any) -> str:
"""
Construct the connection string by parsing, validating, and merging parameters.
This method performs a 6-step process:
1. Parse and validate the base connection_str (validates against allowlist)
2. Normalize parameter names (e.g., addr/address -> Server, uid -> UID)
3. Merge kwargs (which override connection_str params after normalization)
4. Build connection string from normalized, merged params
5. Add Driver and APP parameters (always controlled by the driver)
6. Return the final connection string
Args:
connection_str (str): The base connection string.
**kwargs: Additional key/value pairs for the connection string.
Returns:
str: The constructed and validated connection string.
"""
# Step 1: Parse base connection string with allowlist validation
# The parser validates everything: unknown params, reserved params, duplicates, syntax
parser = _ConnectionStringParser(validate_keywords=True)
parsed_params = parser._parse(connection_str)
# Step 2: Normalize parameter names (e.g., addr/address -> Server, uid -> UID)
# This handles synonym mapping and deduplication via normalized keys
normalized_params = _ConnectionStringParser._normalize_params(
parsed_params, warn_rejected=False
)
# Step 3: Process kwargs and merge with normalized_params
# kwargs override connection string values (processed after, so they take precedence)
for key, value in kwargs.items():
normalized_key = _ConnectionStringParser.normalize_key(key)
if normalized_key:
# Driver and APP are reserved - raise error if user tries to set them
if normalized_key in _RESERVED_PARAMETERS:
raise ValueError(
f"Connection parameter '{key}' is reserved and controlled by the driver. "
f"It cannot be set by the user."
)
# kwargs override any existing values from connection string
normalized_params[normalized_key] = str(value)
else:
logger.warning(f"Ignoring unknown connection parameter from kwargs: {key}")
# Step 4: Build connection string with merged params
builder = _ConnectionStringBuilder(normalized_params)
# Step 5: Add Driver and APP parameters (always controlled by the driver)
# These maintain existing behavior: Driver is always hardcoded, APP is always MSSQL-Python
builder.add_param("Driver", "ODBC Driver 18 for SQL Server")
builder.add_param("APP", "MSSQL-Python")
# Step 6: Build final string
conn_str = builder.build()
logger.info("Final connection string: %s", sanitize_connection_string(conn_str))
return conn_str
@property
def timeout(self) -> int:
"""
Get the current query timeout setting in seconds.
Returns:
int: The timeout value in seconds. Zero means no timeout (wait indefinitely).
"""
return self._timeout
@timeout.setter
def timeout(self, value: int) -> None:
"""
Set the query timeout for all operations performed by this connection.
Args:
value (int): The timeout value in seconds. Zero means no timeout.
Returns:
None
Note:
This timeout applies to all cursors created from this connection.
It cannot be changed for individual cursors or SQL statements.
If a query timeout occurs, an OperationalError exception will be raised.
"""
if not isinstance(value, int):
raise TypeError("Timeout must be an integer")
if value < 0:
raise ValueError("Timeout cannot be negative")
self._timeout = value
logger.info(f"Query timeout set to {value} seconds")
@property
def autocommit(self) -> bool:
"""
Return the current autocommit mode of the connection.
Returns:
bool: True if autocommit is enabled, False otherwise.
"""
return self._conn.get_autocommit()
@autocommit.setter
def autocommit(self, value: bool) -> None:
"""
Set the autocommit mode of the connection.
Args:
value (bool): True to enable autocommit, False to disable it.
Returns:
None
"""
self.setautocommit(value)
logger.info("Autocommit mode set to %s.", value)
@property
def closed(self) -> bool:
"""
Returns True if the connection is closed, False otherwise.
This property indicates whether close() was explicitly called on
the connection. Note that this does not indicate whether the
connection is healthy/alive - if a timeout or network issue breaks
the connection, closed would still be False until close() is
explicitly called.
Returns:
bool: True if the connection is closed, False otherwise.
"""
return self._closed
def setautocommit(self, value: bool = False) -> None:
"""
Set the autocommit mode of the connection.
Args:
value (bool): True to enable autocommit, False to disable it.
Returns:
None
Raises:
DatabaseError: If there is an error while setting the autocommit mode.
"""
self._conn.set_autocommit(value)
def setencoding(self, encoding: Optional[str] = None, ctype: Optional[int] = None) -> None:
"""
Sets the text encoding for SQL statements and text parameters.
Since Python 3 only has str (which is Unicode), this method configures
how text is encoded when sending to the database.
Args:
encoding (str, optional): The encoding to use. This must be a valid Python
encoding that converts text to bytes. If None, defaults to 'utf-16le'.
ctype (int, optional): The C data type to use when passing data:
SQL_CHAR or SQL_WCHAR. If not provided, SQL_WCHAR is used for
UTF-16 variants (see UTF16_ENCODINGS constant). SQL_CHAR is used
for all other encodings.
Returns:
None
Raises:
ProgrammingError: If the encoding is not valid or not supported.
InterfaceError: If the connection is closed.
Example:
# For databases that only communicate with UTF-8
cnxn.setencoding(encoding='utf-8')
# For explicitly using SQL_CHAR
cnxn.setencoding(encoding='utf-8', ctype=mssql_python.SQL_CHAR)
"""
logger.debug(
"setencoding: Configuring encoding=%s, ctype=%s",
str(encoding) if encoding else "default",
str(ctype) if ctype else "auto",
)
if self._closed:
logger.debug("setencoding: Connection is closed")
raise InterfaceError(
driver_error="Connection is closed",
ddbc_error="Connection is closed",
)
# Set default encoding if not provided
if encoding is None:
encoding = "utf-16le"
logger.debug("setencoding: Using default encoding=utf-16le")
# Validate encoding using cached validation for better performance
if not _validate_encoding(encoding):
# Log the sanitized encoding for security
logger.warning(
"Invalid encoding attempted: %s",
sanitize_user_input(str(encoding)),
)
raise ProgrammingError(
driver_error=f"Unsupported encoding: {encoding}",
ddbc_error=f"The encoding '{encoding}' is not supported by Python",
)
# Normalize encoding to casefold for more robust Unicode handling
encoding = encoding.casefold()
logger.debug("setencoding: Encoding normalized to %s", encoding)
# Early validation if ctype is already specified as SQL_WCHAR
if ctype == ConstantsDDBC.SQL_WCHAR.value:
_validate_utf16_wchar_compatibility(encoding, ctype, "SQL_WCHAR")
# Set default ctype based on encoding if not provided
if ctype is None:
if encoding in UTF16_ENCODINGS:
ctype = ConstantsDDBC.SQL_WCHAR.value
logger.debug("setencoding: Auto-selected SQL_WCHAR for UTF-16")
else:
ctype = ConstantsDDBC.SQL_CHAR.value
logger.debug("setencoding: Auto-selected SQL_CHAR for non-UTF-16")
# Validate ctype
valid_ctypes = [ConstantsDDBC.SQL_CHAR.value, ConstantsDDBC.SQL_WCHAR.value]
if ctype not in valid_ctypes:
# Log the sanitized ctype for security
logger.warning(
"Invalid ctype attempted: %s",
sanitize_user_input(str(ctype)),
)
raise ProgrammingError(
driver_error=f"Invalid ctype: {ctype}",
ddbc_error=(
f"ctype must be SQL_CHAR ({ConstantsDDBC.SQL_CHAR.value}) or "
f"SQL_WCHAR ({ConstantsDDBC.SQL_WCHAR.value})"
),
)
# Final validation: SQL_WCHAR ctype only supports UTF-16 encodings (without BOM)
if ctype == ConstantsDDBC.SQL_WCHAR.value:
_validate_utf16_wchar_compatibility(encoding, ctype, "SQL_WCHAR")
# Store the encoding settings (thread-safe with lock)
with self._encoding_lock:
self._encoding_settings = {"encoding": encoding, "ctype": ctype}
# Log with sanitized values for security
logger.info(
"Text encoding set to %s with ctype %s",
sanitize_user_input(encoding),
sanitize_user_input(str(ctype)),
)
def getencoding(self) -> Dict[str, Union[str, int]]:
"""
Gets the current text encoding settings (thread-safe).
Returns:
dict: A dictionary containing 'encoding' and 'ctype' keys.
Raises:
InterfaceError: If the connection is closed.
Example:
settings = cnxn.getencoding()
print(f"Current encoding: {settings['encoding']}")
print(f"Current ctype: {settings['ctype']}")
Note:
This method is thread-safe and can be called from multiple threads concurrently.
Returns a copy of the settings to prevent external modification.
"""
if self._closed:
raise InterfaceError(
driver_error="Connection is closed",
ddbc_error="Connection is closed",
)
# Thread-safe read with lock to prevent race conditions
with self._encoding_lock:
return self._encoding_settings.copy()
def setdecoding(
self, sqltype: int, encoding: Optional[str] = None, ctype: Optional[int] = None
) -> None:
"""
Sets the text decoding used when reading SQL_CHAR and SQL_WCHAR from the database.
This method configures how text data is decoded when reading from the database.
In Python 3, all text is Unicode (str), so this primarily affects the encoding
used to decode bytes from the database.
Args:
sqltype (int): The SQL type being configured: SQL_CHAR, SQL_WCHAR, or SQL_WMETADATA.
SQL_WMETADATA is a special flag for configuring column name decoding.
encoding (str, optional): The Python encoding to use when decoding the data.
If None, uses default encoding based on sqltype.
ctype (int, optional): The C data type to request from SQLGetData:
SQL_CHAR or SQL_WCHAR. If None, uses default based on encoding.
Returns:
None
Raises:
ProgrammingError: If the sqltype, encoding, or ctype is invalid.
InterfaceError: If the connection is closed.
Example:
# Configure SQL_CHAR to use UTF-8 decoding
cnxn.setdecoding(mssql_python.SQL_CHAR, encoding='utf-8')
# Configure column metadata decoding
cnxn.setdecoding(mssql_python.SQL_WMETADATA, encoding='utf-16le')
# Use explicit ctype
cnxn.setdecoding(mssql_python.SQL_WCHAR, encoding='utf-16le',
ctype=mssql_python.SQL_WCHAR)
"""
if self._closed:
raise InterfaceError(
driver_error="Connection is closed",
ddbc_error="Connection is closed",
)
# Validate sqltype
valid_sqltypes = [
ConstantsDDBC.SQL_CHAR.value,
ConstantsDDBC.SQL_WCHAR.value,
SQL_WMETADATA,
]
if sqltype not in valid_sqltypes:
logger.warning(
"Invalid sqltype attempted: %s",
sanitize_user_input(str(sqltype)),
)
raise ProgrammingError(
driver_error=f"Invalid sqltype: {sqltype}",
ddbc_error=(
f"sqltype must be SQL_CHAR ({ConstantsDDBC.SQL_CHAR.value}), "
f"SQL_WCHAR ({ConstantsDDBC.SQL_WCHAR.value}), or "
f"SQL_WMETADATA ({SQL_WMETADATA})"
),
)
# Set default encoding based on sqltype if not provided
if encoding is None:
if sqltype == ConstantsDDBC.SQL_CHAR.value:
encoding = "utf-8" # Default for SQL_CHAR in Python 3
else: # SQL_WCHAR or SQL_WMETADATA
encoding = "utf-16le" # Default for SQL_WCHAR in Python 3
# Validate encoding using cached validation for better performance
if not _validate_encoding(encoding):
logger.warning(
"Invalid encoding attempted: %s",
sanitize_user_input(str(encoding)),
)
raise ProgrammingError(
driver_error=f"Unsupported encoding: {encoding}",
ddbc_error=f"The encoding '{encoding}' is not supported by Python",
)
# Normalize encoding to lowercase for consistency
encoding = encoding.lower()
# Validate SQL_WCHAR encoding compatibility
if sqltype == ConstantsDDBC.SQL_WCHAR.value:
_validate_utf16_wchar_compatibility(encoding, sqltype, "SQL_WCHAR sqltype")
# SQL_WMETADATA can use any valid encoding (UTF-8, UTF-16, etc.)
# No restriction needed here - let users configure as needed
# Set default ctype based on encoding if not provided
if ctype is None:
if encoding in UTF16_ENCODINGS:
ctype = ConstantsDDBC.SQL_WCHAR.value
else:
ctype = ConstantsDDBC.SQL_CHAR.value
# Validate ctype
valid_ctypes = [ConstantsDDBC.SQL_CHAR.value, ConstantsDDBC.SQL_WCHAR.value]
if ctype not in valid_ctypes:
logger.warning(
"Invalid ctype attempted: %s",
sanitize_user_input(str(ctype)),
)
raise ProgrammingError(
driver_error=f"Invalid ctype: {ctype}",
ddbc_error=(
f"ctype must be SQL_CHAR ({ConstantsDDBC.SQL_CHAR.value}) or "
f"SQL_WCHAR ({ConstantsDDBC.SQL_WCHAR.value})"
),
)
# Validate SQL_WCHAR ctype encoding compatibility
if ctype == ConstantsDDBC.SQL_WCHAR.value:
_validate_utf16_wchar_compatibility(encoding, ctype, "SQL_WCHAR ctype")
# Store the decoding settings for the specified sqltype (thread-safe with lock)
with self._encoding_lock:
self._decoding_settings[sqltype] = {"encoding": encoding, "ctype": ctype}
# Log with sanitized values for security
sqltype_name = {
ConstantsDDBC.SQL_CHAR.value: "SQL_CHAR",
ConstantsDDBC.SQL_WCHAR.value: "SQL_WCHAR",
SQL_WMETADATA: "SQL_WMETADATA",
}.get(sqltype, str(sqltype))
logger.info(
"Text decoding set for %s to %s with ctype %s",
sqltype_name,
sanitize_user_input(encoding),
sanitize_user_input(str(ctype)),
)
def getdecoding(self, sqltype: int) -> Dict[str, Union[str, int]]:
"""
Gets the current text decoding settings for the specified SQL type (thread-safe).
Args:
sqltype (int): The SQL type to get settings for: SQL_CHAR, SQL_WCHAR, or SQL_WMETADATA.
Returns:
dict: A dictionary containing 'encoding' and 'ctype' keys for the specified sqltype.
Raises:
ProgrammingError: If the sqltype is invalid.
InterfaceError: If the connection is closed.
Example:
settings = cnxn.getdecoding(mssql_python.SQL_CHAR)
print(f"SQL_CHAR encoding: {settings['encoding']}")
print(f"SQL_CHAR ctype: {settings['ctype']}")
Note:
This method is thread-safe and can be called from multiple threads concurrently.
Returns a copy of the settings to prevent external modification.
"""
if self._closed:
raise InterfaceError(
driver_error="Connection is closed",
ddbc_error="Connection is closed",
)
# Validate sqltype
valid_sqltypes = [
ConstantsDDBC.SQL_CHAR.value,
ConstantsDDBC.SQL_WCHAR.value,
SQL_WMETADATA,
]
if sqltype not in valid_sqltypes:
raise ProgrammingError(
driver_error=f"Invalid sqltype: {sqltype}",
ddbc_error=(
f"sqltype must be SQL_CHAR ({ConstantsDDBC.SQL_CHAR.value}), "
f"SQL_WCHAR ({ConstantsDDBC.SQL_WCHAR.value}), or "
f"SQL_WMETADATA ({SQL_WMETADATA})"
),
)
# Thread-safe read with lock to prevent race conditions
with self._encoding_lock:
return self._decoding_settings[sqltype].copy()
def set_attr(self, attribute: int, value: Union[int, str, bytes, bytearray]) -> None:
"""
Set a connection attribute.
This method sets a connection attribute using SQLSetConnectAttr.
It provides pyodbc-compatible functionality for configuring connection
behavior such as autocommit mode, transaction isolation level, and
connection timeouts.
Args:
attribute (int): The connection attribute to set. Should be one of the
SQL_ATTR_* constants (e.g., SQL_ATTR_AUTOCOMMIT,
SQL_ATTR_TXN_ISOLATION).
value: The value to set for the attribute. Can be an integer, string,
bytes, or bytearray depending on the attribute type.
Raises:
InterfaceError: If the connection is closed or attribute is invalid.
ProgrammingError: If the value type or range is invalid.
ProgrammingError: If the attribute cannot be set after connection.
Example:
>>> conn.set_attr(SQL_ATTR_TXN_ISOLATION, SQL_TXN_READ_COMMITTED)
Note:
Some attributes (like SQL_ATTR_LOGIN_TIMEOUT, SQL_ATTR_ODBC_CURSORS, and
SQL_ATTR_PACKET_SIZE) can only be set before connection establishment and
must be provided in the attrs_before parameter when creating the connection.
Attempting to set these attributes after connection will raise a ProgrammingError.
"""
if self._closed:
raise InterfaceError(
"Cannot set attribute on closed connection", "Connection is closed"
)
# Use the integrated validation helper function with connection state
is_valid, error_message, sanitized_attr, sanitized_val = validate_attribute_value(
attribute, value, is_connected=True
)
if not is_valid:
# Use the already sanitized values for logging
logger.debug(
"warning",
f"Invalid attribute or value: {sanitized_attr}={sanitized_val}, {error_message}",
)
raise ProgrammingError(
driver_error=f"Invalid attribute or value: {error_message}",
ddbc_error=error_message,
)
# Log with sanitized values
logger.debug(f"Setting connection attribute: {sanitized_attr}={sanitized_val}")
try:
# Call the underlying C++ method
self._conn.set_attr(attribute, value)
logger.info(f"Connection attribute {sanitized_attr} set successfully")
except Exception as e:
error_msg = f"Failed to set connection attribute {sanitized_attr}: {str(e)}"
# Determine appropriate exception type based on error content
error_str = str(e).lower()
if "invalid" in error_str or "unsupported" in error_str or "cast" in error_str:
logger.error(error_msg)
raise InterfaceError(error_msg, str(e)) from e
logger.error(error_msg)
raise ProgrammingError(error_msg, str(e)) from e
@property
def searchescape(self) -> str:
"""
The ODBC search pattern escape character, as returned by
SQLGetInfo(SQL_SEARCH_PATTERN_ESCAPE), used to escape special characters
such as '%' and '_' in LIKE clauses. These are driver specific.
Returns:
str: The search pattern escape character (usually '\' or another character)
"""
if not hasattr(self, "_searchescape") or self._searchescape is None:
try:
escape_char = self.getinfo(GetInfoConstants.SQL_SEARCH_PATTERN_ESCAPE.value)
# Some drivers might return this as an integer memory address
# or other non-string format, so ensure we have a string
if not isinstance(escape_char, str):
# Default to backslash if not a string
escape_char = "\\"
self._searchescape = escape_char
except Exception as e:
# Log the exception for debugging, but do not expose sensitive info
logger.debug(
"warning",
"Failed to retrieve search escape character, using default '\\'. "
"Exception: %s",
type(e).__name__,
)
self._searchescape = "\\"
return self._searchescape
def cursor(self) -> Cursor:
"""
Return a new Cursor object using the connection.
This method creates and returns a new cursor object that can be used to
execute SQL queries and fetch results. The cursor is associated with the
current connection and allows interaction with the database.
Returns:
Cursor: A new cursor object for executing SQL queries.
Raises:
DatabaseError: If there is an error while creating the cursor.
InterfaceError: If there is an error related to the database interface.
"""
logger.debug(
"cursor: Creating new cursor - timeout=%d, total_cursors=%d",
self._timeout,
len(self._cursors),
)
if self._closed:
logger.error("cursor: Cannot create cursor on closed connection")
# raise InterfaceError
raise InterfaceError(
driver_error="Cannot create cursor on closed connection",
ddbc_error="Cannot create cursor on closed connection",
)
cursor = Cursor(self, timeout=self._timeout)
self._cursors.add(cursor) # Track the cursor
logger.debug("cursor: Cursor created successfully - total_cursors=%d", len(self._cursors))
return cursor
def add_output_converter(self, sqltype: int, func: Callable[[Any], Any]) -> None:
"""
Register an output converter function that will be called whenever a value
with the given SQL type is read from the database.
Thread-safe implementation that protects the converters dictionary with a lock.
⚠️ WARNING: Registering an output converter will cause the supplied Python function
to be executed on every matching database value. Do not register converters from
untrusted sources, as this can result in arbitrary code execution and security
vulnerabilities. This API should never be exposed to untrusted or external input.
Args:
sqltype (int): The integer SQL type value to convert, which can be one of the
defined standard constants (e.g. SQL_VARCHAR) or a database-specific
value (e.g. -151 for the SQL Server 2008 geometry data type).
func (callable): The converter function which will be called with a single parameter,
the value, and should return the converted value. If the value is NULL
then the parameter passed to the function will be None, otherwise it
will be a bytes object.
Returns:
None
"""
with self._converters_lock:
self._output_converters[sqltype] = func
# Pass to the underlying connection if native implementation supports it
if hasattr(self._conn, "add_output_converter"):
self._conn.add_output_converter(sqltype, func)
logger.info(f"Added output converter for SQL type {sqltype}")
def get_output_converter(self, sqltype: Union[int, type]) -> Optional[Callable[[Any], Any]]:
"""
Get the output converter function for the specified SQL type.
Thread-safe implementation that protects the converters dictionary with a lock.
Args:
sqltype (int or type): The SQL type value or Python type to get the converter for
Returns:
callable or None: The converter function or None if no converter is registered
Note:
⚠️ The returned converter function will be executed on database values. Only use
converters from trusted sources.
"""
with self._converters_lock:
return self._output_converters.get(sqltype)
def remove_output_converter(self, sqltype: Union[int, type]) -> None:
"""
Remove the output converter function for the specified SQL type.