| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- from discord import Guild, Member, Message, PartialEmoji, RawReactionActionEvent, TextChannel
- from discord.ext import commands
- from datetime import datetime, timedelta
-
- from config import CONFIG
- from rscollections import AgeBoundDict
- from storage import ConfigKey, Storage
- import json
-
- class BotMessageReaction:
- """
- A possible reaction to a bot message that will trigger an action. The list
- of available reactions will be listed at the end of a BotMessage. When a
- mod reacts to the message with the emote, something can happen.
-
- If the reaction is disabled, reactions will not register. The description
- will still show up in the message, but no emoji is shown. This can be used
- to explain why an action is no longer available.
- """
- def __init__(self, emoji: str, is_enabled: bool, description: str):
- self.emoji = emoji
- self.is_enabled = is_enabled
- self.description = description
-
- def __eq__(self, other):
- return other is not None and \
- other.emoji == self.emoji and \
- other.is_enabled == self.is_enabled and \
- other.description == self.description
-
- class BotMessage:
- """
- Holds state for a bot-generated message. A message is composed, sent via
- `BaseCog.post_message()`, and can later be updated.
-
- A message consists of a type (e.g. info, warning), text, optional quoted
- text (such as the content of a flagged message), and an optional list of
- actions that can be taken via a mod reacting to the message.
- """
-
- TYPE_DEFAULT = 0
- TYPE_INFO = 1
- TYPE_MOD_WARNING = 2
- TYPE_SUCCESS = 3
- TYPE_FAILURE = 4
-
- def __init__(self,
- guild: Guild,
- text: str,
- type: int = 0, # TYPE_DEFAULT
- context = None,
- reply_to: Message = None):
- self.guild = guild
- self.text = text
- self.type = type
- self.context = context
- self.quote = None
- self.__posted_text = None # last text posted, to test for changes
- self.__posted_emoji = set()
- self.__message = None # Message
- self.__reply_to = reply_to
- self.__reactions = [] # BotMessageReaction[]
-
- def is_sent(self) -> bool:
- """
- Returns whether this message has been sent to the guild. This may
- continue returning False even after calling BaseCog.post_message if
- the guild has no configured warning channel.
- """
- return self.__message is not None
-
- def message_id(self):
- return self.__message.id if self.__message else None
-
- def message_sent_at(self):
- return self.__message.created_at if self.__message else None
-
- async def set_text(self, new_text: str) -> None:
- """
- Replaces the text of this message. If the message has been sent, it will
- be updated.
- """
- self.text = new_text
- await self.__update_if_sent()
-
- async def set_reactions(self, reactions: list) -> None:
- """
- Replaces all BotMessageReactions with a new list. If the message has
- been sent, it will be updated.
- """
- if reactions == self.__reactions:
- # No change
- return
- self.__reactions = reactions.copy() if reactions is not None else []
- await self.__update_if_sent()
-
- async def add_reaction(self, reaction: BotMessageReaction) -> None:
- """
- Adds one BotMessageReaction to this message. If a reaction already
- exists for the given emoji it is replaced with the new one. If the
- message has been sent, it will be updated.
- """
- # Alias for update. Makes for clearer intent.
- await self.update_reaction(reaction)
-
- async def update_reaction(self, reaction: BotMessageReaction) -> None:
- """
- Updates or adds a BotMessageReaction. If the message has been sent, it
- will be updated.
- """
- found = False
- for i in range(len(self.__reactions)):
- existing = self.__reactions[i]
- if existing.emoji == reaction.emoji:
- if reaction == self.__reactions[i]:
- # No change
- return
- self.__reactions[i] = reaction
- found = True
- break
- if not found:
- self.__reactions.append(reaction)
- await self.__update_if_sent()
-
- async def remove_reaction(self, reaction_or_emoji) -> None:
- """
- Removes a reaction. Can pass either a BotMessageReaction or just the
- emoji string. If the message has been sent, it will be updated.
- """
- for i in range(len(self.__reactions)):
- existing = self.__reactions[i]
- if (isinstance(reaction_or_emoji, str) and existing.emoji == reaction_or_emoji) or \
- (isinstance(reaction_or_emoji, BotMessageReaction) and existing.emoji == reaction_or_emoji.emoji):
- self.__reactions.pop(i)
- await self.__update_if_sent()
- return
-
- def reaction_for_emoji(self, emoji) -> BotMessageReaction:
- for reaction in self.__reactions:
- if isinstance(emoji, PartialEmoji) and reaction.emoji == emoji.name:
- return reaction
- elif isinstance(emoji, str) and reaction.emoji == emoji:
- return reaction
- return None
-
- async def __update_if_sent(self) -> None:
- if self.__message:
- await self._update()
-
- async def _update(self) -> None:
- content: str = self.__formatted_message()
- if self.__message:
- if content != self.__posted_text:
- await self.__message.edit(content=content)
- self.__posted_text = content
- else:
- if self.__reply_to:
- self.__message = await self.__reply_to.reply(content=content, mention_author=False)
- self.__posted_text = content
- else:
- channel_id = Storage.get_config_value(self.guild, ConfigKey.WARNING_CHANNEL_ID)
- if channel_id is None:
- BaseCog.guild_trace(self.guild, 'No warning channel set! No warning issued.')
- return
- channel: TextChannel = self.guild.get_channel(channel_id)
- if channel is None:
- BaseCog.guild_trace(self.guild, 'Configured warning channel does not exist!')
- return
- self.__message = await channel.send(content=content)
- self.__posted_text = content
- emoji_to_remove = self.__posted_emoji.copy()
- for reaction in self.__reactions:
- if reaction.is_enabled:
- if reaction.emoji not in self.__posted_emoji:
- await self.__message.add_reaction(reaction.emoji)
- self.__posted_emoji.add(reaction.emoji)
- if reaction.emoji in emoji_to_remove:
- emoji_to_remove.remove(reaction.emoji)
- for emoji in emoji_to_remove:
- await self.__message.clear_reaction(emoji)
- if emoji in self.__posted_emoji:
- self.__posted_emoji.remove(emoji)
-
- def __formatted_message(self) -> str:
- s: str = ''
-
- if self.type == self.TYPE_INFO:
- s += CONFIG['info_emoji'] + ' '
- elif self.type == self.TYPE_MOD_WARNING:
- mention: str = Storage.get_config_value(self.guild, ConfigKey.WARNING_MENTION)
- if mention:
- s += mention + ' '
- s += CONFIG['warning_emoji'] + ' '
- elif self.type == self.TYPE_SUCCESS:
- s += CONFIG['success_emoji'] + ' '
- elif self.type == self.TYPE_FAILURE:
- s += CONFIG['failure_emoji'] + ' '
-
- s += self.text
-
- if self.quote:
- s += f'\n\n> {self.quote}'
-
- if len(self.__reactions) > 0:
- s += '\n\nAvailable actions:'
- for reaction in self.__reactions:
- if reaction.is_enabled:
- s += f'\n {reaction.emoji} {reaction.description}'
- else:
- s += f'\n {reaction.description}'
-
- return s
-
- class BaseCog(commands.Cog):
- def __init__(self, bot):
- self.bot = bot
-
- # Config
-
- @classmethod
- def get_cog_default(cls, key: str):
- """
- Convenience method for getting a cog configuration default from
- `CONFIG['cogs'][<cogname>][<key>]`.
- """
- cogs: dict = CONFIG['cog_defaults']
- cog = cogs.get(cls.__name__)
- if cog is None:
- return None
- return cog.get(key)
-
- # Bot message handling
-
- @classmethod
- def __bot_messages(cls, guild: Guild) -> AgeBoundDict:
- bm = Storage.get_state_value(guild, 'bot_messages')
- if bm is None:
- far_future = datetime.utcnow() + timedelta(days=1000)
- bm = AgeBoundDict(timedelta(seconds=600),
- lambda k, v : v.message_sent_at() or far_future)
- Storage.set_state_value(guild, 'bot_messages', bm)
- return bm
-
- @classmethod
- async def post_message(cls, message: BotMessage) -> bool:
- await message._update()
- guild_messages = cls.__bot_messages(message.guild)
- if message.is_sent():
- guild_messages[message.message_id()] = message
- return True
- return False
-
- @commands.Cog.listener()
- async def on_raw_reaction_add(self, payload: RawReactionActionEvent):
- 'Event handler'
- if payload.user_id == self.bot.user.id:
- # Ignore bot's own reactions
- return
- member: Member = payload.member
- if member is None:
- return
- guild: Guild = self.bot.get_guild(payload.guild_id)
- if guild is None:
- # Possibly a DM
- return
- channel: GuildChannel = guild.get_channel(payload.channel_id)
- if channel is None:
- # Possibly a DM
- return
- message: Message = await channel.fetch_message(payload.message_id)
- if message is None:
- # Message deleted?
- return
- if message.author.id != self.bot.user.id:
- # Bot didn't author this
- return
- guild_messages = self.__bot_messages(guild)
- bot_message = guild_messages.get(message.id)
- if bot_message is None:
- # Unknown message (expired or was never tracked)
- return
- reaction = bot_message.reaction_for_emoji(payload.emoji)
- if reaction is None or not reaction.is_enabled:
- # Can't use this reaction with this message
- return
- if not member.permissions_in(channel).ban_members:
- # Not a mod (could make permissions configurable per BotMessageReaction some day)
- return
- await self.on_mod_react(bot_message, reaction, member)
-
- async def on_mod_react(self,
- bot_message: BotMessage,
- reaction: BotMessageReaction,
- reacted_by: Member) -> None:
- """
- Subclass override point for receiving mod reactions to bot messages sent
- via `post_message()`.
- """
- pass
-
- @classmethod
- async def validate_param(cls, context: commands.Context, param_name: str, value,
- allowed_types: tuple = None,
- min_value = None,
- max_value = None) -> bool:
- """
- Convenience method for validating a command parameter is of the expected
- type and in the expected range. Bad values will cause a reply to be sent
- to the original message and a False will be returned. If all checks
- succeed, True will be returned.
- """
- # TODO: Rework this to use BotMessage
- if allowed_types is not None and not isinstance(value, allowed_types):
- if len(allowed_types) == 1:
- await context.message.reply(f'⚠️ `{param_name}` must be of type ' +
- f'{allowed_types[0]}.', mention_author=False)
- else:
- await context.message.reply(f'⚠️ `{param_name}` must be of types ' +
- f'{allowed_types}.', mention_author=False)
- return False
- if min_value is not None and value < min_value:
- await context.message.reply(f'⚠️ `{param_name}` must be >= {min_value}.',
- mention_author=False)
- return False
- if max_value is not None and value > max_value:
- await context.message.reply(f'⚠️ `{param_name}` must be <= {max_value}.',
- mention_author=False)
- return True
-
- @classmethod
- async def warn(cls, guild: Guild, message: str) -> Message:
- """
- Sends a warning message to the configured warning channel for the
- given guild. If no warning channel is configured no action is taken.
- Returns the Message if successful or None if not.
- """
- channel_id = Storage.get_config_value(guild, ConfigKey.WARNING_CHANNEL_ID)
- if channel_id is None:
- cls.guild_trace(guild, 'No warning channel set! No warning issued.')
- return None
- channel: TextChannel = guild.get_channel(channel_id)
- if channel is None:
- cls.guild_trace(guild, 'Configured warning channel does not exist!')
- return None
- mention: str = Storage.get_config_value(guild, ConfigKey.WARNING_MENTION)
- text: str = message
- if mention is not None:
- text = f'{mention} {text}'
- msg: Message = await channel.send(text)
- return msg
-
- @classmethod
- async def update_warn(cls, warn_message: Message, new_text: str) -> None:
- """
- Updates the text of a previously posted `warn`. Includes configured
- mentions if necessary.
- """
- text: str = new_text
- mention: str = Storage.get_config_value(
- warn_message.guild,
- ConfigKey.WARNING_MENTION)
- if mention is not None:
- text = f'{mention} {text}'
- await warn_message.edit(content=text)
-
- @classmethod
- def guild_trace(cls, guild: Guild, message: str) -> None:
- print(f'[guild {guild.id}|{guild.name}] {message}')
|