from discord import Guild, Member, Message, PartialEmoji, RawReactionActionEvent, TextChannel from discord.ext import commands from datetime import datetime, timedelta from abc import ABC, abstractmethod from config import CONFIG from rscollections import AgeBoundDict from storage import ConfigKey, Storage import json class BotMessageReaction: """ A possible reaction to a bot message that will trigger an action. The list of available reactions will be listed at the end of a BotMessage. When a mod reacts to the message with the emote, something can happen. If the reaction is disabled, reactions will not register. The description will still show up in the message, but no emoji is shown. This can be used to explain why an action is no longer available. """ def __init__(self, emoji: str, is_enabled: bool, description: str): self.emoji = emoji self.is_enabled = is_enabled self.description = description def __eq__(self, other): return other is not None and \ other.emoji == self.emoji and \ other.is_enabled == self.is_enabled and \ other.description == self.description @classmethod def standard_set(cls, did_delete: bool = None, message_count: int = 1, did_kick: bool = None, did_ban: bool = None, user_count: int = 1) -> list: """ Convenience factory for generating any of the three most common commands: delete message(s), kick user(s), and ban user(s). All arguments are optional. Resulting list can be passed directly to `BotMessage.set_reactions()`. Params - did_delete Whether the message(s) have been deleted. Pass True or False if this applies, omit to leave out delete action. - message_count How many messages there are. Used for pluralizing description. Defaults to 1. Omit if n/a. - did_kick Whether the user(s) have been kicked. Pass True or False if this applies, omit to leave out kick action. - did_ban Whether the user(s) have been banned. Pass True or False if this applies, omit to leave out ban action. - user_count How many users there are. Used for pluralizing description. Defaults to 1. Omit if n/a. """ reactions = [] if did_delete is not None: if did_delete: reactions.append(BotMessageReaction( CONFIG['trash_emoji'], False, 'Message deleted' if message_count == 1 else 'Messages deleted')) else: reactions.append(BotMessageReaction( CONFIG['trash_emoji'], True, 'Delete message' if message_count == 1 else 'Delete messages')) if did_kick is not None: if did_ban is not None and did_ban: # Don't show kick option at all if we also banned pass elif did_kick: reactions.append(BotMessageReaction( CONFIG['kick_emoji'], False, 'User kicked' if user_count == 1 else 'Users kicked')) else: reactions.append(BotMessageReaction( CONFIG['kick_emoji'], True, 'Kick user' if user_count == 1 else 'Kick users')) if did_ban is not None: if did_ban: reactions.append(BotMessageReaction( CONFIG['ban_emoji'], False, 'User banned' if user_count == 1 else 'Users banned')) else: reactions.append(BotMessageReaction( CONFIG['ban_emoji'], True, 'Ban user' if user_count == 1 else 'Ban users')) return reactions class BotMessage: """ Holds state for a bot-generated message. A message is composed, sent via `BaseCog.post_message()`, and can later be updated. A message consists of a type (e.g. info, warning), text, optional quoted text (such as the content of a flagged message), and an optional list of actions that can be taken via a mod reacting to the message. """ TYPE_DEFAULT = 0 TYPE_INFO = 1 TYPE_MOD_WARNING = 2 TYPE_SUCCESS = 3 TYPE_FAILURE = 4 def __init__(self, guild: Guild, text: str, type: int = 0, # TYPE_DEFAULT context = None, reply_to: Message = None): self.guild = guild self.text = text self.type = type self.context = context self.quote = None self.source_cog = None # Set by `BaseCog.post_message()` self.__posted_text = None # last text posted, to test for changes self.__posted_emoji = set() self.__message = None # Message self.__reply_to = reply_to self.__reactions = [] # BotMessageReaction[] def is_sent(self) -> bool: """ Returns whether this message has been sent to the guild. This may continue returning False even after calling BaseCog.post_message if the guild has no configured warning channel. """ return self.__message is not None def message_id(self): return self.__message.id if self.__message else None def message_sent_at(self): return self.__message.created_at if self.__message else None async def set_text(self, new_text: str) -> None: """ Replaces the text of this message. If the message has been sent, it will be updated. """ self.text = new_text await self.__update_if_sent() async def set_reactions(self, reactions: list) -> None: """ Replaces all BotMessageReactions with a new list. If the message has been sent, it will be updated. """ if reactions == self.__reactions: # No change return self.__reactions = reactions.copy() if reactions is not None else [] await self.__update_if_sent() async def add_reaction(self, reaction: BotMessageReaction) -> None: """ Adds one BotMessageReaction to this message. If a reaction already exists for the given emoji it is replaced with the new one. If the message has been sent, it will be updated. """ # Alias for update. Makes for clearer intent. await self.update_reaction(reaction) async def update_reaction(self, reaction: BotMessageReaction) -> None: """ Updates or adds a BotMessageReaction. If the message has been sent, it will be updated. """ found = False for i in range(len(self.__reactions)): existing = self.__reactions[i] if existing.emoji == reaction.emoji: if reaction == self.__reactions[i]: # No change return self.__reactions[i] = reaction found = True break if not found: self.__reactions.append(reaction) await self.__update_if_sent() async def remove_reaction(self, reaction_or_emoji) -> None: """ Removes a reaction. Can pass either a BotMessageReaction or just the emoji string. If the message has been sent, it will be updated. """ for i in range(len(self.__reactions)): existing = self.__reactions[i] if (isinstance(reaction_or_emoji, str) and existing.emoji == reaction_or_emoji) or \ (isinstance(reaction_or_emoji, BotMessageReaction) and existing.emoji == reaction_or_emoji.emoji): self.__reactions.pop(i) await self.__update_if_sent() return def reaction_for_emoji(self, emoji) -> BotMessageReaction: for reaction in self.__reactions: if isinstance(emoji, PartialEmoji) and reaction.emoji == emoji.name: return reaction elif isinstance(emoji, str) and reaction.emoji == emoji: return reaction return None async def __update_if_sent(self) -> None: if self.__message: await self._update() async def _update(self) -> None: content: str = self.__formatted_message() if self.__message: if content != self.__posted_text: await self.__message.edit(content=content) self.__posted_text = content else: if self.__reply_to: self.__message = await self.__reply_to.reply(content=content, mention_author=False) self.__posted_text = content else: channel_id = Storage.get_config_value(self.guild, ConfigKey.WARNING_CHANNEL_ID) if channel_id is None: BaseCog.log(self.guild, '\u0007No warning channel set! No warning issued.') return channel: TextChannel = self.guild.get_channel(channel_id) if channel is None: BaseCog.log(self.guild, '\u0007Configured warning channel does not exist!') return self.__message = await channel.send(content=content) self.__posted_text = content emoji_to_remove = self.__posted_emoji.copy() for reaction in self.__reactions: if reaction.is_enabled: if reaction.emoji not in self.__posted_emoji: await self.__message.add_reaction(reaction.emoji) self.__posted_emoji.add(reaction.emoji) if reaction.emoji in emoji_to_remove: emoji_to_remove.remove(reaction.emoji) for emoji in emoji_to_remove: await self.__message.clear_reaction(emoji) if emoji in self.__posted_emoji: self.__posted_emoji.remove(emoji) def __formatted_message(self) -> str: s: str = '' if self.type == self.TYPE_INFO: s += CONFIG['info_emoji'] + ' ' elif self.type == self.TYPE_MOD_WARNING: mention: str = Storage.get_config_value(self.guild, ConfigKey.WARNING_MENTION) if mention: s += mention + ' ' s += CONFIG['warning_emoji'] + ' ' elif self.type == self.TYPE_SUCCESS: s += CONFIG['success_emoji'] + ' ' elif self.type == self.TYPE_FAILURE: s += CONFIG['failure_emoji'] + ' ' s += self.text if self.quote: s += f'\n\n> {self.quote}' if len(self.__reactions) > 0: s += '\n\nAvailable actions:' for reaction in self.__reactions: if reaction.is_enabled: s += f'\n {reaction.emoji} {reaction.description}' else: s += f'\n {reaction.description}' return s class CogSetting: def __init__(self, name: str, brief: str = None, description: str = None, usage: str = None, min_value = None, max_value = None, enum_values: set = None): self.name = name self.brief = brief self.description = description or '' # XXX: Can't be None self.usage = usage self.min_value = min_value self.max_value = max_value self.enum_values = enum_values if self.enum_values or self.min_value is not None or self.max_value is not None: self.description += '\n' if self.enum_values: allowed_values = '`' + ('`, `'.join(enum_values)) + '`' self.description += f'\nAllowed values: {allowed_values}' if self.min_value is not None: self.description += f'\nMin value: {self.min_value}' if self.max_value is not None: self.description += f'\nMax value: {self.max_value}' if self.usage is None: self.usage = f'<{self.name}>' class BaseCog(commands.Cog): def __init__(self, bot): self.bot = bot self.are_settings_setup = False self.settings = [] # Config @classmethod def get_cog_default(cls, key: str): """ Convenience method for getting a cog configuration default from `CONFIG['cogs'][][]`. """ cogs: dict = CONFIG['cog_defaults'] cog = cogs.get(cls.__name__) if cog is None: return None return cog.get(key) def add_setting(self, setting: CogSetting) -> None: """ Called by a subclass in __init__ to register a mod-configurable guild setting. A "get" and "set" command will be generated. If the cog has a command group it will be detected automatically and the commands added to that. Otherwise the commands will be added at the top level. """ self.settings.append(setting) @classmethod def get_guild_setting(cls, guild: Guild, setting: CogSetting, use_cog_default_if_not_set: bool = True): """ Returns the configured value for a setting for the given guild. If no setting is configured the default for the cog will be returned, unless the optional `use_cog_default_if_not_set` is `False`, then `None` will be returned. """ key = f'{cls.__name__}.{setting.name}' value = Storage.get_config_value(guild, key) if value is None and use_cog_default_if_not_set: value = cls.get_cog_default(setting.name) return value @classmethod def set_guild_setting(cls, guild: Guild, setting: CogSetting, new_value) -> None: """ Manually sets a setting for the given guild. BaseCog creates "get" and "set" commands for guild administrators to configure values themselves, but this method can be used for hidden settings from code. """ key = f'{cls.__name__}.{setting.name}' Storage.set_config_value(guild, key, new_value) @commands.Cog.listener() async def on_ready(self): self.__set_up_setting_commands() def __set_up_setting_commands(self): """ Sets up getter and setter commands for all registered cog settings. Only runs once. """ if self.are_settings_setup: return self.are_settings_setup = True # See if the cog has a command group. Currently only supporting one max. group: commands.core.Group = None for member_name in dir(self): member = getattr(self, member_name) if isinstance(member, commands.core.Group): group = member break for setting in self.settings: self.__make_getter_setter_commands(setting, group) def __make_getter_setter_commands(self, setting: CogSetting, group: commands.core.Group) -> None: """ Creates a "get..." and "set..." command for the given setting and either registers them as subcommands under the given command group or under the bot if `None`. """ # Manually constructing equivalent of: # @commands.command( # brief='Posts a test warning in the configured warning channel.' # ) # @commands.has_permissions(ban_members=True) # @commands.guild_only() # async def getvar(self, context): async def getter(self, context): await self.__get_setting_command(context, setting) async def setter(self, context, new_value): await self.__set_setting_command(context, new_value, setting) get_command = commands.Command( getter, name=f'get{setting.name}', brief=f'Shows {setting.brief}', description=setting.description, checks=[ commands.has_permissions(ban_members=True), commands.guild_only(), ]) set_command = commands.Command( setter, name=f'set{setting.name}', brief=f'Sets {setting.brief}', description=setting.description, usage=setting.usage, checks=[ commands.has_permissions(ban_members=True), commands.guild_only(), ]) # XXX: Passing `cog` in init gets ignored and set to `None` so set after. # This ensures the callback is passed `self`. get_command.cog = self set_command.cog = self if group: group.add_command(get_command) group.add_command(set_command) else: self.bot.add_command(get_command) self.bot.add_command(set_command) async def __set_setting_command(self, context, new_value, setting) -> None: setting_name = setting.name if context.command.parent: setting_name = f'{context.command.parent.name}.{setting_name}' if setting.min_value is not None and new_value < setting.min_value: await context.message.reply( f'{CONFIG["failure_emoji"]} `{setting_name}` must be >= {setting.min_value}', mention_author=False) return if setting.max_value is not None and new_value > setting.max_value: await context.message.reply( f'{CONFIG["failure_emoji"]} `{setting_name}` must be <= {setting.max_value}', mention_author=False) return if setting.enum_values is not None and new_value not in setting.enum_values: allowed_values = '`' + ('`, `'.join(setting.enum_values)) + '`' await context.message.reply( f'{CONFIG["failure_emoji"]} `{setting_name}` must be one of {allowed_values}', mention_author=False) return key = f'{self.__class__.__name__}.{setting.name}' Storage.set_config_value(context.guild, key, new_value) await context.message.reply( f'{CONFIG["success_emoji"]} `{setting_name}` is now set to `{new_value}`', mention_author=False) async def __get_setting_command(self, context, setting) -> None: setting_name = setting.name if context.command.parent: setting_name = f'{context.command.parent.name}.{setting_name}' key = f'{self.__class__.__name__}.{setting.name}' value = Storage.get_config_value(context.guild, key) if value is None: value = self.get_cog_default(setting.name) await context.message.reply( f'{CONFIG["info_emoji"]} `{setting_name}` is using default of `{value}`', mention_author=False) else: await context.message.reply( f'{CONFIG["info_emoji"]} `{setting_name}` is set to `{value}`', mention_author=False) # Bot message handling @classmethod def __bot_messages(cls, guild: Guild) -> AgeBoundDict: bm = Storage.get_state_value(guild, 'bot_messages') if bm is None: far_future = datetime.utcnow() + timedelta(days=1000) bm = AgeBoundDict(timedelta(seconds=600), lambda k, v : v.message_sent_at() or far_future) Storage.set_state_value(guild, 'bot_messages', bm) return bm async def post_message(self, message: BotMessage) -> bool: message.source_cog = self await message._update() guild_messages = self.__bot_messages(message.guild) if message.is_sent(): guild_messages[message.message_id()] = message return True return False @commands.Cog.listener() async def on_raw_reaction_add(self, payload: RawReactionActionEvent): 'Event handler' if payload.user_id == self.bot.user.id: # Ignore bot's own reactions return member: Member = payload.member if member is None: return guild: Guild = self.bot.get_guild(payload.guild_id) if guild is None: # Possibly a DM return channel: GuildChannel = guild.get_channel(payload.channel_id) if channel is None: # Possibly a DM return message: Message = await channel.fetch_message(payload.message_id) if message is None: # Message deleted? return if message.author.id != self.bot.user.id: # Bot didn't author this return guild_messages = self.__bot_messages(guild) bot_message = guild_messages.get(message.id) if bot_message is None: # Unknown message (expired or was never tracked) return if self is not bot_message.source_cog: # Belongs to a different cog return reaction = bot_message.reaction_for_emoji(payload.emoji) if reaction is None or not reaction.is_enabled: # Can't use this reaction with this message return if not member.permissions_in(channel).ban_members: # Not a mod (could make permissions configurable per BotMessageReaction some day) return await self.on_mod_react(bot_message, reaction, member) async def on_mod_react(self, bot_message: BotMessage, reaction: BotMessageReaction, reacted_by: Member) -> None: """ Subclass override point for receiving mod reactions to bot messages sent via `post_message()`. """ pass # Helpers @classmethod async def validate_param(cls, context: commands.Context, param_name: str, value, allowed_types: tuple = None, min_value = None, max_value = None) -> bool: """ Convenience method for validating a command parameter is of the expected type and in the expected range. Bad values will cause a reply to be sent to the original message and a False will be returned. If all checks succeed, True will be returned. """ # TODO: Rework this to use BotMessage if allowed_types is not None and not isinstance(value, allowed_types): if len(allowed_types) == 1: await context.message.reply(f'⚠️ `{param_name}` must be of type ' + f'{allowed_types[0]}.', mention_author=False) else: await context.message.reply(f'⚠️ `{param_name}` must be of types ' + f'{allowed_types}.', mention_author=False) return False if min_value is not None and value < min_value: await context.message.reply(f'⚠️ `{param_name}` must be >= {min_value}.', mention_author=False) return False if max_value is not None and value > max_value: await context.message.reply(f'⚠️ `{param_name}` must be <= {max_value}.', mention_author=False) return True @classmethod async def warn(cls, guild: Guild, message: str) -> Message: """ DEPRECATED. Use post_message. Sends a warning message to the configured warning channel for the given guild. If no warning channel is configured no action is taken. Returns the Message if successful or None if not. """ channel_id = Storage.get_config_value(guild, ConfigKey.WARNING_CHANNEL_ID) if channel_id is None: cls.log(guild, '\u0007No warning channel set! No warning issued.') return None channel: TextChannel = guild.get_channel(channel_id) if channel is None: cls.log(guild, '\u0007Configured warning channel does not exist!') return None mention: str = Storage.get_config_value(guild, ConfigKey.WARNING_MENTION) text: str = message if mention is not None: text = f'{mention} {text}' msg: Message = await channel.send(text) return msg @classmethod async def update_warn(cls, warn_message: Message, new_text: str) -> None: """ DEPRECATED. Use post_message. Updates the text of a previously posted `warn`. Includes configured mentions if necessary. """ text: str = new_text mention: str = Storage.get_config_value( warn_message.guild, ConfigKey.WARNING_MENTION) if mention is not None: text = f'{mention} {text}' await warn_message.edit(content=text) @classmethod def log(cls, guild: Guild, message) -> None: now = datetime.now() print(f'[{now.strftime("%Y-%m-%dT%H:%M:%S")}|{cls.__name__}|{guild.name}] {message}')