Skip to content

feat: add message_debounce config and DebounceManager for message aggregation#8624

Open
sunlonglong-creator wants to merge 1 commit into
AstrBotDevs:masterfrom
sunlonglong-creator:master
Open

feat: add message_debounce config and DebounceManager for message aggregation#8624
sunlonglong-creator wants to merge 1 commit into
AstrBotDevs:masterfrom
sunlonglong-creator:master

Conversation

@sunlonglong-creator
Copy link
Copy Markdown

@sunlonglong-creator sunlonglong-creator commented Jun 6, 2026

功能描述

新增消息防抖(debounce)功能。当启用了 message_debounce 后,短时间内来自同一会话的多条消息会先缓冲,超时后合并为一条再交给 LLM 处理。

解决痛点: QQ 等平台上图片和文字往往不是同时到达,导致图片和文字被割裂成两次独立对话,LLM 无法同时看到上下文。

配置项

platform_settings 下新增 message_debounce

message_debounce:
  enable: false   # 启用后开启防抖
  interval: 2     # 防抖窗口(秒),推荐2-3秒

改动文件

  • astrbot/core/config/default.py — 默认值 + UI schema + ext_group 元数据
  • astrbot/core/event_bus.py — DebounceManager 实现 + 集成到 dispatch()
  • dashboard/src/i18n/locales/zh-CN/features/config-metadata.json
  • dashboard/src/i18n/locales/en-US/features/config-metadata.json
  • .gitignore — 添加 .codegraph/ 忽略规则

Summary by Sourcery

Add configurable message debounce support to aggregate rapid messages from the same session before LLM processing.

New Features:

  • Introduce a DebounceManager that aggregates multiple messages from the same session within a configurable time window into a single event before scheduling.
  • Add a platform_settings.message_debounce configuration block with enable and interval options to control per-session debounce behavior.

Enhancements:

  • Wire message debounce into the EventBus so eligible messages are buffered and merged before being dispatched to the pipeline scheduler.
  • Update configuration metadata to expose the message debounce options in the dashboard configuration UI.

Documentation:

  • Add English and Chinese dashboard i18n entries describing the message debounce feature and its options.

Chores:

  • Ignore the .codegraph/ directory in version control.

@dosubot dosubot Bot added size:L This PR changes 100-499 lines, ignoring generated files. area:core The bug / feature is about astrbot's core, backend area:webui The bug / feature is about webui(dashboard) of astrbot. labels Jun 6, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In _merge_events, consider handling cases where base.message_str or base.message_obj may be None (e.g., initialize an empty string / message object before += and .append), to avoid potential AttributeError or TypeError when merging events with partial data.
  • The current merge logic mutates the first AstrMessageEvent in place and reuses its message_obj list; if that event is referenced elsewhere, this could have side effects—consider constructing a new AstrMessageEvent (or at least a shallow copy of message_obj.message) for the merged result to keep the debounce behavior isolated.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `_merge_events`, consider handling cases where `base.message_str` or `base.message_obj` may be `None` (e.g., initialize an empty string / message object before `+=` and `.append`), to avoid potential `AttributeError` or `TypeError` when merging events with partial data.
- The current merge logic mutates the first `AstrMessageEvent` in place and reuses its `message_obj` list; if that event is referenced elsewhere, this could have side effects—consider constructing a new `AstrMessageEvent` (or at least a shallow copy of `message_obj.message`) for the merged result to keep the debounce behavior isolated.

## Individual Comments

### Comment 1
<location path="astrbot/core/event_bus.py" line_range="57" />
<code_context>
+        # Get interval from config
+        conf = self._config_mgr.get_conf(origin)
+        debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
+        interval: int = debounce_cfg.get("interval", 2)
+
+        # Buffer
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider validating the debounce interval to avoid zero or negative values.

If `interval` is 0 or negative (e.g. from config), `asyncio.sleep(interval)` will either return immediately or raise, which breaks the intended debounce behavior. Please validate or normalize this value (e.g. enforce `interval > 0`, use `max(1, interval)`, or fall back to the default on invalid input).
</issue_to_address>

### Comment 2
<location path="astrbot/core/event_bus.py" line_range="103-112" />
<code_context>
+    @staticmethod
+    def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
+        """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
+        base = events[0]
+
+        for ev in events[1:]:
+            # Merge text
+            if ev.message_str:
+                base.message_str += "\n" + ev.message_str
+
+            # Merge message components (images, files, etc.), skip At/Reply
+            if ev.message_obj and ev.message_obj.message:
+                for comp in ev.message_obj.message:
+                    if not isinstance(comp, (At, Reply)):
+                        base.message_obj.message.append(comp)
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Merging message components assumes `base.message_obj` exists, which may drop or mis-handle components if only later events carry them.

In `_merge_events`, components are appended only when `ev.message_obj` is present, but `base.message_obj` itself may be `None`. If the first event has only text and a later one carries images/files, appending to `base.message_obj.message` can either drop those components or raise an attribute error. Consider initializing or cloning `message_obj` from the first event that has one (when `base.message_obj` is `None`), or choosing as `base` the first event that has a `message_obj`, so non-text components are preserved safely.
</issue_to_address>

### Comment 3
<location path="astrbot/core/event_bus.py" line_range="26" />
<code_context>
 from .platform import AstrMessageEvent


+class DebounceManager:
+    """消息防抖管理器:短时间内的同会话消息合并为一条再调度。
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider centralizing config/scheduler resolution in EventBus and making DebounceManager a pure buffering/timing helper that non-destructively merges events to simplify responsibilities and shared state.

You can reduce the new complexity by tightening the boundaries and removing duplicated lookups, without changing behavior.

### 1. Let `EventBus` own config & scheduler resolution

Instead of letting `DebounceManager` read config *and* resolve the scheduler again, have `EventBus` do that once and pass the resolved values in. That removes duplicated logic and makes `DebounceManager` a pure “timer + buffer” component.

**EventBus.dispatch (compute once, then delegate):**

```python
async def dispatch(self) -> None:
    while True:
        event: AstrMessageEvent = await self.event_queue.get()
        origin = event.unified_msg_origin

        conf_info = self.astrbot_config_mgr.get_conf_info(origin)
        conf_id = conf_info["id"]
        conf_name = conf_info.get("name") or conf_id
        self._print_event(event, conf_name)

        # Config lookup only here
        conf = self.astrbot_config_mgr.get_conf(origin)
        debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
        enable = debounce_cfg.get("enable", False)
        interval: int = debounce_cfg.get("interval", 2)

        scheduler = self.pipeline_scheduler_mapping.get(conf_id)
        if not scheduler:
            logger.error(
                f"PipelineScheduler not found for id: {conf_id}, event ignored."
            )
            continue

        if enable:
            logger.debug(
                f"[Debounce] 消息进入防抖队列: {event.get_message_outline()}"
            )
            await self._debounce_mgr.push(event, scheduler, interval)
        else:
            asyncio.create_task(scheduler.execute(event))
```

**DebounceManager: no config_mgr, no scheduler_mapping**

```python
class DebounceManager:
    def __init__(self) -> None:
        self._tasks: dict[str, asyncio.Task] = {}
        self._buffers: dict[str, list[AstrMessageEvent]] = {}
        self._schedulers: dict[str, PipelineScheduler] = {}

    async def push(
        self,
        event: AstrMessageEvent,
        scheduler: PipelineScheduler,
        interval: int,
    ) -> None:
        origin = event.unified_msg_origin

        # Remember scheduler per origin (no mapping lookup inside)
        self._schedulers[origin] = scheduler

        existing = self._tasks.get(origin)
        if existing is not None and not existing.done():
            existing.cancel()
            logger.debug(f"[Debounce] 重置计时器: {origin}")

        self._buffers.setdefault(origin, []).append(event)

        self._tasks[origin] = asyncio.create_task(
            self._flush_after_delay(origin, interval),
        )

    async def _flush_after_delay(self, origin: str, interval: int) -> None:
        try:
            await asyncio.sleep(interval)
        except asyncio.CancelledError:
            return

        events = self._buffers.pop(origin, [])
        task = self._tasks.pop(origin, None)
        scheduler = self._schedulers.get(origin)

        if not events or not scheduler:
            if not scheduler:
                logger.error(
                    f"[Debounce] PipelineScheduler not found for origin: {origin}, "
                    f"events discarded."
                )
            return

        merged = events[0] if len(events) == 1 else self._merge_events(events)

        logger.info(f"[Debounce] 合并 {len(events)} 条消息后执行调度: {origin}")
        await scheduler.execute(merged)
```

This keeps the debounce feature but:

- Config reads only happen in `EventBus`.
- Scheduler resolution only happens in `EventBus`.
- `DebounceManager` no longer depends on `AstrBotConfigManager` or the mapping.

### 2. Make `_merge_events` avoid mutating the original event

Mutating `events[0]` in place makes reasoning/debugging harder when the same object might be referenced elsewhere. A small change keeps behavior but makes it safer:

```python
import copy

@staticmethod
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
    """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
    base = copy.deepcopy(events[0])  # avoid mutating the original

    for ev in events[1:]:
        if ev.message_str:
            base.message_str += "\n" + ev.message_str

        if ev.message_obj and ev.message_obj.message:
            for comp in ev.message_obj.message:
                if not isinstance(comp, (At, Reply)):
                    base.message_obj.message.append(comp)

    return base
```

This keeps the same merged content semantics, but `_merge_events` becomes easier to test and reason about, and you remove one more subtle source of shared-state complexity.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread astrbot/core/event_bus.py Outdated
Comment thread astrbot/core/event_bus.py
Comment on lines +103 to +112
base = events[0]

for ev in events[1:]:
# Merge text
if ev.message_str:
base.message_str += "\n" + ev.message_str

# Merge message components (images, files, etc.), skip At/Reply
if ev.message_obj and ev.message_obj.message:
for comp in ev.message_obj.message:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Merging message components assumes base.message_obj exists, which may drop or mis-handle components if only later events carry them.

In _merge_events, components are appended only when ev.message_obj is present, but base.message_obj itself may be None. If the first event has only text and a later one carries images/files, appending to base.message_obj.message can either drop those components or raise an attribute error. Consider initializing or cloning message_obj from the first event that has one (when base.message_obj is None), or choosing as base the first event that has a message_obj, so non-text components are preserved safely.

Comment thread astrbot/core/event_bus.py
from .platform import AstrMessageEvent


class DebounceManager:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider centralizing config/scheduler resolution in EventBus and making DebounceManager a pure buffering/timing helper that non-destructively merges events to simplify responsibilities and shared state.

You can reduce the new complexity by tightening the boundaries and removing duplicated lookups, without changing behavior.

1. Let EventBus own config & scheduler resolution

Instead of letting DebounceManager read config and resolve the scheduler again, have EventBus do that once and pass the resolved values in. That removes duplicated logic and makes DebounceManager a pure “timer + buffer” component.

EventBus.dispatch (compute once, then delegate):

async def dispatch(self) -> None:
    while True:
        event: AstrMessageEvent = await self.event_queue.get()
        origin = event.unified_msg_origin

        conf_info = self.astrbot_config_mgr.get_conf_info(origin)
        conf_id = conf_info["id"]
        conf_name = conf_info.get("name") or conf_id
        self._print_event(event, conf_name)

        # Config lookup only here
        conf = self.astrbot_config_mgr.get_conf(origin)
        debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
        enable = debounce_cfg.get("enable", False)
        interval: int = debounce_cfg.get("interval", 2)

        scheduler = self.pipeline_scheduler_mapping.get(conf_id)
        if not scheduler:
            logger.error(
                f"PipelineScheduler not found for id: {conf_id}, event ignored."
            )
            continue

        if enable:
            logger.debug(
                f"[Debounce] 消息进入防抖队列: {event.get_message_outline()}"
            )
            await self._debounce_mgr.push(event, scheduler, interval)
        else:
            asyncio.create_task(scheduler.execute(event))

DebounceManager: no config_mgr, no scheduler_mapping

class DebounceManager:
    def __init__(self) -> None:
        self._tasks: dict[str, asyncio.Task] = {}
        self._buffers: dict[str, list[AstrMessageEvent]] = {}
        self._schedulers: dict[str, PipelineScheduler] = {}

    async def push(
        self,
        event: AstrMessageEvent,
        scheduler: PipelineScheduler,
        interval: int,
    ) -> None:
        origin = event.unified_msg_origin

        # Remember scheduler per origin (no mapping lookup inside)
        self._schedulers[origin] = scheduler

        existing = self._tasks.get(origin)
        if existing is not None and not existing.done():
            existing.cancel()
            logger.debug(f"[Debounce] 重置计时器: {origin}")

        self._buffers.setdefault(origin, []).append(event)

        self._tasks[origin] = asyncio.create_task(
            self._flush_after_delay(origin, interval),
        )

    async def _flush_after_delay(self, origin: str, interval: int) -> None:
        try:
            await asyncio.sleep(interval)
        except asyncio.CancelledError:
            return

        events = self._buffers.pop(origin, [])
        task = self._tasks.pop(origin, None)
        scheduler = self._schedulers.get(origin)

        if not events or not scheduler:
            if not scheduler:
                logger.error(
                    f"[Debounce] PipelineScheduler not found for origin: {origin}, "
                    f"events discarded."
                )
            return

        merged = events[0] if len(events) == 1 else self._merge_events(events)

        logger.info(f"[Debounce] 合并 {len(events)} 条消息后执行调度: {origin}")
        await scheduler.execute(merged)

This keeps the debounce feature but:

  • Config reads only happen in EventBus.
  • Scheduler resolution only happens in EventBus.
  • DebounceManager no longer depends on AstrBotConfigManager or the mapping.

2. Make _merge_events avoid mutating the original event

Mutating events[0] in place makes reasoning/debugging harder when the same object might be referenced elsewhere. A small change keeps behavior but makes it safer:

import copy

@staticmethod
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
    """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
    base = copy.deepcopy(events[0])  # avoid mutating the original

    for ev in events[1:]:
        if ev.message_str:
            base.message_str += "\n" + ev.message_str

        if ev.message_obj and ev.message_obj.message:
            for comp in ev.message_obj.message:
                if not isinstance(comp, (At, Reply)):
                    base.message_obj.message.append(comp)

    return base

This keeps the same merged content semantics, but _merge_events becomes easier to test and reason about, and you remove one more subtle source of shared-state complexity.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a message debouncing feature to aggregate consecutive messages from the same session within a specified interval before processing them. Feedback highlights a potential TypeError or AttributeError in the event merging logic when handling None values, suggests adding unit tests for the new DebounceManager, and recommends implementing a maximum delay to prevent message starvation in highly active sessions.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread astrbot/core/event_bus.py
Comment on lines +101 to +117
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
"""将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
base = events[0]

for ev in events[1:]:
# Merge text
if ev.message_str:
base.message_str += "\n" + ev.message_str

# Merge message components (images, files, etc.), skip At/Reply
if ev.message_obj and ev.message_obj.message:
for comp in ev.message_obj.message:
if not isinstance(comp, (At, Reply)):
base.message_obj.message.append(comp)

return base

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

_merge_events 中,如果第一条消息(base)的 message_strNone(例如第一条消息是纯图片),而后续消息有 message_str,执行 base.message_str += "\n" + ev.message_str 会抛出 TypeError: unsupported operand type(s) for +=: 'NoneType' and 'str'

同样地,如果 base.message_objNone(例如某些平台上的纯文本消息),而后续消息有 message_obj,访问 base.message_obj.message 会抛出 AttributeError

建议在合并时对 None 值进行防御性检查。

    @staticmethod
    def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
        """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
        base = events[0]

        for ev in events[1:]:
            # Merge text safely
            if ev.message_str:
                if base.message_str:
                    base.message_str += "\n" + ev.message_str
                else:
                    base.message_str = ev.message_str

            # Merge message components (images, files, etc.) safely, skip At/Reply
            if ev.message_obj and ev.message_obj.message:
                if base.message_obj is None:
                    base.message_obj = ev.message_obj
                else:
                    for comp in ev.message_obj.message:
                        if not isinstance(comp, (At, Reply)):
                            base.message_obj.message.append(comp)

        return base

Comment thread astrbot/core/event_bus.py
Comment on lines +26 to +33
class DebounceManager:
"""消息防抖管理器:短时间内的同会话消息合并为一条再调度。

当 message_debounce.enable = true 时,
同一 unified_msg_origin 的连续消息会在 interval 秒内聚合,
计时器被新消息重置,超时后合并所有消息一次性交给 scheduler 执行。
"""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

根据通用规则,新功能(如消息防抖管理器 DebounceManager)应当伴随相应的单元测试。请添加对应的单元测试,以验证 DebounceManager 的防抖延迟、计时器重置以及消息合并逻辑的正确性。

References
  1. New functionality, such as handling attachments, should be accompanied by corresponding unit tests.

Comment thread astrbot/core/event_bus.py
Comment on lines +44 to +67
async def push(self, event: AstrMessageEvent) -> None:
"""推送事件到防抖管理器。若同会话已有等待计时器则重置, 否则创建新计时器。"""
origin = event.unified_msg_origin

# Cancel existing timer if any
existing = self._tasks.get(origin)
if existing is not None and not existing.done():
existing.cancel()
logger.debug(f"[Debounce] 重置计时器: {origin}")

# Get interval from config
conf = self._config_mgr.get_conf(origin)
debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
interval: int = debounce_cfg.get("interval", 2)

# Buffer
if origin not in self._buffers:
self._buffers[origin] = []
self._buffers[origin].append(event)

# Start new timer
self._tasks[origin] = asyncio.create_task(
self._flush_after_delay(origin, interval),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

当前的防抖实现中,每次收到新消息都会重置计时器(existing.cancel())。在非常活跃的会话(如高频群聊或遭遇刷屏)中,如果消息发送间隔一直小于 interval,计时器将不断被重置,导致消息永远无法被 flush 调度给 LLM,且缓冲区会持续增长(存在潜在的内存泄漏风险)。

建议引入一个最大等待时间(例如 max_delay),从第一条消息到达开始计时,一旦达到最大等待时间则强制执行 flush,不再因新消息而重置,以避免消息饥饿和内存无限增长。

@sunlonglong-creator
Copy link
Copy Markdown
Author

感谢 https://github.com/sourcery-ai@gemini-code-assist 的 review 🙏 已在 acebc9e 处理

@gemini-code-assist
Copy link
Copy Markdown
Contributor

不客气!很高兴能对你的 PR 有所帮助。既然你已经完成了相关修复,后续如果有其他问题或需要进一步的建议,随时可以再次联系我。祝你的 PR 合并顺利!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend area:webui The bug / feature is about webui(dashboard) of astrbot. size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant