Skip to content

fix: retry sqlite preference reads on lock#8616

Open
zouyonghe wants to merge 1 commit into
AstrBotDevs:masterfrom
zouyonghe:fix/sqlite-preference-lock-retry
Open

fix: retry sqlite preference reads on lock#8616
zouyonghe wants to merge 1 commit into
AstrBotDevs:masterfrom
zouyonghe:fix/sqlite-preference-lock-retry

Conversation

@zouyonghe
Copy link
Copy Markdown
Member

@zouyonghe zouyonghe commented Jun 6, 2026

Summary

  • add SQLite lock retry handling inside SQLiteDatabase
  • apply the retry path to get_preference and get_preferences
  • add regression tests for locked preference reads and non-lock OperationalError behavior

Context

Follow-up to 1ad2b2c and #7724. Provider stats already retried sqlite lock errors, but shared preference reads could still fail with database is locked.

Tests

  • uv run pytest tests/unit/test_sqlite_lock_retry.py tests/unit/test_provider_stats.py
  • uv run ruff check astrbot/core/db/sqlite.py tests/unit/test_sqlite_lock_retry.py

@dosubot dosubot Bot added size:M This PR changes 30-99 lines, ignoring generated files. area:core The bug / feature is about astrbot's core, backend labels Jun 6, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a retry mechanism with exponential backoff for SQLite database lock errors in the SQLiteDatabase class, specifically wrapping the get_preference and get_preferences methods. It also adds comprehensive unit tests to verify this retry behavior. The review feedback suggests a more robust implementation for the _is_sqlite_database_locked_error helper function to handle cases where the orig attribute of OperationalError is explicitly None, ensuring the fallback to the outer exception works correctly.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread astrbot/core/db/sqlite.py
Comment on lines +47 to +50
def _is_sqlite_database_locked_error(exc: OperationalError) -> bool:
raw = getattr(exc, "orig", exc)
message = str(raw).lower()
return "database" in message and "locked" in message
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using getattr(exc, "orig", exc) can return None if the orig attribute exists on the exception but its value is explicitly None (which is common in SQLAlchemy when an OperationalError is raised without an underlying DBAPI exception, or in unit tests). If raw becomes None, calling str(raw) results in "none", which will fail to match "database" and "locked" even if the outer exception contains those terms.

A more robust approach is to use getattr(exc, "orig", None) or exc to ensure we fall back to exc if orig is None.

Suggested change
def _is_sqlite_database_locked_error(exc: OperationalError) -> bool:
raw = getattr(exc, "orig", exc)
message = str(raw).lower()
return "database" in message and "locked" in message
def _is_sqlite_database_locked_error(exc: OperationalError) -> bool:
raw = getattr(exc, "orig", None) or exc
message = str(raw).lower()
return "database" in message and "locked" in message
References
  1. When detecting SQLite database lock errors in SQLAlchemy, inspect the nested/original exception (e.g., using getattr(exc, 'orig', exc)) and check for the presence of both 'database' and 'locked' in the error message to robustly cover various lock scenarios such as 'database table is locked'.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • The _is_sqlite_database_locked_error check based on substring matching ('database' and 'locked') is a bit fragile; consider using the underlying DBAPI error type or error code from exc.orig (e.g., sqlite3.OperationalError and .args) to distinguish lock errors more robustly.
  • The hard-coded SQLITE_LOCK_RETRY_ATTEMPTS and SQLITE_LOCK_RETRY_BASE_DELAY constants make it harder to tune behavior in different environments; consider allowing these to be configured (e.g., via SQLiteDatabase init parameters or settings) so retry aggressiveness can be adjusted without code changes.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `_is_sqlite_database_locked_error` check based on substring matching (`'database'` and `'locked'`) is a bit fragile; consider using the underlying DBAPI error type or error code from `exc.orig` (e.g., `sqlite3.OperationalError` and `.args`) to distinguish lock errors more robustly.
- The hard-coded `SQLITE_LOCK_RETRY_ATTEMPTS` and `SQLITE_LOCK_RETRY_BASE_DELAY` constants make it harder to tune behavior in different environments; consider allowing these to be configured (e.g., via `SQLiteDatabase` init parameters or settings) so retry aggressiveness can be adjusted without code changes.

## Individual Comments

### Comment 1
<location path="tests/unit/test_sqlite_lock_retry.py" line_range="50-59" />
<code_context>
+    assert session.attempts == 2
+
+
+@pytest.mark.asyncio
+async def test_get_preference_does_not_retry_other_operational_errors(
+    temp_db,
+    monkeypatch: pytest.MonkeyPatch,
+):
+    session = _LockedOnceSession(None, "no such table: preferences")
+    sleep_delays = []
+
+    async def record_sleep(delay: float) -> None:
+        sleep_delays.append(delay)
+
+    monkeypatch.setattr(temp_db, "get_db", lambda: _SessionContext(session))
+    monkeypatch.setattr(sqlite.asyncio, "sleep", record_sleep)
+
+    with pytest.raises(OperationalError):
+        await temp_db.get_preference("global", "global", "migration_done")
+
+    assert session.attempts == 1
+    assert sleep_delays == []
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for exhausting all retry attempts on a persistent SQLite lock and propagating the final OperationalError

Please also cover the case where the DB stays locked for all `SQLITE_LOCK_RETRY_ATTEMPTS`. In that test, have the session always raise `database is locked` and assert that:

- `execute` is called exactly `SQLITE_LOCK_RETRY_ATTEMPTS` times
- the recorded sleep delays match the exponential backoff (`SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt)` for attempts 0..n-2)
- the final `OperationalError` is propagated rather than wrapped

This will verify retry count and backoff behavior under a persistent lock.
</issue_to_address>

### Comment 2
<location path="astrbot/core/db/sqlite.py" line_range="60" />
<code_context>
         self.inited = False
         super().__init__()

+    async def _run_with_sqlite_lock_retry(
+        self,
+        operation: Callable[[], Awaitable[TxResult]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the retry helper to be session-aware so callers can use it directly without defining nested async functions per method.

You can reduce the added complexity by making the retry helper session-aware, so callers don’t need per-method inner functions.

### 1. Replace `_run_with_sqlite_lock_retry` with a session-aware helper

```python
# replace this:
async def _run_with_sqlite_lock_retry(
    self,
    operation: Callable[[], Awaitable[TxResult]],
) -> TxResult:
    for attempt in range(SQLITE_LOCK_RETRY_ATTEMPTS):
        last_attempt = attempt == SQLITE_LOCK_RETRY_ATTEMPTS - 1
        try:
            return await operation()
        except asyncio.CancelledError:
            raise
        except OperationalError as exc:
            if not _is_sqlite_database_locked_error(exc) or last_attempt:
                raise
            await asyncio.sleep(SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt))

    raise RuntimeError("unreachable sqlite lock retry state")
```

with something like:

```python
async def _with_session_and_sqlite_lock_retry(
    self,
    fn: Callable[[AsyncSession], Awaitable[TxResult]],
) -> TxResult:
    for attempt in range(SQLITE_LOCK_RETRY_ATTEMPTS):
        last_attempt = attempt == SQLITE_LOCK_RETRY_ATTEMPTS - 1
        try:
            async with self.get_db() as session:
                session = T.cast(AsyncSession, session)
                return await fn(session)
        except asyncio.CancelledError:
            raise
        except OperationalError as exc:
            if not _is_sqlite_database_locked_error(exc) or last_attempt:
                raise
            await asyncio.sleep(SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt))
```

This keeps the retry behavior and session lifetime semantics intact, but moves both concerns into one place.

### 2. Simplify `get_preference` and `get_preferences` (no inner async defs)

```python
async def get_preference(self, scope, scope_id, key):
    """Get a preference by key."""
    return await self._with_session_and_sqlite_lock_retry(
        lambda session: session.execute(
            select(Preference).where(
                Preference.scope == scope,
                Preference.scope_id == scope_id,
                Preference.key == key,
            )
        ).scalar_one_or_none()
    )
```

```python
async def get_preferences(self, scope, scope_id=None, key=None):
    """Get all preferences for a specific scope ID or key."""
    async def _run(session: AsyncSession) -> list[Preference]:
        query = select(Preference).where(Preference.scope == scope)
        if scope_id is not None:
            query = query.where(Preference.scope_id == scope_id)
        if key is not None:
            query = query.where(Preference.key == key)
        result = await session.execute(query)
        return result.scalars().all()

    return await self._with_session_and_sqlite_lock_retry(_run)
```

This removes the per-method nested zero-arg async functions while preserving your lock retry logic and DB behavior.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +50 to +59
@pytest.mark.asyncio
async def test_get_preference_retries_sqlite_database_lock(
temp_db,
monkeypatch: pytest.MonkeyPatch,
):
preference = Preference(
scope="global",
scope_id="global",
key="migration_done",
value={"val": True},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test for exhausting all retry attempts on a persistent SQLite lock and propagating the final OperationalError

Please also cover the case where the DB stays locked for all SQLITE_LOCK_RETRY_ATTEMPTS. In that test, have the session always raise database is locked and assert that:

  • execute is called exactly SQLITE_LOCK_RETRY_ATTEMPTS times
  • the recorded sleep delays match the exponential backoff (SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt) for attempts 0..n-2)
  • the final OperationalError is propagated rather than wrapped

This will verify retry count and backoff behavior under a persistent lock.

Comment thread astrbot/core/db/sqlite.py
self.inited = False
super().__init__()

async def _run_with_sqlite_lock_retry(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring the retry helper to be session-aware so callers can use it directly without defining nested async functions per method.

You can reduce the added complexity by making the retry helper session-aware, so callers don’t need per-method inner functions.

1. Replace _run_with_sqlite_lock_retry with a session-aware helper

# replace this:
async def _run_with_sqlite_lock_retry(
    self,
    operation: Callable[[], Awaitable[TxResult]],
) -> TxResult:
    for attempt in range(SQLITE_LOCK_RETRY_ATTEMPTS):
        last_attempt = attempt == SQLITE_LOCK_RETRY_ATTEMPTS - 1
        try:
            return await operation()
        except asyncio.CancelledError:
            raise
        except OperationalError as exc:
            if not _is_sqlite_database_locked_error(exc) or last_attempt:
                raise
            await asyncio.sleep(SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt))

    raise RuntimeError("unreachable sqlite lock retry state")

with something like:

async def _with_session_and_sqlite_lock_retry(
    self,
    fn: Callable[[AsyncSession], Awaitable[TxResult]],
) -> TxResult:
    for attempt in range(SQLITE_LOCK_RETRY_ATTEMPTS):
        last_attempt = attempt == SQLITE_LOCK_RETRY_ATTEMPTS - 1
        try:
            async with self.get_db() as session:
                session = T.cast(AsyncSession, session)
                return await fn(session)
        except asyncio.CancelledError:
            raise
        except OperationalError as exc:
            if not _is_sqlite_database_locked_error(exc) or last_attempt:
                raise
            await asyncio.sleep(SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt))

This keeps the retry behavior and session lifetime semantics intact, but moves both concerns into one place.

2. Simplify get_preference and get_preferences (no inner async defs)

async def get_preference(self, scope, scope_id, key):
    """Get a preference by key."""
    return await self._with_session_and_sqlite_lock_retry(
        lambda session: session.execute(
            select(Preference).where(
                Preference.scope == scope,
                Preference.scope_id == scope_id,
                Preference.key == key,
            )
        ).scalar_one_or_none()
    )
async def get_preferences(self, scope, scope_id=None, key=None):
    """Get all preferences for a specific scope ID or key."""
    async def _run(session: AsyncSession) -> list[Preference]:
        query = select(Preference).where(Preference.scope == scope)
        if scope_id is not None:
            query = query.where(Preference.scope_id == scope_id)
        if key is not None:
            query = query.where(Preference.key == key)
        result = await session.execute(query)
        return result.scalars().all()

    return await self._with_session_and_sqlite_lock_retry(_run)

This removes the per-method nested zero-arg async functions while preserving your lock retry logic and DB behavior.

@KBVsent
Copy link
Copy Markdown
Contributor

KBVsent commented Jun 6, 2026

引入问题的提交

问题由这个提交引入:

2d7862684088613840228fa482b181bd7c241053
fix SQLAlchemy compatibility issues on macOS (#7724)

该提交做了两件关键改动:

  1. SQLite engine 改为 poolclass=NullPool
  2. 新增 event.listen(..., "connect", _configure_sqlite_connection),并在每次连接时执行多条 PRAGMA,包括 PRAGMA optimize

NullPool 会让连接更频繁创建,因此 connect hook 被频繁触发;再叠加每连接执行 PRAGMA optimize,就把原本应低频执行的维护操作放进了高频读写路径。

建议修改

主数据库 astrbot/core/db/__init__.py 的 connect hook 建议改成只保留轻量连接级设置:

def _configure_sqlite_connection(dbapi_connection, connection_record) -> None:
    cursor = dbapi_connection.cursor()
    try:
        cursor.execute("PRAGMA busy_timeout=30000")
        cursor.execute("PRAGMA synchronous=NORMAL")
        cursor.execute("PRAGMA cache_size=20000")
        cursor.execute("PRAGMA temp_store=MEMORY")
        cursor.execute("PRAGMA mmap_size=134217728")
    finally:
        cursor.close()

然后把这类数据库级/维护级操作只放在初始化阶段:

await conn.execute(text("PRAGMA journal_mode=WAL"))
await conn.execute(text("PRAGMA synchronous=NORMAL"))
await conn.execute(text("PRAGMA cache_size=20000"))
await conn.execute(text("PRAGMA temp_store=MEMORY"))
await conn.execute(text("PRAGMA mmap_size=134217728"))
await conn.execute(text("PRAGMA optimize"))

最终建议:

  • 必须移除每连接 PRAGMA optimize
  • 建议移除每连接 PRAGMA journal_mode=WAL
  • busy_timeout 可以同时通过 connect_args={"timeout": 30}PRAGMA busy_timeout=30000 保守设置
  • PRAGMA optimize 可保留在启动初始化、低频后台任务、关闭前维护任务里,不要放在 connect hook

同类风险文件也要同步处理:

  • astrbot/core/knowledge_base/kb_db_sqlite.py

这个文件同样也有 PRAGMA optimize,建议一起移除,避免知识库数据库在并发访问时出现同类锁问题。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants