from discord import Guild, Member, Message, PartialEmoji, RawReactionActionEvent, TextChannel from discord.ext import commands from datetime import datetime, timedelta from config import CONFIG from rscollections import AgeBoundDict from storage import ConfigKey, Storage import json class BotMessageReaction: """ A possible reaction to a bot message that will trigger an action. The list of available reactions will be listed at the end of a BotMessage. When a mod reacts to the message with the emote, something can happen. If the reaction is disabled, reactions will not register. The description will still show up in the message, but no emoji is shown. This can be used to explain why an action is no longer available. """ def __init__(self, emoji: str, is_enabled: bool, description: str): self.emoji = emoji self.is_enabled = is_enabled self.description = description def __eq__(self, other): return other is not None and \ other.emoji == self.emoji and \ other.is_enabled == self.is_enabled and \ other.description == self.description class BotMessage: """ Holds state for a bot-generated message. A message is composed, sent via `BaseCog.post_message()`, and can later be updated. A message consists of a type (e.g. info, warning), text, optional quoted text (such as the content of a flagged message), and an optional list of actions that can be taken via a mod reacting to the message. """ TYPE_DEFAULT = 0 TYPE_INFO = 1 TYPE_MOD_WARNING = 2 TYPE_SUCCESS = 3 TYPE_FAILURE = 4 def __init__(self, guild: Guild, text: str, type: int = 0, # TYPE_DEFAULT context = None, reply_to: Message = None): self.guild = guild self.text = text self.type = type self.context = context self.quote = None self.__posted_text = None # last text posted, to test for changes self.__posted_emoji = set() self.__message = None # Message self.__reply_to = reply_to self.__reactions = [] # BotMessageReaction[] def is_sent(self) -> bool: """ Returns whether this message has been sent to the guild. This may continue returning False even after calling BaseCog.post_message if the guild has no configured warning channel. """ return self.__message is not None def message_id(self): return self.__message.id if self.__message else None def message_sent_at(self): return self.__message.created_at if self.__message else None async def set_text(self, new_text: str) -> None: """ Replaces the text of this message. If the message has been sent, it will be updated. """ self.text = new_text await self.__update_if_sent() async def set_reactions(self, reactions: list) -> None: """ Replaces all BotMessageReactions with a new list. If the message has been sent, it will be updated. """ if reactions == self.__reactions: # No change return self.__reactions = reactions.copy() if reactions is not None else [] await self.__update_if_sent() async def add_reaction(self, reaction: BotMessageReaction) -> None: """ Adds one BotMessageReaction to this message. If a reaction already exists for the given emoji it is replaced with the new one. If the message has been sent, it will be updated. """ # Alias for update. Makes for clearer intent. await self.update_reaction(reaction) async def update_reaction(self, reaction: BotMessageReaction) -> None: """ Updates or adds a BotMessageReaction. If the message has been sent, it will be updated. """ found = False for i in range(len(self.__reactions)): existing = self.__reactions[i] if existing.emoji == reaction.emoji: if reaction == self.__reactions[i]: # No change return self.__reactions[i] = reaction found = True break if not found: self.__reactions.append(reaction) await self.__update_if_sent() async def remove_reaction(self, reaction_or_emoji) -> None: """ Removes a reaction. Can pass either a BotMessageReaction or just the emoji string. If the message has been sent, it will be updated. """ for i in range(len(self.__reactions)): existing = self.__reactions[i] if (isinstance(reaction_or_emoji, str) and existing.emoji == reaction_or_emoji) or \ (isinstance(reaction_or_emoji, BotMessageReaction) and existing.emoji == reaction_or_emoji.emoji): self.__reactions.pop(i) await self.__update_if_sent() return def reaction_for_emoji(self, emoji) -> BotMessageReaction: for reaction in self.__reactions: if isinstance(emoji, PartialEmoji) and reaction.emoji == emoji.name: return reaction elif isinstance(emoji, str) and reaction.emoji == emoji: return reaction return None async def __update_if_sent(self) -> None: if self.__message: await self._update() async def _update(self) -> None: content: str = self.__formatted_message() if self.__message: if content != self.__posted_text: await self.__message.edit(content=content) self.__posted_text = content else: if self.__reply_to: self.__message = await self.__reply_to.reply(content=content, mention_author=False) self.__posted_text = content else: channel_id = Storage.get_config_value(self.guild, ConfigKey.WARNING_CHANNEL_ID) if channel_id is None: BaseCog.guild_trace(self.guild, 'No warning channel set! No warning issued.') return channel: TextChannel = self.guild.get_channel(channel_id) if channel is None: BaseCog.guild_trace(self.guild, 'Configured warning channel does not exist!') return self.__message = await channel.send(content=content) self.__posted_text = content emoji_to_remove = self.__posted_emoji.copy() for reaction in self.__reactions: if reaction.is_enabled: if reaction.emoji not in self.__posted_emoji: await self.__message.add_reaction(reaction.emoji) self.__posted_emoji.add(reaction.emoji) if reaction.emoji in emoji_to_remove: emoji_to_remove.remove(reaction.emoji) for emoji in emoji_to_remove: await self.__message.clear_reaction(emoji) if emoji in self.__posted_emoji: self.__posted_emoji.remove(emoji) def __formatted_message(self) -> str: s: str = '' if self.type == self.TYPE_INFO: s += CONFIG['info_emoji'] + ' ' elif self.type == self.TYPE_MOD_WARNING: mention: str = Storage.get_config_value(self.guild, ConfigKey.WARNING_MENTION) if mention: s += mention + ' ' s += CONFIG['warning_emoji'] + ' ' elif self.type == self.TYPE_SUCCESS: s += CONFIG['success_emoji'] + ' ' elif self.type == self.TYPE_FAILURE: s += CONFIG['failure_emoji'] + ' ' s += self.text if self.quote: s += f'\n\n> {self.quote}' if len(self.__reactions) > 0: s += '\n\nAvailable actions:' for reaction in self.__reactions: if reaction.is_enabled: s += f'\n {reaction.emoji} {reaction.description}' else: s += f'\n {reaction.description}' return s class BaseCog(commands.Cog): def __init__(self, bot): self.bot = bot # Config @classmethod def get_cog_default(cls, key: str): """ Convenience method for getting a cog configuration default from `CONFIG['cogs'][][]`. """ cogs: dict = CONFIG['cog_defaults'] cog = cogs.get(cls.__name__) if cog is None: return None return cog.get(key) # Bot message handling @classmethod def __bot_messages(cls, guild: Guild) -> AgeBoundDict: bm = Storage.get_state_value(guild, 'bot_messages') if bm is None: far_future = datetime.utcnow() + timedelta(days=1000) bm = AgeBoundDict(timedelta(seconds=600), lambda k, v : v.message_sent_at() or far_future) Storage.set_state_value(guild, 'bot_messages', bm) return bm @classmethod async def post_message(cls, message: BotMessage) -> bool: await message._update() guild_messages = cls.__bot_messages(message.guild) if message.is_sent(): guild_messages[message.message_id()] = message return True return False @commands.Cog.listener() async def on_raw_reaction_add(self, payload: RawReactionActionEvent): 'Event handler' if payload.user_id == self.bot.user.id: # Ignore bot's own reactions return member: Member = payload.member if member is None: return guild: Guild = self.bot.get_guild(payload.guild_id) if guild is None: # Possibly a DM return channel: GuildChannel = guild.get_channel(payload.channel_id) if channel is None: # Possibly a DM return message: Message = await channel.fetch_message(payload.message_id) if message is None: # Message deleted? return if message.author.id != self.bot.user.id: # Bot didn't author this return guild_messages = self.__bot_messages(guild) bot_message = guild_messages.get(message.id) if bot_message is None: # Unknown message (expired or was never tracked) return reaction = bot_message.reaction_for_emoji(payload.emoji) if reaction is None or not reaction.is_enabled: # Can't use this reaction with this message return if not member.permissions_in(channel).ban_members: # Not a mod (could make permissions configurable per BotMessageReaction some day) return await self.on_mod_react(bot_message, reaction, member) async def on_mod_react(self, bot_message: BotMessage, reaction: BotMessageReaction, reacted_by: Member) -> None: """ Subclass override point for receiving mod reactions to bot messages sent via `post_message()`. """ pass @classmethod async def validate_param(cls, context: commands.Context, param_name: str, value, allowed_types: tuple = None, min_value = None, max_value = None) -> bool: """ Convenience method for validating a command parameter is of the expected type and in the expected range. Bad values will cause a reply to be sent to the original message and a False will be returned. If all checks succeed, True will be returned. """ # TODO: Rework this to use BotMessage if allowed_types is not None and not isinstance(value, allowed_types): if len(allowed_types) == 1: await context.message.reply(f'⚠️ `{param_name}` must be of type ' + f'{allowed_types[0]}.', mention_author=False) else: await context.message.reply(f'⚠️ `{param_name}` must be of types ' + f'{allowed_types}.', mention_author=False) return False if min_value is not None and value < min_value: await context.message.reply(f'⚠️ `{param_name}` must be >= {min_value}.', mention_author=False) return False if max_value is not None and value > max_value: await context.message.reply(f'⚠️ `{param_name}` must be <= {max_value}.', mention_author=False) return True @classmethod async def warn(cls, guild: Guild, message: str) -> Message: """ Sends a warning message to the configured warning channel for the given guild. If no warning channel is configured no action is taken. Returns the Message if successful or None if not. """ channel_id = Storage.get_config_value(guild, ConfigKey.WARNING_CHANNEL_ID) if channel_id is None: cls.guild_trace(guild, 'No warning channel set! No warning issued.') return None channel: TextChannel = guild.get_channel(channel_id) if channel is None: cls.guild_trace(guild, 'Configured warning channel does not exist!') return None mention: str = Storage.get_config_value(guild, ConfigKey.WARNING_MENTION) text: str = message if mention is not None: text = f'{mention} {text}' msg: Message = await channel.send(text) return msg @classmethod async def update_warn(cls, warn_message: Message, new_text: str) -> None: """ Updates the text of a previously posted `warn`. Includes configured mentions if necessary. """ text: str = new_text mention: str = Storage.get_config_value( warn_message.guild, ConfigKey.WARNING_MENTION) if mention is not None: text = f'{mention} {text}' await warn_message.edit(content=text) @classmethod def guild_trace(cls, guild: Guild, message: str) -> None: print(f'[guild {guild.id}|{guild.name}] {message}')