""" Base cog class and helper classes. """ from datetime import datetime, timedelta from discord import Guild, Member, Message, PartialEmoji, RawReactionActionEvent, TextChannel from discord.abc import GuildChannel from discord.ext import commands from config import CONFIG from rocketbot.collections import AgeBoundDict from rocketbot.storage import ConfigKey, Storage class BotMessageReaction: """ A possible reaction to a bot message that will trigger an action. The list of available reactions will be listed at the end of a BotMessage. When a mod reacts to the message with the emote, something can happen. If the reaction is disabled, reactions will not register. The description will still show up in the message, but no emoji is shown. This can be used to explain why an action is no longer available. """ def __init__(self, emoji: str, is_enabled: bool, description: str): self.emoji = emoji self.is_enabled = is_enabled self.description = description def __eq__(self, other): return other is not None and \ other.emoji == self.emoji and \ other.is_enabled == self.is_enabled and \ other.description == self.description @classmethod def standard_set(cls, did_delete: bool = None, message_count: int = 1, did_kick: bool = None, did_ban: bool = None, user_count: int = 1) -> list: """ Convenience factory for generating any of the three most common commands: delete message(s), kick user(s), and ban user(s). All arguments are optional. Resulting list can be passed directly to `BotMessage.set_reactions()`. Params - did_delete Whether the message(s) have been deleted. Pass True or False if this applies, omit to leave out delete action. - message_count How many messages there are. Used for pluralizing description. Defaults to 1. Omit if n/a. - did_kick Whether the user(s) have been kicked. Pass True or False if this applies, omit to leave out kick action. - did_ban Whether the user(s) have been banned. Pass True or False if this applies, omit to leave out ban action. - user_count How many users there are. Used for pluralizing description. Defaults to 1. Omit if n/a. """ reactions = [] if did_delete is not None: if did_delete: reactions.append(BotMessageReaction( CONFIG['trash_emoji'], False, 'Message deleted' if message_count == 1 else 'Messages deleted')) else: reactions.append(BotMessageReaction( CONFIG['trash_emoji'], True, 'Delete message' if message_count == 1 else 'Delete messages')) if did_kick is not None: if did_ban is not None and did_ban: # Don't show kick option at all if we also banned pass elif did_kick: reactions.append(BotMessageReaction( CONFIG['kick_emoji'], False, 'User kicked' if user_count == 1 else 'Users kicked')) else: reactions.append(BotMessageReaction( CONFIG['kick_emoji'], True, 'Kick user' if user_count == 1 else 'Kick users')) if did_ban is not None: if did_ban: reactions.append(BotMessageReaction( CONFIG['ban_emoji'], False, 'User banned' if user_count == 1 else 'Users banned')) else: reactions.append(BotMessageReaction( CONFIG['ban_emoji'], True, 'Ban user' if user_count == 1 else 'Ban users')) return reactions class BotMessage: """ Holds state for a bot-generated message. A message is composed, sent via `BaseCog.post_message()`, and can later be updated. A message consists of a type (e.g. info, warning), text, optional quoted text (such as the content of a flagged message), and an optional list of actions that can be taken via a mod reacting to the message. """ TYPE_DEFAULT = 0 TYPE_INFO = 1 TYPE_MOD_WARNING = 2 TYPE_SUCCESS = 3 TYPE_FAILURE = 4 def __init__(self, guild: Guild, text: str, type: int = TYPE_DEFAULT, context = None, reply_to: Message = None): self.guild = guild self.text = text self.type = type self.context = context self.quote = None self.source_cog = None # Set by `BaseCog.post_message()` self.__posted_text = None # last text posted, to test for changes self.__posted_emoji = set() self.__message = None # Message self.__reply_to = reply_to self.__reactions = [] # BotMessageReaction[] def is_sent(self) -> bool: """ Returns whether this message has been sent to the guild. This may continue returning False even after calling BaseCog.post_message if the guild has no configured warning channel. """ return self.__message is not None def message_id(self): 'Returns the Message id or None if not sent.' return self.__message.id if self.__message else None def message_sent_at(self) -> datetime: 'Returns when the message was sent or None if not sent.' return self.__message.created_at if self.__message else None def has_reactions(self) -> bool: 'Whether this message has any reactions defined.' return len(self.__reactions) > 0 async def set_text(self, new_text: str) -> None: """ Replaces the text of this message. If the message has been sent, it will be updated. """ self.text = new_text await self.update_if_sent() async def set_reactions(self, reactions: list) -> None: """ Replaces all BotMessageReactions with a new list. If the message has been sent, it will be updated. """ if reactions == self.__reactions: # No change return self.__reactions = reactions.copy() if reactions is not None else [] await self.update_if_sent() async def add_reaction(self, reaction: BotMessageReaction) -> None: """ Adds one BotMessageReaction to this message. If a reaction already exists for the given emoji it is replaced with the new one. If the message has been sent, it will be updated. """ # Alias for update. Makes for clearer intent. await self.update_reaction(reaction) async def update_reaction(self, reaction: BotMessageReaction) -> None: """ Updates or adds a BotMessageReaction. If the message has been sent, it will be updated. """ found = False for i, existing in enumerate(self.__reactions): if existing.emoji == reaction.emoji: if reaction == self.__reactions[i]: # No change return self.__reactions[i] = reaction found = True break if not found: self.__reactions.append(reaction) await self.update_if_sent() async def remove_reaction(self, reaction_or_emoji) -> None: """ Removes a reaction. Can pass either a BotMessageReaction or just the emoji string. If the message has been sent, it will be updated. """ for i, existing in enumerate(self.__reactions): if (isinstance(reaction_or_emoji, str) and existing.emoji == reaction_or_emoji) or \ (isinstance(reaction_or_emoji, BotMessageReaction) and \ existing.emoji == reaction_or_emoji.emoji): self.__reactions.pop(i) await self.update_if_sent() return def reaction_for_emoji(self, emoji) -> BotMessageReaction: """ Finds the BotMessageReaction for the given emoji or None if not found. Accepts either a PartialEmoji or str. """ for reaction in self.__reactions: if isinstance(emoji, PartialEmoji) and reaction.emoji == emoji.name: return reaction if isinstance(emoji, str) and reaction.emoji == emoji: return reaction return None async def update_if_sent(self) -> None: """ Updates the text and/or reactions on a message if it was sent to the guild, otherwise does nothing. Does not need to be called by BaseCog subclasses. """ if self.__message: await self.update() async def update(self) -> None: """ Sends or updates an already sent message based on BotMessage state. Does not need to be called by BaseCog subclasses. """ content: str = self.__formatted_message() if self.__message: if content != self.__posted_text: await self.__message.edit(content=content) self.__posted_text = content else: if self.__reply_to: self.__message = await self.__reply_to.reply(content=content, mention_author=False) self.__posted_text = content else: channel_id = Storage.get_config_value(self.guild, ConfigKey.WARNING_CHANNEL_ID) if channel_id is None: BaseCog.log(self.guild, '\u0007No warning channel set! No warning issued.') return channel: TextChannel = self.guild.get_channel(channel_id) if channel is None: BaseCog.log(self.guild, '\u0007Configured warning channel does not exist!') return self.__message = await channel.send(content=content) self.__posted_text = content emoji_to_remove = self.__posted_emoji.copy() for reaction in self.__reactions: if reaction.is_enabled: if reaction.emoji not in self.__posted_emoji: await self.__message.add_reaction(reaction.emoji) self.__posted_emoji.add(reaction.emoji) if reaction.emoji in emoji_to_remove: emoji_to_remove.remove(reaction.emoji) for emoji in emoji_to_remove: await self.__message.clear_reaction(emoji) if emoji in self.__posted_emoji: self.__posted_emoji.remove(emoji) def __formatted_message(self) -> str: s: str = '' if self.type == self.TYPE_INFO: s += CONFIG['info_emoji'] + ' ' elif self.type == self.TYPE_MOD_WARNING: mention: str = Storage.get_config_value(self.guild, ConfigKey.WARNING_MENTION) if mention: s += mention + ' ' s += CONFIG['warning_emoji'] + ' ' elif self.type == self.TYPE_SUCCESS: s += CONFIG['success_emoji'] + ' ' elif self.type == self.TYPE_FAILURE: s += CONFIG['failure_emoji'] + ' ' s += self.text if self.quote: s += f'\n\n> {self.quote}' if len(self.__reactions) > 0: s += '\n\nAvailable actions:' for reaction in self.__reactions: if reaction.is_enabled: s += f'\n {reaction.emoji} {reaction.description}' else: s += f'\n {reaction.description}' return s class CogSetting: """ Describes a configuration setting for a guild that can be edited by the mods of those guilds. BaseCog can generate "get" and "set" commands automatically, reducing the boilerplate of generating commands manually. Offers simple validation rules. """ def __init__(self, name: str, datatype, brief: str = None, description: str = None, usage: str = None, min_value = None, max_value = None, enum_values: set = None): """ Params: - name Setting identifier. Must follow variable naming conventions. - datatype Datatype of the setting. E.g. int, float, str - brief Description of the setting, starting with lower case. Will be inserted into phrases like "Sets " and "Gets " - min_value Smallest allowable value. Must be of the same datatype as the value. None for no minimum. - max_value Largest allowable value. None for no maximum. - enum_values Set of allowed values. None if unconstrained. """ self.name = name self.datatype = datatype self.brief = brief self.description = description or '' # Can't be None self.usage = usage self.min_value = min_value self.max_value = max_value self.enum_values = enum_values if self.enum_values or self.min_value is not None or self.max_value is not None: self.description += '\n' if self.enum_values: allowed_values = '`' + ('`, `'.join(enum_values)) + '`' self.description += f'\nAllowed values: {allowed_values}' if self.min_value is not None: self.description += f'\nMin value: {self.min_value}' if self.max_value is not None: self.description += f'\nMax value: {self.max_value}' if self.usage is None: self.usage = f'<{self.name}>' class BaseCog(commands.Cog): """ Superclass for all Rocketbot cogs. Provides lots of conveniences for common tasks. """ def __init__(self, bot): self.bot = bot self.are_settings_setup = False self.settings = [] # Config @classmethod def get_cog_default(cls, key: str): """ Convenience method for getting a cog configuration default from `CONFIG['cogs'][][]`. These values are used for CogSettings when no guild-specific value is configured yet. """ cogs: dict = CONFIG['cog_defaults'] cog = cogs.get(cls.__name__) if cog is None: return None return cog.get(key) def add_setting(self, setting: CogSetting) -> None: """ Called by a subclass in __init__ to register a mod-configurable guild setting. A "get" and "set" command will be generated. If the setting is named "enabled" (exactly) then "enable" and "disable" commands will be created instead which set the setting to True/False. If the cog has a command group it will be detected automatically and the commands added to that. Otherwise the commands will be added at the top level. Changes to settings can be detected by overriding `on_setting_updated`. """ self.settings.append(setting) @classmethod def get_guild_setting(cls, guild: Guild, setting: CogSetting, use_cog_default_if_not_set: bool = True): """ Returns the configured value for a setting for the given guild. If no setting is configured the default for the cog will be returned, unless the optional `use_cog_default_if_not_set` is `False`, then `None` will be returned. """ key = f'{cls.__name__}.{setting.name}' value = Storage.get_config_value(guild, key) if value is None and use_cog_default_if_not_set: value = cls.get_cog_default(setting.name) return value @classmethod def set_guild_setting(cls, guild: Guild, setting: CogSetting, new_value) -> None: """ Manually sets a setting for the given guild. BaseCog creates "get" and "set" commands for guild administrators to configure values themselves, but this method can be used for hidden settings from code. A ValueError will be raised if the new value does not pass validation specified in the CogSetting. """ if setting.min_value is not None and new_value < setting.min_value: raise ValueError(f'{setting.name} must be at least {setting.min_value}') if setting.max_value is not None and new_value > setting.max_value: raise ValueError(f'{setting.name} must be no more than {setting.max_value}') if setting.enum_values and new_value not in setting.enum_values: raise ValueError(f'{setting.name} must be one of {setting.enum_values}') key = f'{cls.__name__}.{setting.name}' Storage.set_config_value(guild, key, new_value) @commands.Cog.listener() async def on_ready(self): 'Event listener' self.__set_up_setting_commands() def __set_up_setting_commands(self): """ Sets up commands for editing all registered cog settings. This method only runs once. """ if self.are_settings_setup: return self.are_settings_setup = True # See if the cog has a command group. Currently only supporting one max. group: commands.core.Group = None for member_name in dir(self): member = getattr(self, member_name) if isinstance(member, commands.core.Group): group = member break for setting in self.settings: if setting.name == 'enabled' or setting.name == 'is_enabled': self.__make_enable_disable_commands(setting, group) else: self.__make_getter_setter_commands(setting, group) def __make_getter_setter_commands(self, setting: CogSetting, group: commands.core.Group) -> None: """ Creates a "get..." and "set..." command for the given setting and either registers them as subcommands under the given command group or under the bot if `None`. """ # Manually constructing equivalent of: # @commands.command() # @commands.has_permissions(ban_members=True) # @commands.guild_only() # async def getvar(self, context): async def getter(self, context): await self.__get_setting_command(context, setting) async def setter_int(self, context, new_value: int): await self.__set_setting_command(context, new_value, setting) async def setter_float(self, context, new_value: float): await self.__set_setting_command(context, new_value, setting) async def setter_str(self, context, new_value: str): await self.__set_setting_command(context, new_value, setting) async def setter_bool(self, context, new_value: bool): await self.__set_setting_command(context, new_value, setting) setter = None if setting.datatype == int: setter = setter_int elif setting.datatype == float: setter = setter_float elif setting.datatype == str: setter = setter_str elif setting.datatype == bool: setter = setter_bool else: raise RuntimeError(f'Datatype {setting.datatype} unsupported') get_command = commands.Command( getter, name=f'get{setting.name}', brief=f'Shows {setting.brief}', description=setting.description, checks=[ commands.has_permissions(ban_members=True), commands.guild_only(), ]) set_command = commands.Command( setter, name=f'set{setting.name}', brief=f'Sets {setting.brief}', description=setting.description, usage=setting.usage, checks=[ commands.has_permissions(ban_members=True), commands.guild_only(), ]) # Passing `cog` in init gets ignored and set to `None` so set after. # This ensures the callback is passed `self`. get_command.cog = self set_command.cog = self if group: group.add_command(get_command) group.add_command(set_command) else: self.bot.add_command(get_command) self.bot.add_command(set_command) def __make_enable_disable_commands(self, setting: CogSetting, group: commands.core.Group) -> None: """ Creates "enable" and "disable" commands. """ async def enabler(self, context): await self.__enable_command(context, setting) async def disabler(self, context): await self.__disable_command(context, setting) enable_command = commands.Command( enabler, name='enable', brief=f'Enables {setting.brief}', description=setting.description, checks=[ commands.has_permissions(ban_members=True), commands.guild_only(), ]) disable_command = commands.Command( disabler, name='disable', brief=f'Disables {setting.brief}', description=setting.description, checks=[ commands.has_permissions(ban_members=True), commands.guild_only(), ]) enable_command.cog = self disable_command.cog = self if group: group.add_command(enable_command) group.add_command(disable_command) else: self.bot.add_command(enable_command) self.bot.add_command(disable_command) async def __set_setting_command(self, context, new_value, setting) -> None: setting_name = setting.name if context.command.parent: setting_name = f'{context.command.parent.name}.{setting_name}' if setting.min_value is not None and new_value < setting.min_value: await context.message.reply( f'{CONFIG["failure_emoji"]} `{setting_name}` must be >= {setting.min_value}', mention_author=False) return if setting.max_value is not None and new_value > setting.max_value: await context.message.reply( f'{CONFIG["failure_emoji"]} `{setting_name}` must be <= {setting.max_value}', mention_author=False) return if setting.enum_values is not None and new_value not in setting.enum_values: allowed_values = '`' + ('`, `'.join(setting.enum_values)) + '`' await context.message.reply( f'{CONFIG["failure_emoji"]} `{setting_name}` must be one of {allowed_values}', mention_author=False) return key = f'{self.__class__.__name__}.{setting.name}' Storage.set_config_value(context.guild, key, new_value) await context.message.reply( f'{CONFIG["success_emoji"]} `{setting_name}` is now set to `{new_value}`', mention_author=False) await self.on_setting_updated(context.guild, setting) self.log(context.guild, f'{context.author.name} set {key} to {new_value}') async def __get_setting_command(self, context, setting) -> None: setting_name = setting.name if context.command.parent: setting_name = f'{context.command.parent.name}.{setting_name}' key = f'{self.__class__.__name__}.{setting.name}' value = Storage.get_config_value(context.guild, key) if value is None: value = self.get_cog_default(setting.name) await context.message.reply( f'{CONFIG["info_emoji"]} `{setting_name}` is using default of `{value}`', mention_author=False) else: await context.message.reply( f'{CONFIG["info_emoji"]} `{setting_name}` is set to `{value}`', mention_author=False) async def __enable_command(self, context, setting) -> None: key = f'{self.__class__.__name__}.{setting.name}' Storage.set_config_value(context.guild, key, True) await context.message.reply( f'{CONFIG["success_emoji"]} {setting.brief.capitalize()} enabled.', mention_author=False) await self.on_setting_updated(context.guild, setting) self.log(context.guild, f'{context.author.name} enabled {self.__class__.__name__}') async def __disable_command(self, context, setting) -> None: key = f'{self.__class__.__name__}.{setting.name}' Storage.set_config_value(context.guild, key, False) await context.message.reply( f'{CONFIG["success_emoji"]} {setting.brief.capitalize()} disabled.', mention_author=False) await self.on_setting_updated(context.guild, setting) self.log(context.guild, f'{context.author.name} disabled {self.__class__.__name__}') async def on_setting_updated(self, guild: Guild, setting: CogSetting) -> None: """ Subclass override point for being notified when a CogSetting is edited. """ # Bot message handling @classmethod def __bot_messages(cls, guild: Guild) -> AgeBoundDict: bm = Storage.get_state_value(guild, 'bot_messages') if bm is None: far_future = datetime.utcnow() + timedelta(days=1000) bm = AgeBoundDict(timedelta(seconds=600), lambda k, v : v.message_sent_at() or far_future) Storage.set_state_value(guild, 'bot_messages', bm) return bm async def post_message(self, message: BotMessage) -> bool: """ Posts a BotMessage to a guild. Returns whether it was successful. If the caller wants to listen to reactions they should be added before calling this method. Listen to reactions by overriding `on_mod_react`. """ message.source_cog = self await message.update() if message.has_reactions() and message.is_sent(): guild_messages = self.__bot_messages(message.guild) guild_messages[message.message_id()] = message return message.is_sent() @commands.Cog.listener() async def on_raw_reaction_add(self, payload: RawReactionActionEvent): 'Event handler' if payload.user_id == self.bot.user.id: # Ignore bot's own reactions return member: Member = payload.member if member is None: return guild: Guild = self.bot.get_guild(payload.guild_id) if guild is None: # Possibly a DM return channel: GuildChannel = guild.get_channel(payload.channel_id) if channel is None: # Possibly a DM return message: Message = await channel.fetch_message(payload.message_id) if message is None: # Message deleted? return if message.author.id != self.bot.user.id: # Bot didn't author this return guild_messages = self.__bot_messages(guild) bot_message = guild_messages.get(message.id) if bot_message is None: # Unknown message (expired or was never tracked) return if self is not bot_message.source_cog: # Belongs to a different cog return reaction = bot_message.reaction_for_emoji(payload.emoji) if reaction is None or not reaction.is_enabled: # Can't use this reaction with this message return if not member.permissions_in(channel).ban_members: # Not a mod (could make permissions configurable per BotMessageReaction some day) return await self.on_mod_react(bot_message, reaction, member) async def on_mod_react(self, bot_message: BotMessage, reaction: BotMessageReaction, reacted_by: Member) -> None: """ Subclass override point for receiving mod reactions to bot messages sent via `post_message()`. """ # Helpers @classmethod def log(cls, guild: Guild, message) -> None: """ Writes a message to the console. Intended for significant events only. """ now = datetime.now() print(f'[{now.strftime("%Y-%m-%dT%H:%M:%S")}|{cls.__name__}|{guild.name}] {message}')