From e9d8a9a1e352213953126805774f9d58390d5a99 Mon Sep 17 00:00:00 2001 From: Akokko Date: Fri, 5 Jun 2026 15:38:51 +0800 Subject: [PATCH 1/5] fix: clean up Loguru sinks on shutdown Add LogManager.shutdown() and wire it into the AstrBot shutdown path.Also handle SIGINT/SIGTERM gracefully in astrbot run so file-log sinks are flushed and removed before exit. This prevents multiprocessing.resource_tracker semaphore warnings when file logging is enabled. Refs #8595 --- astrbot/cli/commands/cmd_run.py | 67 ++++++++- astrbot/core/core_lifecycle.py | 2 + astrbot/core/initial_loader.py | 57 +++---- astrbot/core/log.py | 14 ++ tests/unit/test_cmd_run_shutdown.py | 191 ++++++++++++++++++++++++ tests/unit/test_core_lifecycle.py | 17 +++ tests/unit/test_log_manager_shutdown.py | 42 ++++++ 7 files changed, 363 insertions(+), 27 deletions(-) create mode 100644 tests/unit/test_cmd_run_shutdown.py create mode 100644 tests/unit/test_log_manager_shutdown.py diff --git a/astrbot/cli/commands/cmd_run.py b/astrbot/cli/commands/cmd_run.py index de09e58521..d4006370af 100644 --- a/astrbot/cli/commands/cmd_run.py +++ b/astrbot/cli/commands/cmd_run.py @@ -1,14 +1,50 @@ import asyncio import os +import signal import sys import traceback +from collections.abc import Callable from pathlib import Path +from typing import Any import click from filelock import FileLock, Timeout from ..utils import check_astrbot_root, check_dashboard, get_astrbot_root +ShutdownCallback = Callable[[signal.Signals], None] + + +def _install_shutdown_signal_handlers( + loop: asyncio.AbstractEventLoop, + callback: ShutdownCallback, +) -> Callable[[], None]: + """Install SIGINT/SIGTERM handlers and return a cleanup callback.""" + handled_signals = (signal.SIGINT, signal.SIGTERM) + previous_handlers: dict[signal.Signals, Any] = {} + installed: list[signal.Signals] = [] + + for signum in handled_signals: + previous_handlers[signum] = signal.getsignal(signum) + try: + loop.add_signal_handler(signum, callback, signum) + installed.append(signum) + except (NotImplementedError, RuntimeError): + signal.signal( + signum, lambda _signum, _frame: callback(signal.Signals(_signum)) + ) + installed.append(signum) + + def cleanup() -> None: + for signum in installed: + try: + loop.remove_signal_handler(signum) + except (NotImplementedError, RuntimeError): + pass + signal.signal(signum, previous_handlers[signum]) + + return cleanup + async def run_astrbot(astrbot_root: Path) -> None: """Run AstrBot""" @@ -23,7 +59,36 @@ async def run_astrbot(astrbot_root: Path) -> None: core_lifecycle = InitialLoader(db, log_broker) - await core_lifecycle.start() + loop = asyncio.get_running_loop() + shutdown_requested = asyncio.Event() + shutdown_signal: signal.Signals | None = None + + def request_shutdown(signum: signal.Signals) -> None: + nonlocal shutdown_signal + shutdown_signal = signum + shutdown_requested.set() + + cleanup_signal_handlers = _install_shutdown_signal_handlers(loop, request_shutdown) + runner_task = asyncio.create_task(core_lifecycle.start(), name="astrbot") + shutdown_task = asyncio.create_task( + shutdown_requested.wait(), name="astrbot_shutdown" + ) + + try: + done, _ = await asyncio.wait( + {runner_task, shutdown_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + if shutdown_task in done and not runner_task.done(): + signal_name = shutdown_signal.name if shutdown_signal else "unknown" + logger.info(f"Received {signal_name}; stopping AstrBot...") + runner_task.cancel() + await runner_task + finally: + cleanup_signal_handlers() + if not shutdown_task.done(): + shutdown_task.cancel() + await LogManager.shutdown() @click.option("--reload", "-r", is_flag=True, help="Auto-reload plugins") diff --git a/astrbot/core/core_lifecycle.py b/astrbot/core/core_lifecycle.py index 725b170003..2cae87bbca 100644 --- a/astrbot/core/core_lifecycle.py +++ b/astrbot/core/core_lifecycle.py @@ -391,6 +391,8 @@ async def stop(self) -> None: except Exception as e: logger.error(f"任务 {task.get_name()} 发生错误: {e}") + await LogManager.shutdown() + async def restart(self) -> None: """重启 AstrBot 核心生命周期管理类, 终止各个管理器并重新加载平台实例""" await self.provider_manager.terminate() diff --git a/astrbot/core/initial_loader.py b/astrbot/core/initial_loader.py index 3f836a4c42..3a541bab25 100644 --- a/astrbot/core/initial_loader.py +++ b/astrbot/core/initial_loader.py @@ -8,7 +8,7 @@ import asyncio import traceback -from astrbot.core import LogBroker, logger +from astrbot.core import LogBroker, LogManager, logger from astrbot.core.core_lifecycle import AstrBotCoreLifecycle from astrbot.core.db import BaseDatabase from astrbot.dashboard.server import AstrBotDashboard @@ -25,33 +25,38 @@ def __init__(self, db: BaseDatabase, log_broker: LogBroker) -> None: async def start(self) -> None: core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db) + initialized = False try: - await core_lifecycle.initialize() - except Exception as e: - logger.critical(traceback.format_exc()) - logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!") - return - - core_task = core_lifecycle.start() - - webui_dir = self.webui_dir - - self.dashboard_server = AstrBotDashboard( - core_lifecycle, - self.db, - core_lifecycle.dashboard_shutdown_event, - webui_dir, - ) - - coro = self.dashboard_server.run() - if coro: - # 启动核心任务和仪表板服务器 - task = asyncio.gather(core_task, coro) - else: - task = core_task - try: + try: + await core_lifecycle.initialize() + initialized = True + except Exception as e: + logger.critical(traceback.format_exc()) + logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!") + return + + core_task = core_lifecycle.start() + + webui_dir = self.webui_dir + + self.dashboard_server = AstrBotDashboard( + core_lifecycle, + self.db, + core_lifecycle.dashboard_shutdown_event, + webui_dir, + ) + + coro = self.dashboard_server.run() + if coro: + # 启动核心任务和仪表板服务器 + task = asyncio.gather(core_task, coro) + else: + task = core_task await task # 整个AstrBot在这里运行 except asyncio.CancelledError: logger.info("🌈 正在关闭 AstrBot...") - await core_lifecycle.stop() + if initialized: + await core_lifecycle.stop() + finally: + await LogManager.shutdown() diff --git a/astrbot/core/log.py b/astrbot/core/log.py index 3dd0719b11..390ed38ff6 100644 --- a/astrbot/core/log.py +++ b/astrbot/core/log.py @@ -415,3 +415,17 @@ def configure_trace_logger(cls, config: dict | None) -> None: backup_count=3, trace=True, ) + + @classmethod + async def shutdown(cls) -> None: + """Flush and remove loguru sinks during process shutdown.""" + try: + await _loguru.complete() + finally: + cls._remove_sink(cls._trace_sink_id) + cls._trace_sink_id = None + cls._remove_sink(cls._file_sink_id) + cls._file_sink_id = None + cls._remove_sink(cls._console_sink_id) + cls._console_sink_id = None + cls._configured = False diff --git a/tests/unit/test_cmd_run_shutdown.py b/tests/unit/test_cmd_run_shutdown.py new file mode 100644 index 0000000000..df45ca67ae --- /dev/null +++ b/tests/unit/test_cmd_run_shutdown.py @@ -0,0 +1,191 @@ +from __future__ import annotations + +import asyncio +import signal +from collections.abc import Callable +from pathlib import Path +from typing import Any, cast +from unittest.mock import AsyncMock, MagicMock + +import pytest + +import astrbot.cli.commands.cmd_run as cmd_run +import astrbot.core as core_module +import astrbot.core.initial_loader as initial_loader_module + + +@pytest.mark.asyncio +async def test_run_astrbot_stops_gracefully_on_sigterm(monkeypatch): + shutdown_mock = AsyncMock(return_value=None) + set_queue_handler_mock = MagicMock() + check_dashboard_mock = AsyncMock(return_value=None) + signal_restore_mock = MagicMock() + pending_signals: dict[signal.Signals, Callable[[], None]] = {} + removed_signals: list[signal.Signals] = [] + previous_handlers = { + signal.SIGINT: object(), + signal.SIGTERM: object(), + } + + class FakeLoop: + def add_signal_handler(self, signum, callback, *args): + pending_signals[signum] = lambda: callback(*args) + + def remove_signal_handler(self, signum): + removed_signals.append(signum) + pending_signals.pop(signum, None) + return True + + started = asyncio.Event() + cancelled = asyncio.Event() + + class FakeLoader: + def __init__(self, db, log_broker): + self.db = db + self.log_broker = log_broker + + async def start(self): + started.set() + try: + await asyncio.Event().wait() + except asyncio.CancelledError: + cancelled.set() + return + + monkeypatch.setattr(initial_loader_module, "InitialLoader", FakeLoader) + monkeypatch.setattr(cmd_run, "check_dashboard", check_dashboard_mock) + fake_loop = FakeLoop() + monkeypatch.setattr(cmd_run.asyncio, "get_running_loop", lambda: fake_loop) + monkeypatch.setattr( + cmd_run.signal, "getsignal", lambda signum: previous_handlers[signum] + ) + monkeypatch.setattr(cmd_run.signal, "signal", signal_restore_mock) + monkeypatch.setattr( + core_module.LogManager, "set_queue_handler", set_queue_handler_mock + ) + monkeypatch.setattr(core_module.LogManager, "shutdown", shutdown_mock) + + awaitable = asyncio.create_task(cmd_run.run_astrbot(Path("/tmp/astrbot-root"))) + await started.wait() + + assert signal.SIGTERM in pending_signals + pending_signals[signal.SIGTERM]() + + await awaitable + + assert cancelled.is_set() + assert set(removed_signals) == {signal.SIGINT, signal.SIGTERM} + assert signal_restore_mock.call_count == 2 + shutdown_mock.assert_awaited_once() + check_dashboard_mock.assert_awaited_once() + set_queue_handler_mock.assert_called_once() + + +def test_install_shutdown_signal_handlers_falls_back_and_restores(monkeypatch): + restored_handlers: list[tuple[signal.Signals, Any]] = [] + installed_handlers: dict[signal.Signals, Callable[[int, object], object]] = {} + previous_handlers = { + signal.SIGINT: object(), + signal.SIGTERM: object(), + } + callback = MagicMock() + + class FakeLoop: + def add_signal_handler(self, _signum, _callback, *_args): + _ = (_signum, _callback, _args) + raise NotImplementedError + + def remove_signal_handler(self, _signum): + _ = _signum + raise NotImplementedError + + def fake_signal(signum: signal.Signals, handler: Any) -> object: + if callable(handler): + installed_handlers[signum] = cast(Callable[[int, object], object], handler) + else: + restored_handlers.append((signum, handler)) + return previous_handlers[signum] + + monkeypatch.setattr( + cmd_run.signal, "getsignal", lambda signum: previous_handlers[signum] + ) + monkeypatch.setattr(cmd_run.signal, "signal", fake_signal) + + cleanup = cmd_run._install_shutdown_signal_handlers(cast(Any, FakeLoop()), callback) + + installed_handlers[signal.SIGTERM](signal.SIGTERM, None) + callback.assert_called_once_with(signal.SIGTERM) + + cleanup() + + assert restored_handlers == [ + (signal.SIGINT, previous_handlers[signal.SIGINT]), + (signal.SIGTERM, previous_handlers[signal.SIGTERM]), + ] + + +@pytest.mark.asyncio +async def test_initial_loader_shutdowns_logs_on_initialize_failure(monkeypatch): + shutdown_mock = AsyncMock(return_value=None) + lifecycle_instances: list[MagicMock] = [] + + class FakeLifecycle: + def __init__(self, log_broker, db): + _ = (log_broker, db) + lifecycle = MagicMock() + lifecycle.initialize = AsyncMock(side_effect=RuntimeError("boom")) + lifecycle.stop = AsyncMock() + lifecycle.start = AsyncMock() + lifecycle.dashboard_shutdown_event = asyncio.Event() + lifecycle.astrbot_config = MagicMock() + lifecycle_instances.append(lifecycle) + self.__dict__.update(lifecycle.__dict__) + + monkeypatch.setattr(initial_loader_module, "AstrBotCoreLifecycle", FakeLifecycle) + monkeypatch.setattr(initial_loader_module.LogManager, "shutdown", shutdown_mock) + + loader = initial_loader_module.InitialLoader(MagicMock(), MagicMock()) + + await loader.start() + + assert len(lifecycle_instances) == 1 + lifecycle_instances[0].stop.assert_not_awaited() + shutdown_mock.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_initial_loader_handles_cancellation_during_initialize(monkeypatch): + shutdown_mock = AsyncMock(return_value=None) + initialize_started = asyncio.Event() + lifecycle_instances: list[MagicMock] = [] + + class FakeLifecycle: + def __init__(self, log_broker, db): + _ = (log_broker, db) + lifecycle = MagicMock() + + async def initialize(): + initialize_started.set() + await asyncio.Event().wait() + + lifecycle.initialize = AsyncMock(side_effect=initialize) + lifecycle.stop = AsyncMock() + lifecycle.start = AsyncMock() + lifecycle.dashboard_shutdown_event = asyncio.Event() + lifecycle.astrbot_config = MagicMock() + lifecycle_instances.append(lifecycle) + self.__dict__.update(lifecycle.__dict__) + + monkeypatch.setattr(initial_loader_module, "AstrBotCoreLifecycle", FakeLifecycle) + monkeypatch.setattr(initial_loader_module.LogManager, "shutdown", shutdown_mock) + + loader = initial_loader_module.InitialLoader(MagicMock(), MagicMock()) + task = asyncio.create_task(loader.start()) + await initialize_started.wait() + + task.cancel() + await task + + assert len(lifecycle_instances) == 1 + lifecycle_instances[0].stop.assert_not_awaited() + shutdown_mock.assert_awaited_once() diff --git a/tests/unit/test_core_lifecycle.py b/tests/unit/test_core_lifecycle.py index 1fc8035e48..22b85f613a 100644 --- a/tests/unit/test_core_lifecycle.py +++ b/tests/unit/test_core_lifecycle.py @@ -102,6 +102,14 @@ def test_init_clears_proxy( class TestAstrBotCoreLifecycleStop: """Tests for AstrBotCoreLifecycle.stop method.""" + @pytest.fixture(autouse=True) + def _mock_log_manager_shutdown(self, monkeypatch: pytest.MonkeyPatch): + self.mock_log_shutdown = AsyncMock() + monkeypatch.setattr( + "astrbot.core.core_lifecycle.LogManager.shutdown", + self.mock_log_shutdown, + ) + @pytest.mark.asyncio async def test_stop_without_initialize(self, mock_log_broker, mock_db): """Test stop without initialize should not raise errors.""" @@ -747,6 +755,14 @@ async def test_start_calls_on_astrbot_loaded_hook(self, mock_log_broker, mock_db class TestAstrBotCoreLifecycleStopAdditional: """Additional tests for AstrBotCoreLifecycle.stop method.""" + @pytest.fixture(autouse=True) + def _mock_log_manager_shutdown(self, monkeypatch: pytest.MonkeyPatch): + self.mock_log_shutdown = AsyncMock() + monkeypatch.setattr( + "astrbot.core.core_lifecycle.LogManager.shutdown", + self.mock_log_shutdown, + ) + @pytest.mark.asyncio async def test_stop_cancels_all_tasks(self, mock_log_broker, mock_db): """Test that stop cancels all current tasks.""" @@ -818,6 +834,7 @@ async def test_stop_terminates_all_managers(self, mock_log_broker, mock_db): lifecycle.provider_manager.terminate.assert_awaited_once() lifecycle.platform_manager.terminate.assert_awaited_once() lifecycle.kb_manager.terminate.assert_awaited_once() + self.mock_log_shutdown.assert_awaited_once() @pytest.mark.asyncio async def test_stop_handles_plugin_termination_error( diff --git a/tests/unit/test_log_manager_shutdown.py b/tests/unit/test_log_manager_shutdown.py new file mode 100644 index 0000000000..c885f38759 --- /dev/null +++ b/tests/unit/test_log_manager_shutdown.py @@ -0,0 +1,42 @@ +from unittest.mock import AsyncMock, MagicMock, call + +import pytest + +import astrbot.core.log as log_module +from astrbot.core.log import LogManager + + +@pytest.mark.asyncio +async def test_shutdown_completes_and_removes_all_registered_sinks(monkeypatch): + fake_loguru = MagicMock() + fake_loguru.complete = AsyncMock(return_value=None) + fake_loguru.remove = MagicMock() + monkeypatch.setattr(log_module, "_loguru", fake_loguru) + + original_state = ( + LogManager._trace_sink_id, + LogManager._file_sink_id, + LogManager._console_sink_id, + LogManager._configured, + ) + LogManager._trace_sink_id = 22 + LogManager._file_sink_id = 11 + LogManager._console_sink_id = 33 + LogManager._configured = True + + try: + await LogManager.shutdown() + + fake_loguru.complete.assert_awaited_once() + assert fake_loguru.remove.call_args_list == [call(22), call(11), call(33)] + assert LogManager._trace_sink_id is None + assert LogManager._file_sink_id is None + assert LogManager._console_sink_id is None + assert LogManager._configured is False + finally: + ( + LogManager._trace_sink_id, + LogManager._file_sink_id, + LogManager._console_sink_id, + LogManager._configured, + ) = original_state From 1d62e0c132481ad52710ac813534a46a3054e46c Mon Sep 17 00:00:00 2001 From: Akokko Date: Fri, 5 Jun 2026 16:10:12 +0800 Subject: [PATCH 2/5] fix: tighten AstrBot shutdown cleanup - await and clean up shutdown_task to avoid pending task noise - treat SIGINT/SIGTERM-triggered CancelledError as normal shutdown - stop the core on runtime errors while preserving the original exception - add regression tests for the shutdown paths Refs #8595 --- astrbot/cli/commands/cmd_run.py | 20 +++-- astrbot/core/initial_loader.py | 10 +++ tests/unit/test_cmd_run_shutdown.py | 127 ++++++++++++++++++++++++++++ 3 files changed, 152 insertions(+), 5 deletions(-) diff --git a/astrbot/cli/commands/cmd_run.py b/astrbot/cli/commands/cmd_run.py index d4006370af..e0ed6c0350 100644 --- a/astrbot/cli/commands/cmd_run.py +++ b/astrbot/cli/commands/cmd_run.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import os import signal import sys @@ -30,9 +31,11 @@ def _install_shutdown_signal_handlers( loop.add_signal_handler(signum, callback, signum) installed.append(signum) except (NotImplementedError, RuntimeError): - signal.signal( - signum, lambda _signum, _frame: callback(signal.Signals(_signum)) - ) + def fallback_handler(received_signum, frame): + _ = frame + callback(signal.Signals(received_signum)) + + signal.signal(signum, fallback_handler) installed.append(signum) def cleanup() -> None: @@ -79,15 +82,22 @@ def request_shutdown(signum: signal.Signals) -> None: {runner_task, shutdown_task}, return_when=asyncio.FIRST_COMPLETED, ) - if shutdown_task in done and not runner_task.done(): + shutdown_requested_by_signal = shutdown_task in done + if shutdown_requested_by_signal and not runner_task.done(): signal_name = shutdown_signal.name if shutdown_signal else "unknown" logger.info(f"Received {signal_name}; stopping AstrBot...") runner_task.cancel() - await runner_task + try: + await runner_task + except asyncio.CancelledError: + if not shutdown_requested_by_signal: + raise finally: cleanup_signal_handlers() if not shutdown_task.done(): shutdown_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await shutdown_task await LogManager.shutdown() diff --git a/astrbot/core/initial_loader.py b/astrbot/core/initial_loader.py index 3a541bab25..0d185d3343 100644 --- a/astrbot/core/initial_loader.py +++ b/astrbot/core/initial_loader.py @@ -58,5 +58,15 @@ async def start(self) -> None: logger.info("🌈 正在关闭 AstrBot...") if initialized: await core_lifecycle.stop() + except Exception: + if initialized: + try: + await core_lifecycle.stop() + except Exception: + logger.error( + "AstrBot shutdown during runtime-error handling failed", + exc_info=True, + ) + raise finally: await LogManager.shutdown() diff --git a/tests/unit/test_cmd_run_shutdown.py b/tests/unit/test_cmd_run_shutdown.py index df45ca67ae..09c7af76d2 100644 --- a/tests/unit/test_cmd_run_shutdown.py +++ b/tests/unit/test_cmd_run_shutdown.py @@ -81,6 +81,57 @@ async def start(self): set_queue_handler_mock.assert_called_once() +@pytest.mark.asyncio +async def test_run_astrbot_suppresses_signal_cancelled_runner(monkeypatch): + shutdown_mock = AsyncMock(return_value=None) + check_dashboard_mock = AsyncMock(return_value=None) + signal_restore_mock = MagicMock() + pending_signals: dict[signal.Signals, Callable[[], None]] = {} + previous_handlers = { + signal.SIGINT: object(), + signal.SIGTERM: object(), + } + + class FakeLoop: + def add_signal_handler(self, signum, callback, *args): + pending_signals[signum] = lambda: callback(*args) + + def remove_signal_handler(self, signum): + pending_signals.pop(signum, None) + return True + + started = asyncio.Event() + + class FakeLoader: + def __init__(self, db, log_broker): + self.db = db + self.log_broker = log_broker + + async def start(self): + started.set() + await asyncio.Event().wait() + + monkeypatch.setattr(initial_loader_module, "InitialLoader", FakeLoader) + monkeypatch.setattr(cmd_run, "check_dashboard", check_dashboard_mock) + monkeypatch.setattr(cmd_run.asyncio, "get_running_loop", lambda: FakeLoop()) + monkeypatch.setattr( + cmd_run.signal, "getsignal", lambda signum: previous_handlers[signum] + ) + monkeypatch.setattr(cmd_run.signal, "signal", signal_restore_mock) + monkeypatch.setattr(core_module.LogManager, "set_queue_handler", MagicMock()) + monkeypatch.setattr(core_module.LogManager, "shutdown", shutdown_mock) + + awaitable = asyncio.create_task(cmd_run.run_astrbot(Path("/tmp/astrbot-root"))) + await started.wait() + + pending_signals[signal.SIGTERM]() + + await awaitable + + shutdown_mock.assert_awaited_once() + check_dashboard_mock.assert_awaited_once() + + def test_install_shutdown_signal_handlers_falls_back_and_restores(monkeypatch): restored_handlers: list[tuple[signal.Signals, Any]] = [] installed_handlers: dict[signal.Signals, Callable[[int, object], object]] = {} @@ -189,3 +240,79 @@ async def initialize(): assert len(lifecycle_instances) == 1 lifecycle_instances[0].stop.assert_not_awaited() shutdown_mock.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_initial_loader_stops_core_on_runtime_exception(monkeypatch): + shutdown_mock = AsyncMock(return_value=None) + lifecycle_instances: list[MagicMock] = [] + + class FakeLifecycle: + def __init__(self, log_broker, db): + _ = (log_broker, db) + lifecycle = MagicMock() + lifecycle.initialize = AsyncMock(return_value=None) + lifecycle.stop = AsyncMock() + lifecycle.start = AsyncMock(side_effect=RuntimeError("run boom")) + lifecycle.dashboard_shutdown_event = asyncio.Event() + lifecycle.astrbot_config = MagicMock() + lifecycle_instances.append(lifecycle) + self.__dict__.update(lifecycle.__dict__) + + class FakeDashboard: + def __init__(self, *args, **kwargs): + _ = (args, kwargs) + + def run(self): + return None + + monkeypatch.setattr(initial_loader_module, "AstrBotCoreLifecycle", FakeLifecycle) + monkeypatch.setattr(initial_loader_module, "AstrBotDashboard", FakeDashboard) + monkeypatch.setattr(initial_loader_module.LogManager, "shutdown", shutdown_mock) + + loader = initial_loader_module.InitialLoader(MagicMock(), MagicMock()) + + with pytest.raises(RuntimeError, match="run boom"): + await loader.start() + + assert len(lifecycle_instances) == 1 + lifecycle_instances[0].stop.assert_awaited_once() + shutdown_mock.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_initial_loader_preserves_runtime_error_if_stop_fails(monkeypatch): + shutdown_mock = AsyncMock(return_value=None) + lifecycle_instances: list[MagicMock] = [] + + class FakeLifecycle: + def __init__(self, log_broker, db): + _ = (log_broker, db) + lifecycle = MagicMock() + lifecycle.initialize = AsyncMock(return_value=None) + lifecycle.stop = AsyncMock(side_effect=RuntimeError("stop boom")) + lifecycle.start = AsyncMock(side_effect=RuntimeError("run boom")) + lifecycle.dashboard_shutdown_event = asyncio.Event() + lifecycle.astrbot_config = MagicMock() + lifecycle_instances.append(lifecycle) + self.__dict__.update(lifecycle.__dict__) + + class FakeDashboard: + def __init__(self, *args, **kwargs): + _ = (args, kwargs) + + def run(self): + return None + + monkeypatch.setattr(initial_loader_module, "AstrBotCoreLifecycle", FakeLifecycle) + monkeypatch.setattr(initial_loader_module, "AstrBotDashboard", FakeDashboard) + monkeypatch.setattr(initial_loader_module.LogManager, "shutdown", shutdown_mock) + + loader = initial_loader_module.InitialLoader(MagicMock(), MagicMock()) + + with pytest.raises(RuntimeError, match="run boom"): + await loader.start() + + assert len(lifecycle_instances) == 1 + lifecycle_instances[0].stop.assert_awaited_once() + shutdown_mock.assert_awaited_once() From 2910a8e76f6510a2466849c9ac0895219149d24e Mon Sep 17 00:00:00 2001 From: Akokko Date: Sat, 6 Jun 2026 19:10:04 +0800 Subject: [PATCH 3/5] fix: address shutdown review feedback Co-Authored-By: Claude Opus 4.8 --- astrbot/cli/commands/cmd_run.py | 3 ++- astrbot/core/log.py | 2 -- tests/unit/test_cmd_run_shutdown.py | 9 +++++++++ tests/unit/test_log_manager_shutdown.py | 6 +++--- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/astrbot/cli/commands/cmd_run.py b/astrbot/cli/commands/cmd_run.py index e0ed6c0350..2d993b8d09 100644 --- a/astrbot/cli/commands/cmd_run.py +++ b/astrbot/cli/commands/cmd_run.py @@ -31,9 +31,10 @@ def _install_shutdown_signal_handlers( loop.add_signal_handler(signum, callback, signum) installed.append(signum) except (NotImplementedError, RuntimeError): + def fallback_handler(received_signum, frame): _ = frame - callback(signal.Signals(received_signum)) + loop.call_soon_threadsafe(callback, signal.Signals(received_signum)) signal.signal(signum, fallback_handler) installed.append(signum) diff --git a/astrbot/core/log.py b/astrbot/core/log.py index 390ed38ff6..d00718b23b 100644 --- a/astrbot/core/log.py +++ b/astrbot/core/log.py @@ -426,6 +426,4 @@ async def shutdown(cls) -> None: cls._trace_sink_id = None cls._remove_sink(cls._file_sink_id) cls._file_sink_id = None - cls._remove_sink(cls._console_sink_id) - cls._console_sink_id = None cls._configured = False diff --git a/tests/unit/test_cmd_run_shutdown.py b/tests/unit/test_cmd_run_shutdown.py index 09c7af76d2..f28bbc6712 100644 --- a/tests/unit/test_cmd_run_shutdown.py +++ b/tests/unit/test_cmd_run_shutdown.py @@ -140,6 +140,7 @@ def test_install_shutdown_signal_handlers_falls_back_and_restores(monkeypatch): signal.SIGTERM: object(), } callback = MagicMock() + scheduled_callbacks: list[tuple[Callable[..., object], tuple[object, ...]]] = [] class FakeLoop: def add_signal_handler(self, _signum, _callback, *_args): @@ -150,6 +151,9 @@ def remove_signal_handler(self, _signum): _ = _signum raise NotImplementedError + def call_soon_threadsafe(self, callback, *args): + scheduled_callbacks.append((callback, args)) + def fake_signal(signum: signal.Signals, handler: Any) -> object: if callable(handler): installed_handlers[signum] = cast(Callable[[int, object], object], handler) @@ -165,6 +169,11 @@ def fake_signal(signum: signal.Signals, handler: Any) -> object: cleanup = cmd_run._install_shutdown_signal_handlers(cast(Any, FakeLoop()), callback) installed_handlers[signal.SIGTERM](signal.SIGTERM, None) + callback.assert_not_called() + assert scheduled_callbacks == [(callback, (signal.SIGTERM,))] + + scheduled_callback, scheduled_args = scheduled_callbacks.pop() + scheduled_callback(*scheduled_args) callback.assert_called_once_with(signal.SIGTERM) cleanup() diff --git a/tests/unit/test_log_manager_shutdown.py b/tests/unit/test_log_manager_shutdown.py index c885f38759..f493d3ea37 100644 --- a/tests/unit/test_log_manager_shutdown.py +++ b/tests/unit/test_log_manager_shutdown.py @@ -7,7 +7,7 @@ @pytest.mark.asyncio -async def test_shutdown_completes_and_removes_all_registered_sinks(monkeypatch): +async def test_shutdown_completes_and_removes_queued_file_sinks(monkeypatch): fake_loguru = MagicMock() fake_loguru.complete = AsyncMock(return_value=None) fake_loguru.remove = MagicMock() @@ -28,10 +28,10 @@ async def test_shutdown_completes_and_removes_all_registered_sinks(monkeypatch): await LogManager.shutdown() fake_loguru.complete.assert_awaited_once() - assert fake_loguru.remove.call_args_list == [call(22), call(11), call(33)] + assert fake_loguru.remove.call_args_list == [call(22), call(11)] assert LogManager._trace_sink_id is None assert LogManager._file_sink_id is None - assert LogManager._console_sink_id is None + assert LogManager._console_sink_id == 33 assert LogManager._configured is False finally: ( From 23905937197c319fdf02c0940951d57133f7d8ab Mon Sep 17 00:00:00 2001 From: Akokko Date: Sat, 6 Jun 2026 19:59:08 +0800 Subject: [PATCH 4/5] fix: clean up runner task on cancellation Co-Authored-By: Claude Opus 4.8 --- astrbot/cli/commands/cmd_run.py | 4 ++ tests/unit/test_cmd_run_shutdown.py | 58 +++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/astrbot/cli/commands/cmd_run.py b/astrbot/cli/commands/cmd_run.py index 2d993b8d09..34d6e11aa6 100644 --- a/astrbot/cli/commands/cmd_run.py +++ b/astrbot/cli/commands/cmd_run.py @@ -95,6 +95,10 @@ def request_shutdown(signum: signal.Signals) -> None: raise finally: cleanup_signal_handlers() + if not runner_task.done(): + runner_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await runner_task if not shutdown_task.done(): shutdown_task.cancel() with contextlib.suppress(asyncio.CancelledError): diff --git a/tests/unit/test_cmd_run_shutdown.py b/tests/unit/test_cmd_run_shutdown.py index f28bbc6712..2b55057400 100644 --- a/tests/unit/test_cmd_run_shutdown.py +++ b/tests/unit/test_cmd_run_shutdown.py @@ -132,6 +132,64 @@ async def start(self): check_dashboard_mock.assert_awaited_once() +@pytest.mark.asyncio +async def test_run_astrbot_cancels_runner_when_parent_is_cancelled(monkeypatch): + shutdown_mock = AsyncMock(return_value=None) + check_dashboard_mock = AsyncMock(return_value=None) + signal_restore_mock = MagicMock() + previous_handlers = { + signal.SIGINT: object(), + signal.SIGTERM: object(), + } + + class FakeLoop: + def add_signal_handler(self, _signum, _callback, *_args): + _ = (_signum, _callback, _args) + + def remove_signal_handler(self, _signum): + _ = _signum + return True + + started = asyncio.Event() + cancelled = asyncio.Event() + + class FakeLoader: + def __init__(self, db, log_broker): + self.db = db + self.log_broker = log_broker + + async def start(self): + started.set() + try: + await asyncio.Event().wait() + except asyncio.CancelledError: + cancelled.set() + raise + + monkeypatch.setattr(initial_loader_module, "InitialLoader", FakeLoader) + monkeypatch.setattr(cmd_run, "check_dashboard", check_dashboard_mock) + monkeypatch.setattr(cmd_run.asyncio, "get_running_loop", lambda: FakeLoop()) + monkeypatch.setattr( + cmd_run.signal, "getsignal", lambda signum: previous_handlers[signum] + ) + monkeypatch.setattr(cmd_run.signal, "signal", signal_restore_mock) + monkeypatch.setattr(core_module.LogManager, "set_queue_handler", MagicMock()) + monkeypatch.setattr(core_module.LogManager, "shutdown", shutdown_mock) + + awaitable = asyncio.create_task(cmd_run.run_astrbot(Path("/tmp/astrbot-root"))) + await started.wait() + + awaitable.cancel() + + with pytest.raises(asyncio.CancelledError): + await awaitable + + assert cancelled.is_set() + shutdown_mock.assert_awaited_once() + check_dashboard_mock.assert_awaited_once() + assert signal_restore_mock.call_count == 2 + + def test_install_shutdown_signal_handlers_falls_back_and_restores(monkeypatch): restored_handlers: list[tuple[signal.Signals, Any]] = [] installed_handlers: dict[signal.Signals, Callable[[int, object], object]] = {} From 3c13c0b38e9e6dccfce0ac0d6a9721e456c90634 Mon Sep 17 00:00:00 2001 From: Akokko Date: Sat, 6 Jun 2026 20:36:24 +0800 Subject: [PATCH 5/5] fix: centralize log shutdown cleanup Co-Authored-By: Claude Opus 4.8 --- astrbot/cli/commands/cmd_run.py | 32 ++++-- astrbot/core/core_lifecycle.py | 2 - tests/unit/test_cmd_run_shutdown.py | 152 ++++++++++++++++++++++++++-- tests/unit/test_core_lifecycle.py | 17 ---- 4 files changed, 167 insertions(+), 36 deletions(-) diff --git a/astrbot/cli/commands/cmd_run.py b/astrbot/cli/commands/cmd_run.py index 34d6e11aa6..2e3dd1c9c2 100644 --- a/astrbot/cli/commands/cmd_run.py +++ b/astrbot/cli/commands/cmd_run.py @@ -26,26 +26,43 @@ def _install_shutdown_signal_handlers( installed: list[signal.Signals] = [] for signum in handled_signals: - previous_handlers[signum] = signal.getsignal(signum) + try: + previous_handlers[signum] = signal.getsignal(signum) + except ValueError: + previous_handlers[signum] = None try: loop.add_signal_handler(signum, callback, signum) installed.append(signum) - except (NotImplementedError, RuntimeError): + except (NotImplementedError, RuntimeError, ValueError): def fallback_handler(received_signum, frame): _ = frame - loop.call_soon_threadsafe(callback, signal.Signals(received_signum)) + if not loop.is_closed(): + try: + loop.call_soon_threadsafe( + callback, signal.Signals(received_signum) + ) + except RuntimeError: + pass - signal.signal(signum, fallback_handler) - installed.append(signum) + try: + signal.signal(signum, fallback_handler) + installed.append(signum) + except ValueError: + pass def cleanup() -> None: for signum in installed: try: loop.remove_signal_handler(signum) - except (NotImplementedError, RuntimeError): + except (NotImplementedError, RuntimeError, ValueError): pass - signal.signal(signum, previous_handlers[signum]) + previous_handler = previous_handlers.get(signum) + if previous_handler is not None: + try: + signal.signal(signum, previous_handler) + except (TypeError, ValueError): + pass return cleanup @@ -103,7 +120,6 @@ def request_shutdown(signum: signal.Signals) -> None: shutdown_task.cancel() with contextlib.suppress(asyncio.CancelledError): await shutdown_task - await LogManager.shutdown() @click.option("--reload", "-r", is_flag=True, help="Auto-reload plugins") diff --git a/astrbot/core/core_lifecycle.py b/astrbot/core/core_lifecycle.py index 2cae87bbca..725b170003 100644 --- a/astrbot/core/core_lifecycle.py +++ b/astrbot/core/core_lifecycle.py @@ -391,8 +391,6 @@ async def stop(self) -> None: except Exception as e: logger.error(f"任务 {task.get_name()} 发生错误: {e}") - await LogManager.shutdown() - async def restart(self) -> None: """重启 AstrBot 核心生命周期管理类, 终止各个管理器并重新加载平台实例""" await self.provider_manager.terminate() diff --git a/tests/unit/test_cmd_run_shutdown.py b/tests/unit/test_cmd_run_shutdown.py index 2b55057400..574a57cb0c 100644 --- a/tests/unit/test_cmd_run_shutdown.py +++ b/tests/unit/test_cmd_run_shutdown.py @@ -16,7 +16,6 @@ @pytest.mark.asyncio async def test_run_astrbot_stops_gracefully_on_sigterm(monkeypatch): - shutdown_mock = AsyncMock(return_value=None) set_queue_handler_mock = MagicMock() check_dashboard_mock = AsyncMock(return_value=None) signal_restore_mock = MagicMock() @@ -63,7 +62,6 @@ async def start(self): monkeypatch.setattr( core_module.LogManager, "set_queue_handler", set_queue_handler_mock ) - monkeypatch.setattr(core_module.LogManager, "shutdown", shutdown_mock) awaitable = asyncio.create_task(cmd_run.run_astrbot(Path("/tmp/astrbot-root"))) await started.wait() @@ -76,14 +74,12 @@ async def start(self): assert cancelled.is_set() assert set(removed_signals) == {signal.SIGINT, signal.SIGTERM} assert signal_restore_mock.call_count == 2 - shutdown_mock.assert_awaited_once() check_dashboard_mock.assert_awaited_once() set_queue_handler_mock.assert_called_once() @pytest.mark.asyncio async def test_run_astrbot_suppresses_signal_cancelled_runner(monkeypatch): - shutdown_mock = AsyncMock(return_value=None) check_dashboard_mock = AsyncMock(return_value=None) signal_restore_mock = MagicMock() pending_signals: dict[signal.Signals, Callable[[], None]] = {} @@ -119,7 +115,6 @@ async def start(self): ) monkeypatch.setattr(cmd_run.signal, "signal", signal_restore_mock) monkeypatch.setattr(core_module.LogManager, "set_queue_handler", MagicMock()) - monkeypatch.setattr(core_module.LogManager, "shutdown", shutdown_mock) awaitable = asyncio.create_task(cmd_run.run_astrbot(Path("/tmp/astrbot-root"))) await started.wait() @@ -128,13 +123,11 @@ async def start(self): await awaitable - shutdown_mock.assert_awaited_once() check_dashboard_mock.assert_awaited_once() @pytest.mark.asyncio async def test_run_astrbot_cancels_runner_when_parent_is_cancelled(monkeypatch): - shutdown_mock = AsyncMock(return_value=None) check_dashboard_mock = AsyncMock(return_value=None) signal_restore_mock = MagicMock() previous_handlers = { @@ -152,6 +145,7 @@ def remove_signal_handler(self, _signum): started = asyncio.Event() cancelled = asyncio.Event() + shutdown_complete = asyncio.Event() class FakeLoader: def __init__(self, db, log_broker): @@ -165,6 +159,9 @@ async def start(self): except asyncio.CancelledError: cancelled.set() raise + finally: + await asyncio.sleep(0) + shutdown_complete.set() monkeypatch.setattr(initial_loader_module, "InitialLoader", FakeLoader) monkeypatch.setattr(cmd_run, "check_dashboard", check_dashboard_mock) @@ -174,7 +171,6 @@ async def start(self): ) monkeypatch.setattr(cmd_run.signal, "signal", signal_restore_mock) monkeypatch.setattr(core_module.LogManager, "set_queue_handler", MagicMock()) - monkeypatch.setattr(core_module.LogManager, "shutdown", shutdown_mock) awaitable = asyncio.create_task(cmd_run.run_astrbot(Path("/tmp/astrbot-root"))) await started.wait() @@ -185,7 +181,7 @@ async def start(self): await awaitable assert cancelled.is_set() - shutdown_mock.assert_awaited_once() + assert shutdown_complete.is_set() check_dashboard_mock.assert_awaited_once() assert signal_restore_mock.call_count == 2 @@ -209,6 +205,9 @@ def remove_signal_handler(self, _signum): _ = _signum raise NotImplementedError + def is_closed(self): + return False + def call_soon_threadsafe(self, callback, *args): scheduled_callbacks.append((callback, args)) @@ -242,6 +241,141 @@ def fake_signal(signum: signal.Signals, handler: Any) -> object: ] +def test_fallback_signal_handler_ignores_closed_loop(monkeypatch): + installed_handlers: dict[signal.Signals, Callable[[int, object], object]] = {} + previous_handlers = { + signal.SIGINT: object(), + signal.SIGTERM: object(), + } + callback = MagicMock() + + class FakeLoop: + def add_signal_handler(self, _signum, _callback, *_args): + _ = (_signum, _callback, _args) + raise NotImplementedError + + def remove_signal_handler(self, _signum): + _ = _signum + raise NotImplementedError + + def is_closed(self): + return True + + def call_soon_threadsafe(self, _callback, *_args): + raise AssertionError("closed loop should not schedule callbacks") + + def fake_signal(signum: signal.Signals, handler: Any) -> object: + if callable(handler): + installed_handlers[signum] = cast(Callable[[int, object], object], handler) + return previous_handlers[signum] + + monkeypatch.setattr( + cmd_run.signal, "getsignal", lambda signum: previous_handlers[signum] + ) + monkeypatch.setattr(cmd_run.signal, "signal", fake_signal) + + cleanup = cmd_run._install_shutdown_signal_handlers(cast(Any, FakeLoop()), callback) + + installed_handlers[signal.SIGTERM](signal.SIGTERM, None) + + callback.assert_not_called() + cleanup() + + +def test_fallback_signal_handler_ignores_call_soon_runtime_error(monkeypatch): + installed_handlers: dict[signal.Signals, Callable[[int, object], object]] = {} + previous_handlers = { + signal.SIGINT: object(), + signal.SIGTERM: object(), + } + callback = MagicMock() + + class FakeLoop: + def add_signal_handler(self, _signum, _callback, *_args): + _ = (_signum, _callback, _args) + raise NotImplementedError + + def remove_signal_handler(self, _signum): + _ = _signum + raise NotImplementedError + + def is_closed(self): + return False + + def call_soon_threadsafe(self, _callback, *_args): + raise RuntimeError("event loop is closing") + + def fake_signal(signum: signal.Signals, handler: Any) -> object: + if callable(handler): + installed_handlers[signum] = cast(Callable[[int, object], object], handler) + return previous_handlers[signum] + + monkeypatch.setattr( + cmd_run.signal, "getsignal", lambda signum: previous_handlers[signum] + ) + monkeypatch.setattr(cmd_run.signal, "signal", fake_signal) + + cleanup = cmd_run._install_shutdown_signal_handlers(cast(Any, FakeLoop()), callback) + + installed_handlers[signal.SIGTERM](signal.SIGTERM, None) + + callback.assert_not_called() + cleanup() + + +def test_install_shutdown_signal_handlers_skips_unavailable_signal_api(monkeypatch): + callback = MagicMock() + + class FakeLoop: + def add_signal_handler(self, _signum, _callback, *_args): + _ = (_signum, _callback, _args) + raise ValueError("signal only works in main thread") + + def remove_signal_handler(self, _signum): + raise AssertionError("no handlers should be installed") + + monkeypatch.setattr( + cmd_run.signal, + "getsignal", + MagicMock(side_effect=ValueError("signal only works in main thread")), + ) + monkeypatch.setattr( + cmd_run.signal, + "signal", + MagicMock(side_effect=ValueError("signal only works in main thread")), + ) + + cleanup = cmd_run._install_shutdown_signal_handlers(cast(Any, FakeLoop()), callback) + + cleanup() + callback.assert_not_called() + + +def test_cleanup_signal_handlers_skips_none_previous_handler(monkeypatch): + restored_handlers: list[tuple[signal.Signals, Any]] = [] + callback = MagicMock() + + class FakeLoop: + def add_signal_handler(self, _signum, _callback, *_args): + _ = (_signum, _callback, _args) + + def remove_signal_handler(self, _signum): + _ = _signum + return True + + def fake_signal(signum: signal.Signals, handler: Any) -> object: + restored_handlers.append((signum, handler)) + return object() + + monkeypatch.setattr(cmd_run.signal, "getsignal", lambda _signum: None) + monkeypatch.setattr(cmd_run.signal, "signal", fake_signal) + + cleanup = cmd_run._install_shutdown_signal_handlers(cast(Any, FakeLoop()), callback) + + cleanup() + assert restored_handlers == [] + + @pytest.mark.asyncio async def test_initial_loader_shutdowns_logs_on_initialize_failure(monkeypatch): shutdown_mock = AsyncMock(return_value=None) diff --git a/tests/unit/test_core_lifecycle.py b/tests/unit/test_core_lifecycle.py index 22b85f613a..1fc8035e48 100644 --- a/tests/unit/test_core_lifecycle.py +++ b/tests/unit/test_core_lifecycle.py @@ -102,14 +102,6 @@ def test_init_clears_proxy( class TestAstrBotCoreLifecycleStop: """Tests for AstrBotCoreLifecycle.stop method.""" - @pytest.fixture(autouse=True) - def _mock_log_manager_shutdown(self, monkeypatch: pytest.MonkeyPatch): - self.mock_log_shutdown = AsyncMock() - monkeypatch.setattr( - "astrbot.core.core_lifecycle.LogManager.shutdown", - self.mock_log_shutdown, - ) - @pytest.mark.asyncio async def test_stop_without_initialize(self, mock_log_broker, mock_db): """Test stop without initialize should not raise errors.""" @@ -755,14 +747,6 @@ async def test_start_calls_on_astrbot_loaded_hook(self, mock_log_broker, mock_db class TestAstrBotCoreLifecycleStopAdditional: """Additional tests for AstrBotCoreLifecycle.stop method.""" - @pytest.fixture(autouse=True) - def _mock_log_manager_shutdown(self, monkeypatch: pytest.MonkeyPatch): - self.mock_log_shutdown = AsyncMock() - monkeypatch.setattr( - "astrbot.core.core_lifecycle.LogManager.shutdown", - self.mock_log_shutdown, - ) - @pytest.mark.asyncio async def test_stop_cancels_all_tasks(self, mock_log_broker, mock_db): """Test that stop cancels all current tasks.""" @@ -834,7 +818,6 @@ async def test_stop_terminates_all_managers(self, mock_log_broker, mock_db): lifecycle.provider_manager.terminate.assert_awaited_once() lifecycle.platform_manager.terminate.assert_awaited_once() lifecycle.kb_manager.terminate.assert_awaited_once() - self.mock_log_shutdown.assert_awaited_once() @pytest.mark.asyncio async def test_stop_handles_plugin_termination_error(