Skip to content

Commit f4c0182

Browse files
authored
feat: support and flush chunks in the chat stream helper (#1809)
1 parent 1c84f7f commit f4c0182

7 files changed

Lines changed: 212 additions & 31 deletions

File tree

slack_sdk/web/async_chat_stream.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010

1111
import json
1212
import logging
13-
from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
13+
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
1414

1515
import slack_sdk.errors as e
1616
from slack_sdk.models.blocks.blocks import Block
17+
from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk
1718
from slack_sdk.models.metadata import Metadata
1819
from slack_sdk.web.async_slack_response import AsyncSlackResponse
1920

@@ -75,7 +76,8 @@ def __init__(
7576
async def append(
7677
self,
7778
*,
78-
markdown_text: str,
79+
markdown_text: Optional[str] = None,
80+
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
7981
**kwargs,
8082
) -> Optional[AsyncSlackResponse]:
8183
"""Append to the stream.
@@ -84,6 +86,7 @@ async def append(
8486
is stopped this method cannot be called.
8587
8688
Args:
89+
chunks: An array of streaming chunks that can contain either markdown text or task updates.
8790
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
8891
what will be appended to the message received so far.
8992
**kwargs: Additional arguments passed to the underlying API calls.
@@ -111,9 +114,10 @@ async def append(
111114
raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
112115
if kwargs.get("token"):
113116
self._token = kwargs.pop("token")
114-
self._buffer += markdown_text
115-
if len(self._buffer) >= self._buffer_size:
116-
return await self._flush_buffer(**kwargs)
117+
if markdown_text is not None:
118+
self._buffer += markdown_text
119+
if len(self._buffer) >= self._buffer_size or chunks is not None:
120+
return await self._flush_buffer(chunks=chunks, **kwargs)
117121
details = {
118122
"buffer_length": len(self._buffer),
119123
"buffer_size": self._buffer_size,
@@ -129,6 +133,7 @@ async def stop(
129133
self,
130134
*,
131135
markdown_text: Optional[str] = None,
136+
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
132137
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
133138
metadata: Optional[Union[Dict, Metadata]] = None,
134139
**kwargs,
@@ -137,6 +142,7 @@ async def stop(
137142
138143
Args:
139144
blocks: A list of blocks that will be rendered at the bottom of the finalized message.
145+
chunks: An array of streaming chunks that can contain either markdown text or task updates.
140146
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
141147
what will be appended to the message received so far.
142148
metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
@@ -177,26 +183,36 @@ async def stop(
177183
raise e.SlackRequestError("Failed to stop stream: stream not started")
178184
self._stream_ts = str(response["ts"])
179185
self._state = "in_progress"
186+
flushings: List[Union[Dict, Chunk]] = []
187+
if len(self._buffer) != 0:
188+
flushings.append(MarkdownTextChunk(text=self._buffer))
189+
if chunks is not None:
190+
flushings.extend(chunks)
180191
response = await self._client.chat_stopStream(
181192
token=self._token,
182193
channel=self._stream_args["channel"],
183194
ts=self._stream_ts,
184195
blocks=blocks,
185-
markdown_text=self._buffer,
196+
chunks=flushings,
186197
metadata=metadata,
187198
**kwargs,
188199
)
189200
self._state = "completed"
190201
return response
191202

192-
async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse:
193-
"""Flush the internal buffer by making appropriate API calls."""
203+
async def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> AsyncSlackResponse:
204+
"""Flush the internal buffer with chunks by making appropriate API calls."""
205+
flushings: List[Union[Dict, Chunk]] = []
206+
if len(self._buffer) != 0:
207+
flushings.append(MarkdownTextChunk(text=self._buffer))
208+
if chunks is not None:
209+
flushings.extend(chunks)
194210
if not self._stream_ts:
195211
response = await self._client.chat_startStream(
196212
**self._stream_args,
197213
token=self._token,
198214
**kwargs,
199-
markdown_text=self._buffer,
215+
chunks=flushings,
200216
)
201217
self._stream_ts = response.get("ts")
202218
self._state = "in_progress"
@@ -206,7 +222,7 @@ async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse:
206222
channel=self._stream_args["channel"],
207223
ts=self._stream_ts,
208224
**kwargs,
209-
markdown_text=self._buffer,
225+
chunks=flushings,
210226
)
211227
self._buffer = ""
212228
return response

slack_sdk/web/async_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2631,7 +2631,7 @@ async def chat_appendStream(
26312631
*,
26322632
channel: str,
26332633
ts: str,
2634-
markdown_text: str,
2634+
markdown_text: Optional[str] = None,
26352635
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
26362636
**kwargs,
26372637
) -> AsyncSlackResponse:

slack_sdk/web/chat_stream.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import json
22
import logging
3-
from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
3+
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
44

55
import slack_sdk.errors as e
66
from slack_sdk.models.blocks.blocks import Block
7+
from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk
78
from slack_sdk.models.metadata import Metadata
89
from slack_sdk.web.slack_response import SlackResponse
910

@@ -65,7 +66,8 @@ def __init__(
6566
def append(
6667
self,
6768
*,
68-
markdown_text: str,
69+
markdown_text: Optional[str] = None,
70+
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
6971
**kwargs,
7072
) -> Optional[SlackResponse]:
7173
"""Append to the stream.
@@ -74,6 +76,7 @@ def append(
7476
is stopped this method cannot be called.
7577
7678
Args:
79+
chunks: An array of streaming chunks that can contain either markdown text or task updates.
7780
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
7881
what will be appended to the message received so far.
7982
**kwargs: Additional arguments passed to the underlying API calls.
@@ -101,9 +104,10 @@ def append(
101104
raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
102105
if kwargs.get("token"):
103106
self._token = kwargs.pop("token")
104-
self._buffer += markdown_text
105-
if len(self._buffer) >= self._buffer_size:
106-
return self._flush_buffer(**kwargs)
107+
if markdown_text is not None:
108+
self._buffer += markdown_text
109+
if len(self._buffer) >= self._buffer_size or chunks is not None:
110+
return self._flush_buffer(chunks=chunks, **kwargs)
107111
details = {
108112
"buffer_length": len(self._buffer),
109113
"buffer_size": self._buffer_size,
@@ -119,6 +123,7 @@ def stop(
119123
self,
120124
*,
121125
markdown_text: Optional[str] = None,
126+
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
122127
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
123128
metadata: Optional[Union[Dict, Metadata]] = None,
124129
**kwargs,
@@ -127,6 +132,7 @@ def stop(
127132
128133
Args:
129134
blocks: A list of blocks that will be rendered at the bottom of the finalized message.
135+
chunks: An array of streaming chunks that can contain either markdown text or task updates.
130136
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
131137
what will be appended to the message received so far.
132138
metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
@@ -167,26 +173,36 @@ def stop(
167173
raise e.SlackRequestError("Failed to stop stream: stream not started")
168174
self._stream_ts = str(response["ts"])
169175
self._state = "in_progress"
176+
flushings: List[Union[Dict, Chunk]] = []
177+
if len(self._buffer) != 0:
178+
flushings.append(MarkdownTextChunk(text=self._buffer))
179+
if chunks is not None:
180+
flushings.extend(chunks)
170181
response = self._client.chat_stopStream(
171182
token=self._token,
172183
channel=self._stream_args["channel"],
173184
ts=self._stream_ts,
174185
blocks=blocks,
175-
markdown_text=self._buffer,
186+
chunks=flushings,
176187
metadata=metadata,
177188
**kwargs,
178189
)
179190
self._state = "completed"
180191
return response
181192

182-
def _flush_buffer(self, **kwargs) -> SlackResponse:
183-
"""Flush the internal buffer by making appropriate API calls."""
193+
def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> SlackResponse:
194+
"""Flush the internal buffer with chunks by making appropriate API calls."""
195+
flushings: List[Union[Dict, Chunk]] = []
196+
if len(self._buffer) != 0:
197+
flushings.append(MarkdownTextChunk(text=self._buffer))
198+
if chunks is not None:
199+
flushings.extend(chunks)
184200
if not self._stream_ts:
185201
response = self._client.chat_startStream(
186202
**self._stream_args,
187203
token=self._token,
188204
**kwargs,
189-
markdown_text=self._buffer,
205+
chunks=flushings,
190206
)
191207
self._stream_ts = response.get("ts")
192208
self._state = "in_progress"
@@ -196,7 +212,7 @@ def _flush_buffer(self, **kwargs) -> SlackResponse:
196212
channel=self._stream_args["channel"],
197213
ts=self._stream_ts,
198214
**kwargs,
199-
markdown_text=self._buffer,
215+
chunks=flushings,
200216
)
201217
self._buffer = ""
202218
return response

slack_sdk/web/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2621,7 +2621,7 @@ def chat_appendStream(
26212621
*,
26222622
channel: str,
26232623
ts: str,
2624-
markdown_text: str,
2624+
markdown_text: Optional[str] = None,
26252625
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
26262626
**kwargs,
26272627
) -> SlackResponse:

slack_sdk/web/legacy_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2632,7 +2632,7 @@ def chat_appendStream(
26322632
*,
26332633
channel: str,
26342634
ts: str,
2635-
markdown_text: str,
2635+
markdown_text: Optional[str] = None,
26362636
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
26372637
**kwargs,
26382638
) -> Union[Future, SlackResponse]:

tests/slack_sdk/web/test_chat_stream.py

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from slack_sdk.models.blocks.basic_components import FeedbackButtonObject
88
from slack_sdk.models.blocks.block_elements import FeedbackButtonsElement, IconButtonElement
99
from slack_sdk.models.blocks.blocks import ContextActionsBlock
10+
from slack_sdk.models.messages.chunk import MarkdownTextChunk, TaskUpdateChunk
1011
from tests.mock_web_api_server import cleanup_mock_web_api_server, setup_mock_web_api_server
1112
from tests.slack_sdk.web.mock_web_api_handler import MockHandler
1213

@@ -105,7 +106,10 @@ def test_streams_a_short_message(self):
105106
stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {})
106107
self.assertEqual(stop_request.get("channel"), "C0123456789")
107108
self.assertEqual(stop_request.get("ts"), "123.123")
108-
self.assertEqual(stop_request.get("markdown_text"), "nice!")
109+
self.assertEqual(
110+
json.dumps(stop_request.get("chunks")),
111+
'[{"text": "nice!", "type": "markdown_text"}]',
112+
)
109113

110114
def test_streams_a_long_message(self):
111115
streamer = self.client.chat_stream(
@@ -146,13 +150,19 @@ def test_streams_a_long_message(self):
146150
start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {})
147151
self.assertEqual(start_request.get("channel"), "C0123456789")
148152
self.assertEqual(start_request.get("thread_ts"), "123.000")
149-
self.assertEqual(start_request.get("markdown_text"), "**this messag")
153+
self.assertEqual(
154+
json.dumps(start_request.get("chunks")),
155+
'[{"text": "**this messag", "type": "markdown_text"}]',
156+
)
150157
self.assertEqual(start_request.get("recipient_team_id"), "T0123456789")
151158
self.assertEqual(start_request.get("recipient_user_id"), "U0123456789")
152159

153160
append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {})
154161
self.assertEqual(append_request.get("channel"), "C0123456789")
155-
self.assertEqual(append_request.get("markdown_text"), "e is bold!")
162+
self.assertEqual(
163+
json.dumps(append_request.get("chunks")),
164+
'[{"text": "e is bold!", "type": "markdown_text"}]',
165+
)
156166
self.assertEqual(append_request.get("token"), "xoxb-chat_stream_test_token1")
157167
self.assertEqual(append_request.get("ts"), "123.123")
158168

@@ -162,10 +172,74 @@ def test_streams_a_long_message(self):
162172
'[{"elements": [{"negative_button": {"text": {"emoji": true, "text": "bad", "type": "plain_text"}, "value": "-1"}, "positive_button": {"text": {"emoji": true, "text": "good", "type": "plain_text"}, "value": "+1"}, "type": "feedback_buttons"}, {"icon": "trash", "text": {"emoji": true, "text": "delete", "type": "plain_text"}, "type": "icon_button"}], "type": "context_actions"}]',
163173
)
164174
self.assertEqual(stop_request.get("channel"), "C0123456789")
165-
self.assertEqual(stop_request.get("markdown_text"), "**")
175+
self.assertEqual(
176+
json.dumps(stop_request.get("chunks")),
177+
'[{"text": "**", "type": "markdown_text"}]',
178+
)
166179
self.assertEqual(stop_request.get("token"), "xoxb-chat_stream_test_token2")
167180
self.assertEqual(stop_request.get("ts"), "123.123")
168181

182+
def test_streams_a_chunk_message(self):
183+
streamer = self.client.chat_stream(
184+
channel="C0123456789",
185+
recipient_team_id="T0123456789",
186+
recipient_user_id="U0123456789",
187+
thread_ts="123.000",
188+
)
189+
streamer.append(markdown_text="**this is ")
190+
streamer.append(markdown_text="buffered**")
191+
streamer.append(
192+
chunks=[
193+
TaskUpdateChunk(
194+
id="001",
195+
title="Counting...",
196+
status="pending",
197+
),
198+
],
199+
)
200+
streamer.append(
201+
chunks=[
202+
MarkdownTextChunk(text="**this is unbuffered**"),
203+
],
204+
)
205+
streamer.append(markdown_text="\n")
206+
streamer.stop(
207+
chunks=[
208+
MarkdownTextChunk(text=":space_invader:"),
209+
],
210+
)
211+
212+
self.assertEqual(self.received_requests.get("/chat.startStream", 0), 1)
213+
self.assertEqual(self.received_requests.get("/chat.appendStream", 0), 1)
214+
self.assertEqual(self.received_requests.get("/chat.stopStream", 0), 1)
215+
216+
if hasattr(self.thread.server, "chat_stream_requests"):
217+
start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {})
218+
self.assertEqual(start_request.get("channel"), "C0123456789")
219+
self.assertEqual(start_request.get("thread_ts"), "123.000")
220+
self.assertEqual(
221+
json.dumps(start_request.get("chunks")),
222+
'[{"text": "**this is buffered**", "type": "markdown_text"}, {"id": "001", "status": "pending", "title": "Counting...", "type": "task_update"}]',
223+
)
224+
self.assertEqual(start_request.get("recipient_team_id"), "T0123456789")
225+
self.assertEqual(start_request.get("recipient_user_id"), "U0123456789")
226+
227+
append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {})
228+
self.assertEqual(append_request.get("channel"), "C0123456789")
229+
self.assertEqual(append_request.get("ts"), "123.123")
230+
self.assertEqual(
231+
json.dumps(append_request.get("chunks")),
232+
'[{"text": "**this is unbuffered**", "type": "markdown_text"}]',
233+
)
234+
235+
stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {})
236+
self.assertEqual(stop_request.get("channel"), "C0123456789")
237+
self.assertEqual(stop_request.get("ts"), "123.123")
238+
self.assertEqual(
239+
json.dumps(stop_request.get("chunks")),
240+
'[{"text": "\\n", "type": "markdown_text"}, {"text": ":space_invader:", "type": "markdown_text"}]',
241+
)
242+
169243
def test_streams_errors_when_appending_to_an_unstarted_stream(self):
170244
streamer = self.client.chat_stream(
171245
channel="C0123456789",

0 commit comments

Comments
 (0)