feat: add message_debounce config and DebounceManager for message aggregation#8624
feat: add message_debounce config and DebounceManager for message aggregation#8624sunlonglong-creator wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
_merge_events, consider handling cases wherebase.message_strorbase.message_objmay beNone(e.g., initialize an empty string / message object before+=and.append), to avoid potentialAttributeErrororTypeErrorwhen merging events with partial data. - The current merge logic mutates the first
AstrMessageEventin place and reuses itsmessage_objlist; if that event is referenced elsewhere, this could have side effects—consider constructing a newAstrMessageEvent(or at least a shallow copy ofmessage_obj.message) for the merged result to keep the debounce behavior isolated.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `_merge_events`, consider handling cases where `base.message_str` or `base.message_obj` may be `None` (e.g., initialize an empty string / message object before `+=` and `.append`), to avoid potential `AttributeError` or `TypeError` when merging events with partial data.
- The current merge logic mutates the first `AstrMessageEvent` in place and reuses its `message_obj` list; if that event is referenced elsewhere, this could have side effects—consider constructing a new `AstrMessageEvent` (or at least a shallow copy of `message_obj.message`) for the merged result to keep the debounce behavior isolated.
## Individual Comments
### Comment 1
<location path="astrbot/core/event_bus.py" line_range="57" />
<code_context>
+ # Get interval from config
+ conf = self._config_mgr.get_conf(origin)
+ debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
+ interval: int = debounce_cfg.get("interval", 2)
+
+ # Buffer
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider validating the debounce interval to avoid zero or negative values.
If `interval` is 0 or negative (e.g. from config), `asyncio.sleep(interval)` will either return immediately or raise, which breaks the intended debounce behavior. Please validate or normalize this value (e.g. enforce `interval > 0`, use `max(1, interval)`, or fall back to the default on invalid input).
</issue_to_address>
### Comment 2
<location path="astrbot/core/event_bus.py" line_range="103-112" />
<code_context>
+ @staticmethod
+ def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
+ """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
+ base = events[0]
+
+ for ev in events[1:]:
+ # Merge text
+ if ev.message_str:
+ base.message_str += "\n" + ev.message_str
+
+ # Merge message components (images, files, etc.), skip At/Reply
+ if ev.message_obj and ev.message_obj.message:
+ for comp in ev.message_obj.message:
+ if not isinstance(comp, (At, Reply)):
+ base.message_obj.message.append(comp)
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Merging message components assumes `base.message_obj` exists, which may drop or mis-handle components if only later events carry them.
In `_merge_events`, components are appended only when `ev.message_obj` is present, but `base.message_obj` itself may be `None`. If the first event has only text and a later one carries images/files, appending to `base.message_obj.message` can either drop those components or raise an attribute error. Consider initializing or cloning `message_obj` from the first event that has one (when `base.message_obj` is `None`), or choosing as `base` the first event that has a `message_obj`, so non-text components are preserved safely.
</issue_to_address>
### Comment 3
<location path="astrbot/core/event_bus.py" line_range="26" />
<code_context>
from .platform import AstrMessageEvent
+class DebounceManager:
+ """消息防抖管理器:短时间内的同会话消息合并为一条再调度。
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider centralizing config/scheduler resolution in EventBus and making DebounceManager a pure buffering/timing helper that non-destructively merges events to simplify responsibilities and shared state.
You can reduce the new complexity by tightening the boundaries and removing duplicated lookups, without changing behavior.
### 1. Let `EventBus` own config & scheduler resolution
Instead of letting `DebounceManager` read config *and* resolve the scheduler again, have `EventBus` do that once and pass the resolved values in. That removes duplicated logic and makes `DebounceManager` a pure “timer + buffer” component.
**EventBus.dispatch (compute once, then delegate):**
```python
async def dispatch(self) -> None:
while True:
event: AstrMessageEvent = await self.event_queue.get()
origin = event.unified_msg_origin
conf_info = self.astrbot_config_mgr.get_conf_info(origin)
conf_id = conf_info["id"]
conf_name = conf_info.get("name") or conf_id
self._print_event(event, conf_name)
# Config lookup only here
conf = self.astrbot_config_mgr.get_conf(origin)
debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
enable = debounce_cfg.get("enable", False)
interval: int = debounce_cfg.get("interval", 2)
scheduler = self.pipeline_scheduler_mapping.get(conf_id)
if not scheduler:
logger.error(
f"PipelineScheduler not found for id: {conf_id}, event ignored."
)
continue
if enable:
logger.debug(
f"[Debounce] 消息进入防抖队列: {event.get_message_outline()}"
)
await self._debounce_mgr.push(event, scheduler, interval)
else:
asyncio.create_task(scheduler.execute(event))
```
**DebounceManager: no config_mgr, no scheduler_mapping**
```python
class DebounceManager:
def __init__(self) -> None:
self._tasks: dict[str, asyncio.Task] = {}
self._buffers: dict[str, list[AstrMessageEvent]] = {}
self._schedulers: dict[str, PipelineScheduler] = {}
async def push(
self,
event: AstrMessageEvent,
scheduler: PipelineScheduler,
interval: int,
) -> None:
origin = event.unified_msg_origin
# Remember scheduler per origin (no mapping lookup inside)
self._schedulers[origin] = scheduler
existing = self._tasks.get(origin)
if existing is not None and not existing.done():
existing.cancel()
logger.debug(f"[Debounce] 重置计时器: {origin}")
self._buffers.setdefault(origin, []).append(event)
self._tasks[origin] = asyncio.create_task(
self._flush_after_delay(origin, interval),
)
async def _flush_after_delay(self, origin: str, interval: int) -> None:
try:
await asyncio.sleep(interval)
except asyncio.CancelledError:
return
events = self._buffers.pop(origin, [])
task = self._tasks.pop(origin, None)
scheduler = self._schedulers.get(origin)
if not events or not scheduler:
if not scheduler:
logger.error(
f"[Debounce] PipelineScheduler not found for origin: {origin}, "
f"events discarded."
)
return
merged = events[0] if len(events) == 1 else self._merge_events(events)
logger.info(f"[Debounce] 合并 {len(events)} 条消息后执行调度: {origin}")
await scheduler.execute(merged)
```
This keeps the debounce feature but:
- Config reads only happen in `EventBus`.
- Scheduler resolution only happens in `EventBus`.
- `DebounceManager` no longer depends on `AstrBotConfigManager` or the mapping.
### 2. Make `_merge_events` avoid mutating the original event
Mutating `events[0]` in place makes reasoning/debugging harder when the same object might be referenced elsewhere. A small change keeps behavior but makes it safer:
```python
import copy
@staticmethod
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
"""将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
base = copy.deepcopy(events[0]) # avoid mutating the original
for ev in events[1:]:
if ev.message_str:
base.message_str += "\n" + ev.message_str
if ev.message_obj and ev.message_obj.message:
for comp in ev.message_obj.message:
if not isinstance(comp, (At, Reply)):
base.message_obj.message.append(comp)
return base
```
This keeps the same merged content semantics, but `_merge_events` becomes easier to test and reason about, and you remove one more subtle source of shared-state complexity.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| base = events[0] | ||
|
|
||
| for ev in events[1:]: | ||
| # Merge text | ||
| if ev.message_str: | ||
| base.message_str += "\n" + ev.message_str | ||
|
|
||
| # Merge message components (images, files, etc.), skip At/Reply | ||
| if ev.message_obj and ev.message_obj.message: | ||
| for comp in ev.message_obj.message: |
There was a problem hiding this comment.
issue (bug_risk): Merging message components assumes base.message_obj exists, which may drop or mis-handle components if only later events carry them.
In _merge_events, components are appended only when ev.message_obj is present, but base.message_obj itself may be None. If the first event has only text and a later one carries images/files, appending to base.message_obj.message can either drop those components or raise an attribute error. Consider initializing or cloning message_obj from the first event that has one (when base.message_obj is None), or choosing as base the first event that has a message_obj, so non-text components are preserved safely.
| from .platform import AstrMessageEvent | ||
|
|
||
|
|
||
| class DebounceManager: |
There was a problem hiding this comment.
issue (complexity): Consider centralizing config/scheduler resolution in EventBus and making DebounceManager a pure buffering/timing helper that non-destructively merges events to simplify responsibilities and shared state.
You can reduce the new complexity by tightening the boundaries and removing duplicated lookups, without changing behavior.
1. Let EventBus own config & scheduler resolution
Instead of letting DebounceManager read config and resolve the scheduler again, have EventBus do that once and pass the resolved values in. That removes duplicated logic and makes DebounceManager a pure “timer + buffer” component.
EventBus.dispatch (compute once, then delegate):
async def dispatch(self) -> None:
while True:
event: AstrMessageEvent = await self.event_queue.get()
origin = event.unified_msg_origin
conf_info = self.astrbot_config_mgr.get_conf_info(origin)
conf_id = conf_info["id"]
conf_name = conf_info.get("name") or conf_id
self._print_event(event, conf_name)
# Config lookup only here
conf = self.astrbot_config_mgr.get_conf(origin)
debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
enable = debounce_cfg.get("enable", False)
interval: int = debounce_cfg.get("interval", 2)
scheduler = self.pipeline_scheduler_mapping.get(conf_id)
if not scheduler:
logger.error(
f"PipelineScheduler not found for id: {conf_id}, event ignored."
)
continue
if enable:
logger.debug(
f"[Debounce] 消息进入防抖队列: {event.get_message_outline()}"
)
await self._debounce_mgr.push(event, scheduler, interval)
else:
asyncio.create_task(scheduler.execute(event))DebounceManager: no config_mgr, no scheduler_mapping
class DebounceManager:
def __init__(self) -> None:
self._tasks: dict[str, asyncio.Task] = {}
self._buffers: dict[str, list[AstrMessageEvent]] = {}
self._schedulers: dict[str, PipelineScheduler] = {}
async def push(
self,
event: AstrMessageEvent,
scheduler: PipelineScheduler,
interval: int,
) -> None:
origin = event.unified_msg_origin
# Remember scheduler per origin (no mapping lookup inside)
self._schedulers[origin] = scheduler
existing = self._tasks.get(origin)
if existing is not None and not existing.done():
existing.cancel()
logger.debug(f"[Debounce] 重置计时器: {origin}")
self._buffers.setdefault(origin, []).append(event)
self._tasks[origin] = asyncio.create_task(
self._flush_after_delay(origin, interval),
)
async def _flush_after_delay(self, origin: str, interval: int) -> None:
try:
await asyncio.sleep(interval)
except asyncio.CancelledError:
return
events = self._buffers.pop(origin, [])
task = self._tasks.pop(origin, None)
scheduler = self._schedulers.get(origin)
if not events or not scheduler:
if not scheduler:
logger.error(
f"[Debounce] PipelineScheduler not found for origin: {origin}, "
f"events discarded."
)
return
merged = events[0] if len(events) == 1 else self._merge_events(events)
logger.info(f"[Debounce] 合并 {len(events)} 条消息后执行调度: {origin}")
await scheduler.execute(merged)This keeps the debounce feature but:
- Config reads only happen in
EventBus. - Scheduler resolution only happens in
EventBus. DebounceManagerno longer depends onAstrBotConfigManageror the mapping.
2. Make _merge_events avoid mutating the original event
Mutating events[0] in place makes reasoning/debugging harder when the same object might be referenced elsewhere. A small change keeps behavior but makes it safer:
import copy
@staticmethod
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
"""将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
base = copy.deepcopy(events[0]) # avoid mutating the original
for ev in events[1:]:
if ev.message_str:
base.message_str += "\n" + ev.message_str
if ev.message_obj and ev.message_obj.message:
for comp in ev.message_obj.message:
if not isinstance(comp, (At, Reply)):
base.message_obj.message.append(comp)
return baseThis keeps the same merged content semantics, but _merge_events becomes easier to test and reason about, and you remove one more subtle source of shared-state complexity.
There was a problem hiding this comment.
Code Review
This pull request introduces a message debouncing feature to aggregate consecutive messages from the same session within a specified interval before processing them. Feedback highlights a potential TypeError or AttributeError in the event merging logic when handling None values, suggests adding unit tests for the new DebounceManager, and recommends implementing a maximum delay to prevent message starvation in highly active sessions.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent: | ||
| """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。""" | ||
| base = events[0] | ||
|
|
||
| for ev in events[1:]: | ||
| # Merge text | ||
| if ev.message_str: | ||
| base.message_str += "\n" + ev.message_str | ||
|
|
||
| # Merge message components (images, files, etc.), skip At/Reply | ||
| if ev.message_obj and ev.message_obj.message: | ||
| for comp in ev.message_obj.message: | ||
| if not isinstance(comp, (At, Reply)): | ||
| base.message_obj.message.append(comp) | ||
|
|
||
| return base | ||
|
|
There was a problem hiding this comment.
在 _merge_events 中,如果第一条消息(base)的 message_str 为 None(例如第一条消息是纯图片),而后续消息有 message_str,执行 base.message_str += "\n" + ev.message_str 会抛出 TypeError: unsupported operand type(s) for +=: 'NoneType' and 'str'。
同样地,如果 base.message_obj 为 None(例如某些平台上的纯文本消息),而后续消息有 message_obj,访问 base.message_obj.message 会抛出 AttributeError。
建议在合并时对 None 值进行防御性检查。
@staticmethod
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
"""将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
base = events[0]
for ev in events[1:]:
# Merge text safely
if ev.message_str:
if base.message_str:
base.message_str += "\n" + ev.message_str
else:
base.message_str = ev.message_str
# Merge message components (images, files, etc.) safely, skip At/Reply
if ev.message_obj and ev.message_obj.message:
if base.message_obj is None:
base.message_obj = ev.message_obj
else:
for comp in ev.message_obj.message:
if not isinstance(comp, (At, Reply)):
base.message_obj.message.append(comp)
return base| class DebounceManager: | ||
| """消息防抖管理器:短时间内的同会话消息合并为一条再调度。 | ||
|
|
||
| 当 message_debounce.enable = true 时, | ||
| 同一 unified_msg_origin 的连续消息会在 interval 秒内聚合, | ||
| 计时器被新消息重置,超时后合并所有消息一次性交给 scheduler 执行。 | ||
| """ | ||
|
|
| async def push(self, event: AstrMessageEvent) -> None: | ||
| """推送事件到防抖管理器。若同会话已有等待计时器则重置, 否则创建新计时器。""" | ||
| origin = event.unified_msg_origin | ||
|
|
||
| # Cancel existing timer if any | ||
| existing = self._tasks.get(origin) | ||
| if existing is not None and not existing.done(): | ||
| existing.cancel() | ||
| logger.debug(f"[Debounce] 重置计时器: {origin}") | ||
|
|
||
| # Get interval from config | ||
| conf = self._config_mgr.get_conf(origin) | ||
| debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {}) | ||
| interval: int = debounce_cfg.get("interval", 2) | ||
|
|
||
| # Buffer | ||
| if origin not in self._buffers: | ||
| self._buffers[origin] = [] | ||
| self._buffers[origin].append(event) | ||
|
|
||
| # Start new timer | ||
| self._tasks[origin] = asyncio.create_task( | ||
| self._flush_after_delay(origin, interval), | ||
| ) |
c266c1d to
48555a4
Compare
48555a4 to
acebc9e
Compare
|
感谢 https://github.com/sourcery-ai 和 @gemini-code-assist 的 review 🙏 已在 acebc9e 处理 |
|
不客气!很高兴能对你的 PR 有所帮助。既然你已经完成了相关修复,后续如果有其他问题或需要进一步的建议,随时可以再次联系我。祝你的 PR 合并顺利! |
功能描述
新增消息防抖(debounce)功能。当启用了
message_debounce后,短时间内来自同一会话的多条消息会先缓冲,超时后合并为一条再交给 LLM 处理。解决痛点: QQ 等平台上图片和文字往往不是同时到达,导致图片和文字被割裂成两次独立对话,LLM 无法同时看到上下文。
配置项
platform_settings下新增message_debounce:改动文件
astrbot/core/config/default.py— 默认值 + UI schema + ext_group 元数据astrbot/core/event_bus.py— DebounceManager 实现 + 集成到 dispatch()dashboard/src/i18n/locales/zh-CN/features/config-metadata.jsondashboard/src/i18n/locales/en-US/features/config-metadata.json.gitignore— 添加 .codegraph/ 忽略规则Summary by Sourcery
Add configurable message debounce support to aggregate rapid messages from the same session before LLM processing.
New Features:
Enhancements:
Documentation:
Chores: