Skip to content

Commit cc80ba5

Browse files
committed
Add BQ support and track bytes processed
1 parent 1a659ca commit cc80ba5

19 files changed

Lines changed: 193 additions & 116 deletions

File tree

sqlmesh/core/console.py

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from sqlmesh.core.environment import EnvironmentNamingInfo, EnvironmentSummary
3333
from sqlmesh.core.linter.rule import RuleViolation
3434
from sqlmesh.core.model import Model
35+
from sqlmesh.core.execution_tracker import QueryExecutionStats
3536
from sqlmesh.core.snapshot import (
3637
Snapshot,
3738
SnapshotChangeCategory,
@@ -439,7 +440,7 @@ def update_snapshot_evaluation_progress(
439440
num_audits_passed: int,
440441
num_audits_failed: int,
441442
audit_only: bool = False,
442-
rows_processed: t.Optional[int] = None,
443+
execution_stats: t.Optional[QueryExecutionStats] = None,
443444
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
444445
) -> None:
445446
"""Updates the snapshot evaluation progress."""
@@ -588,7 +589,7 @@ def update_snapshot_evaluation_progress(
588589
num_audits_passed: int,
589590
num_audits_failed: int,
590591
audit_only: bool = False,
591-
rows_processed: t.Optional[int] = None,
592+
execution_stats: t.Optional[QueryExecutionStats] = None,
592593
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
593594
) -> None:
594595
pass
@@ -1035,7 +1036,7 @@ def start_evaluation_progress(
10351036
# determine column widths
10361037
self.evaluation_column_widths["annotation"] = (
10371038
_calculate_annotation_str_len(
1038-
batched_intervals, self.AUDIT_PADDING, len(" (XXXXXX rows processed)")
1039+
batched_intervals, self.AUDIT_PADDING, len(" (123.4m rows, 123.4 KiB)")
10391040
)
10401041
+ 3 # brackets and opening escape backslash
10411042
)
@@ -1081,7 +1082,7 @@ def update_snapshot_evaluation_progress(
10811082
num_audits_passed: int,
10821083
num_audits_failed: int,
10831084
audit_only: bool = False,
1084-
rows_processed: t.Optional[int] = None,
1085+
execution_stats: t.Optional[QueryExecutionStats] = None,
10851086
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10861087
) -> None:
10871088
"""Update the snapshot evaluation progress."""
@@ -1102,7 +1103,7 @@ def update_snapshot_evaluation_progress(
11021103
).ljust(self.evaluation_column_widths["name"])
11031104

11041105
annotation = _create_evaluation_model_annotation(
1105-
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
1106+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
11061107
)
11071108
audits_str = ""
11081109
if num_audits_passed:
@@ -3673,7 +3674,7 @@ def update_snapshot_evaluation_progress(
36733674
num_audits_passed: int,
36743675
num_audits_failed: int,
36753676
audit_only: bool = False,
3676-
rows_processed: t.Optional[int] = None,
3677+
execution_stats: t.Optional[QueryExecutionStats] = None,
36773678
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36783679
) -> None:
36793680
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
@@ -3844,7 +3845,7 @@ def update_snapshot_evaluation_progress(
38443845
num_audits_passed: int,
38453846
num_audits_failed: int,
38463847
audit_only: bool = False,
3847-
rows_processed: t.Optional[int] = None,
3848+
execution_stats: t.Optional[QueryExecutionStats] = None,
38483849
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38493850
) -> None:
38503851
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
@@ -4179,11 +4180,27 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41794180

41804181

41814182
def _create_evaluation_model_annotation(
4182-
snapshot: Snapshot, interval_info: t.Optional[str], rows_processed: t.Optional[int]
4183+
snapshot: Snapshot,
4184+
interval_info: t.Optional[str],
4185+
execution_stats: t.Optional[QueryExecutionStats],
41834186
) -> str:
41844187
annotation = None
4185-
num_rows_processed = str(rows_processed) if rows_processed else ""
4186-
rows_processed_str = f" ({num_rows_processed} rows)" if num_rows_processed else ""
4188+
execution_stats_str = ""
4189+
if execution_stats:
4190+
rows_processed = execution_stats.total_rows_processed
4191+
execution_stats_str += (
4192+
f"{_abbreviate_integer_count(rows_processed)} row{'s' if rows_processed > 1 else ''}"
4193+
if rows_processed
4194+
else ""
4195+
)
4196+
4197+
bytes_processed = execution_stats.total_bytes_processed
4198+
execution_stats_str += (
4199+
f"{', ' if execution_stats_str else ''}{_format_bytes(bytes_processed)}"
4200+
if bytes_processed
4201+
else ""
4202+
)
4203+
execution_stats_str = f" ({execution_stats_str})" if execution_stats_str else ""
41874204

41884205
if snapshot.is_audit:
41894206
annotation = "run standalone audit"
@@ -4193,30 +4210,32 @@ def _create_evaluation_model_annotation(
41934210
if snapshot.model.kind.is_view:
41944211
annotation = "recreate view"
41954212
if snapshot.model.kind.is_seed:
4196-
annotation = f"insert seed file{rows_processed_str}"
4213+
annotation = f"insert seed file{execution_stats_str}"
41974214
if snapshot.model.kind.is_full:
4198-
annotation = f"full refresh{rows_processed_str}"
4215+
annotation = f"full refresh{execution_stats_str}"
41994216
if snapshot.model.kind.is_incremental_by_unique_key:
4200-
annotation = f"insert/update rows{rows_processed_str}"
4217+
annotation = f"insert/update rows{execution_stats_str}"
42014218
if snapshot.model.kind.is_incremental_by_partition:
4202-
annotation = f"insert partitions{rows_processed_str}"
4219+
annotation = f"insert partitions{execution_stats_str}"
42034220

42044221
if annotation:
42054222
return annotation
42064223

4207-
return f"{interval_info}{rows_processed_str}" if interval_info else ""
4224+
return f"{interval_info}{execution_stats_str}" if interval_info else ""
42084225

42094226

42104227
def _calculate_interval_str_len(
4211-
snapshot: Snapshot, intervals: t.List[Interval], rows_processed: t.Optional[int] = None
4228+
snapshot: Snapshot,
4229+
intervals: t.List[Interval],
4230+
execution_stats: t.Optional[QueryExecutionStats] = None,
42124231
) -> int:
42134232
interval_str_len = 0
42144233
for interval in intervals:
42154234
interval_str_len = max(
42164235
interval_str_len,
42174236
len(
42184237
_create_evaluation_model_annotation(
4219-
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
4238+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
42204239
)
42214240
),
42224241
)
@@ -4271,14 +4290,50 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42714290
def _calculate_annotation_str_len(
42724291
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
42734292
audit_padding: int = 0,
4274-
rows_processed_len: int = 0,
4293+
execution_stats_len: int = 0,
42754294
) -> int:
42764295
annotation_str_len = 0
42774296
for snapshot, intervals in batched_intervals.items():
42784297
annotation_str_len = max(
42794298
annotation_str_len,
42804299
_calculate_interval_str_len(snapshot, intervals)
42814300
+ _calculate_audit_str_len(snapshot, audit_padding)
4282-
+ rows_processed_len,
4301+
+ execution_stats_len,
42834302
)
42844303
return annotation_str_len
4304+
4305+
4306+
# Convert number of bytes to a human-readable string
4307+
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L165
4308+
def _format_bytes(num_bytes: t.Optional[int]) -> str:
4309+
if num_bytes and num_bytes > 0:
4310+
if num_bytes < 1024:
4311+
return f"{num_bytes} Bytes"
4312+
4313+
num_bytes_float = float(num_bytes) / 1024.0
4314+
for unit in ["KiB", "MiB", "GiB", "TiB", "PiB"]:
4315+
if num_bytes_float < 1024.0:
4316+
return f"{num_bytes_float:3.1f} {unit}"
4317+
num_bytes_float /= 1024.0
4318+
4319+
num_bytes_float *= 1024.0 # undo last division in loop
4320+
return f"{num_bytes_float:3.1f} {unit}"
4321+
return ""
4322+
4323+
4324+
# Abbreviate integer count. Example: 1,000,000,000 -> 1b
4325+
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L178
4326+
def _abbreviate_integer_count(count: t.Optional[int]) -> str:
4327+
if count and count > 0:
4328+
if count < 1000:
4329+
return str(count)
4330+
4331+
count_float = float(count) / 1000.0
4332+
for unit in ["k", "m", "b", "t"]:
4333+
if count_float < 1000.0:
4334+
return f"{count_float:3.1f}{unit}".strip()
4335+
count_float /= 1000.0
4336+
4337+
count_float *= 1000.0 # undo last division in loop
4338+
return f"{count_float:3.1f}{unit}".strip()
4339+
return ""

sqlmesh/core/engine_adapter/base.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ def _create_table_from_source_queries(
856856
table_description: t.Optional[str] = None,
857857
column_descriptions: t.Optional[t.Dict[str, str]] = None,
858858
table_kind: t.Optional[str] = None,
859-
track_row_count: bool = True,
859+
track_execution_stats: bool = True,
860860
**kwargs: t.Any,
861861
) -> None:
862862
table = exp.to_table(table_name)
@@ -902,15 +902,15 @@ def _create_table_from_source_queries(
902902
replace=replace,
903903
table_description=table_description,
904904
table_kind=table_kind,
905-
track_row_count=track_row_count,
905+
track_execution_stats=track_execution_stats,
906906
**kwargs,
907907
)
908908
else:
909909
self._insert_append_query(
910910
table_name,
911911
query,
912912
target_columns_to_types or self.columns(table),
913-
track_row_count=track_row_count,
913+
track_execution_stats=track_execution_stats,
914914
)
915915

916916
# Register comments with commands if the engine supports comments and we weren't able to
@@ -934,7 +934,7 @@ def _create_table(
934934
table_description: t.Optional[str] = None,
935935
column_descriptions: t.Optional[t.Dict[str, str]] = None,
936936
table_kind: t.Optional[str] = None,
937-
track_row_count: bool = True,
937+
track_execution_stats: bool = True,
938938
**kwargs: t.Any,
939939
) -> None:
940940
self.execute(
@@ -952,7 +952,7 @@ def _create_table(
952952
table_kind=table_kind,
953953
**kwargs,
954954
),
955-
track_row_count=track_row_count,
955+
track_execution_stats=track_execution_stats,
956956
)
957957

958958
def _build_create_table_exp(
@@ -1440,7 +1440,7 @@ def insert_append(
14401440
table_name: TableName,
14411441
query_or_df: QueryOrDF,
14421442
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1443-
track_row_count: bool = True,
1443+
track_execution_stats: bool = True,
14441444
source_columns: t.Optional[t.List[str]] = None,
14451445
) -> None:
14461446
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
@@ -1450,22 +1450,25 @@ def insert_append(
14501450
source_columns=source_columns,
14511451
)
14521452
self._insert_append_source_queries(
1453-
table_name, source_queries, target_columns_to_types, track_row_count
1453+
table_name, source_queries, target_columns_to_types, track_execution_stats
14541454
)
14551455

14561456
def _insert_append_source_queries(
14571457
self,
14581458
table_name: TableName,
14591459
source_queries: t.List[SourceQuery],
14601460
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1461-
track_row_count: bool = True,
1461+
track_execution_stats: bool = True,
14621462
) -> None:
14631463
with self.transaction(condition=len(source_queries) > 0):
14641464
target_columns_to_types = target_columns_to_types or self.columns(table_name)
14651465
for source_query in source_queries:
14661466
with source_query as query:
14671467
self._insert_append_query(
1468-
table_name, query, target_columns_to_types, track_row_count=track_row_count
1468+
table_name,
1469+
query,
1470+
target_columns_to_types,
1471+
track_execution_stats=track_execution_stats,
14691472
)
14701473

14711474
def _insert_append_query(
@@ -1474,13 +1477,13 @@ def _insert_append_query(
14741477
query: Query,
14751478
target_columns_to_types: t.Dict[str, exp.DataType],
14761479
order_projections: bool = True,
1477-
track_row_count: bool = True,
1480+
track_execution_stats: bool = True,
14781481
) -> None:
14791482
if order_projections:
14801483
query = self._order_projections_and_filter(query, target_columns_to_types)
14811484
self.execute(
14821485
exp.insert(query, table_name, columns=list(target_columns_to_types)),
1483-
track_row_count=track_row_count,
1486+
track_execution_stats=track_execution_stats,
14841487
)
14851488

14861489
def insert_overwrite_by_partition(
@@ -1623,7 +1626,7 @@ def _insert_overwrite_by_condition(
16231626
)
16241627
if insert_overwrite_strategy.is_replace_where:
16251628
insert_exp.set("where", where or exp.true())
1626-
self.execute(insert_exp, track_row_count=True)
1629+
self.execute(insert_exp, track_execution_stats=True)
16271630

16281631
def update_table(
16291632
self,
@@ -1644,7 +1647,9 @@ def _merge(
16441647
using = exp.alias_(
16451648
exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
16461649
)
1647-
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens), track_row_count=True)
1650+
self.execute(
1651+
exp.Merge(this=this, using=using, on=on, whens=whens), track_execution_stats=True
1652+
)
16481653

16491654
def scd_type_2_by_time(
16501655
self,
@@ -2393,7 +2398,7 @@ def execute(
23932398
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
23942399
ignore_unsupported_errors: bool = False,
23952400
quote_identifiers: bool = True,
2396-
track_row_count: bool = False,
2401+
track_execution_stats: bool = False,
23972402
**kwargs: t.Any,
23982403
) -> None:
23992404
"""Execute a sql query."""
@@ -2415,7 +2420,7 @@ def execute(
24152420
expression=e if isinstance(e, exp.Expression) else None,
24162421
quote_identifiers=quote_identifiers,
24172422
)
2418-
self._execute(sql, track_row_count, **kwargs)
2423+
self._execute(sql, track_execution_stats, **kwargs)
24192424

24202425
def _attach_correlation_id(self, sql: str) -> str:
24212426
if self.ATTACH_CORRELATION_ID and self.correlation_id:
@@ -2440,12 +2445,12 @@ def _log_sql(
24402445

24412446
logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log)
24422447

2443-
def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) -> None:
2448+
def _execute(self, sql: str, track_execution_stats: bool = False, **kwargs: t.Any) -> None:
24442449
self.cursor.execute(sql, **kwargs)
24452450

24462451
if (
24472452
self.SUPPORTS_QUERY_EXECUTION_TRACKING
2448-
and track_row_count
2453+
and track_execution_stats
24492454
and QueryExecutionTracker.is_tracking()
24502455
):
24512456
rowcount_raw = getattr(self.cursor, "rowcount", None)
@@ -2456,7 +2461,7 @@ def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) ->
24562461
except (TypeError, ValueError):
24572462
pass
24582463

2459-
QueryExecutionTracker.record_execution(sql, rowcount)
2464+
QueryExecutionTracker.record_execution(sql, rowcount, None)
24602465

24612466
@contextlib.contextmanager
24622467
def temp_table(
@@ -2502,7 +2507,7 @@ def temp_table(
25022507
exists=True,
25032508
table_description=None,
25042509
column_descriptions=None,
2505-
track_row_count=False,
2510+
track_execution_stats=False,
25062511
**kwargs,
25072512
)
25082513

@@ -2754,7 +2759,7 @@ def _replace_by_key(
27542759
insert_statement.set("where", delete_filter)
27552760
insert_statement.set("this", exp.to_table(target_table))
27562761

2757-
self.execute(insert_statement, track_row_count=True)
2762+
self.execute(insert_statement, track_execution_stats=True)
27582763
finally:
27592764
self.drop_table(temp_table)
27602765

0 commit comments

Comments
 (0)