Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion astrbot/cli/commands/cmd_run.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,71 @@
import asyncio
import contextlib
import os
import signal
import sys
import traceback
from collections.abc import Callable
from pathlib import Path
from typing import Any

import click
from filelock import FileLock, Timeout

from ..utils import check_astrbot_root, check_dashboard, get_astrbot_root

ShutdownCallback = Callable[[signal.Signals], None]


def _install_shutdown_signal_handlers(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the signal handling and shutdown flow in run_astrbot by inlining the logic, reducing task coordination, and centralizing LogManager shutdown.

You can reduce complexity without losing functionality by simplifying both the signal-handling abstraction and the task coordination.

1. Inline signal setup instead of _install_shutdown_signal_handlers

The helper tracks previous handlers and has two code paths, plus a cleanup closure. For this CLI entrypoint, you can keep behavior but make it local and linear:

async def run_astrbot(astrbot_root: Path) -> None:
    ...
    loop = asyncio.get_running_loop()
    shutdown_requested = asyncio.Event()

    def request_shutdown(signum: int) -> None:
        logger.info(f"Received {signal.Signals(signum).name}; stopping AstrBot...")
        shutdown_requested.set()

    for signum in (signal.SIGINT, signal.SIGTERM):
        try:
            loop.add_signal_handler(signum, request_shutdown, signum)
        except (NotImplementedError, RuntimeError):
            signal.signal(signum, lambda s, _f: request_shutdown(s))

This removes:

  • ShutdownCallback and _install_shutdown_signal_handlers
  • previous_handlers, installed, and the cleanup closure
  • The need to thread shutdown_signal state around

If restoring previous handlers is really required, you can still keep that logic inline in run_astrbot, which is easier to reason about than a generic helper.

2. Simplify the two-task coordination

You can avoid asyncio.wait and a separate shutdown_task by directly awaiting the event and then cancelling the main task:

async def run_astrbot(astrbot_root: Path) -> None:
    ...
    runner_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")

    try:
        await shutdown_requested.wait()
        if not runner_task.done():
            runner_task.cancel()
        await runner_task
    finally:
        await LogManager.shutdown()

If you want to log only when a signal actually occurred, keep the logging in request_shutdown as above. This preserves:

  • “run core_lifecycle until a signal”
  • “cancel and await on signal”
  • “always shut down logging”

but removes shutdown_task, asyncio.wait, and the FIRST_COMPLETED + done bookkeeping.

3. Centralise LogManager.shutdown

Right now run_astrbot calls await LogManager.shutdown() and InitialLoader.start also calls it in a finally. To avoid double-shutdown logic spread across layers, consider consolidating it in one place.

For example, keep it only in the CLI:

# in InitialLoader.start()
try:
    ...
finally:
    # remove LogManager.shutdown() from here
    ...

And leave:

try:
    ...
finally:
    await LogManager.shutdown()

in run_astrbot.

This reduces cross-layer coupling and simplifies reasoning about where the global logging system is terminated.

loop: asyncio.AbstractEventLoop,
callback: ShutdownCallback,
) -> Callable[[], None]:
"""Install SIGINT/SIGTERM handlers and return a cleanup callback."""
handled_signals = (signal.SIGINT, signal.SIGTERM)
previous_handlers: dict[signal.Signals, Any] = {}
installed: list[signal.Signals] = []

for signum in handled_signals:
try:
previous_handlers[signum] = signal.getsignal(signum)
except ValueError:
previous_handlers[signum] = None
try:
loop.add_signal_handler(signum, callback, signum)
installed.append(signum)
except (NotImplementedError, RuntimeError, ValueError):

def fallback_handler(received_signum, frame):
_ = frame
if not loop.is_closed():
try:
loop.call_soon_threadsafe(
callback, signal.Signals(received_signum)
)
except RuntimeError:
pass

try:
signal.signal(signum, fallback_handler)
installed.append(signum)
except ValueError:
pass

def cleanup() -> None:
for signum in installed:
try:
loop.remove_signal_handler(signum)
except (NotImplementedError, RuntimeError, ValueError):
pass
previous_handler = previous_handlers.get(signum)
if previous_handler is not None:
try:
signal.signal(signum, previous_handler)
except (TypeError, ValueError):
pass

return cleanup


async def run_astrbot(astrbot_root: Path) -> None:
"""Run AstrBot"""
Expand All @@ -23,7 +80,46 @@ async def run_astrbot(astrbot_root: Path) -> None:

core_lifecycle = InitialLoader(db, log_broker)

await core_lifecycle.start()
loop = asyncio.get_running_loop()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the shutdown flow by having the signal handler cancel the main task directly and streamlining the signal-handler helper to reduce internal bookkeeping.

You can keep the new behavior but flatten the orchestration around shutdown by letting the signal handler cancel the main task directly and dropping the extra task/event.

1. Simplify run_astrbot orchestration

You don’t actually need shutdown_requested, shutdown_task, or asyncio.wait. The signal handler can cancel the main task, and you can distinguish “signal‑driven” vs “other” cancellations from a single await:

async def run_astrbot(astrbot_root: Path) -> None:
    """Run AstrBot"""
    from astrbot.core import LogBroker, LogManager, db_helper, logger
    from astrbot.core.initial_loader import InitialLoader

    await check_dashboard(astrbot_root / "data")

    log_broker = LogBroker()
    LogManager.set_queue_handler(logger, log_broker)
    db = db_helper

    core_lifecycle = InitialLoader(db, log_broker)

    loop = asyncio.get_running_loop()
    shutdown_signal: signal.Signals | None = None

    main_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")

    def request_shutdown(signum: signal.Signals) -> None:
        nonlocal shutdown_signal
        shutdown_signal = signum
        logger.info(f"Received {signum.name}; stopping AstrBot...")
        main_task.cancel()

    cleanup_signal_handlers = _install_shutdown_signal_handlers(loop, request_shutdown)

    try:
        try:
            await main_task
        except asyncio.CancelledError:
            # Only swallow cancellations that were triggered by our signal handler
            if shutdown_signal is None:
                raise
    finally:
        cleanup_signal_handlers()
        await LogManager.shutdown()

This keeps:

  • clean shutdown on SIGINT/SIGTERM
  • differentiation between “signal‑driven” cancellation and other cancellations

but removes:

  • asyncio.Event
  • shutdown_task
  • asyncio.wait(FIRST_COMPLETED)
  • shutdown_requested_by_signal bookkeeping

2. Lightly trim _install_shutdown_signal_handlers

You can reduce some internal bookkeeping without changing behavior by using a single dict for “installed + previous handler” instead of a separate installed list:

def _install_shutdown_signal_handlers(
    loop: asyncio.AbstractEventLoop,
    callback: ShutdownCallback,
) -> Callable[[], None]:
    """Install SIGINT/SIGTERM handlers and return a cleanup callback."""
    handled_signals = (signal.SIGINT, signal.SIGTERM)
    previous_handlers: dict[signal.Signals, Any] = {}

    for signum in handled_signals:
        previous_handlers[signum] = signal.getsignal(signum)
        try:
            loop.add_signal_handler(signum, callback, signum)
        except (NotImplementedError, RuntimeError):
            def fallback_handler(received_signum, _frame) -> None:
                callback(signal.Signals(received_signum))

            signal.signal(signum, fallback_handler)

    def cleanup() -> None:
        for signum, prev in previous_handlers.items():
            with contextlib.suppress(NotImplementedError, RuntimeError):
                loop.remove_signal_handler(signum)
            signal.signal(signum, prev)

    return cleanup

This keeps:

  • restoration of original handlers
  • loop vs. signal.signal fallback handling

but makes the helper easier to scan and reason about.

shutdown_requested = asyncio.Event()
shutdown_signal: signal.Signals | None = None

def request_shutdown(signum: signal.Signals) -> None:
nonlocal shutdown_signal
shutdown_signal = signum
shutdown_requested.set()

cleanup_signal_handlers = _install_shutdown_signal_handlers(loop, request_shutdown)
runner_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")
shutdown_task = asyncio.create_task(
shutdown_requested.wait(), name="astrbot_shutdown"
)

try:
done, _ = await asyncio.wait(
{runner_task, shutdown_task},
return_when=asyncio.FIRST_COMPLETED,
)
shutdown_requested_by_signal = shutdown_task in done
if shutdown_requested_by_signal and not runner_task.done():
signal_name = shutdown_signal.name if shutdown_signal else "unknown"
logger.info(f"Received {signal_name}; stopping AstrBot...")
runner_task.cancel()
try:
await runner_task
except asyncio.CancelledError:
if not shutdown_requested_by_signal:
raise
finally:
cleanup_signal_handlers()
if not runner_task.done():
runner_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await runner_task
if not shutdown_task.done():
shutdown_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await shutdown_task


@click.option("--reload", "-r", is_flag=True, help="Auto-reload plugins")
Expand Down
67 changes: 41 additions & 26 deletions astrbot/core/initial_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import asyncio
import traceback

from astrbot.core import LogBroker, logger
from astrbot.core import LogBroker, LogManager, logger
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from astrbot.core.db import BaseDatabase
from astrbot.dashboard.server import AstrBotDashboard
Expand All @@ -25,33 +25,48 @@ def __init__(self, db: BaseDatabase, log_broker: LogBroker) -> None:

async def start(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider flattening the start() method’s control flow by separating initialization from the runtime phase and using a single try/except for the run logic while keeping the current semantics.

You can keep the new semantics but flatten the control flow and simplify initialized usage by pulling initialization out of the nested try and having a single try/except block for the “run” phase:

async def start(self) -> None:
    core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
    initialized = False

    # 1. Initialization phase
    try:
        await core_lifecycle.initialize()
        initialized = True
    except Exception as e:
        logger.critical(traceback.format_exc())
        logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
        return

    # 2. Runtime phase
    core_task = core_lifecycle.start()
    webui_dir = self.webui_dir
    self.dashboard_server = AstrBotDashboard(
        core_lifecycle,
        self.db,
        core_lifecycle.dashboard_shutdown_event,
        webui_dir,
    )
    coro = self.dashboard_server.run()
    if coro:
        task = asyncio.gather(core_task, coro)
    else:
        task = core_task

    try:
        await task  # 整个AstrBot在这里运行
    except asyncio.CancelledError:
        logger.info("🌈 正在关闭 AstrBot...")
        if initialized:
            await core_lifecycle.stop()
    except Exception:
        if initialized:
            try:
                await core_lifecycle.stop()
            except Exception:
                logger.error(
                    "AstrBot shutdown during runtime-error handling failed",
                    exc_info=True,
                )
        raise
    finally:
        await LogManager.shutdown()

This preserves all the new behaviors:

  • If initialization fails: log and return, no stop().
  • If cancelled after initialization: log and call stop().
  • If a runtime error occurs after initialization: attempt stop(), log if shutdown fails, then re-raise.
  • LogManager.shutdown() is still always called.

If possible at a higher level, you might also consider centralizing LogManager.shutdown() in a single layer (either here or in run_astrbot) to make the overall shutdown protocol easier to follow.

core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
initialized = False

try:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring the start logic to move initialization and task orchestration into small helpers, removing the nested try and initialized flag while preserving current behavior.

The nested try + initialized flag can be removed without changing behaviour by separating initialization from the run/cancel flow and extracting a small helper for the core+dashboard orchestration.

You can keep all guarantees:

  • Init failures are logged and abort early.
  • stop() is only called if init succeeded.
  • LogManager.shutdown() still always runs.

For example:

class InitialLoader:
    ...

    async def _init_core(self, core_lifecycle: AstrBotCoreLifecycle) -> bool:
        try:
            await core_lifecycle.initialize()
            return True
        except Exception as e:
            logger.critical(traceback.format_exc())
            logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
            return False

    def _build_task(self, core_lifecycle: AstrBotCoreLifecycle) -> asyncio.Future:
        core_task = core_lifecycle.start()
        webui_dir = self.webui_dir
        self.dashboard_server = AstrBotDashboard(
            core_lifecycle,
            self.db,
            core_lifecycle.dashboard_shutdown_event,
            webui_dir,
        )
        coro = self.dashboard_server.run()
        if coro:
            return asyncio.gather(core_task, coro)
        return core_task

    async def start(self) -> None:
        core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)

        if not await self._init_core(core_lifecycle):
            # init failed -> nothing to stop, just ensure logs shutdown
            await LogManager.shutdown()
            return

        task = self._build_task(core_lifecycle)

        try:
            await task  # 整个AstrBot在这里运行
        except asyncio.CancelledError:
            logger.info("🌈 正在关闭 AstrBot...")
            await core_lifecycle.stop()
        finally:
            await LogManager.shutdown()

Key effects:

  • No nested try and no initialized flag; the init path is linear and clearly separated.
  • The run orchestration (_build_task) is self-contained and easier to read.
  • Behaviour on init failure and cancellation remains the same as in your current version.

await core_lifecycle.initialize()
except Exception as e:
logger.critical(traceback.format_exc())
logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
return

core_task = core_lifecycle.start()

webui_dir = self.webui_dir

self.dashboard_server = AstrBotDashboard(
core_lifecycle,
self.db,
core_lifecycle.dashboard_shutdown_event,
webui_dir,
)

coro = self.dashboard_server.run()
if coro:
# 启动核心任务和仪表板服务器
task = asyncio.gather(core_task, coro)
else:
task = core_task
try:
try:
await core_lifecycle.initialize()
initialized = True
except Exception as e:
logger.critical(traceback.format_exc())
logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
return

core_task = core_lifecycle.start()

webui_dir = self.webui_dir

self.dashboard_server = AstrBotDashboard(
core_lifecycle,
self.db,
core_lifecycle.dashboard_shutdown_event,
webui_dir,
)

coro = self.dashboard_server.run()
if coro:
# 启动核心任务和仪表板服务器
task = asyncio.gather(core_task, coro)
else:
task = core_task
await task # 整个AstrBot在这里运行
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
await core_lifecycle.stop()
if initialized:
await core_lifecycle.stop()
except Exception:
if initialized:
try:
await core_lifecycle.stop()
except Exception:
logger.error(
"AstrBot shutdown during runtime-error handling failed",
exc_info=True,
)
raise
finally:
await LogManager.shutdown()
Comment on lines 57 to +72
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

如果 await task 在运行时抛出非 CancelledError 的异常(例如数据库连接断开、网络异常或插件崩溃等),该异常将不会被 except asyncio.CancelledError 捕获,而是直接进入 finally 块。

这会导致 core_lifecycle.stop() 无法被调用,从而无法优雅地关闭各个管理器、释放数据库连接和清理插件资源,可能导致资源泄露或锁文件残留。

建议将 core_lifecycle.stop() 的调用移动到 finally 块中,并结合 initialized 状态进行保护,以确保在任何异常退出路径下都能进行优雅清理。

Suggested change
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
await core_lifecycle.stop()
if initialized:
await core_lifecycle.stop()
finally:
await LogManager.shutdown()
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
finally:
if initialized:
await core_lifecycle.stop()
await LogManager.shutdown()

12 changes: 12 additions & 0 deletions astrbot/core/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,15 @@ def configure_trace_logger(cls, config: dict | None) -> None:
backup_count=3,
trace=True,
)

@classmethod
async def shutdown(cls) -> None:
"""Flush and remove loguru sinks during process shutdown."""
try:
await _loguru.complete()
finally:
cls._remove_sink(cls._trace_sink_id)
cls._trace_sink_id = None
cls._remove_sink(cls._file_sink_id)
cls._file_sink_id = None
cls._configured = False
Comment on lines +425 to +429
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

shutdown 中移除控制台日志 Sink (_console_sink_id) 会导致在此之后的所有日志输出被完全静默。

原因

  1. 控制台日志 Sink 并没有开启 enqueue=True,因此它不会像文件日志那样在退出时产生 multiprocessing.resource_tracker 的 semaphore 泄露警告。
  2. LogManager.shutdown() 会在 core_lifecycle.stop() 中被调用。如果在此之后(例如在 InitialLoaderfinally 块或 cmd_run 的后续清理流程中)发生任何错误或需要记录日志,由于控制台 Sink 已被移除,这些关键的退出/错误日志将无法输出到控制台,给排查问题带来困难。

建议

避免在 shutdown 中移除控制台 Sink,仅清理和移除会产生信号量泄露的 _trace_sink_id_file_sink_id

Suggested change
cls._remove_sink(cls._trace_sink_id)
cls._trace_sink_id = None
cls._remove_sink(cls._file_sink_id)
cls._file_sink_id = None
cls._remove_sink(cls._console_sink_id)
cls._console_sink_id = None
cls._configured = False
cls._remove_sink(cls._trace_sink_id)
cls._trace_sink_id = None
cls._remove_sink(cls._file_sink_id)
cls._file_sink_id = None
cls._configured = False

Loading