Skip to content

Commit 8afbaf1

Browse files
authored
Chore!: Change get_expired_environments() to return EnvironmentSummary (#4774)
1 parent c3a4362 commit 8afbaf1

6 files changed

Lines changed: 39 additions & 31 deletions

File tree

sqlmesh/core/context.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2695,16 +2695,22 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
26952695
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
26962696
current_ts = current_ts or now_timestamp()
26972697

2698-
expired_environments = self.state_sync.get_expired_environments(current_ts=current_ts)
2699-
2700-
cleanup_expired_views(
2701-
default_adapter=self.engine_adapter,
2702-
engine_adapters=self.engine_adapters,
2703-
environments=expired_environments,
2704-
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
2705-
console=self.console,
2698+
expired_environments_summaries = self.state_sync.get_expired_environments(
2699+
current_ts=current_ts
27062700
)
27072701

2702+
for expired_env_summary in expired_environments_summaries:
2703+
expired_env = self.state_reader.get_environment(expired_env_summary.name)
2704+
2705+
if expired_env:
2706+
cleanup_expired_views(
2707+
default_adapter=self.engine_adapter,
2708+
engine_adapters=self.engine_adapters,
2709+
environments=[expired_env],
2710+
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
2711+
console=self.console,
2712+
)
2713+
27082714
self.state_sync.delete_expired_environments(current_ts=current_ts)
27092715

27102716
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:

sqlmesh/core/state_sync/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,12 +304,12 @@ def get_expired_snapshots(
304304
"""
305305

306306
@abc.abstractmethod
307-
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
307+
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
308308
"""Returns the expired environments.
309309
310310
Expired environments are environments that have exceeded their time-to-live value.
311311
Returns:
312-
The list of environments to remove, the filter to remove environments.
312+
The list of environment summaries to remove.
313313
"""
314314

315315

@@ -418,7 +418,7 @@ def finalize(self, environment: Environment) -> None:
418418
@abc.abstractmethod
419419
def delete_expired_environments(
420420
self, current_ts: t.Optional[int] = None
421-
) -> t.List[Environment]:
421+
) -> t.List[EnvironmentSummary]:
422422
"""Removes expired environments.
423423
424424
Expired environments are environments that have exceeded their time-to-live value.

sqlmesh/core/state_sync/db/environment.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,27 +165,24 @@ def finalize(self, environment: Environment) -> None:
165165
where=environment_filter,
166166
)
167167

168-
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
168+
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
169169
"""Returns the expired environments.
170170
171171
Expired environments are environments that have exceeded their time-to-live value.
172172
Returns:
173-
The list of environments to remove, the filter to remove environments.
173+
The list of environment summaries to remove.
174174
"""
175-
rows = fetchall(
176-
self.engine_adapter,
177-
self._environments_query(
178-
where=self._create_expiration_filter_expr(current_ts),
179-
lock_for_update=True,
180-
),
181-
)
182-
expired_environments = [self._environment_from_row(r) for r in rows]
183175

184-
return expired_environments
176+
environment_summaries = self.get_environments_summary()
177+
return [
178+
env_summary
179+
for env_summary in environment_summaries
180+
if env_summary.expiration_ts is not None and env_summary.expiration_ts <= current_ts
181+
]
185182

186183
def delete_expired_environments(
187184
self, current_ts: t.Optional[int] = None
188-
) -> t.List[Environment]:
185+
) -> t.List[EnvironmentSummary]:
189186
"""Deletes expired environments.
190187
191188
Returns:

sqlmesh/core/state_sync/db/facade.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ def get_expired_snapshots(
274274
self.environment_state.get_environments(), current_ts=current_ts, ignore_ttl=ignore_ttl
275275
)
276276

277-
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
277+
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
278278
return self.environment_state.get_expired_environments(current_ts=current_ts)
279279

280280
@transactional()
@@ -294,7 +294,7 @@ def delete_expired_snapshots(
294294
@transactional()
295295
def delete_expired_environments(
296296
self, current_ts: t.Optional[int] = None
297-
) -> t.List[Environment]:
297+
) -> t.List[EnvironmentSummary]:
298298
current_ts = current_ts or now_timestamp()
299299
return self.environment_state.delete_expired_environments(current_ts=current_ts)
300300

tests/core/state_sync/test_state_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1115,7 +1115,7 @@ def test_delete_expired_environments(state_sync: EngineAdapterStateSync, make_sn
11151115
assert state_sync.get_environment_statements(env_a.name) == environment_statements
11161116

11171117
deleted_environments = state_sync.delete_expired_environments()
1118-
assert deleted_environments == [env_a]
1118+
assert deleted_environments == [env_a.summary]
11191119

11201120
assert state_sync.get_environment(env_a.name) is None
11211121
assert state_sync.get_environment(env_b.name) == env_b

tests/core/test_context.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -859,9 +859,9 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
859859
adapter_mock.dialect = "duckdb"
860860
state_sync_mock = mocker.MagicMock()
861861

862-
state_sync_mock.get_expired_environments.return_value = [
862+
environments = [
863863
Environment(
864-
name="test_environment",
864+
name="test_environment1",
865865
suffix_target=EnvironmentSuffixTarget.TABLE,
866866
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
867867
start_at="2022-01-01",
@@ -870,7 +870,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
870870
previous_plan_id="test_plan_id",
871871
),
872872
Environment(
873-
name="test_environment",
873+
name="test_environment2",
874874
suffix_target=EnvironmentSuffixTarget.SCHEMA,
875875
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
876876
start_at="2022-01-01",
@@ -880,6 +880,11 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
880880
),
881881
]
882882

883+
state_sync_mock.get_expired_environments.return_value = [env.summary for env in environments]
884+
state_sync_mock.get_environment = lambda name: next(
885+
env for env in environments if env.name == name
886+
)
887+
883888
sushi_context._engine_adapter = adapter_mock
884889
sushi_context.engine_adapters = {sushi_context.config.default_gateway: adapter_mock}
885890
sushi_context._state_sync = state_sync_mock
@@ -891,7 +896,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
891896
adapter_mock.drop_schema.assert_has_calls(
892897
[
893898
call(
894-
schema_("sushi__test_environment", "memory"),
899+
schema_("sushi__test_environment2", "memory"),
895900
cascade=True,
896901
ignore_if_not_exists=True,
897902
),
@@ -903,7 +908,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
903908
adapter_mock.drop_view.assert_has_calls(
904909
[
905910
call(
906-
"memory.sushi.waiter_as_customer_by_day__test_environment",
911+
"memory.sushi.waiter_as_customer_by_day__test_environment1",
907912
ignore_if_not_exists=True,
908913
),
909914
]

0 commit comments

Comments
 (0)