fix: prevent Loguru semaphore leaks on shutdown#8596
Conversation
Add LogManager.shutdown() and wire it into the AstrBot shutdown path.Also handle SIGINT/SIGTERM gracefully in astrbot run so file-log sinks are flushed and removed before exit. This prevents multiprocessing.resource_tracker semaphore warnings when file logging is enabled. Refs AstrBotDevs#8595
There was a problem hiding this comment.
Code Review
This pull request introduces graceful shutdown handling for AstrBot, registering handlers for SIGINT and SIGTERM signals, ensuring that running tasks are cancelled, and adding a shutdown method to LogManager to clean up loguru sinks. The review feedback highlights critical improvements for robustness: ensuring that the core lifecycle is stopped even when non-cancellation exceptions occur during execution, and adding exception handling for signal registration and cleanup to prevent crashes in non-main threads or when dealing with null signal handlers.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| except asyncio.CancelledError: | ||
| logger.info("🌈 正在关闭 AstrBot...") | ||
| await core_lifecycle.stop() | ||
| if initialized: | ||
| await core_lifecycle.stop() | ||
| finally: | ||
| await LogManager.shutdown() |
There was a problem hiding this comment.
如果 await task 在运行时抛出非 CancelledError 的异常(例如数据库连接断开、网络异常或插件崩溃等),该异常将不会被 except asyncio.CancelledError 捕获,而是直接进入 finally 块。
这会导致 core_lifecycle.stop() 无法被调用,从而无法优雅地关闭各个管理器、释放数据库连接和清理插件资源,可能导致资源泄露或锁文件残留。
建议将 core_lifecycle.stop() 的调用移动到 finally 块中,并结合 initialized 状态进行保护,以确保在任何异常退出路径下都能进行优雅清理。
| except asyncio.CancelledError: | |
| logger.info("🌈 正在关闭 AstrBot...") | |
| await core_lifecycle.stop() | |
| if initialized: | |
| await core_lifecycle.stop() | |
| finally: | |
| await LogManager.shutdown() | |
| except asyncio.CancelledError: | |
| logger.info("🌈 正在关闭 AstrBot...") | |
| finally: | |
| if initialized: | |
| await core_lifecycle.stop() | |
| await LogManager.shutdown() |
| for signum in handled_signals: | ||
| previous_handlers[signum] = signal.getsignal(signum) | ||
| try: | ||
| loop.add_signal_handler(signum, callback, signum) | ||
| installed.append(signum) | ||
| except (NotImplementedError, RuntimeError): | ||
| signal.signal( | ||
| signum, lambda _signum, _frame: callback(signal.Signals(_signum)) | ||
| ) | ||
| installed.append(signum) |
There was a problem hiding this comment.
在非主线程(例如某些测试环境或嵌入式运行环境)中,signal.getsignal、loop.add_signal_handler 或 signal.signal 可能会抛出 ValueError。此外,如果 signal.getsignal 返回 None(表示信号处理器由 C/C++ 层注册),直接将其传给 signal.signal 会导致 TypeError。
建议对信号获取和注册进行更健壮的异常处理,并允许在非主线程中优雅降级。
| for signum in handled_signals: | |
| previous_handlers[signum] = signal.getsignal(signum) | |
| try: | |
| loop.add_signal_handler(signum, callback, signum) | |
| installed.append(signum) | |
| except (NotImplementedError, RuntimeError): | |
| signal.signal( | |
| signum, lambda _signum, _frame: callback(signal.Signals(_signum)) | |
| ) | |
| installed.append(signum) | |
| for signum in handled_signals: | |
| try: | |
| previous_handlers[signum] = signal.getsignal(signum) | |
| except ValueError: | |
| previous_handlers[signum] = None | |
| try: | |
| loop.add_signal_handler(signum, callback, signum) | |
| installed.append(signum) | |
| except (NotImplementedError, RuntimeError, ValueError): | |
| try: | |
| signal.signal( | |
| signum, lambda _signum, _frame: callback(signal.Signals(_signum)) | |
| ) | |
| installed.append(signum) | |
| except ValueError: | |
| pass |
| def cleanup() -> None: | ||
| for signum in installed: | ||
| try: | ||
| loop.remove_signal_handler(signum) | ||
| except (NotImplementedError, RuntimeError): | ||
| pass | ||
| signal.signal(signum, previous_handlers[signum]) |
There was a problem hiding this comment.
在清理信号处理器时,如果 previous_handlers 中对应信号的值为 None,直接调用 signal.signal(signum, None) 会抛出 TypeError。此外,如果在非主线程中执行清理,signal.signal 也会抛出 ValueError。
建议在恢复信号处理器前进行 None 检查,并捕获可能的 ValueError。
| def cleanup() -> None: | |
| for signum in installed: | |
| try: | |
| loop.remove_signal_handler(signum) | |
| except (NotImplementedError, RuntimeError): | |
| pass | |
| signal.signal(signum, previous_handlers[signum]) | |
| def cleanup() -> None: | |
| for signum in installed: | |
| try: | |
| loop.remove_signal_handler(signum) | |
| except (NotImplementedError, RuntimeError, ValueError): | |
| pass | |
| handler = previous_handlers.get(signum) | |
| if handler is not None: | |
| try: | |
| signal.signal(signum, handler) | |
| except ValueError: | |
| pass |
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- LogManager.shutdown() is now invoked from multiple layers (AstrBotCoreLifecycle.stop, InitialLoader.start finally, and run_astrbot finally), which can lead to redundant shutdown calls; consider centralizing responsibility in a single top-level shutdown path or explicitly documenting/making shutdown idempotent to avoid surprises.
- Coupling AstrBotCoreLifecycle.stop() to the global LogManager.shutdown() means stopping the core lifecycle also tears down process-wide logging; if callers might expect to reuse the logger after a stop/restart, consider decoupling these concerns or adding a dedicated higher-level shutdown that orchestrates both.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- LogManager.shutdown() is now invoked from multiple layers (AstrBotCoreLifecycle.stop, InitialLoader.start finally, and run_astrbot finally), which can lead to redundant shutdown calls; consider centralizing responsibility in a single top-level shutdown path or explicitly documenting/making shutdown idempotent to avoid surprises.
- Coupling AstrBotCoreLifecycle.stop() to the global LogManager.shutdown() means stopping the core lifecycle also tears down process-wide logging; if callers might expect to reuse the logger after a stop/restart, consider decoupling these concerns or adding a dedicated higher-level shutdown that orchestrates both.
## Individual Comments
### Comment 1
<location path="astrbot/cli/commands/cmd_run.py" line_range="78-86" />
<code_context>
+ )
+
+ try:
+ done, _ = await asyncio.wait(
+ {runner_task, shutdown_task},
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+ if shutdown_task in done and not runner_task.done():
+ signal_name = shutdown_signal.name if shutdown_signal else "unknown"
+ logger.info(f"Received {signal_name}; stopping AstrBot...")
+ runner_task.cancel()
+ await runner_task
+ finally:
+ cleanup_signal_handlers()
</code_context>
<issue_to_address>
**issue (bug_risk):** Handle CancelledError and ensure shutdown_task is fully cleaned up to avoid noisy shutdowns.
When `runner_task` is cancelled, `await runner_task` will raise `asyncio.CancelledError` and currently bubble out of `run_astrbot`, causing noisy tracebacks on normal SIGINT/SIGTERM shutdowns. It would be better to catch `asyncio.CancelledError` around `await runner_task` and treat it as a normal shutdown.
Additionally, `shutdown_task.cancel()` is never awaited, which can leave a pending task warning when the loop closes. After cleaning up signal handlers, check `shutdown_task.done()`, and if not, cancel and await it (suppressing `CancelledError`) so the shutdown path is quiet and all tasks are fully cleaned up.
</issue_to_address>
### Comment 2
<location path="astrbot/cli/commands/cmd_run.py" line_range="18" />
<code_context>
+ShutdownCallback = Callable[[signal.Signals], None]
+
+
+def _install_shutdown_signal_handlers(
+ loop: asyncio.AbstractEventLoop,
+ callback: ShutdownCallback,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the signal handling and shutdown flow in run_astrbot by inlining the logic, reducing task coordination, and centralizing LogManager shutdown.
You can reduce complexity without losing functionality by simplifying both the signal-handling abstraction and the task coordination.
### 1. Inline signal setup instead of `_install_shutdown_signal_handlers`
The helper tracks previous handlers and has two code paths, plus a cleanup closure. For this CLI entrypoint, you can keep behavior but make it local and linear:
```python
async def run_astrbot(astrbot_root: Path) -> None:
...
loop = asyncio.get_running_loop()
shutdown_requested = asyncio.Event()
def request_shutdown(signum: int) -> None:
logger.info(f"Received {signal.Signals(signum).name}; stopping AstrBot...")
shutdown_requested.set()
for signum in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(signum, request_shutdown, signum)
except (NotImplementedError, RuntimeError):
signal.signal(signum, lambda s, _f: request_shutdown(s))
```
This removes:
- `ShutdownCallback` and `_install_shutdown_signal_handlers`
- `previous_handlers`, `installed`, and the `cleanup` closure
- The need to thread `shutdown_signal` state around
If restoring previous handlers is really required, you can still keep that logic *inline* in `run_astrbot`, which is easier to reason about than a generic helper.
### 2. Simplify the two-task coordination
You can avoid `asyncio.wait` and a separate `shutdown_task` by directly awaiting the event and then cancelling the main task:
```python
async def run_astrbot(astrbot_root: Path) -> None:
...
runner_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")
try:
await shutdown_requested.wait()
if not runner_task.done():
runner_task.cancel()
await runner_task
finally:
await LogManager.shutdown()
```
If you want to log only when a signal actually occurred, keep the logging in `request_shutdown` as above. This preserves:
- “run core_lifecycle until a signal”
- “cancel and await on signal”
- “always shut down logging”
but removes `shutdown_task`, `asyncio.wait`, and the `FIRST_COMPLETED` + `done` bookkeeping.
### 3. Centralise `LogManager.shutdown`
Right now `run_astrbot` calls `await LogManager.shutdown()` and `InitialLoader.start` also calls it in a `finally`. To avoid double-shutdown logic spread across layers, consider consolidating it in *one* place.
For example, keep it only in the CLI:
```python
# in InitialLoader.start()
try:
...
finally:
# remove LogManager.shutdown() from here
...
```
And leave:
```python
try:
...
finally:
await LogManager.shutdown()
```
in `run_astrbot`.
This reduces cross-layer coupling and simplifies reasoning about where the global logging system is terminated.
</issue_to_address>
### Comment 3
<location path="astrbot/core/initial_loader.py" line_range="30" />
<code_context>
+
+ for signum in handled_signals:
+ previous_handlers[signum] = signal.getsignal(signum)
+ try:
+ loop.add_signal_handler(signum, callback, signum)
+ installed.append(signum)
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the start logic to move initialization and task orchestration into small helpers, removing the nested try and initialized flag while preserving current behavior.
The nested `try` + `initialized` flag can be removed without changing behaviour by separating initialization from the run/cancel flow and extracting a small helper for the core+dashboard orchestration.
You can keep all guarantees:
- Init failures are logged and abort early.
- `stop()` is only called if init succeeded.
- `LogManager.shutdown()` still always runs.
For example:
```python
class InitialLoader:
...
async def _init_core(self, core_lifecycle: AstrBotCoreLifecycle) -> bool:
try:
await core_lifecycle.initialize()
return True
except Exception as e:
logger.critical(traceback.format_exc())
logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
return False
def _build_task(self, core_lifecycle: AstrBotCoreLifecycle) -> asyncio.Future:
core_task = core_lifecycle.start()
webui_dir = self.webui_dir
self.dashboard_server = AstrBotDashboard(
core_lifecycle,
self.db,
core_lifecycle.dashboard_shutdown_event,
webui_dir,
)
coro = self.dashboard_server.run()
if coro:
return asyncio.gather(core_task, coro)
return core_task
async def start(self) -> None:
core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
if not await self._init_core(core_lifecycle):
# init failed -> nothing to stop, just ensure logs shutdown
await LogManager.shutdown()
return
task = self._build_task(core_lifecycle)
try:
await task # 整个AstrBot在这里运行
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
await core_lifecycle.stop()
finally:
await LogManager.shutdown()
```
Key effects:
- No nested `try` and no `initialized` flag; the init path is linear and clearly separated.
- The run orchestration (`_build_task`) is self-contained and easier to read.
- Behaviour on init failure and cancellation remains the same as in your current version.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| ShutdownCallback = Callable[[signal.Signals], None] | ||
|
|
||
|
|
||
| def _install_shutdown_signal_handlers( |
There was a problem hiding this comment.
issue (complexity): Consider simplifying the signal handling and shutdown flow in run_astrbot by inlining the logic, reducing task coordination, and centralizing LogManager shutdown.
You can reduce complexity without losing functionality by simplifying both the signal-handling abstraction and the task coordination.
1. Inline signal setup instead of _install_shutdown_signal_handlers
The helper tracks previous handlers and has two code paths, plus a cleanup closure. For this CLI entrypoint, you can keep behavior but make it local and linear:
async def run_astrbot(astrbot_root: Path) -> None:
...
loop = asyncio.get_running_loop()
shutdown_requested = asyncio.Event()
def request_shutdown(signum: int) -> None:
logger.info(f"Received {signal.Signals(signum).name}; stopping AstrBot...")
shutdown_requested.set()
for signum in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(signum, request_shutdown, signum)
except (NotImplementedError, RuntimeError):
signal.signal(signum, lambda s, _f: request_shutdown(s))This removes:
ShutdownCallbackand_install_shutdown_signal_handlersprevious_handlers,installed, and thecleanupclosure- The need to thread
shutdown_signalstate around
If restoring previous handlers is really required, you can still keep that logic inline in run_astrbot, which is easier to reason about than a generic helper.
2. Simplify the two-task coordination
You can avoid asyncio.wait and a separate shutdown_task by directly awaiting the event and then cancelling the main task:
async def run_astrbot(astrbot_root: Path) -> None:
...
runner_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")
try:
await shutdown_requested.wait()
if not runner_task.done():
runner_task.cancel()
await runner_task
finally:
await LogManager.shutdown()If you want to log only when a signal actually occurred, keep the logging in request_shutdown as above. This preserves:
- “run core_lifecycle until a signal”
- “cancel and await on signal”
- “always shut down logging”
but removes shutdown_task, asyncio.wait, and the FIRST_COMPLETED + done bookkeeping.
3. Centralise LogManager.shutdown
Right now run_astrbot calls await LogManager.shutdown() and InitialLoader.start also calls it in a finally. To avoid double-shutdown logic spread across layers, consider consolidating it in one place.
For example, keep it only in the CLI:
# in InitialLoader.start()
try:
...
finally:
# remove LogManager.shutdown() from here
...And leave:
try:
...
finally:
await LogManager.shutdown()in run_astrbot.
This reduces cross-layer coupling and simplifies reasoning about where the global logging system is terminated.
| core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db) | ||
| initialized = False | ||
|
|
||
| try: |
There was a problem hiding this comment.
issue (complexity): Consider refactoring the start logic to move initialization and task orchestration into small helpers, removing the nested try and initialized flag while preserving current behavior.
The nested try + initialized flag can be removed without changing behaviour by separating initialization from the run/cancel flow and extracting a small helper for the core+dashboard orchestration.
You can keep all guarantees:
- Init failures are logged and abort early.
stop()is only called if init succeeded.LogManager.shutdown()still always runs.
For example:
class InitialLoader:
...
async def _init_core(self, core_lifecycle: AstrBotCoreLifecycle) -> bool:
try:
await core_lifecycle.initialize()
return True
except Exception as e:
logger.critical(traceback.format_exc())
logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
return False
def _build_task(self, core_lifecycle: AstrBotCoreLifecycle) -> asyncio.Future:
core_task = core_lifecycle.start()
webui_dir = self.webui_dir
self.dashboard_server = AstrBotDashboard(
core_lifecycle,
self.db,
core_lifecycle.dashboard_shutdown_event,
webui_dir,
)
coro = self.dashboard_server.run()
if coro:
return asyncio.gather(core_task, coro)
return core_task
async def start(self) -> None:
core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
if not await self._init_core(core_lifecycle):
# init failed -> nothing to stop, just ensure logs shutdown
await LogManager.shutdown()
return
task = self._build_task(core_lifecycle)
try:
await task # 整个AstrBot在这里运行
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
await core_lifecycle.stop()
finally:
await LogManager.shutdown()Key effects:
- No nested
tryand noinitializedflag; the init path is linear and clearly separated. - The run orchestration (
_build_task) is self-contained and easier to read. - Behaviour on init failure and cancellation remains the same as in your current version.
- await and clean up shutdown_task to avoid pending task noise - treat SIGINT/SIGTERM-triggered CancelledError as normal shutdown - stop the core on runtime errors while preserving the original exception - add regression tests for the shutdown paths Refs AstrBotDevs#8595
|
已根据 review 做了一轮收敛修复:
|
|
/gemini review |
|
@sourcery-ai review |
There was a problem hiding this comment.
Code Review
This pull request introduces graceful shutdown handling and signal management for AstrBot, ensuring that the application and its logging systems are properly cleaned up upon receiving termination signals. The review feedback highlights a critical thread-safety issue in the fallback signal handler, recommending the use of loop.call_soon_threadsafe to safely trigger the shutdown event. Additionally, the reviewer advises against removing the console log sink during shutdown to avoid silencing subsequent diagnostic logs, along with a corresponding update to the unit tests.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def fallback_handler(received_signum, frame): | ||
| _ = frame | ||
| callback(signal.Signals(received_signum)) |
There was a problem hiding this comment.
在通过 signal.signal 注册的同步信号处理函数中,直接调用 callback(即 request_shutdown,其内部会调用 shutdown_requested.set())是不安全的。
原因
Python 的 signal.signal 信号处理函数会在主线程的任意字节码指令之间异步执行。如果此时事件循环正处于修改内部状态(如任务调度、事件等待队列等)的非原子操作中,直接调用 asyncio.Event.set() 可能会导致事件循环内部状态损坏或产生死锁等重入问题。
解决方案
应当使用 loop.call_soon_threadsafe 将回调安全地调度到事件循环的下一个安全时间点执行。
| def fallback_handler(received_signum, frame): | |
| _ = frame | |
| callback(signal.Signals(received_signum)) | |
| def fallback_handler(received_signum, frame): | |
| _ = frame | |
| loop.call_soon_threadsafe(callback, signal.Signals(received_signum)) |
| cls._remove_sink(cls._trace_sink_id) | ||
| cls._trace_sink_id = None | ||
| cls._remove_sink(cls._file_sink_id) | ||
| cls._file_sink_id = None | ||
| cls._remove_sink(cls._console_sink_id) | ||
| cls._console_sink_id = None | ||
| cls._configured = False |
There was a problem hiding this comment.
在 shutdown 中移除控制台日志 Sink (_console_sink_id) 会导致在此之后的所有日志输出被完全静默。
原因
- 控制台日志 Sink 并没有开启
enqueue=True,因此它不会像文件日志那样在退出时产生multiprocessing.resource_tracker的 semaphore 泄露警告。 LogManager.shutdown()会在core_lifecycle.stop()中被调用。如果在此之后(例如在InitialLoader的finally块或cmd_run的后续清理流程中)发生任何错误或需要记录日志,由于控制台 Sink 已被移除,这些关键的退出/错误日志将无法输出到控制台,给排查问题带来困难。
建议
避免在 shutdown 中移除控制台 Sink,仅清理和移除会产生信号量泄露的 _trace_sink_id 和 _file_sink_id。
| cls._remove_sink(cls._trace_sink_id) | |
| cls._trace_sink_id = None | |
| cls._remove_sink(cls._file_sink_id) | |
| cls._file_sink_id = None | |
| cls._remove_sink(cls._console_sink_id) | |
| cls._console_sink_id = None | |
| cls._configured = False | |
| cls._remove_sink(cls._trace_sink_id) | |
| cls._trace_sink_id = None | |
| cls._remove_sink(cls._file_sink_id) | |
| cls._file_sink_id = None | |
| cls._configured = False |
| assert fake_loguru.remove.call_args_list == [call(22), call(11), call(33)] | ||
| assert LogManager._trace_sink_id is None | ||
| assert LogManager._file_sink_id is None | ||
| assert LogManager._console_sink_id is None |
There was a problem hiding this comment.
配合 LogManager.shutdown 不再移除控制台 Sink 的修改,需要同步更新此处的单元测试断言,避免测试失败。
| assert fake_loguru.remove.call_args_list == [call(22), call(11), call(33)] | |
| assert LogManager._trace_sink_id is None | |
| assert LogManager._file_sink_id is None | |
| assert LogManager._console_sink_id is None | |
| assert fake_loguru.remove.call_args_list == [call(22), call(11)] | |
| assert LogManager._trace_sink_id is None | |
| assert LogManager._file_sink_id is None | |
| assert LogManager._console_sink_id == 33 |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- LogManager.shutdown() is now invoked from AstrBotCoreLifecycle.stop(), InitialLoader.start() and run_astrbot(), which can lead to multiple shutdown calls per process; consider centralizing shutdown in a single top-level place or making shutdown explicitly idempotent to avoid redundant work and subtle ordering issues.
- In InitialLoader.start(), the finally block always calls LogManager.shutdown() even when AstrBotCoreLifecycle.stop() has already done so on normal shutdown paths; if you keep shutdown in stop(), you can drop the finalizer here (or vice versa) to simplify the shutdown flow.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- LogManager.shutdown() is now invoked from AstrBotCoreLifecycle.stop(), InitialLoader.start() and run_astrbot(), which can lead to multiple shutdown calls per process; consider centralizing shutdown in a single top-level place or making shutdown explicitly idempotent to avoid redundant work and subtle ordering issues.
- In InitialLoader.start(), the finally block always calls LogManager.shutdown() even when AstrBotCoreLifecycle.stop() has already done so on normal shutdown paths; if you keep shutdown in stop(), you can drop the finalizer here (or vice versa) to simplify the shutdown flow.
## Individual Comments
### Comment 1
<location path="astrbot/cli/commands/cmd_run.py" line_range="65" />
<code_context>
core_lifecycle = InitialLoader(db, log_broker)
- await core_lifecycle.start()
+ loop = asyncio.get_running_loop()
+ shutdown_requested = asyncio.Event()
+ shutdown_signal: signal.Signals | None = None
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the shutdown flow by having the signal handler cancel the main task directly and streamlining the signal-handler helper to reduce internal bookkeeping.
You can keep the new behavior but flatten the orchestration around shutdown by letting the signal handler cancel the main task directly and dropping the extra task/event.
### 1. Simplify `run_astrbot` orchestration
You don’t actually need `shutdown_requested`, `shutdown_task`, or `asyncio.wait`. The signal handler can cancel the main task, and you can distinguish “signal‑driven” vs “other” cancellations from a single `await`:
```python
async def run_astrbot(astrbot_root: Path) -> None:
"""Run AstrBot"""
from astrbot.core import LogBroker, LogManager, db_helper, logger
from astrbot.core.initial_loader import InitialLoader
await check_dashboard(astrbot_root / "data")
log_broker = LogBroker()
LogManager.set_queue_handler(logger, log_broker)
db = db_helper
core_lifecycle = InitialLoader(db, log_broker)
loop = asyncio.get_running_loop()
shutdown_signal: signal.Signals | None = None
main_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")
def request_shutdown(signum: signal.Signals) -> None:
nonlocal shutdown_signal
shutdown_signal = signum
logger.info(f"Received {signum.name}; stopping AstrBot...")
main_task.cancel()
cleanup_signal_handlers = _install_shutdown_signal_handlers(loop, request_shutdown)
try:
try:
await main_task
except asyncio.CancelledError:
# Only swallow cancellations that were triggered by our signal handler
if shutdown_signal is None:
raise
finally:
cleanup_signal_handlers()
await LogManager.shutdown()
```
This keeps:
- clean shutdown on SIGINT/SIGTERM
- differentiation between “signal‑driven” cancellation and other cancellations
but removes:
- `asyncio.Event`
- `shutdown_task`
- `asyncio.wait(FIRST_COMPLETED)`
- `shutdown_requested_by_signal` bookkeeping
### 2. Lightly trim `_install_shutdown_signal_handlers`
You can reduce some internal bookkeeping without changing behavior by using a single dict for “installed + previous handler” instead of a separate `installed` list:
```python
def _install_shutdown_signal_handlers(
loop: asyncio.AbstractEventLoop,
callback: ShutdownCallback,
) -> Callable[[], None]:
"""Install SIGINT/SIGTERM handlers and return a cleanup callback."""
handled_signals = (signal.SIGINT, signal.SIGTERM)
previous_handlers: dict[signal.Signals, Any] = {}
for signum in handled_signals:
previous_handlers[signum] = signal.getsignal(signum)
try:
loop.add_signal_handler(signum, callback, signum)
except (NotImplementedError, RuntimeError):
def fallback_handler(received_signum, _frame) -> None:
callback(signal.Signals(received_signum))
signal.signal(signum, fallback_handler)
def cleanup() -> None:
for signum, prev in previous_handlers.items():
with contextlib.suppress(NotImplementedError, RuntimeError):
loop.remove_signal_handler(signum)
signal.signal(signum, prev)
return cleanup
```
This keeps:
- restoration of original handlers
- loop vs. `signal.signal` fallback handling
but makes the helper easier to scan and reason about.
</issue_to_address>
### Comment 2
<location path="astrbot/core/initial_loader.py" line_range="26" />
<code_context>
async def start(self) -> None:
core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
+ initialized = False
</code_context>
<issue_to_address>
**issue (complexity):** Consider flattening the start() method’s control flow by separating initialization from the runtime phase and using a single try/except for the run logic while keeping the current semantics.
You can keep the new semantics but flatten the control flow and simplify `initialized` usage by pulling initialization out of the nested `try` and having a single `try/except` block for the “run” phase:
```python
async def start(self) -> None:
core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
initialized = False
# 1. Initialization phase
try:
await core_lifecycle.initialize()
initialized = True
except Exception as e:
logger.critical(traceback.format_exc())
logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
return
# 2. Runtime phase
core_task = core_lifecycle.start()
webui_dir = self.webui_dir
self.dashboard_server = AstrBotDashboard(
core_lifecycle,
self.db,
core_lifecycle.dashboard_shutdown_event,
webui_dir,
)
coro = self.dashboard_server.run()
if coro:
task = asyncio.gather(core_task, coro)
else:
task = core_task
try:
await task # 整个AstrBot在这里运行
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
if initialized:
await core_lifecycle.stop()
except Exception:
if initialized:
try:
await core_lifecycle.stop()
except Exception:
logger.error(
"AstrBot shutdown during runtime-error handling failed",
exc_info=True,
)
raise
finally:
await LogManager.shutdown()
```
This preserves all the new behaviors:
- If initialization fails: log and return, no `stop()`.
- If cancelled after initialization: log and call `stop()`.
- If a runtime error occurs after initialization: attempt `stop()`, log if shutdown fails, then re-raise.
- `LogManager.shutdown()` is still always called.
If possible at a higher level, you might also consider centralizing `LogManager.shutdown()` in a single layer (either here or in `run_astrbot`) to make the overall shutdown protocol easier to follow.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| core_lifecycle = InitialLoader(db, log_broker) | ||
|
|
||
| await core_lifecycle.start() | ||
| loop = asyncio.get_running_loop() |
There was a problem hiding this comment.
issue (complexity): Consider simplifying the shutdown flow by having the signal handler cancel the main task directly and streamlining the signal-handler helper to reduce internal bookkeeping.
You can keep the new behavior but flatten the orchestration around shutdown by letting the signal handler cancel the main task directly and dropping the extra task/event.
1. Simplify run_astrbot orchestration
You don’t actually need shutdown_requested, shutdown_task, or asyncio.wait. The signal handler can cancel the main task, and you can distinguish “signal‑driven” vs “other” cancellations from a single await:
async def run_astrbot(astrbot_root: Path) -> None:
"""Run AstrBot"""
from astrbot.core import LogBroker, LogManager, db_helper, logger
from astrbot.core.initial_loader import InitialLoader
await check_dashboard(astrbot_root / "data")
log_broker = LogBroker()
LogManager.set_queue_handler(logger, log_broker)
db = db_helper
core_lifecycle = InitialLoader(db, log_broker)
loop = asyncio.get_running_loop()
shutdown_signal: signal.Signals | None = None
main_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")
def request_shutdown(signum: signal.Signals) -> None:
nonlocal shutdown_signal
shutdown_signal = signum
logger.info(f"Received {signum.name}; stopping AstrBot...")
main_task.cancel()
cleanup_signal_handlers = _install_shutdown_signal_handlers(loop, request_shutdown)
try:
try:
await main_task
except asyncio.CancelledError:
# Only swallow cancellations that were triggered by our signal handler
if shutdown_signal is None:
raise
finally:
cleanup_signal_handlers()
await LogManager.shutdown()This keeps:
- clean shutdown on SIGINT/SIGTERM
- differentiation between “signal‑driven” cancellation and other cancellations
but removes:
asyncio.Eventshutdown_taskasyncio.wait(FIRST_COMPLETED)shutdown_requested_by_signalbookkeeping
2. Lightly trim _install_shutdown_signal_handlers
You can reduce some internal bookkeeping without changing behavior by using a single dict for “installed + previous handler” instead of a separate installed list:
def _install_shutdown_signal_handlers(
loop: asyncio.AbstractEventLoop,
callback: ShutdownCallback,
) -> Callable[[], None]:
"""Install SIGINT/SIGTERM handlers and return a cleanup callback."""
handled_signals = (signal.SIGINT, signal.SIGTERM)
previous_handlers: dict[signal.Signals, Any] = {}
for signum in handled_signals:
previous_handlers[signum] = signal.getsignal(signum)
try:
loop.add_signal_handler(signum, callback, signum)
except (NotImplementedError, RuntimeError):
def fallback_handler(received_signum, _frame) -> None:
callback(signal.Signals(received_signum))
signal.signal(signum, fallback_handler)
def cleanup() -> None:
for signum, prev in previous_handlers.items():
with contextlib.suppress(NotImplementedError, RuntimeError):
loop.remove_signal_handler(signum)
signal.signal(signum, prev)
return cleanupThis keeps:
- restoration of original handlers
- loop vs.
signal.signalfallback handling
but makes the helper easier to scan and reason about.
| @@ -25,33 +25,48 @@ def __init__(self, db: BaseDatabase, log_broker: LogBroker) -> None: | |||
|
|
|||
| async def start(self) -> None: | |||
There was a problem hiding this comment.
issue (complexity): Consider flattening the start() method’s control flow by separating initialization from the runtime phase and using a single try/except for the run logic while keeping the current semantics.
You can keep the new semantics but flatten the control flow and simplify initialized usage by pulling initialization out of the nested try and having a single try/except block for the “run” phase:
async def start(self) -> None:
core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
initialized = False
# 1. Initialization phase
try:
await core_lifecycle.initialize()
initialized = True
except Exception as e:
logger.critical(traceback.format_exc())
logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
return
# 2. Runtime phase
core_task = core_lifecycle.start()
webui_dir = self.webui_dir
self.dashboard_server = AstrBotDashboard(
core_lifecycle,
self.db,
core_lifecycle.dashboard_shutdown_event,
webui_dir,
)
coro = self.dashboard_server.run()
if coro:
task = asyncio.gather(core_task, coro)
else:
task = core_task
try:
await task # 整个AstrBot在这里运行
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
if initialized:
await core_lifecycle.stop()
except Exception:
if initialized:
try:
await core_lifecycle.stop()
except Exception:
logger.error(
"AstrBot shutdown during runtime-error handling failed",
exc_info=True,
)
raise
finally:
await LogManager.shutdown()This preserves all the new behaviors:
- If initialization fails: log and return, no
stop(). - If cancelled after initialization: log and call
stop(). - If a runtime error occurs after initialization: attempt
stop(), log if shutdown fails, then re-raise. LogManager.shutdown()is still always called.
If possible at a higher level, you might also consider centralizing LogManager.shutdown() in a single layer (either here or in run_astrbot) to make the overall shutdown protocol easier to follow.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces graceful shutdown handling for AstrBot by capturing SIGINT and SIGTERM signals, managing task cancellation, and ensuring proper cleanup of log sinks via a new LogManager.shutdown method. It also adds comprehensive unit tests to verify these shutdown behaviors. The reviewer identified a potential task leak and logging issue if run_astrbot itself is cancelled, suggesting that runner_task should be explicitly cancelled and awaited in the finally block.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| finally: | ||
| cleanup_signal_handlers() | ||
| if not shutdown_task.done(): | ||
| shutdown_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await shutdown_task | ||
| await LogManager.shutdown() |
There was a problem hiding this comment.
If run_astrbot itself is cancelled (for example, if the parent task is cancelled), asyncio.wait will raise a CancelledError and control will jump directly to the finally block. In this scenario, runner_task is left running in the background (leaked) and LogManager.shutdown() is called while runner_task might still be active and attempting to log, which can lead to errors or lost logs.
To ensure a robust and graceful shutdown under all cancellation scenarios, we should explicitly cancel and await runner_task in the finally block if it is not already done.
| finally: | |
| cleanup_signal_handlers() | |
| if not shutdown_task.done(): | |
| shutdown_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await shutdown_task | |
| await LogManager.shutdown() | |
| finally: | |
| cleanup_signal_handlers() | |
| if not runner_task.done(): | |
| runner_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await runner_task | |
| if not shutdown_task.done(): | |
| shutdown_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await shutdown_task | |
| await LogManager.shutdown() |
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements graceful shutdown signal handling for AstrBot, installing SIGINT/SIGTERM handlers with fallback mechanisms, managing task cancellation, and introducing a LogManager.shutdown() method to flush and remove log sinks. The reviewer feedback suggests making the fallback signal handler more robust by checking if the event loop is closed. Additionally, the reviewer recommends removing redundant calls to LogManager.shutdown() in both AstrBotCoreLifecycle.stop() and run_astrbot(), which would also simplify the unit tests by eliminating several now-unnecessary mock fixtures and assertions.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def fallback_handler(received_signum, frame): | ||
| _ = frame | ||
| loop.call_soon_threadsafe(callback, signal.Signals(received_signum)) |
There was a problem hiding this comment.
The fallback signal handler can be made more robust by checking if the event loop is closed and handling potential RuntimeError exceptions when calling call_soon_threadsafe. This prevents unhandled exceptions during a messy or rapid shutdown sequence.
| def fallback_handler(received_signum, frame): | |
| _ = frame | |
| loop.call_soon_threadsafe(callback, signal.Signals(received_signum)) | |
| def fallback_handler(received_signum, frame): | |
| _ = frame | |
| if not loop.is_closed(): | |
| try: | |
| loop.call_soon_threadsafe(callback, signal.Signals(received_signum)) | |
| except RuntimeError: | |
| pass |
| except Exception as e: | ||
| logger.error(f"任务 {task.get_name()} 发生错误: {e}") | ||
|
|
||
| await LogManager.shutdown() |
There was a problem hiding this comment.
Calling LogManager.shutdown() inside AstrBotCoreLifecycle.stop() introduces unnecessary side effects and redundancy. Since LogManager is a global static manager, shutting it down here affects the global logging state, which is particularly problematic in unit tests (requiring extra mocking/fixtures). Furthermore, any logs emitted during the final stages of the application shutdown (after stop() completes but before the process exits) will not be written to the file/trace logs. Since LogManager.shutdown() is already called in the finally block of InitialLoader.start(), we can safely remove it from here.
| shutdown_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await shutdown_task | ||
| await LogManager.shutdown() |
| @pytest.fixture(autouse=True) | ||
| def _mock_log_manager_shutdown(self, monkeypatch: pytest.MonkeyPatch): | ||
| self.mock_log_shutdown = AsyncMock() | ||
| monkeypatch.setattr( | ||
| "astrbot.core.core_lifecycle.LogManager.shutdown", | ||
| self.mock_log_shutdown, | ||
| ) |
| @pytest.fixture(autouse=True) | ||
| def _mock_log_manager_shutdown(self, monkeypatch: pytest.MonkeyPatch): | ||
| self.mock_log_shutdown = AsyncMock() | ||
| monkeypatch.setattr( | ||
| "astrbot.core.core_lifecycle.LogManager.shutdown", | ||
| self.mock_log_shutdown, | ||
| ) |
| lifecycle.provider_manager.terminate.assert_awaited_once() | ||
| lifecycle.platform_manager.terminate.assert_awaited_once() | ||
| lifecycle.kb_manager.terminate.assert_awaited_once() | ||
| self.mock_log_shutdown.assert_awaited_once() |
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces graceful shutdown handling for AstrBot. It implements robust SIGINT and SIGTERM signal handlers in cmd_run.py with fallback support for environments where standard signal handlers are unavailable. Additionally, it updates the InitialLoader to ensure the core lifecycle is stopped and log sinks are flushed and removed via a new LogManager.shutdown() method during process termination. Comprehensive unit tests have been added to verify these shutdown behaviors and logging cleanup. No review comments were provided, so there is no feedback to address.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
修复开启文件日志后,AstrBot 退出时出现的
multiprocessing.resource_trackersemaphore warning。典型 warning:
如果同时开启普通文件日志和 trace 日志,可能出现:
问题原因:AstrBot 的文件日志 sink 使用了 Loguru 的
enqueue=True,会创建 multiprocessing 队列相关资源。进程退出前如果没有显式 flush / remove 这些 sink,Python 的multiprocessing.resource_tracker会在进程关闭时检测到残留 semaphore 并输出 warning。Fixes #8595
Modifications / 改动点
新增
LogManager.shutdown():统一complete()并移除 trace / file sinks,避免 Loguruenqueue=True队列资源残留保留 console sink,避免关闭清理后续诊断日志被静默吞掉
在
AstrBotCoreLifecycle.stop()和InitialLoader的关闭路径调用日志清理为
astrbot run增加 SIGINT / SIGTERM graceful shutdown,确保收到退出信号时也能走清理流程signal.signalfallback handler 改为通过loop.call_soon_threadsafe(...)调回 event loop,避免同步 signal handler 里直接操作 asyncio 状态补充日志 shutdown、CLI 信号关闭、初始化失败 / 取消 / 运行期异常路径测试
This is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
已通过以下检查:
真实运行验证:macOS 本地启动 AstrBot,启动到:
随后发送 SIGTERM,结果:
未再出现:
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
🤖 Generated with Claude Code