fix: retry sqlite preference reads on lock#8616
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a retry mechanism with exponential backoff for SQLite database lock errors in the SQLiteDatabase class, specifically wrapping the get_preference and get_preferences methods. It also adds comprehensive unit tests to verify this retry behavior. The review feedback suggests a more robust implementation for the _is_sqlite_database_locked_error helper function to handle cases where the orig attribute of OperationalError is explicitly None, ensuring the fallback to the outer exception works correctly.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def _is_sqlite_database_locked_error(exc: OperationalError) -> bool: | ||
| raw = getattr(exc, "orig", exc) | ||
| message = str(raw).lower() | ||
| return "database" in message and "locked" in message |
There was a problem hiding this comment.
Using getattr(exc, "orig", exc) can return None if the orig attribute exists on the exception but its value is explicitly None (which is common in SQLAlchemy when an OperationalError is raised without an underlying DBAPI exception, or in unit tests). If raw becomes None, calling str(raw) results in "none", which will fail to match "database" and "locked" even if the outer exception contains those terms.
A more robust approach is to use getattr(exc, "orig", None) or exc to ensure we fall back to exc if orig is None.
| def _is_sqlite_database_locked_error(exc: OperationalError) -> bool: | |
| raw = getattr(exc, "orig", exc) | |
| message = str(raw).lower() | |
| return "database" in message and "locked" in message | |
| def _is_sqlite_database_locked_error(exc: OperationalError) -> bool: | |
| raw = getattr(exc, "orig", None) or exc | |
| message = str(raw).lower() | |
| return "database" in message and "locked" in message |
References
- When detecting SQLite database lock errors in SQLAlchemy, inspect the nested/original exception (e.g., using getattr(exc, 'orig', exc)) and check for the presence of both 'database' and 'locked' in the error message to robustly cover various lock scenarios such as 'database table is locked'.
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The
_is_sqlite_database_locked_errorcheck based on substring matching ('database'and'locked') is a bit fragile; consider using the underlying DBAPI error type or error code fromexc.orig(e.g.,sqlite3.OperationalErrorand.args) to distinguish lock errors more robustly. - The hard-coded
SQLITE_LOCK_RETRY_ATTEMPTSandSQLITE_LOCK_RETRY_BASE_DELAYconstants make it harder to tune behavior in different environments; consider allowing these to be configured (e.g., viaSQLiteDatabaseinit parameters or settings) so retry aggressiveness can be adjusted without code changes.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `_is_sqlite_database_locked_error` check based on substring matching (`'database'` and `'locked'`) is a bit fragile; consider using the underlying DBAPI error type or error code from `exc.orig` (e.g., `sqlite3.OperationalError` and `.args`) to distinguish lock errors more robustly.
- The hard-coded `SQLITE_LOCK_RETRY_ATTEMPTS` and `SQLITE_LOCK_RETRY_BASE_DELAY` constants make it harder to tune behavior in different environments; consider allowing these to be configured (e.g., via `SQLiteDatabase` init parameters or settings) so retry aggressiveness can be adjusted without code changes.
## Individual Comments
### Comment 1
<location path="tests/unit/test_sqlite_lock_retry.py" line_range="50-59" />
<code_context>
+ assert session.attempts == 2
+
+
+@pytest.mark.asyncio
+async def test_get_preference_does_not_retry_other_operational_errors(
+ temp_db,
+ monkeypatch: pytest.MonkeyPatch,
+):
+ session = _LockedOnceSession(None, "no such table: preferences")
+ sleep_delays = []
+
+ async def record_sleep(delay: float) -> None:
+ sleep_delays.append(delay)
+
+ monkeypatch.setattr(temp_db, "get_db", lambda: _SessionContext(session))
+ monkeypatch.setattr(sqlite.asyncio, "sleep", record_sleep)
+
+ with pytest.raises(OperationalError):
+ await temp_db.get_preference("global", "global", "migration_done")
+
+ assert session.attempts == 1
+ assert sleep_delays == []
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for exhausting all retry attempts on a persistent SQLite lock and propagating the final OperationalError
Please also cover the case where the DB stays locked for all `SQLITE_LOCK_RETRY_ATTEMPTS`. In that test, have the session always raise `database is locked` and assert that:
- `execute` is called exactly `SQLITE_LOCK_RETRY_ATTEMPTS` times
- the recorded sleep delays match the exponential backoff (`SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt)` for attempts 0..n-2)
- the final `OperationalError` is propagated rather than wrapped
This will verify retry count and backoff behavior under a persistent lock.
</issue_to_address>
### Comment 2
<location path="astrbot/core/db/sqlite.py" line_range="60" />
<code_context>
self.inited = False
super().__init__()
+ async def _run_with_sqlite_lock_retry(
+ self,
+ operation: Callable[[], Awaitable[TxResult]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the retry helper to be session-aware so callers can use it directly without defining nested async functions per method.
You can reduce the added complexity by making the retry helper session-aware, so callers don’t need per-method inner functions.
### 1. Replace `_run_with_sqlite_lock_retry` with a session-aware helper
```python
# replace this:
async def _run_with_sqlite_lock_retry(
self,
operation: Callable[[], Awaitable[TxResult]],
) -> TxResult:
for attempt in range(SQLITE_LOCK_RETRY_ATTEMPTS):
last_attempt = attempt == SQLITE_LOCK_RETRY_ATTEMPTS - 1
try:
return await operation()
except asyncio.CancelledError:
raise
except OperationalError as exc:
if not _is_sqlite_database_locked_error(exc) or last_attempt:
raise
await asyncio.sleep(SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt))
raise RuntimeError("unreachable sqlite lock retry state")
```
with something like:
```python
async def _with_session_and_sqlite_lock_retry(
self,
fn: Callable[[AsyncSession], Awaitable[TxResult]],
) -> TxResult:
for attempt in range(SQLITE_LOCK_RETRY_ATTEMPTS):
last_attempt = attempt == SQLITE_LOCK_RETRY_ATTEMPTS - 1
try:
async with self.get_db() as session:
session = T.cast(AsyncSession, session)
return await fn(session)
except asyncio.CancelledError:
raise
except OperationalError as exc:
if not _is_sqlite_database_locked_error(exc) or last_attempt:
raise
await asyncio.sleep(SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt))
```
This keeps the retry behavior and session lifetime semantics intact, but moves both concerns into one place.
### 2. Simplify `get_preference` and `get_preferences` (no inner async defs)
```python
async def get_preference(self, scope, scope_id, key):
"""Get a preference by key."""
return await self._with_session_and_sqlite_lock_retry(
lambda session: session.execute(
select(Preference).where(
Preference.scope == scope,
Preference.scope_id == scope_id,
Preference.key == key,
)
).scalar_one_or_none()
)
```
```python
async def get_preferences(self, scope, scope_id=None, key=None):
"""Get all preferences for a specific scope ID or key."""
async def _run(session: AsyncSession) -> list[Preference]:
query = select(Preference).where(Preference.scope == scope)
if scope_id is not None:
query = query.where(Preference.scope_id == scope_id)
if key is not None:
query = query.where(Preference.key == key)
result = await session.execute(query)
return result.scalars().all()
return await self._with_session_and_sqlite_lock_retry(_run)
```
This removes the per-method nested zero-arg async functions while preserving your lock retry logic and DB behavior.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| @pytest.mark.asyncio | ||
| async def test_get_preference_retries_sqlite_database_lock( | ||
| temp_db, | ||
| monkeypatch: pytest.MonkeyPatch, | ||
| ): | ||
| preference = Preference( | ||
| scope="global", | ||
| scope_id="global", | ||
| key="migration_done", | ||
| value={"val": True}, |
There was a problem hiding this comment.
suggestion (testing): Add a test for exhausting all retry attempts on a persistent SQLite lock and propagating the final OperationalError
Please also cover the case where the DB stays locked for all SQLITE_LOCK_RETRY_ATTEMPTS. In that test, have the session always raise database is locked and assert that:
executeis called exactlySQLITE_LOCK_RETRY_ATTEMPTStimes- the recorded sleep delays match the exponential backoff (
SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt)for attempts 0..n-2) - the final
OperationalErroris propagated rather than wrapped
This will verify retry count and backoff behavior under a persistent lock.
| self.inited = False | ||
| super().__init__() | ||
|
|
||
| async def _run_with_sqlite_lock_retry( |
There was a problem hiding this comment.
issue (complexity): Consider refactoring the retry helper to be session-aware so callers can use it directly without defining nested async functions per method.
You can reduce the added complexity by making the retry helper session-aware, so callers don’t need per-method inner functions.
1. Replace _run_with_sqlite_lock_retry with a session-aware helper
# replace this:
async def _run_with_sqlite_lock_retry(
self,
operation: Callable[[], Awaitable[TxResult]],
) -> TxResult:
for attempt in range(SQLITE_LOCK_RETRY_ATTEMPTS):
last_attempt = attempt == SQLITE_LOCK_RETRY_ATTEMPTS - 1
try:
return await operation()
except asyncio.CancelledError:
raise
except OperationalError as exc:
if not _is_sqlite_database_locked_error(exc) or last_attempt:
raise
await asyncio.sleep(SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt))
raise RuntimeError("unreachable sqlite lock retry state")with something like:
async def _with_session_and_sqlite_lock_retry(
self,
fn: Callable[[AsyncSession], Awaitable[TxResult]],
) -> TxResult:
for attempt in range(SQLITE_LOCK_RETRY_ATTEMPTS):
last_attempt = attempt == SQLITE_LOCK_RETRY_ATTEMPTS - 1
try:
async with self.get_db() as session:
session = T.cast(AsyncSession, session)
return await fn(session)
except asyncio.CancelledError:
raise
except OperationalError as exc:
if not _is_sqlite_database_locked_error(exc) or last_attempt:
raise
await asyncio.sleep(SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt))This keeps the retry behavior and session lifetime semantics intact, but moves both concerns into one place.
2. Simplify get_preference and get_preferences (no inner async defs)
async def get_preference(self, scope, scope_id, key):
"""Get a preference by key."""
return await self._with_session_and_sqlite_lock_retry(
lambda session: session.execute(
select(Preference).where(
Preference.scope == scope,
Preference.scope_id == scope_id,
Preference.key == key,
)
).scalar_one_or_none()
)async def get_preferences(self, scope, scope_id=None, key=None):
"""Get all preferences for a specific scope ID or key."""
async def _run(session: AsyncSession) -> list[Preference]:
query = select(Preference).where(Preference.scope == scope)
if scope_id is not None:
query = query.where(Preference.scope_id == scope_id)
if key is not None:
query = query.where(Preference.key == key)
result = await session.execute(query)
return result.scalars().all()
return await self._with_session_and_sqlite_lock_retry(_run)This removes the per-method nested zero-arg async functions while preserving your lock retry logic and DB behavior.
|
引入问题的提交 问题由这个提交引入: 该提交做了两件关键改动:
建议修改 主数据库 def _configure_sqlite_connection(dbapi_connection, connection_record) -> None:
cursor = dbapi_connection.cursor()
try:
cursor.execute("PRAGMA busy_timeout=30000")
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.execute("PRAGMA cache_size=20000")
cursor.execute("PRAGMA temp_store=MEMORY")
cursor.execute("PRAGMA mmap_size=134217728")
finally:
cursor.close()然后把这类数据库级/维护级操作只放在初始化阶段: await conn.execute(text("PRAGMA journal_mode=WAL"))
await conn.execute(text("PRAGMA synchronous=NORMAL"))
await conn.execute(text("PRAGMA cache_size=20000"))
await conn.execute(text("PRAGMA temp_store=MEMORY"))
await conn.execute(text("PRAGMA mmap_size=134217728"))
await conn.execute(text("PRAGMA optimize"))最终建议:
同类风险文件也要同步处理:
这个文件同样也有 |
Summary
SQLiteDatabaseget_preferenceandget_preferencesContext
Follow-up to 1ad2b2c and #7724. Provider stats already retried sqlite lock errors, but shared preference reads could still fail with
database is locked.Tests
uv run pytest tests/unit/test_sqlite_lock_retry.py tests/unit/test_provider_stats.pyuv run ruff check astrbot/core/db/sqlite.py tests/unit/test_sqlite_lock_retry.py