Module slack_sdk.web.chat_stream

Classes

class ChatStream (client: WebClient,
*,
channel: str,
logger: logging.Logger,
thread_ts: str,
buffer_size: int,
recipient_team_id: str | None = None,
recipient_user_id: str | None = None,
**kwargs)
Expand source code
class ChatStream:
    """A helper class for streaming markdown text into a conversation using the chat streaming APIs.

    This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API
    methods, with automatic buffering and state management.
    """

    def __init__(
        self,
        client: "WebClient",
        *,
        channel: str,
        logger: logging.Logger,
        thread_ts: str,
        buffer_size: int,
        recipient_team_id: Optional[str] = None,
        recipient_user_id: Optional[str] = None,
        **kwargs,
    ):
        """Initialize a new ChatStream instance.

        The __init__ method creates a unique ChatStream instance that keeps track of one chat stream.

        Args:
            client: The WebClient instance to use for API calls.
            channel: An encoded ID that represents a channel, private group, or DM.
            logger: A logging channel for outputs.
            thread_ts: Provide another message's ts value to reply to. Streamed messages should always be replies to a user
              request.
            recipient_team_id: The encoded ID of the team the user receiving the streaming text belongs to. Required when
              streaming to channels.
            recipient_user_id: The encoded ID of the user to receive the streaming text. Required when streaming to channels.
            buffer_size: The length of markdown_text to buffer in-memory before calling a method. Increasing this value
              decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits.
            **kwargs: Additional arguments passed to the underlying API calls.
        """
        self._client = client
        self._logger = logger
        self._token: Optional[str] = kwargs.pop("token", None)
        self._stream_args = {
            "channel": channel,
            "thread_ts": thread_ts,
            "recipient_team_id": recipient_team_id,
            "recipient_user_id": recipient_user_id,
            **kwargs,
        }
        self._buffer = ""
        self._state = "starting"
        self._stream_ts: Optional[str] = None
        self._buffer_size = buffer_size

    def append(
        self,
        *,
        markdown_text: str,
        **kwargs,
    ) -> Optional[SlackResponse]:
        """Append to the stream.

        The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream
        is stopped this method cannot be called.

        Args:
            markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
              what will be appended to the message received so far.
            **kwargs: Additional arguments passed to the underlying API calls.

        Returns:
            SlackResponse if the buffer was flushed, None if buffering.

        Raises:
            SlackRequestError: If the stream is already completed.

        Example:
            ```python
            streamer = client.chat_stream(
                channel="C0123456789",
                thread_ts="1700000001.123456",
                recipient_team_id="T0123456789",
                recipient_user_id="U0123456789",
            )
            streamer.append(markdown_text="**hello wo")
            streamer.append(markdown_text="rld!**")
            streamer.stop()
            ```
        """
        if self._state == "completed":
            raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
        if kwargs.get("token"):
            self._token = kwargs.pop("token")
        self._buffer += markdown_text
        if len(self._buffer) >= self._buffer_size:
            return self._flush_buffer(**kwargs)
        details = {
            "buffer_length": len(self._buffer),
            "buffer_size": self._buffer_size,
            "channel": self._stream_args.get("channel"),
            "recipient_team_id": self._stream_args.get("recipient_team_id"),
            "recipient_user_id": self._stream_args.get("recipient_user_id"),
            "thread_ts": self._stream_args.get("thread_ts"),
        }
        self._logger.debug(f"ChatStream appended to buffer: {json.dumps(details)}")
        return None

    def stop(
        self,
        *,
        markdown_text: Optional[str] = None,
        blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
        metadata: Optional[Union[Dict, Metadata]] = None,
        **kwargs,
    ) -> SlackResponse:
        """Stop the stream and finalize the message.

        Args:
            blocks: A list of blocks that will be rendered at the bottom of the finalized message.
            markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
              what will be appended to the message received so far.
            metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
              post to Slack is accessible to any app or user who is a member of that workspace.
            **kwargs: Additional arguments passed to the underlying API calls.

        Returns:
            SlackResponse from the chat.stopStream API call.

        Raises:
            SlackRequestError: If the stream is already completed.

        Example:
            ```python
            streamer = client.chat_stream(
                channel="C0123456789",
                thread_ts="1700000001.123456",
                recipient_team_id="T0123456789",
                recipient_user_id="U0123456789",
            )
            streamer.append(markdown_text="**hello wo")
            streamer.append(markdown_text="rld!**")
            streamer.stop()
            ```
        """
        if self._state == "completed":
            raise e.SlackRequestError(f"Cannot stop stream: stream state is {self._state}")
        if kwargs.get("token"):
            self._token = kwargs.pop("token")
        if markdown_text:
            self._buffer += markdown_text
        if not self._stream_ts:
            response = self._client.chat_startStream(
                **self._stream_args,
                token=self._token,
            )
            if not response.get("ts"):
                raise e.SlackRequestError("Failed to stop stream: stream not started")
            self._stream_ts = str(response["ts"])
            self._state = "in_progress"
        response = self._client.chat_stopStream(
            token=self._token,
            channel=self._stream_args["channel"],
            ts=self._stream_ts,
            blocks=blocks,
            markdown_text=self._buffer,
            metadata=metadata,
            **kwargs,
        )
        self._state = "completed"
        return response

    def _flush_buffer(self, **kwargs) -> SlackResponse:
        """Flush the internal buffer by making appropriate API calls."""
        if not self._stream_ts:
            response = self._client.chat_startStream(
                **self._stream_args,
                token=self._token,
                **kwargs,
                markdown_text=self._buffer,
            )
            self._stream_ts = response.get("ts")
            self._state = "in_progress"
        else:
            response = self._client.chat_appendStream(
                token=self._token,
                channel=self._stream_args["channel"],
                ts=self._stream_ts,
                **kwargs,
                markdown_text=self._buffer,
            )
        self._buffer = ""
        return response

A helper class for streaming markdown text into a conversation using the chat streaming APIs.

This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API methods, with automatic buffering and state management.

Initialize a new ChatStream instance.

The init method creates a unique ChatStream instance that keeps track of one chat stream.

Args

client
The WebClient instance to use for API calls.
channel
An encoded ID that represents a channel, private group, or DM.
logger
A logging channel for outputs.
thread_ts
Provide another message's ts value to reply to. Streamed messages should always be replies to a user request.
recipient_team_id
The encoded ID of the team the user receiving the streaming text belongs to. Required when streaming to channels.
recipient_user_id
The encoded ID of the user to receive the streaming text. Required when streaming to channels.
buffer_size
The length of markdown_text to buffer in-memory before calling a method. Increasing this value decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits.
**kwargs
Additional arguments passed to the underlying API calls.

Methods

def append(self, *, markdown_text: str, **kwargs) ‑> SlackResponse | None
Expand source code
def append(
    self,
    *,
    markdown_text: str,
    **kwargs,
) -> Optional[SlackResponse]:
    """Append to the stream.

    The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream
    is stopped this method cannot be called.

    Args:
        markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
          what will be appended to the message received so far.
        **kwargs: Additional arguments passed to the underlying API calls.

    Returns:
        SlackResponse if the buffer was flushed, None if buffering.

    Raises:
        SlackRequestError: If the stream is already completed.

    Example:
        ```python
        streamer = client.chat_stream(
            channel="C0123456789",
            thread_ts="1700000001.123456",
            recipient_team_id="T0123456789",
            recipient_user_id="U0123456789",
        )
        streamer.append(markdown_text="**hello wo")
        streamer.append(markdown_text="rld!**")
        streamer.stop()
        ```
    """
    if self._state == "completed":
        raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
    if kwargs.get("token"):
        self._token = kwargs.pop("token")
    self._buffer += markdown_text
    if len(self._buffer) >= self._buffer_size:
        return self._flush_buffer(**kwargs)
    details = {
        "buffer_length": len(self._buffer),
        "buffer_size": self._buffer_size,
        "channel": self._stream_args.get("channel"),
        "recipient_team_id": self._stream_args.get("recipient_team_id"),
        "recipient_user_id": self._stream_args.get("recipient_user_id"),
        "thread_ts": self._stream_args.get("thread_ts"),
    }
    self._logger.debug(f"ChatStream appended to buffer: {json.dumps(details)}")
    return None

Append to the stream.

The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream is stopped this method cannot be called.

Args

markdown_text
Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far.
**kwargs
Additional arguments passed to the underlying API calls.

Returns

SlackResponse if the buffer was flushed, None if buffering.

Raises

SlackRequestError
If the stream is already completed.

Example

streamer = client.chat_stream(
    channel="C0123456789",
    thread_ts="1700000001.123456",
    recipient_team_id="T0123456789",
    recipient_user_id="U0123456789",
)
streamer.append(markdown_text="**hello wo")
streamer.append(markdown_text="rld!**")
streamer.stop()
def stop(self,
*,
markdown_text: str | None = None,
blocks: str | Sequence[Dict | Block] | None = None,
metadata: Dict | Metadata | None = None,
**kwargs) ‑> SlackResponse
Expand source code
def stop(
    self,
    *,
    markdown_text: Optional[str] = None,
    blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
    metadata: Optional[Union[Dict, Metadata]] = None,
    **kwargs,
) -> SlackResponse:
    """Stop the stream and finalize the message.

    Args:
        blocks: A list of blocks that will be rendered at the bottom of the finalized message.
        markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
          what will be appended to the message received so far.
        metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
          post to Slack is accessible to any app or user who is a member of that workspace.
        **kwargs: Additional arguments passed to the underlying API calls.

    Returns:
        SlackResponse from the chat.stopStream API call.

    Raises:
        SlackRequestError: If the stream is already completed.

    Example:
        ```python
        streamer = client.chat_stream(
            channel="C0123456789",
            thread_ts="1700000001.123456",
            recipient_team_id="T0123456789",
            recipient_user_id="U0123456789",
        )
        streamer.append(markdown_text="**hello wo")
        streamer.append(markdown_text="rld!**")
        streamer.stop()
        ```
    """
    if self._state == "completed":
        raise e.SlackRequestError(f"Cannot stop stream: stream state is {self._state}")
    if kwargs.get("token"):
        self._token = kwargs.pop("token")
    if markdown_text:
        self._buffer += markdown_text
    if not self._stream_ts:
        response = self._client.chat_startStream(
            **self._stream_args,
            token=self._token,
        )
        if not response.get("ts"):
            raise e.SlackRequestError("Failed to stop stream: stream not started")
        self._stream_ts = str(response["ts"])
        self._state = "in_progress"
    response = self._client.chat_stopStream(
        token=self._token,
        channel=self._stream_args["channel"],
        ts=self._stream_ts,
        blocks=blocks,
        markdown_text=self._buffer,
        metadata=metadata,
        **kwargs,
    )
    self._state = "completed"
    return response

Stop the stream and finalize the message.

Args

blocks
A list of blocks that will be rendered at the bottom of the finalized message.
markdown_text
Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far.
metadata
JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you post to Slack is accessible to any app or user who is a member of that workspace.
**kwargs
Additional arguments passed to the underlying API calls.

Returns

SlackResponse from the chat.stopStream API call.

Raises

SlackRequestError
If the stream is already completed.

Example

streamer = client.chat_stream(
    channel="C0123456789",
    thread_ts="1700000001.123456",
    recipient_team_id="T0123456789",
    recipient_user_id="U0123456789",
)
streamer.append(markdown_text="**hello wo")
streamer.append(markdown_text="rld!**")
streamer.stop()