""" Classes for crafting messages from the bot. Content can change as information changes, and mods can perform actions on the message via emoji reactions. """ from datetime import datetime from typing import Any, Optional, Union from discord import Guild, Message, PartialEmoji, TextChannel from config import CONFIG from rocketbot.storage import ConfigKey, Storage from rocketbot.utils import bot_log class BotMessageReaction: """ A possible reaction to a bot message that will trigger an action. The list of available reactions will be listed at the end of a BotMessage. When a mod reacts to the message with the emote, something can happen. If the reaction is disabled, reactions will not register. The description will still show up in the message, but no emoji is shown. This can be used to explain why an action is no longer available. """ def __init__(self, emoji: str, is_enabled: bool, description: str): """ Creates a bot message reaction. - emoji: The emoji string - is_enabled: Whether the emoji can be used - description: What reacting with this emoji does (or an explanation of why it's disabled) """ self.emoji: str = emoji self.is_enabled: bool = is_enabled self.description: str = description def __eq__(self, other): return other is not None and \ other.emoji == self.emoji and \ other.is_enabled == self.is_enabled and \ other.description == self.description @classmethod def standard_set(cls, did_delete: bool = None, message_count: int = 1, did_kick: bool = None, did_ban: bool = None, user_count: int = 1) -> list: # list[BotMessageReaction]: """ Convenience factory for generating any of the three most common commands: delete message(s), kick user(s), and ban user(s). All arguments are optional. Resulting list can be passed directly to `BotMessage.set_reactions()`. Params - did_delete Whether the message(s) have been deleted. Pass True or False if this applies, omit to leave out delete action. - message_count How many messages there are. Used for pluralizing description. Defaults to 1. Omit if n/a. - did_kick Whether the user(s) have been kicked. Pass True or False if this applies, omit to leave out kick action. - did_ban Whether the user(s) have been banned. Pass True or False if this applies, omit to leave out ban action. - user_count How many users there are. Used for pluralizing description. Defaults to 1. Omit if n/a. """ reactions: list[BotMessageReaction] = [] if did_delete is not None: if did_delete: reactions.append(BotMessageReaction( CONFIG['trash_emoji'], False, 'Message deleted' if message_count == 1 else 'Messages deleted')) else: reactions.append(BotMessageReaction( CONFIG['trash_emoji'], True, 'Delete message' if message_count == 1 else 'Delete messages')) if did_kick is not None: if did_ban is not None and did_ban: # Don't show kick option at all if we also banned pass elif did_kick: reactions.append(BotMessageReaction( CONFIG['kick_emoji'], False, 'User kicked' if user_count == 1 else 'Users kicked')) else: reactions.append(BotMessageReaction( CONFIG['kick_emoji'], True, 'Kick user' if user_count == 1 else 'Kick users')) if did_ban is not None: if did_ban: reactions.append(BotMessageReaction( CONFIG['ban_emoji'], False, 'User banned' if user_count == 1 else 'Users banned')) else: reactions.append(BotMessageReaction( CONFIG['ban_emoji'], True, 'Ban user' if user_count == 1 else 'Ban users')) return reactions class BotMessage: """ Holds state for a bot-generated message. A message is composed, sent via `BaseCog.post_message()`, and can later be updated. A message consists of a type (e.g. info, warning), text, optional quoted text (such as the content of a flagged message), and an optional list of actions that can be taken via a mod reacting to the message. """ TYPE_DEFAULT: int = 0 TYPE_INFO: int = 1 TYPE_MOD_WARNING: int = 2 TYPE_SUCCESS: int = 3 TYPE_FAILURE: int = 4 TYPE_LOG: int = 5 def __init__(self, guild: Guild, text: str, type: int = TYPE_DEFAULT, # pylint: disable=redefined-builtin context: Optional[Any] = None, reply_to: Optional[Message] = None, suppress_embeds: bool = False): """ Creates a bot message. - guild: The Discord guild to send the message to. - text: Main text of the message. - type: One of the TYPE_ constants. - context: Arbitrary value that will be passed in the callback. Can be associated data for performing some action. (optional) - reply_to: Existing message this message is in reply to. (optional) """ self.guild: Guild = guild self.text: str = text self.type: int = type self.context: Optional[Any] = context self.quote: Optional[str] = None self.source_cog = None # Set by `BaseCog.post_message()` self.__posted_text: list[str] = [] # last text posted, to test for changes self.__posted_emoji: set[str] = set() # last emoji list posted self.__messages: list[Message] = [] # set once the message has been posted self.__reply_to: Optional[Message] = reply_to self.__suppress_embeds = suppress_embeds self.__reactions: list[BotMessageReaction] = [] def is_sent(self) -> bool: """ Returns whether this message has been sent to the guild. This may continue returning False even after calling BaseCog.post_message if the guild has no configured warning channel. """ return len(self.__messages) > 0 def message_ids(self) -> list[int]: """Returns the ids of all actual Messages sent. One bot message may be broken into multiple Discord messages.""" return [ m.id for m in self.__messages ] def message_sent_at(self) -> Optional[datetime]: """Returns when the message was sent or None if not sent.""" return self.__messages[0].created_at if len(self.__messages) > 0 else None def has_reactions(self) -> bool: """Whether this message has any reactions defined.""" return len(self.__reactions) > 0 async def set_text(self, new_text: str) -> None: """ Replaces the text of this message. If the message has been sent, it will be updated. """ self.text = new_text await self.update_if_sent() async def set_reactions(self, reactions: list[BotMessageReaction]) -> None: """ Replaces all BotMessageReactions with a new list. If the message has been sent, it will be updated. """ if reactions == self.__reactions: # No change return self.__reactions = reactions.copy() if reactions is not None else [] await self.update_if_sent() async def add_reaction(self, reaction: BotMessageReaction) -> None: """ Adds one BotMessageReaction to this message. If a reaction already exists for the given emoji it is replaced with the new one. If the message has been sent, it will be updated. """ # Alias for update. Makes for clearer intent. await self.update_reaction(reaction) async def update_reaction(self, reaction: BotMessageReaction) -> None: """ Updates or adds a BotMessageReaction. If the message has been sent, it will be updated. """ found = False for i, existing in enumerate(self.__reactions): if existing.emoji == reaction.emoji: if reaction == self.__reactions[i]: # No change return self.__reactions[i] = reaction found = True break if not found: self.__reactions.append(reaction) await self.update_if_sent() async def remove_reaction(self, reaction_or_emoji: Union[BotMessageReaction, str]) -> None: """ Removes a reaction. Can pass either a BotMessageReaction or just the emoji string. If the message has been sent, it will be updated. """ for i, existing in enumerate(self.__reactions): if (isinstance(reaction_or_emoji, str) and existing.emoji == reaction_or_emoji) or \ (isinstance(reaction_or_emoji, BotMessageReaction) and existing.emoji == reaction_or_emoji.emoji): self.__reactions.pop(i) await self.update_if_sent() return def reaction_for_emoji(self, emoji) -> Optional[BotMessageReaction]: """ Finds the BotMessageReaction for the given emoji or None if not found. Accepts either a PartialEmoji or str. """ for reaction in self.__reactions: if isinstance(emoji, PartialEmoji) and reaction.emoji == emoji.name: return reaction if isinstance(emoji, str) and reaction.emoji == emoji: return reaction return None async def update_if_sent(self) -> None: """ Updates the text and/or reactions on a message if it was sent to the guild, otherwise does nothing. Does not need to be called by BaseCog subclasses. """ if len(self.__messages) > 0: await self.update() async def update(self) -> None: """ Sends or updates an already sent message based on BotMessage state. Does not need to be called by BaseCog subclasses. """ message_bodies: list[str] = self.__formatted_message() if len(self.__messages) > 0: if message_bodies != self.__posted_text: while len(self.__messages) > len(message_bodies): last_message = self.__messages.pop(-1) await last_message.delete() del Storage.get_bot_messages(self.guild)[last_message.id] for i in range(min(len(message_bodies), len(self.__messages))): await self.__messages[i].edit(content=message_bodies[i]) while len(self.__messages) < len(message_bodies): body = message_bodies[len(self.__messages)] message = await self.__messages[0].channel.send(content=body, suppress_embeds=self.__suppress_embeds) Storage.get_bot_messages(self.guild)[message.id] = self self.__messages.append(message) self.__posted_text = message_bodies else: # No messages posted yet channel: Optional[TextChannel] = None for index, body in enumerate(message_bodies): if index == 0 and self.__reply_to: message = await self.__reply_to.reply(content=body, mention_author=False) Storage.get_bot_messages(self.guild)[message.id] = self self.__messages.append(message) channel = self.__reply_to.channel else: if channel is None: channel_id = Storage.get_config_value(self.guild, ConfigKey.WARNING_CHANNEL_ID) if channel_id is None: bot_log(self.guild, type(self.source_cog) if self.source_cog else None, '\u0007No warning channel set! No warning issued.') return channel: TextChannel = self.guild.get_channel(channel_id) or await self.guild.fetch_channel(channel_id) if channel is None: bot_log(self.guild, type(self.source_cog) if self.source_cog else None, f'\u0007Configured warning channel does not exist for guild {self.guild.name} ({self.guild.id})!') return message = await channel.send(content=body, suppress_embeds=self.__suppress_embeds) Storage.get_bot_messages(self.guild)[message.id] = self self.__messages.append(message) self.__posted_text = message_bodies emoji_to_remove = self.__posted_emoji.copy() for reaction in self.__reactions: if reaction.is_enabled: if reaction.emoji not in self.__posted_emoji: await self.__messages[-1].add_reaction(reaction.emoji) self.__posted_emoji.add(reaction.emoji) if reaction.emoji in emoji_to_remove: emoji_to_remove.remove(reaction.emoji) for emoji in emoji_to_remove: await self.__messages[-1].clear_reaction(emoji) if emoji in self.__posted_emoji: self.__posted_emoji.remove(emoji) def __formatted_message(self) -> list[str]: """ Composes the entire message Markdown from components. Includes the main message, quoted text, summary of available reactions, etc. Returned as array of message bodies small enough to fit in a Discord message. """ s: str = '' if self.type == self.TYPE_INFO: s += CONFIG['info_emoji'] + ' ' elif self.type == self.TYPE_MOD_WARNING: mention: str = Storage.get_config_value(self.guild, ConfigKey.WARNING_MENTION) if mention: s += mention + ' ' s += CONFIG['warning_emoji'] + ' ' elif self.type == self.TYPE_SUCCESS: s += CONFIG['success_emoji'] + ' ' elif self.type == self.TYPE_FAILURE: s += CONFIG['failure_emoji'] + ' ' elif self.type == self.TYPE_LOG: s += CONFIG['log_emoji'] + ' ' s += self.text if self.quote: quoted = '\n> '.join(self.quote.splitlines()) s += f'\n\n> {quoted}' if len(self.__reactions) > 0: s += '\n\nAvailable actions:' for reaction in self.__reactions: if reaction.is_enabled: s += f'\n {reaction.emoji} {reaction.description}' else: s += f'\n {reaction.description}' # API complains if *request* is >2000. Unsure how much of that payload is overhead, so will define a max length # conservatively of 85-90% of that. ideal_message_length = 1700 max_message_length = 1800 # First time cutting up message bodies = [s] while len(bodies[-1]) > max_message_length: # Try to cut at last newline before length limit, otherwise last space, otherwise hard cut at limit last_body = bodies.pop(-1) cut_before = ideal_message_length cut_after = ideal_message_length last_newline_index = last_body.rfind('\n', max_message_length // 2, max_message_length) if last_newline_index >= 0: cut_before = last_newline_index cut_after = last_newline_index + 1 else: last_space_index = last_body.rfind(' ', max_message_length // 2, max_message_length) if last_space_index >= 0: cut_before = last_space_index cut_after = last_space_index + 1 body = last_body[:cut_before].strip() remainder = last_body[cut_after:].strip() while body.endswith('\n>'): body = body[:-2] while remainder.endswith('\n>'): remainder = remainder[:-2] bodies.append(body) bodies.append(remainder) return bodies