Skip to content

Commit 13024bc

Browse files
authored
Fix: make MSSQL merge exists implementation opt-in (#4870)
1 parent d2d8c2f commit 13024bc

9 files changed

Lines changed: 134 additions & 32 deletions

File tree

docs/integrations/engines/mssql.md

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,65 @@
11
# MSSQL
22

3-
## Local/Built-in Scheduler
4-
**Engine Adapter Type**: `mssql`
3+
## Installation
54

6-
### Installation
7-
#### User / Password Authentication:
5+
### User / Password Authentication:
86
```
97
pip install "sqlmesh[mssql]"
108
```
11-
#### Microsoft Entra ID / Azure Active Directory Authentication:
9+
### Microsoft Entra ID / Azure Active Directory Authentication:
1210
```
1311
pip install "sqlmesh[mssql-odbc]"
1412
```
1513

14+
## Incremental by unique key `MERGE`
15+
16+
SQLMesh executes a `MERGE` statement to insert rows for [incremental by unique key](../../concepts/models/model_kinds.md#incremental_by_unique_key) model kinds.
17+
18+
By default, the `MERGE` statement updates all non-key columns of an existing row when a new row with the same key values is inserted. If all column values match between the two rows, those updates are unnecessary.
19+
20+
SQLMesh provides an optional performance optimization that skips unnecessary updates by comparing column values with the `EXISTS` and `EXCEPT` operators.
21+
22+
Enable the optimization by setting the `mssql_merge_exists` key to `true` in the [`physical_properties`](../../concepts/models/overview.md#physical_properties) section of the `MODEL` statement.
23+
24+
For example:
25+
26+
```sql linenums="1" hl_lines="7-9"
27+
MODEL (
28+
name sqlmesh_example.unique_key,
29+
kind INCREMENTAL_BY_UNIQUE_KEY (
30+
unique_key id
31+
),
32+
cron '@daily',
33+
physical_properties (
34+
mssql_merge_exists = true
35+
)
36+
);
37+
```
38+
39+
!!! warning "Not all column types supported"
40+
The `mssql_merge_exists` optimization is not supported for all column types, including `GEOMETRY`, `XML`, `TEXT`, `NTEXT`, `IMAGE`, and most user-defined types.
41+
42+
Learn more in the [MSSQL `EXCEPT` statement documentation](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/set-operators-except-and-intersect-transact-sql?view=sql-server-ver17#arguments).
43+
44+
## Local/Built-in Scheduler
45+
**Engine Adapter Type**: `mssql`
46+
1647
### Connection options
1748

18-
| Option | Description | Type | Required |
19-
| ----------------- | ------------------------------------------------------------ | :----------: | :------: |
20-
| `type` | Engine type name - must be `mssql` | string | Y |
21-
| `host` | The hostname of the MSSQL server | string | Y |
22-
| `user` | The username / client id to use for authentication with the MSSQL server | string | N |
23-
| `password` | The password / client secret to use for authentication with the MSSQL server | string | N |
24-
| `port` | The port number of the MSSQL server | int | N |
25-
| `database` | The target database | string | N |
26-
| `charset` | The character set used for the connection | string | N |
27-
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
28-
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
29-
| `appname` | The application name to use for the connection | string | N |
30-
| `conn_properties` | The list of connection properties | list[string] | N |
31-
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
32-
| `driver` | The driver to use for the connection. Default: pymssql | string | N |
33-
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N |
34-
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |
49+
| Option | Description | Type | Required |
50+
| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------: | :------: |
51+
| `type` | Engine type name - must be `mssql` | string | Y |
52+
| `host` | The hostname of the MSSQL server | string | Y |
53+
| `user` | The username / client id to use for authentication with the MSSQL server | string | N |
54+
| `password` | The password / client secret to use for authentication with the MSSQL server | string | N |
55+
| `port` | The port number of the MSSQL server | int | N |
56+
| `database` | The target database | string | N |
57+
| `charset` | The character set used for the connection | string | N |
58+
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
59+
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
60+
| `appname` | The application name to use for the connection | string | N |
61+
| `conn_properties` | The list of connection properties | list[string] | N |
62+
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
63+
| `driver` | The driver to use for the connection. Default: pymssql | string | N |
64+
| `driver_name` | The driver name to use for the connection (e.g., *ODBC Driver 18 for SQL Server*). | string | N |
65+
| `odbc_properties` | ODBC connection properties (e.g., *authentication: ActiveDirectoryServicePrincipal*). See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |

sqlmesh/core/engine_adapter/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,7 @@ def merge(
19391939
unique_key: t.Sequence[exp.Expression],
19401940
when_matched: t.Optional[exp.Whens] = None,
19411941
merge_filter: t.Optional[exp.Expression] = None,
1942+
**kwargs: t.Any,
19421943
) -> None:
19431944
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
19441945
source_table, columns_to_types, target_table=target_table

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def merge(
3232
unique_key: t.Sequence[exp.Expression],
3333
when_matched: t.Optional[exp.Whens] = None,
3434
merge_filter: t.Optional[exp.Expression] = None,
35+
**kwargs: t.Any,
3536
) -> None:
3637
logical_merge(
3738
self,

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,10 @@ def merge(
198198
unique_key: t.Sequence[exp.Expression],
199199
when_matched: t.Optional[exp.Whens] = None,
200200
merge_filter: t.Optional[exp.Expression] = None,
201+
**kwargs: t.Any,
201202
) -> None:
203+
mssql_merge_exists = kwargs.get("physical_properties", {}).get("mssql_merge_exists")
204+
202205
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
203206
source_table, columns_to_types, target_table=target_table
204207
)
@@ -214,7 +217,6 @@ def merge(
214217

215218
match_expressions = []
216219
if not when_matched:
217-
match_condition = None
218220
unique_key_names = [y.name for y in unique_key]
219221
columns_to_types_no_keys = [c for c in columns_to_types if c not in unique_key_names]
220222

@@ -225,10 +227,14 @@ def merge(
225227
exp.column(c, MERGE_SOURCE_ALIAS) for c in columns_to_types_no_keys
226228
]
227229

228-
match_condition = exp.Exists(
229-
this=exp.select(*target_columns_no_keys).except_(
230-
exp.select(*source_columns_no_keys)
230+
match_condition = (
231+
exp.Exists(
232+
this=exp.select(*target_columns_no_keys).except_(
233+
exp.select(*source_columns_no_keys)
234+
)
231235
)
236+
if mssql_merge_exists
237+
else None
232238
)
233239

234240
if target_columns_no_keys:

sqlmesh/core/engine_adapter/postgres.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def merge(
109109
unique_key: t.Sequence[exp.Expression],
110110
when_matched: t.Optional[exp.Whens] = None,
111111
merge_filter: t.Optional[exp.Expression] = None,
112+
**kwargs: t.Any,
112113
) -> None:
113114
# Merge isn't supported until Postgres 15
114115
merge_impl = (

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ def merge(
353353
unique_key: t.Sequence[exp.Expression],
354354
when_matched: t.Optional[exp.Whens] = None,
355355
merge_filter: t.Optional[exp.Expression] = None,
356+
**kwargs: t.Any,
356357
) -> None:
357358
if self.enable_merge:
358359
# By default we use the logical merge unless the user has opted in

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1614,6 +1614,7 @@ def insert(
16141614
end=kwargs.get("end"),
16151615
execution_time=kwargs.get("execution_time"),
16161616
),
1617+
physical_properties=kwargs.get("physical_properties", model.physical_properties),
16171618
)
16181619

16191620
def append(
@@ -1634,6 +1635,7 @@ def append(
16341635
end=kwargs.get("end"),
16351636
execution_time=kwargs.get("execution_time"),
16361637
),
1638+
physical_properties=kwargs.get("physical_properties", model.physical_properties),
16371639
)
16381640

16391641

tests/core/engine_adapter/test_mssql.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ def test_merge_pandas(
474474

475475
assert to_sql_calls(adapter) == [
476476
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
477-
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
477+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
478478
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
479479
]
480480

@@ -498,11 +498,47 @@ def test_merge_pandas(
498498

499499
assert to_sql_calls(adapter) == [
500500
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
501-
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
501+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
502+
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
503+
]
504+
505+
506+
def test_merge_exists(
507+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable
508+
):
509+
mocker.patch(
510+
"sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.table_exists",
511+
return_value=False,
512+
)
513+
514+
adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)
515+
516+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table")
517+
table_name = "target"
518+
temp_table_id = "abcdefgh"
519+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
520+
521+
df = pd.DataFrame({"id": [1, 2, 3], "ts": [1, 2, 3], "val": [4, 5, 6]})
522+
523+
# regular implementation
524+
adapter.merge(
525+
target_table=table_name,
526+
source_table=df,
527+
columns_to_types={
528+
"id": exp.DataType.build("int"),
529+
"ts": exp.DataType.build("TIMESTAMP"),
530+
"val": exp.DataType.build("int"),
531+
},
532+
unique_key=[exp.to_identifier("id")],
533+
)
534+
535+
assert to_sql_calls(adapter) == [
536+
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
537+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
502538
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
503539
]
504540

505-
# all model columns are keys
541+
# merge exists implementation
506542
adapter.cursor.reset_mock()
507543
adapter._connection_pool.get().reset_mock()
508544
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
@@ -512,11 +548,31 @@ def test_merge_pandas(
512548
columns_to_types={
513549
"id": exp.DataType.build("int"),
514550
"ts": exp.DataType.build("TIMESTAMP"),
551+
"val": exp.DataType.build("int"),
515552
},
516-
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
553+
unique_key=[exp.to_identifier("id")],
554+
physical_properties={"mssql_merge_exists": True},
517555
)
518-
adapter._connection_pool.get().bulk_copy.assert_called_with(
519-
f"__temp_target_{temp_table_id}", [(1, 1), (2, 2), (3, 3)]
556+
557+
assert to_sql_calls(adapter) == [
558+
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
559+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
560+
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
561+
]
562+
563+
# merge exists and all model columns are keys
564+
adapter.cursor.reset_mock()
565+
adapter._connection_pool.get().reset_mock()
566+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
567+
adapter.merge(
568+
target_table=table_name,
569+
source_table=df,
570+
columns_to_types={
571+
"id": exp.DataType.build("int"),
572+
"ts": exp.DataType.build("TIMESTAMP"),
573+
},
574+
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
575+
physical_properties={"mssql_merge_exists": True},
520576
)
521577

522578
assert to_sql_calls(adapter) == [

tests/core/test_snapshot_evaluator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2231,6 +2231,7 @@ def test_create_incremental_by_unique_key_updated_at_exp(adapter_mock, make_snap
22312231
)
22322232
]
22332233
),
2234+
physical_properties={},
22342235
)
22352236

22362237

@@ -2327,6 +2328,7 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
23272328
),
23282329
],
23292330
),
2331+
physical_properties={},
23302332
)
23312333

23322334

@@ -2478,6 +2480,7 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24782480
expression=exp.Literal(this="2020-01-01", is_string=True),
24792481
),
24802482
),
2483+
physical_properties={},
24812484
)
24822485

24832486

0 commit comments

Comments
 (0)