Source code for slack_transfer.functions.common

import warnings
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.web import SlackResponse


[docs]def get_channels_list(client: WebClient) -> List[Dict]: """get channel list. Args: client (WebClient): client. If use this via any Client Class, self is automatically set. Thus, ignore this. Yields: List[Dict]: List of channel info. The details of channel info is listed: https://api.slack.com/types/channel (see the dictionary corresponding to channel key.) """ channels: List[Dict] = [] next_cursor: Optional[str] = None while True: response: SlackResponse = client.conversations_list( cursor=next_cursor, types="public_channel,private_channel" ) if not response["ok"]: raise IOError("channel list cannot be fetched in downloading WS data.") channels.extend(response["channels"]) if "response_metadata" in response: next_cursor = response["response_metadata"]["next_cursor"] if next_cursor == "": break else: break return channels
[docs]def get_replies(client: WebClient, channel_id: str, ts: str) -> List[Dict]: """get replies to the specific message. Args: client (WebClient): client. If use this via any Client Class, self is automatically set. Thus, ignore this. channel_id (str): channel id (usually 9-digits string) ts (str): ts of the specific message. ts is a unix timestamp in the format of float. But, here, string for it is accepted. Yields: List[Dict]: List of the reply messages. The details of message is listed: https://api.slack.com/reference/messaging/payload """ messages: List[Dict] = [] next_cursor: Optional[str] = None while True: response = client.conversations_replies( channel=channel_id, ts=ts, cursor=next_cursor ) if not response["ok"]: raise IOError( f"replies cannot be fetched in downloading WS data. (channel_id: {channel_id}, ts: {ts})" ) messages.extend(response["messages"]) if "response_metadata" in response: next_cursor = response["response_metadata"]["next_cursor"] if next_cursor == "": break else: break return messages
[docs]def get_file_volumes( client: WebClient, channel_ids: Optional[Union[str, List[str]]] = None, auto_join: bool = False, ) -> float: """Calculate the total file volumes (bytes) Args: client (WebClient): client. If use this via any Client Class, self is automatically set. Thus, ignore this. channel_ids (Optional; str or List[str]; default=None): channel id list for calculation. If not set, calculate all accessible channels. auto_join (bool; default=False): when bot is not in channel, automatically join if possible (public channel). This feature is disabled for :class:`slack_transfer._base.CommonDryRunClient` Yields: float: total file volumes (bytes) """ if channel_ids is None: channels = get_channels_list(client=client) channel_ids = list(map(lambda x: x["id"], channels)) elif isinstance(channel_ids, str): channel_ids = [channel_ids] else: channel_ids = channel_ids volume_dict: Dict[str, int] = {} for channel_id in channel_ids: next_cursor: Optional[str] = None while True: try: response: SlackResponse = client.files_list( channel=channel_id, cursor=next_cursor ) except SlackApiError as e: if e.response["error"] == "not_in_channel": if auto_join: try: client.conversations_join(channel=channel_id) continue except: pass warnings.warn(f"slack bot is not in `{channel_id}`. Skip this.") break else: raise e if not response["ok"]: raise IOError("channel list cannot be fetched.") volume_dict.update( dict(map(lambda x: (x["id"], x["size"]), response["files"])) ) if "response_metadata" in response: next_cursor = response["response_metadata"]["next_cursor"] if next_cursor == "": break else: break return sum(volume_dict.values())
[docs]def test_connection(client: WebClient) -> None: """test connections to slack. This test including common scope among all client. The token set to the client use this method requires the following scope: - channels:history - channels:join - channels:read - files:read - groups:history - groups:read If one of the scope is missing, this method rise an error. Those tests are performed step by step. It means that missing scope appears not at once. Args: client (WebClient): client. If use this via any Client Class, self is automatically set. Thus, ignore this. """ response = client.auth_test() if not response["ok"]: raise IOError("slack token is invalid.") print(f"Successfully access to `{response['team']}`") # channels:read,groups:read try: response = client.conversations_list(types="public_channel,private_channel") except SlackApiError as e: if e.response["error"] == "missing_scope": raise AttributeError("missing scope: channels:read,groups:read") else: raise e channel_cand_public = list(filter(lambda x: x["is_general"], response.data["channels"])) # type: ignore if len(channel_cand_public) == 0: raise ValueError("general type channel doesn't exist.") if len(channel_cand_public) != 1: raise AssertionError # channels:join try: client.conversations_join(channel=channel_cand_public[0]["id"]) except SlackApiError as e: if e.response["error"] != "method_not_supported_for_channel_type": raise e # channels:history for public try: client.conversations_history(channel=channel_cand_public[0]["id"], ts="1") except SlackApiError as e: if e.response["error"] == "missing_scope": raise AttributeError("missing scope: channels:history") else: raise e # files:read client.files_list() channel_cand_private = list(filter(lambda x: x["is_private"], response.data["channels"])) # type: ignore if len(channel_cand_private) == 0: warnings.warn("No private channel is tested") return # groups:history for private try: client.conversations_history(channel=channel_cand_private[0]["id"], ts="1") except SlackApiError as e: if e.response["error"] == "missing_scope": raise AttributeError("missing scope: groups:history") else: raise e
[docs]def test_downloader(client: WebClient) -> None: """test connections to slack as downloader. This test including common scope for DownloaderClient. The token set to the client use this method requires the following scope: - bookmarks:read - emoji:read - users:read If one of the scope is missing, this method rise an error. Those tests are performed step by step. It means that missing scope appears not at once. Args: client (WebClient): client. If use this via any Client Class, self is automatically set. Thus, ignore this. """ client.users_list() # channels:read,groups:read response = client.conversations_list(types="public_channel,private_channel") channel_cand_public = list(filter(lambda x: x["is_general"], response.data["channels"])) # type: ignore if len(channel_cand_public) == 0: raise ValueError("general type channel doesn't exist.") if len(channel_cand_public) != 1: raise AssertionError # bookmarks:read client.bookmarks_list(channel_id=channel_cand_public[0]["id"]) # emoji:read client.emoji_list() print("test for downloader is finished!")
[docs]def test_uploader(client: WebClient) -> None: """test connections to slack as uploader. This test including common scope for UploaderClient. The token set to the client use this method requires the following scope: - channels:manage - files:write - chat:write - pins:write - bookmarks:write - reaction:write If one of the scope is missing, this method rise an error. Those tests are performed step by step. It means that missing scope appears not at once. Args: client (WebClient): client. If use this via any Client Class, self is automatically set. Thus, ignore this. """ # channels:manage # conversations_setTopic and conversations_setPurpose is also the same scope try: client.conversations_create(name=":::", is_private=False) except SlackApiError as e: if e.response["error"] == "missing_scope": raise AttributeError("missing scope: channels:manage") elif e.response["error"] == "invalid_name_specials": pass else: raise e # try: # client.conversations_create(name=":::", is_private=True) # except SlackApiError as e: # if e.response["error"] == "missing_scope": # raise AttributeError(f"missing scope: groups:write") # elif e.response["error"] == "invalid_name_specials": # pass # else: # raise e # files:write try: client.files_upload(content="aaa", channels=":::::") except SlackApiError as e: if e.response["error"] == "missing_scope": raise e # files_list is tested in test_connection # chat:write try: client.chat_postMessage(channel=":::::", text="test") except SlackApiError as e: if e.response["error"] == "missing_scope": raise AttributeError("missing scope: chat:write") # pins:write try: client.pins_add(channel=":::::", timestamp="1") except SlackApiError as e: if e.response["error"] == "missing_scope": raise e # bookmarks:write try: client.bookmarks_add(channel_id=":::::", title="test", type="link") except SlackApiError as e: if e.response["error"] == "missing_scope": raise e # reaction:write try: client.reactions_add(channel=":::::", name="test", timestamp="1") except SlackApiError as e: if e.response["error"] == "missing_scope": raise e print("test for uploader is finished!")