Skip to content

Commit 7fe5431

Browse files
authored
FEAT: BCP implementation in mssql-python driver using rust (#402)
<!-- mssql-python maintainers: ADO Work Item --> > [AB#41033](https://sqlclientdrivers.visualstudio.com/c6d89619-62de-46a0-8b46-70b92a84d85e/_workitems/edit/41033) <!-- External contributors: GitHub Issue --> > GitHub Issue: #<ISSUE_NUMBER> ------------------------------------------------------------------- ### Summary <!-- Insert your summary of changes below. Minimum 10 characters required. --> This pull request introduces a new method to the `cursor.py` file that enables efficient bulk copy operations using a Rust-based implementation. The main addition is the `_bulkcopy` method, which leverages the `mssql_py_core` Rust library to perform high-performance data transfers from Python into SQL Server tables. The method includes robust input validation, connection handling, and error management. **Bulk Copy Feature:** * Added a new `_bulkcopy` method to the `cursor.py` class, allowing bulk data insertion into a SQL Server table using the Rust-based `mssql_py_core` library. This method supports configurable batch size, timeout, and column mappings, and provides detailed error handling and resource cleanup. **Input Validation and Error Handling:** * The `_bulkcopy` method validates its parameters (such as `table_name`, `batch_size`, and `timeout`) and checks for the presence of required connection string components, raising appropriate exceptions for invalid input or missing dependencies. **Integration and Resource Management:** * Ensures that Rust resources (connection and cursor) are properly closed after the operation, even in the case of exceptions, to prevent resource leaks. <!-- ### PR Title Guide > For feature requests FEAT: (short-description) > For non-feature requests like test case updates, config updates , dependency updates etc CHORE: (short-description) > For Fix requests FIX: (short-description) > For doc update requests DOC: (short-description) > For Formatting, indentation, or styling update STYLE: (short-description) > For Refactor, without any feature changes REFACTOR: (short-description) > For release related changes, without any feature changes RELEASE: #<RELEASE_VERSION> (short-description) ### Contribution Guidelines External contributors: - Create a GitHub issue first: https://github.com/microsoft/mssql-python/issues/new - Link the GitHub issue in the "GitHub Issue" section above - Follow the PR title format and provide a meaningful summary mssql-python maintainers: - Create an ADO Work Item following internal processes - Link the ADO Work Item in the "ADO Work Item" section above - Follow the PR title format and provide a meaningful summary -->
1 parent dc0e162 commit 7fe5431

2 files changed

Lines changed: 186 additions & 2 deletions

File tree

main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
print(f"Database ID: {row[0]}, Name: {row[1]}")
1616

1717
cursor.close()
18-
conn.close()
18+
conn.close()

mssql_python/cursor.py

Lines changed: 185 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import uuid
1616
import datetime
1717
import warnings
18-
from typing import List, Union, Any, Optional, Tuple, Sequence, TYPE_CHECKING
18+
from typing import List, Union, Any, Optional, Tuple, Sequence, TYPE_CHECKING, Iterable
1919
from mssql_python.constants import ConstantsDDBC as ddbc_sql_const, SQLTypes
2020
from mssql_python.helpers import check_error
2121
from mssql_python.logging import logger
@@ -2451,6 +2451,190 @@ def nextset(self) -> Union[bool, None]:
24512451
)
24522452
return True
24532453

2454+
def _bulkcopy(
2455+
self, table_name: str, data: Iterable[Union[Tuple, List]], **kwargs
2456+
): # pragma: no cover
2457+
"""
2458+
Perform bulk copy operation for high-performance data loading.
2459+
2460+
Args:
2461+
table_name: Target table name (can include schema, e.g., 'dbo.MyTable').
2462+
The table must exist and the user must have INSERT permissions.
2463+
2464+
data: Iterable of tuples or lists containing row data to be inserted.
2465+
2466+
Data Format Requirements:
2467+
- Each element in the iterable represents one row
2468+
- Each row should be a tuple or list of column values
2469+
- Column order must match the target table's column order (by ordinal
2470+
position), unless column_mappings is specified
2471+
- The number of values in each row must match the number of columns
2472+
in the target table
2473+
2474+
**kwargs: Additional bulk copy options.
2475+
2476+
column_mappings (List[Tuple[int, str]], optional):
2477+
Maps source data column indices to target table column names.
2478+
Each tuple is (source_index, target_column_name) where:
2479+
- source_index: 0-based index of the column in the source data
2480+
- target_column_name: Name of the target column in the database table
2481+
2482+
When omitted: Columns are mapped by ordinal position (first data
2483+
column → first table column, second → second, etc.)
2484+
2485+
When specified: Only the mapped columns are inserted; unmapped
2486+
source columns are ignored, and unmapped target columns must
2487+
have default values or allow NULL.
2488+
2489+
Returns:
2490+
Dictionary with bulk copy results including:
2491+
- rows_copied: Number of rows successfully copied
2492+
- batch_count: Number of batches processed
2493+
- elapsed_time: Time taken for the operation
2494+
2495+
Raises:
2496+
ImportError: If mssql_py_core library is not installed
2497+
TypeError: If data is None, not iterable, or is a string/bytes
2498+
ValueError: If table_name is empty or parameters are invalid
2499+
RuntimeError: If connection string is not available
2500+
"""
2501+
try:
2502+
import mssql_py_core
2503+
except ImportError as exc:
2504+
raise ImportError(
2505+
"Bulk copy requires the mssql_py_core library which is not installed. "
2506+
"To install, run: pip install mssql_py_core "
2507+
) from exc
2508+
2509+
# Validate inputs
2510+
if not table_name or not isinstance(table_name, str):
2511+
raise ValueError("table_name must be a non-empty string")
2512+
2513+
# Validate that data is iterable (but not a string or bytes, which are technically iterable)
2514+
if data is None:
2515+
raise TypeError("data must be an iterable of tuples or lists, got None")
2516+
if isinstance(data, (str, bytes)):
2517+
raise TypeError(
2518+
f"data must be an iterable of tuples or lists, got {type(data).__name__}. "
2519+
"Strings and bytes are not valid row collections."
2520+
)
2521+
if not hasattr(data, "__iter__"):
2522+
raise TypeError(
2523+
f"data must be an iterable of tuples or lists, got non-iterable {type(data).__name__}"
2524+
)
2525+
2526+
# Extract and validate kwargs with defaults
2527+
batch_size = kwargs.get("batch_size", None)
2528+
timeout = kwargs.get("timeout", 30)
2529+
2530+
# Validate batch_size type and value (only if explicitly provided)
2531+
if batch_size is not None:
2532+
if not isinstance(batch_size, (int, float)):
2533+
raise TypeError(
2534+
f"batch_size must be a positive integer, got {type(batch_size).__name__}"
2535+
)
2536+
if batch_size <= 0:
2537+
raise ValueError(f"batch_size must be positive, got {batch_size}")
2538+
2539+
# Validate timeout type and value
2540+
if not isinstance(timeout, (int, float)):
2541+
raise TypeError(f"timeout must be a positive number, got {type(timeout).__name__}")
2542+
if timeout <= 0:
2543+
raise ValueError(f"timeout must be positive, got {timeout}")
2544+
2545+
# Get and parse connection string
2546+
if not hasattr(self.connection, "connection_str"):
2547+
raise RuntimeError("Connection string not available for bulk copy")
2548+
2549+
# Use the proper connection string parser that handles braced values
2550+
from mssql_python.connection_string_parser import _ConnectionStringParser
2551+
2552+
parser = _ConnectionStringParser(validate_keywords=False)
2553+
params = parser._parse(self.connection.connection_str)
2554+
2555+
if not params.get("server"):
2556+
raise ValueError("SERVER parameter is required in connection string")
2557+
2558+
if not params.get("database"):
2559+
raise ValueError(
2560+
"DATABASE parameter is required in connection string for bulk copy. "
2561+
"Specify the target database explicitly to avoid accidentally writing to system databases."
2562+
)
2563+
2564+
# Build connection context for bulk copy library
2565+
# Note: Password is extracted separately to avoid storing it in the main context
2566+
# dict that could be accidentally logged or exposed in error messages.
2567+
trust_cert = params.get("trustservercertificate", "yes").lower() in ("yes", "true")
2568+
2569+
# Parse encryption setting from connection string
2570+
encrypt_param = params.get("encrypt")
2571+
if encrypt_param is not None:
2572+
encrypt_value = encrypt_param.strip().lower()
2573+
if encrypt_value in ("yes", "true", "mandatory", "required"):
2574+
encryption = "Required"
2575+
elif encrypt_value in ("no", "false", "optional"):
2576+
encryption = "Optional"
2577+
else:
2578+
# Pass through unrecognized values (e.g., "Strict") to the underlying driver
2579+
encryption = encrypt_param
2580+
else:
2581+
encryption = "Optional"
2582+
2583+
context = {
2584+
"server": params.get("server"),
2585+
"database": params.get("database"),
2586+
"user_name": params.get("uid", ""),
2587+
"trust_server_certificate": trust_cert,
2588+
"encryption": encryption,
2589+
}
2590+
2591+
# Extract password separately to avoid storing it in generic context that may be logged
2592+
password = params.get("pwd", "")
2593+
pycore_context = dict(context)
2594+
pycore_context["password"] = password
2595+
2596+
pycore_connection = None
2597+
pycore_cursor = None
2598+
try:
2599+
pycore_connection = mssql_py_core.PyCoreConnection(pycore_context)
2600+
pycore_cursor = pycore_connection.cursor()
2601+
2602+
result = pycore_cursor.bulkcopy(table_name, iter(data), **kwargs)
2603+
2604+
return result
2605+
2606+
except Exception as e:
2607+
# Log the error for debugging (without exposing credentials)
2608+
logger.debug(
2609+
"Bulk copy operation failed for table '%s': %s: %s",
2610+
table_name,
2611+
type(e).__name__,
2612+
str(e),
2613+
)
2614+
# Re-raise without exposing connection context in the error chain
2615+
# to prevent credential leakage in stack traces
2616+
raise type(e)(str(e)) from None
2617+
2618+
finally:
2619+
# Clear sensitive data to minimize memory exposure
2620+
password = ""
2621+
if pycore_context:
2622+
pycore_context["password"] = ""
2623+
pycore_context["user_name"] = ""
2624+
# Clean up bulk copy resources
2625+
for resource in (pycore_cursor, pycore_connection):
2626+
if resource and hasattr(resource, "close"):
2627+
try:
2628+
resource.close()
2629+
except Exception as cleanup_error:
2630+
# Log cleanup errors at debug level to aid troubleshooting
2631+
# without masking the original exception
2632+
logger.debug(
2633+
"Failed to close bulk copy resource %s: %s",
2634+
type(resource).__name__,
2635+
cleanup_error,
2636+
)
2637+
24542638
def __enter__(self):
24552639
"""
24562640
Enter the runtime context for the cursor.

0 commit comments

Comments
 (0)