-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
fix: prevent Loguru semaphore leaks on shutdown #8596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e9d8a9a
1d62e0c
2910a8e
2390593
3c13c0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,71 @@ | ||
| import asyncio | ||
| import contextlib | ||
| import os | ||
| import signal | ||
| import sys | ||
| import traceback | ||
| from collections.abc import Callable | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| import click | ||
| from filelock import FileLock, Timeout | ||
|
|
||
| from ..utils import check_astrbot_root, check_dashboard, get_astrbot_root | ||
|
|
||
| ShutdownCallback = Callable[[signal.Signals], None] | ||
|
|
||
|
|
||
| def _install_shutdown_signal_handlers( | ||
| loop: asyncio.AbstractEventLoop, | ||
| callback: ShutdownCallback, | ||
| ) -> Callable[[], None]: | ||
| """Install SIGINT/SIGTERM handlers and return a cleanup callback.""" | ||
| handled_signals = (signal.SIGINT, signal.SIGTERM) | ||
| previous_handlers: dict[signal.Signals, Any] = {} | ||
| installed: list[signal.Signals] = [] | ||
|
|
||
| for signum in handled_signals: | ||
| try: | ||
| previous_handlers[signum] = signal.getsignal(signum) | ||
| except ValueError: | ||
| previous_handlers[signum] = None | ||
| try: | ||
| loop.add_signal_handler(signum, callback, signum) | ||
| installed.append(signum) | ||
| except (NotImplementedError, RuntimeError, ValueError): | ||
|
|
||
| def fallback_handler(received_signum, frame): | ||
| _ = frame | ||
| if not loop.is_closed(): | ||
| try: | ||
| loop.call_soon_threadsafe( | ||
| callback, signal.Signals(received_signum) | ||
| ) | ||
| except RuntimeError: | ||
| pass | ||
|
|
||
| try: | ||
| signal.signal(signum, fallback_handler) | ||
| installed.append(signum) | ||
| except ValueError: | ||
| pass | ||
|
|
||
| def cleanup() -> None: | ||
| for signum in installed: | ||
| try: | ||
| loop.remove_signal_handler(signum) | ||
| except (NotImplementedError, RuntimeError, ValueError): | ||
| pass | ||
| previous_handler = previous_handlers.get(signum) | ||
| if previous_handler is not None: | ||
| try: | ||
| signal.signal(signum, previous_handler) | ||
| except (TypeError, ValueError): | ||
| pass | ||
|
|
||
| return cleanup | ||
|
|
||
|
|
||
| async def run_astrbot(astrbot_root: Path) -> None: | ||
| """Run AstrBot""" | ||
|
|
@@ -23,7 +80,46 @@ async def run_astrbot(astrbot_root: Path) -> None: | |
|
|
||
| core_lifecycle = InitialLoader(db, log_broker) | ||
|
|
||
| await core_lifecycle.start() | ||
| loop = asyncio.get_running_loop() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider simplifying the shutdown flow by having the signal handler cancel the main task directly and streamlining the signal-handler helper to reduce internal bookkeeping. You can keep the new behavior but flatten the orchestration around shutdown by letting the signal handler cancel the main task directly and dropping the extra task/event. 1. Simplify
|
||
| shutdown_requested = asyncio.Event() | ||
| shutdown_signal: signal.Signals | None = None | ||
|
|
||
| def request_shutdown(signum: signal.Signals) -> None: | ||
| nonlocal shutdown_signal | ||
| shutdown_signal = signum | ||
| shutdown_requested.set() | ||
|
|
||
| cleanup_signal_handlers = _install_shutdown_signal_handlers(loop, request_shutdown) | ||
| runner_task = asyncio.create_task(core_lifecycle.start(), name="astrbot") | ||
| shutdown_task = asyncio.create_task( | ||
| shutdown_requested.wait(), name="astrbot_shutdown" | ||
| ) | ||
|
|
||
| try: | ||
| done, _ = await asyncio.wait( | ||
| {runner_task, shutdown_task}, | ||
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
| shutdown_requested_by_signal = shutdown_task in done | ||
| if shutdown_requested_by_signal and not runner_task.done(): | ||
| signal_name = shutdown_signal.name if shutdown_signal else "unknown" | ||
| logger.info(f"Received {signal_name}; stopping AstrBot...") | ||
| runner_task.cancel() | ||
| try: | ||
| await runner_task | ||
| except asyncio.CancelledError: | ||
| if not shutdown_requested_by_signal: | ||
| raise | ||
| finally: | ||
| cleanup_signal_handlers() | ||
| if not runner_task.done(): | ||
| runner_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await runner_task | ||
| if not shutdown_task.done(): | ||
| shutdown_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await shutdown_task | ||
|
|
||
|
|
||
| @click.option("--reload", "-r", is_flag=True, help="Auto-reload plugins") | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -8,7 +8,7 @@ | |||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||
| import traceback | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| from astrbot.core import LogBroker, logger | ||||||||||||||||||||||||||||
| from astrbot.core import LogBroker, LogManager, logger | ||||||||||||||||||||||||||||
| from astrbot.core.core_lifecycle import AstrBotCoreLifecycle | ||||||||||||||||||||||||||||
| from astrbot.core.db import BaseDatabase | ||||||||||||||||||||||||||||
| from astrbot.dashboard.server import AstrBotDashboard | ||||||||||||||||||||||||||||
|
|
@@ -25,33 +25,48 @@ def __init__(self, db: BaseDatabase, log_broker: LogBroker) -> None: | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| async def start(self) -> None: | ||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider flattening the start() method’s control flow by separating initialization from the runtime phase and using a single try/except for the run logic while keeping the current semantics. You can keep the new semantics but flatten the control flow and simplify async def start(self) -> None:
core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
initialized = False
# 1. Initialization phase
try:
await core_lifecycle.initialize()
initialized = True
except Exception as e:
logger.critical(traceback.format_exc())
logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
return
# 2. Runtime phase
core_task = core_lifecycle.start()
webui_dir = self.webui_dir
self.dashboard_server = AstrBotDashboard(
core_lifecycle,
self.db,
core_lifecycle.dashboard_shutdown_event,
webui_dir,
)
coro = self.dashboard_server.run()
if coro:
task = asyncio.gather(core_task, coro)
else:
task = core_task
try:
await task # 整个AstrBot在这里运行
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
if initialized:
await core_lifecycle.stop()
except Exception:
if initialized:
try:
await core_lifecycle.stop()
except Exception:
logger.error(
"AstrBot shutdown during runtime-error handling failed",
exc_info=True,
)
raise
finally:
await LogManager.shutdown()This preserves all the new behaviors:
If possible at a higher level, you might also consider centralizing |
||||||||||||||||||||||||||||
| core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db) | ||||||||||||||||||||||||||||
| initialized = False | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider refactoring the start logic to move initialization and task orchestration into small helpers, removing the nested try and initialized flag while preserving current behavior. The nested You can keep all guarantees:
For example: class InitialLoader:
...
async def _init_core(self, core_lifecycle: AstrBotCoreLifecycle) -> bool:
try:
await core_lifecycle.initialize()
return True
except Exception as e:
logger.critical(traceback.format_exc())
logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
return False
def _build_task(self, core_lifecycle: AstrBotCoreLifecycle) -> asyncio.Future:
core_task = core_lifecycle.start()
webui_dir = self.webui_dir
self.dashboard_server = AstrBotDashboard(
core_lifecycle,
self.db,
core_lifecycle.dashboard_shutdown_event,
webui_dir,
)
coro = self.dashboard_server.run()
if coro:
return asyncio.gather(core_task, coro)
return core_task
async def start(self) -> None:
core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
if not await self._init_core(core_lifecycle):
# init failed -> nothing to stop, just ensure logs shutdown
await LogManager.shutdown()
return
task = self._build_task(core_lifecycle)
try:
await task # 整个AstrBot在这里运行
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
await core_lifecycle.stop()
finally:
await LogManager.shutdown()Key effects:
|
||||||||||||||||||||||||||||
| await core_lifecycle.initialize() | ||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||
| logger.critical(traceback.format_exc()) | ||||||||||||||||||||||||||||
| logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!") | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| core_task = core_lifecycle.start() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| webui_dir = self.webui_dir | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| self.dashboard_server = AstrBotDashboard( | ||||||||||||||||||||||||||||
| core_lifecycle, | ||||||||||||||||||||||||||||
| self.db, | ||||||||||||||||||||||||||||
| core_lifecycle.dashboard_shutdown_event, | ||||||||||||||||||||||||||||
| webui_dir, | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| coro = self.dashboard_server.run() | ||||||||||||||||||||||||||||
| if coro: | ||||||||||||||||||||||||||||
| # 启动核心任务和仪表板服务器 | ||||||||||||||||||||||||||||
| task = asyncio.gather(core_task, coro) | ||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||
| task = core_task | ||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| await core_lifecycle.initialize() | ||||||||||||||||||||||||||||
| initialized = True | ||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||
| logger.critical(traceback.format_exc()) | ||||||||||||||||||||||||||||
| logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!") | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| core_task = core_lifecycle.start() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| webui_dir = self.webui_dir | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| self.dashboard_server = AstrBotDashboard( | ||||||||||||||||||||||||||||
| core_lifecycle, | ||||||||||||||||||||||||||||
| self.db, | ||||||||||||||||||||||||||||
| core_lifecycle.dashboard_shutdown_event, | ||||||||||||||||||||||||||||
| webui_dir, | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| coro = self.dashboard_server.run() | ||||||||||||||||||||||||||||
| if coro: | ||||||||||||||||||||||||||||
| # 启动核心任务和仪表板服务器 | ||||||||||||||||||||||||||||
| task = asyncio.gather(core_task, coro) | ||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||
| task = core_task | ||||||||||||||||||||||||||||
| await task # 整个AstrBot在这里运行 | ||||||||||||||||||||||||||||
| except asyncio.CancelledError: | ||||||||||||||||||||||||||||
| logger.info("🌈 正在关闭 AstrBot...") | ||||||||||||||||||||||||||||
| await core_lifecycle.stop() | ||||||||||||||||||||||||||||
| if initialized: | ||||||||||||||||||||||||||||
| await core_lifecycle.stop() | ||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||
| if initialized: | ||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| await core_lifecycle.stop() | ||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||
| logger.error( | ||||||||||||||||||||||||||||
| "AstrBot shutdown during runtime-error handling failed", | ||||||||||||||||||||||||||||
| exc_info=True, | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||
| await LogManager.shutdown() | ||||||||||||||||||||||||||||
|
Comment on lines
57
to
+72
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 如果 这会导致 建议将
Suggested change
|
||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -415,3 +415,15 @@ def configure_trace_logger(cls, config: dict | None) -> None: | |||||||||||||||||||||||||
| backup_count=3, | ||||||||||||||||||||||||||
| trace=True, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @classmethod | ||||||||||||||||||||||||||
| async def shutdown(cls) -> None: | ||||||||||||||||||||||||||
| """Flush and remove loguru sinks during process shutdown.""" | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| await _loguru.complete() | ||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||
| cls._remove_sink(cls._trace_sink_id) | ||||||||||||||||||||||||||
| cls._trace_sink_id = None | ||||||||||||||||||||||||||
| cls._remove_sink(cls._file_sink_id) | ||||||||||||||||||||||||||
| cls._file_sink_id = None | ||||||||||||||||||||||||||
| cls._configured = False | ||||||||||||||||||||||||||
|
Comment on lines
+425
to
+429
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 在 原因
建议避免在
Suggested change
|
||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider simplifying the signal handling and shutdown flow in run_astrbot by inlining the logic, reducing task coordination, and centralizing LogManager shutdown.
You can reduce complexity without losing functionality by simplifying both the signal-handling abstraction and the task coordination.
1. Inline signal setup instead of
_install_shutdown_signal_handlersThe helper tracks previous handlers and has two code paths, plus a cleanup closure. For this CLI entrypoint, you can keep behavior but make it local and linear:
This removes:
ShutdownCallbackand_install_shutdown_signal_handlersprevious_handlers,installed, and thecleanupclosureshutdown_signalstate aroundIf restoring previous handlers is really required, you can still keep that logic inline in
run_astrbot, which is easier to reason about than a generic helper.2. Simplify the two-task coordination
You can avoid
asyncio.waitand a separateshutdown_taskby directly awaiting the event and then cancelling the main task:If you want to log only when a signal actually occurred, keep the logging in
request_shutdownas above. This preserves:but removes
shutdown_task,asyncio.wait, and theFIRST_COMPLETED+donebookkeeping.3. Centralise
LogManager.shutdownRight now
run_astrbotcallsawait LogManager.shutdown()andInitialLoader.startalso calls it in afinally. To avoid double-shutdown logic spread across layers, consider consolidating it in one place.For example, keep it only in the CLI:
And leave:
in
run_astrbot.This reduces cross-layer coupling and simplifies reasoning about where the global logging system is terminated.