Skip to content

Commit 8549479

Browse files
committed
Add rows tracking
1 parent 08739b8 commit 8549479

20 files changed

Lines changed: 461 additions & 95 deletions

File tree

sqlmesh/core/console.py

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ def update_snapshot_evaluation_progress(
439439
num_audits_passed: int,
440440
num_audits_failed: int,
441441
audit_only: bool = False,
442+
rows_processed: t.Optional[int] = None,
442443
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
443444
) -> None:
444445
"""Updates the snapshot evaluation progress."""
@@ -587,6 +588,7 @@ def update_snapshot_evaluation_progress(
587588
num_audits_passed: int,
588589
num_audits_failed: int,
589590
audit_only: bool = False,
591+
rows_processed: t.Optional[int] = None,
590592
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
591593
) -> None:
592594
pass
@@ -1032,7 +1034,9 @@ def start_evaluation_progress(
10321034

10331035
# determine column widths
10341036
self.evaluation_column_widths["annotation"] = (
1035-
_calculate_annotation_str_len(batched_intervals, self.AUDIT_PADDING)
1037+
_calculate_annotation_str_len(
1038+
batched_intervals, self.AUDIT_PADDING, len(" (XXXXXX rows processed)")
1039+
)
10361040
+ 3 # brackets and opening escape backslash
10371041
)
10381042
self.evaluation_column_widths["name"] = max(
@@ -1077,6 +1081,7 @@ def update_snapshot_evaluation_progress(
10771081
num_audits_passed: int,
10781082
num_audits_failed: int,
10791083
audit_only: bool = False,
1084+
rows_processed: t.Optional[int] = None,
10801085
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10811086
) -> None:
10821087
"""Update the snapshot evaluation progress."""
@@ -1097,7 +1102,7 @@ def update_snapshot_evaluation_progress(
10971102
).ljust(self.evaluation_column_widths["name"])
10981103

10991104
annotation = _create_evaluation_model_annotation(
1100-
snapshot, _format_evaluation_model_interval(snapshot, interval)
1105+
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
11011106
)
11021107
audits_str = ""
11031108
if num_audits_passed:
@@ -3668,6 +3673,7 @@ def update_snapshot_evaluation_progress(
36683673
num_audits_passed: int,
36693674
num_audits_failed: int,
36703675
audit_only: bool = False,
3676+
rows_processed: t.Optional[int] = None,
36713677
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36723678
) -> None:
36733679
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
@@ -3838,6 +3844,7 @@ def update_snapshot_evaluation_progress(
38383844
num_audits_passed: int,
38393845
num_audits_failed: int,
38403846
audit_only: bool = False,
3847+
rows_processed: t.Optional[int] = None,
38413848
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38423849
) -> None:
38433850
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
@@ -4022,7 +4029,8 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
40224029
self._write(f"Join On: {keys}")
40234030

40244031

4025-
_CONSOLE: Console = NoopConsole()
4032+
# _CONSOLE: Console = NoopConsole()
4033+
_CONSOLE: Console = TerminalConsole()
40264034

40274035

40284036
def set_console(console: Console) -> None:
@@ -4169,33 +4177,49 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41694177
return ""
41704178

41714179

4172-
def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str:
4180+
def _create_evaluation_model_annotation(
4181+
snapshot: Snapshot, interval_info: t.Optional[str], rows_processed: t.Optional[int]
4182+
) -> str:
4183+
annotation = None
4184+
num_rows_processed = str(rows_processed) if rows_processed else ""
4185+
rows_processed_str = f" ({num_rows_processed} rows processed)" if num_rows_processed else ""
4186+
41734187
if snapshot.is_audit:
4174-
return "run standalone audit"
4175-
if snapshot.is_model and snapshot.model.kind.is_external:
4176-
return "run external audits"
4177-
if snapshot.model.kind.is_seed:
4178-
return "insert seed file"
4179-
if snapshot.model.kind.is_full:
4180-
return "full refresh"
4181-
if snapshot.model.kind.is_view:
4182-
return "recreate view"
4183-
if snapshot.model.kind.is_incremental_by_unique_key:
4184-
return "insert/update rows"
4185-
if snapshot.model.kind.is_incremental_by_partition:
4186-
return "insert partitions"
4187-
4188-
return interval_info if interval_info else ""
4189-
4190-
4191-
def _calculate_interval_str_len(snapshot: Snapshot, intervals: t.List[Interval]) -> int:
4188+
annotation = "run standalone audit"
4189+
if snapshot.is_model:
4190+
if snapshot.model.kind.is_external:
4191+
annotation = "run external audits"
4192+
if snapshot.model.kind.is_view:
4193+
annotation = "recreate view"
4194+
if snapshot.model.kind.is_seed:
4195+
# no "processed" for seeds
4196+
seed_num_rows_inserted = (
4197+
f" ({num_rows_processed} rows inserted)" if num_rows_processed else ""
4198+
)
4199+
annotation = f"insert seed file{seed_num_rows_inserted}"
4200+
if snapshot.model.kind.is_full:
4201+
annotation = f"full refresh{rows_processed_str}"
4202+
if snapshot.model.kind.is_incremental_by_unique_key:
4203+
annotation = f"insert/update rows{rows_processed_str}"
4204+
if snapshot.model.kind.is_incremental_by_partition:
4205+
annotation = f"insert partitions{rows_processed_str}"
4206+
4207+
if annotation:
4208+
return annotation
4209+
4210+
return f"{interval_info}{rows_processed_str}" if interval_info else ""
4211+
4212+
4213+
def _calculate_interval_str_len(
4214+
snapshot: Snapshot, intervals: t.List[Interval], rows_processed: t.Optional[int] = None
4215+
) -> int:
41924216
interval_str_len = 0
41934217
for interval in intervals:
41944218
interval_str_len = max(
41954219
interval_str_len,
41964220
len(
41974221
_create_evaluation_model_annotation(
4198-
snapshot, _format_evaluation_model_interval(snapshot, interval)
4222+
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
41994223
)
42004224
),
42014225
)
@@ -4248,13 +4272,16 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42484272

42494273

42504274
def _calculate_annotation_str_len(
4251-
batched_intervals: t.Dict[Snapshot, t.List[Interval]], audit_padding: int = 0
4275+
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
4276+
audit_padding: int = 0,
4277+
rows_processed_len: int = 0,
42524278
) -> int:
42534279
annotation_str_len = 0
42544280
for snapshot, intervals in batched_intervals.items():
42554281
annotation_str_len = max(
42564282
annotation_str_len,
42574283
_calculate_interval_str_len(snapshot, intervals)
4258-
+ _calculate_audit_str_len(snapshot, audit_padding),
4284+
+ _calculate_audit_str_len(snapshot, audit_padding)
4285+
+ rows_processed_len,
42594286
)
42604287
return annotation_str_len

sqlmesh/core/engine_adapter/base.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
)
4141
from sqlmesh.core.model.kind import TimeColumn
4242
from sqlmesh.core.schema_diff import SchemaDiffer, TableAlterOperation
43+
from sqlmesh.core.execution_tracker import record_execution as track_execution_record
4344
from sqlmesh.utils import (
4445
CorrelationId,
4546
columns_to_types_all_known,
@@ -854,6 +855,7 @@ def _create_table_from_source_queries(
854855
table_description: t.Optional[str] = None,
855856
column_descriptions: t.Optional[t.Dict[str, str]] = None,
856857
table_kind: t.Optional[str] = None,
858+
track_row_count: bool = True,
857859
**kwargs: t.Any,
858860
) -> None:
859861
table = exp.to_table(table_name)
@@ -899,11 +901,15 @@ def _create_table_from_source_queries(
899901
replace=replace,
900902
table_description=table_description,
901903
table_kind=table_kind,
904+
track_row_count=track_row_count,
902905
**kwargs,
903906
)
904907
else:
905908
self._insert_append_query(
906-
table_name, query, target_columns_to_types or self.columns(table)
909+
table_name,
910+
query,
911+
target_columns_to_types or self.columns(table),
912+
track_row_count=track_row_count,
907913
)
908914

909915
# Register comments with commands if the engine supports comments and we weren't able to
@@ -927,6 +933,7 @@ def _create_table(
927933
table_description: t.Optional[str] = None,
928934
column_descriptions: t.Optional[t.Dict[str, str]] = None,
929935
table_kind: t.Optional[str] = None,
936+
track_row_count: bool = True,
930937
**kwargs: t.Any,
931938
) -> None:
932939
self.execute(
@@ -943,7 +950,8 @@ def _create_table(
943950
),
944951
table_kind=table_kind,
945952
**kwargs,
946-
)
953+
),
954+
track_row_count=track_row_count,
947955
)
948956

949957
def _build_create_table_exp(
@@ -1431,6 +1439,7 @@ def insert_append(
14311439
table_name: TableName,
14321440
query_or_df: QueryOrDF,
14331441
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1442+
track_row_count: bool = True,
14341443
source_columns: t.Optional[t.List[str]] = None,
14351444
) -> None:
14361445
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
@@ -1439,30 +1448,39 @@ def insert_append(
14391448
target_table=table_name,
14401449
source_columns=source_columns,
14411450
)
1442-
self._insert_append_source_queries(table_name, source_queries, target_columns_to_types)
1451+
self._insert_append_source_queries(
1452+
table_name, source_queries, target_columns_to_types, track_row_count
1453+
)
14431454

14441455
def _insert_append_source_queries(
14451456
self,
14461457
table_name: TableName,
14471458
source_queries: t.List[SourceQuery],
14481459
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1460+
track_row_count: bool = True,
14491461
) -> None:
14501462
with self.transaction(condition=len(source_queries) > 0):
14511463
target_columns_to_types = target_columns_to_types or self.columns(table_name)
14521464
for source_query in source_queries:
14531465
with source_query as query:
1454-
self._insert_append_query(table_name, query, target_columns_to_types)
1466+
self._insert_append_query(
1467+
table_name, query, target_columns_to_types, track_row_count=track_row_count
1468+
)
14551469

14561470
def _insert_append_query(
14571471
self,
14581472
table_name: TableName,
14591473
query: Query,
14601474
target_columns_to_types: t.Dict[str, exp.DataType],
14611475
order_projections: bool = True,
1476+
track_row_count: bool = True,
14621477
) -> None:
14631478
if order_projections:
14641479
query = self._order_projections_and_filter(query, target_columns_to_types)
1465-
self.execute(exp.insert(query, table_name, columns=list(target_columns_to_types)))
1480+
self.execute(
1481+
exp.insert(query, table_name, columns=list(target_columns_to_types)),
1482+
track_row_count=track_row_count,
1483+
)
14661484

14671485
def insert_overwrite_by_partition(
14681486
self,
@@ -1604,7 +1622,7 @@ def _insert_overwrite_by_condition(
16041622
)
16051623
if insert_overwrite_strategy.is_replace_where:
16061624
insert_exp.set("where", where or exp.true())
1607-
self.execute(insert_exp)
1625+
self.execute(insert_exp, track_row_count=True)
16081626

16091627
def update_table(
16101628
self,
@@ -1625,7 +1643,7 @@ def _merge(
16251643
using = exp.alias_(
16261644
exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
16271645
)
1628-
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens))
1646+
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens), track_row_count=True)
16291647

16301648
def scd_type_2_by_time(
16311649
self,
@@ -2374,6 +2392,7 @@ def execute(
23742392
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
23752393
ignore_unsupported_errors: bool = False,
23762394
quote_identifiers: bool = True,
2395+
track_row_count: bool = False,
23772396
**kwargs: t.Any,
23782397
) -> None:
23792398
"""Execute a sql query."""
@@ -2395,7 +2414,7 @@ def execute(
23952414
expression=e if isinstance(e, exp.Expression) else None,
23962415
quote_identifiers=quote_identifiers,
23972416
)
2398-
self._execute(sql, **kwargs)
2417+
self._execute(sql, track_row_count, **kwargs)
23992418

24002419
def _attach_correlation_id(self, sql: str) -> str:
24012420
if self.ATTACH_CORRELATION_ID and self.correlation_id:
@@ -2420,9 +2439,20 @@ def _log_sql(
24202439

24212440
logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log)
24222441

2423-
def _execute(self, sql: str, **kwargs: t.Any) -> None:
2442+
def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) -> None:
24242443
self.cursor.execute(sql, **kwargs)
24252444

2445+
if track_row_count:
2446+
rowcount_raw = getattr(self.cursor, "rowcount", None)
2447+
rowcount = None
2448+
if rowcount_raw is not None:
2449+
try:
2450+
rowcount = int(rowcount_raw)
2451+
except (TypeError, ValueError):
2452+
pass
2453+
2454+
track_execution_record(sql, rowcount)
2455+
24262456
@contextlib.contextmanager
24272457
def temp_table(
24282458
self,
@@ -2467,6 +2497,7 @@ def temp_table(
24672497
exists=True,
24682498
table_description=None,
24692499
column_descriptions=None,
2500+
track_row_count=False,
24702501
**kwargs,
24712502
)
24722503

@@ -2718,7 +2749,7 @@ def _replace_by_key(
27182749
insert_statement.set("where", delete_filter)
27192750
insert_statement.set("this", exp.to_table(target_table))
27202751

2721-
self.execute(insert_statement)
2752+
self.execute(insert_statement, track_row_count=True)
27222753
finally:
27232754
self.drop_table(temp_table)
27242755

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
SourceQuery,
2222
set_catalog,
2323
)
24+
from sqlmesh.core.execution_tracker import record_execution as track_execution_record
2425
from sqlmesh.core.node import IntervalUnit
2526
from sqlmesh.core.schema_diff import TableAlterOperation, NestedSupport
2627
from sqlmesh.utils import optional_import, get_source_columns_to_types
@@ -1049,6 +1050,7 @@ def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any)
10491050
def _execute(
10501051
self,
10511052
sql: str,
1053+
track_row_count: bool = False,
10521054
**kwargs: t.Any,
10531055
) -> None:
10541056
"""Execute a sql query."""
@@ -1094,6 +1096,9 @@ def _execute(
10941096
self.cursor._set_rowcount(query_results)
10951097
self.cursor._set_description(query_results.schema)
10961098

1099+
if track_row_count:
1100+
track_execution_record(sql, query_results.total_rows)
1101+
10971102
def _get_data_objects(
10981103
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
10991104
) -> t.List[DataObject]:

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def _insert_overwrite_by_condition(
294294
)
295295

296296
try:
297-
self.execute(existing_records_insert_exp)
297+
self.execute(existing_records_insert_exp, track_row_count=True)
298298
finally:
299299
if table_partition_exp:
300300
self.drop_table(partitions_temp_table_name)
@@ -489,6 +489,7 @@ def _create_table(
489489
table_description: t.Optional[str] = None,
490490
column_descriptions: t.Optional[t.Dict[str, str]] = None,
491491
table_kind: t.Optional[str] = None,
492+
track_row_count: bool = True,
492493
**kwargs: t.Any,
493494
) -> None:
494495
"""Creates a table in the database.
@@ -525,6 +526,7 @@ def _create_table(
525526
column_descriptions,
526527
table_kind,
527528
empty_ctas=(self.engine_run_mode.is_cloud and expression is not None),
529+
track_row_count=track_row_count,
528530
**kwargs,
529531
)
530532

0 commit comments

Comments
 (0)