Module slack_sdk.web.async_chat_stream
Classes
class AsyncChatStream (client: AsyncWebClient,
*,
channel: str,
logger: logging.Logger,
thread_ts: str,
buffer_size: int,
recipient_team_id: str | None = None,
recipient_user_id: str | None = None,
**kwargs)-
Expand source code
class AsyncChatStream: """A helper class for streaming markdown text into a conversation using the chat streaming APIs. This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API methods, with automatic buffering and state management. """ def __init__( self, client: "AsyncWebClient", *, channel: str, logger: logging.Logger, thread_ts: str, buffer_size: int, recipient_team_id: Optional[str] = None, recipient_user_id: Optional[str] = None, **kwargs, ): """Initialize a new ChatStream instance. The __init__ method creates a unique ChatStream instance that keeps track of one chat stream. Args: client: The WebClient instance to use for API calls. channel: An encoded ID that represents a channel, private group, or DM. logger: A logging channel for outputs. thread_ts: Provide another message's ts value to reply to. Streamed messages should always be replies to a user request. recipient_team_id: The encoded ID of the team the user receiving the streaming text belongs to. Required when streaming to channels. recipient_user_id: The encoded ID of the user to receive the streaming text. Required when streaming to channels. buffer_size: The length of markdown_text to buffer in-memory before calling a method. Increasing this value decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits. **kwargs: Additional arguments passed to the underlying API calls. """ self._client = client self._logger = logger self._token: Optional[str] = kwargs.pop("token", None) self._stream_args = { "channel": channel, "thread_ts": thread_ts, "recipient_team_id": recipient_team_id, "recipient_user_id": recipient_user_id, **kwargs, } self._buffer = "" self._state = "starting" self._stream_ts: Optional[str] = None self._buffer_size = buffer_size async def append( self, *, markdown_text: str, **kwargs, ) -> Optional[AsyncSlackResponse]: """Append to the stream. The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream is stopped this method cannot be called. Args: markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far. **kwargs: Additional arguments passed to the underlying API calls. Returns: AsyncSlackResponse if the buffer was flushed, None if buffering. Raises: SlackRequestError: If the stream is already completed. Example: ```python streamer = client.chat_stream( channel="C0123456789", thread_ts="1700000001.123456", recipient_team_id="T0123456789", recipient_user_id="U0123456789", ) streamer.append(markdown_text="**hello wo") streamer.append(markdown_text="rld!**") streamer.stop() ``` """ if self._state == "completed": raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}") if kwargs.get("token"): self._token = kwargs.pop("token") self._buffer += markdown_text if len(self._buffer) >= self._buffer_size: return await self._flush_buffer(**kwargs) details = { "buffer_length": len(self._buffer), "buffer_size": self._buffer_size, "channel": self._stream_args.get("channel"), "recipient_team_id": self._stream_args.get("recipient_team_id"), "recipient_user_id": self._stream_args.get("recipient_user_id"), "thread_ts": self._stream_args.get("thread_ts"), } self._logger.debug(f"ChatStream appended to buffer: {json.dumps(details)}") return None async def stop( self, *, markdown_text: Optional[str] = None, blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None, metadata: Optional[Union[Dict, Metadata]] = None, **kwargs, ) -> AsyncSlackResponse: """Stop the stream and finalize the message. Args: blocks: A list of blocks that will be rendered at the bottom of the finalized message. markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far. metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you post to Slack is accessible to any app or user who is a member of that workspace. **kwargs: Additional arguments passed to the underlying API calls. Returns: AsyncSlackResponse from the chat.stopStream API call. Raises: SlackRequestError: If the stream is already completed. Example: ```python streamer = client.chat_stream( channel="C0123456789", thread_ts="1700000001.123456", recipient_team_id="T0123456789", recipient_user_id="U0123456789", ) streamer.append(markdown_text="**hello wo") streamer.append(markdown_text="rld!**") streamer.stop() ``` """ if self._state == "completed": raise e.SlackRequestError(f"Cannot stop stream: stream state is {self._state}") if kwargs.get("token"): self._token = kwargs.pop("token") if markdown_text: self._buffer += markdown_text if not self._stream_ts: response = await self._client.chat_startStream( **self._stream_args, token=self._token, ) if not response.get("ts"): raise e.SlackRequestError("Failed to stop stream: stream not started") self._stream_ts = str(response["ts"]) self._state = "in_progress" response = await self._client.chat_stopStream( token=self._token, channel=self._stream_args["channel"], ts=self._stream_ts, blocks=blocks, markdown_text=self._buffer, metadata=metadata, **kwargs, ) self._state = "completed" return response async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse: """Flush the internal buffer by making appropriate API calls.""" if not self._stream_ts: response = await self._client.chat_startStream( **self._stream_args, token=self._token, **kwargs, markdown_text=self._buffer, ) self._stream_ts = response.get("ts") self._state = "in_progress" else: response = await self._client.chat_appendStream( token=self._token, channel=self._stream_args["channel"], ts=self._stream_ts, **kwargs, markdown_text=self._buffer, ) self._buffer = "" return response
A helper class for streaming markdown text into a conversation using the chat streaming APIs.
This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API methods, with automatic buffering and state management.
Initialize a new ChatStream instance.
The init method creates a unique ChatStream instance that keeps track of one chat stream.
Args
client
- The WebClient instance to use for API calls.
channel
- An encoded ID that represents a channel, private group, or DM.
logger
- A logging channel for outputs.
thread_ts
- Provide another message's ts value to reply to. Streamed messages should always be replies to a user request.
recipient_team_id
- The encoded ID of the team the user receiving the streaming text belongs to. Required when streaming to channels.
recipient_user_id
- The encoded ID of the user to receive the streaming text. Required when streaming to channels.
buffer_size
- The length of markdown_text to buffer in-memory before calling a method. Increasing this value decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits.
**kwargs
- Additional arguments passed to the underlying API calls.
Methods
async def append(self, *, markdown_text: str, **kwargs) ‑> AsyncSlackResponse | None
-
Expand source code
async def append( self, *, markdown_text: str, **kwargs, ) -> Optional[AsyncSlackResponse]: """Append to the stream. The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream is stopped this method cannot be called. Args: markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far. **kwargs: Additional arguments passed to the underlying API calls. Returns: AsyncSlackResponse if the buffer was flushed, None if buffering. Raises: SlackRequestError: If the stream is already completed. Example: ```python streamer = client.chat_stream( channel="C0123456789", thread_ts="1700000001.123456", recipient_team_id="T0123456789", recipient_user_id="U0123456789", ) streamer.append(markdown_text="**hello wo") streamer.append(markdown_text="rld!**") streamer.stop() ``` """ if self._state == "completed": raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}") if kwargs.get("token"): self._token = kwargs.pop("token") self._buffer += markdown_text if len(self._buffer) >= self._buffer_size: return await self._flush_buffer(**kwargs) details = { "buffer_length": len(self._buffer), "buffer_size": self._buffer_size, "channel": self._stream_args.get("channel"), "recipient_team_id": self._stream_args.get("recipient_team_id"), "recipient_user_id": self._stream_args.get("recipient_user_id"), "thread_ts": self._stream_args.get("thread_ts"), } self._logger.debug(f"ChatStream appended to buffer: {json.dumps(details)}") return None
Append to the stream.
The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream is stopped this method cannot be called.
Args
markdown_text
- Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far.
**kwargs
- Additional arguments passed to the underlying API calls.
Returns
AsyncSlackResponse if the buffer was flushed, None if buffering.
Raises
SlackRequestError
- If the stream is already completed.
Example
streamer = client.chat_stream( channel="C0123456789", thread_ts="1700000001.123456", recipient_team_id="T0123456789", recipient_user_id="U0123456789", ) streamer.append(markdown_text="**hello wo") streamer.append(markdown_text="rld!**") streamer.stop()
async def stop(self,
*,
markdown_text: str | None = None,
blocks: str | Sequence[Dict | Block] | None = None,
metadata: Dict | Metadata | None = None,
**kwargs) ‑> AsyncSlackResponse-
Expand source code
async def stop( self, *, markdown_text: Optional[str] = None, blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None, metadata: Optional[Union[Dict, Metadata]] = None, **kwargs, ) -> AsyncSlackResponse: """Stop the stream and finalize the message. Args: blocks: A list of blocks that will be rendered at the bottom of the finalized message. markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far. metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you post to Slack is accessible to any app or user who is a member of that workspace. **kwargs: Additional arguments passed to the underlying API calls. Returns: AsyncSlackResponse from the chat.stopStream API call. Raises: SlackRequestError: If the stream is already completed. Example: ```python streamer = client.chat_stream( channel="C0123456789", thread_ts="1700000001.123456", recipient_team_id="T0123456789", recipient_user_id="U0123456789", ) streamer.append(markdown_text="**hello wo") streamer.append(markdown_text="rld!**") streamer.stop() ``` """ if self._state == "completed": raise e.SlackRequestError(f"Cannot stop stream: stream state is {self._state}") if kwargs.get("token"): self._token = kwargs.pop("token") if markdown_text: self._buffer += markdown_text if not self._stream_ts: response = await self._client.chat_startStream( **self._stream_args, token=self._token, ) if not response.get("ts"): raise e.SlackRequestError("Failed to stop stream: stream not started") self._stream_ts = str(response["ts"]) self._state = "in_progress" response = await self._client.chat_stopStream( token=self._token, channel=self._stream_args["channel"], ts=self._stream_ts, blocks=blocks, markdown_text=self._buffer, metadata=metadata, **kwargs, ) self._state = "completed" return response
Stop the stream and finalize the message.
Args
blocks
- A list of blocks that will be rendered at the bottom of the finalized message.
markdown_text
- Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far.
metadata
- JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you post to Slack is accessible to any app or user who is a member of that workspace.
**kwargs
- Additional arguments passed to the underlying API calls.
Returns
AsyncSlackResponse from the chat.stopStream API call.
Raises
SlackRequestError
- If the stream is already completed.
Example
streamer = client.chat_stream( channel="C0123456789", thread_ts="1700000001.123456", recipient_team_id="T0123456789", recipient_user_id="U0123456789", ) streamer.append(markdown_text="**hello wo") streamer.append(markdown_text="rld!**") streamer.stop()