""" Cog for matching messages against guild-configurable criteria and taking automated actions on them. """ from datetime import datetime from typing import Optional, Literal from discord import Guild, Member, Message, utils as discordutils, Permissions, Interaction from discord.app_commands import Group, rename from discord.ext import commands from config import CONFIG from rocketbot.bot import Rocketbot from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction from rocketbot.cogsetting import CogSetting from rocketbot.pattern import PatternCompiler, PatternDeprecationError, \ PatternError, PatternStatement from rocketbot.storage import Storage class PatternContext: """ Data about a message that has matched a configured statement and what actions have been carried out. """ def __init__(self, message: Message, statement: PatternStatement): self.message = message self.statement = statement self.is_deleted = False self.is_kicked = False self.is_banned = False class PatternCog(BaseCog, name='Pattern Matching'): """ Highly flexible cog for performing various actions on messages that match various criteria. Patterns can be defined by mods for each guild. """ SETTING_PATTERNS = CogSetting('patterns', None) def __init__(self, bot: Rocketbot): super().__init__( bot, config_prefix='patterns', name='patterns', short_description='Manages message pattern matching.', ) def __get_patterns(self, guild: Guild) -> dict[str, PatternStatement]: """ Returns a name -> PatternStatement lookup for the guild. """ patterns: dict[str, PatternStatement] = Storage.get_state_value(guild, 'PatternCog.patterns') if patterns is None: jsons: list[dict] = self.get_guild_setting(guild, self.SETTING_PATTERNS) or [] pattern_list: list[PatternStatement] = [] for json in jsons: try: ps = PatternStatement.from_json(json) pattern_list.append(ps) try: ps.check_deprecations() except PatternDeprecationError as e: self.log(guild, f'Pattern {ps.name}: {e}') except PatternError as e: self.log(guild, f'Error decoding pattern "{json["name"]}": {e}') patterns = { p.name:p for p in pattern_list} Storage.set_state_value(guild, 'PatternCog.patterns', patterns) return patterns @classmethod def __save_patterns(cls, guild: Guild, patterns: dict[str, PatternStatement]) -> None: to_save: list[dict] = list(map(lambda ps: ps.to_json(), patterns.values())) cls.set_guild_setting(guild, cls.SETTING_PATTERNS, to_save) @classmethod def __get_last_matched(cls, guild: Guild, name: str) -> Optional[datetime]: last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched') if last_matched: return last_matched.get(name) return None @classmethod def __set_last_matched(cls, guild: Guild, name: str, time: datetime) -> None: last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched') if last_matched is None: last_matched = {} Storage.set_state_value(guild, 'PatternCog.last_matched', last_matched) last_matched[name] = time @commands.Cog.listener() async def on_message(self, message: Message) -> None: """Event listener""" if message.author is None or \ message.author.bot or \ message.channel is None or \ message.guild is None or \ message.content is None or \ message.content == '': return if message.channel.permissions_for(message.author).ban_members: # Ignore mods return patterns = self.__get_patterns(message.guild) for statement in sorted(patterns.values(), key=lambda s : s.priority, reverse=True): other_fields = { 'last_matched': self.__get_last_matched(message.guild, statement.name), } if statement.expression.matches(message, other_fields): self.__set_last_matched(message.guild, statement.name, message.created_at) await self.__trigger_actions(message, statement) break async def __trigger_actions(self, message: Message, statement: PatternStatement) -> None: context = PatternContext(message, statement) should_post_message = False message_type: int = BotMessage.TYPE_DEFAULT action_descriptions = [] self.log(message.guild, f'Message from {message.author.name} matched ' + \ f'pattern "{statement.name}"') for action in statement.actions: if action.action == 'ban': await message.author.ban( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{statement.name}"', delete_message_days=0) context.is_banned = True context.is_kicked = True action_descriptions.append('Author banned') self.log(message.guild, f'{message.author.name} banned') elif action.action == 'delete': await message.delete() context.is_deleted = True action_descriptions.append('Message deleted') self.log(message.guild, f'{message.author.name}\'s message deleted') elif action.action == 'kick': await message.author.kick( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{statement.name}"') context.is_kicked = True action_descriptions.append('Author kicked') self.log(message.guild, f'{message.author.name} kicked') elif action.action == 'modinfo': should_post_message = True message_type = BotMessage.TYPE_INFO action_descriptions.append('Message logged') elif action.action == 'modwarn': should_post_message = not self.was_warned_recently(message.author) message_type = BotMessage.TYPE_MOD_WARNING action_descriptions.append('Mods alerted') elif action.action == 'reply': await message.reply( f'{action.arguments[0]}', mention_author=False) action_descriptions.append('Autoreplied') self.log(message.guild, f'autoreplied to {message.author.name}') if should_post_message: bm = BotMessage( message.guild, f'User {message.author.name} tripped custom pattern ' + \ f'`{statement.name}` at {message.jump_url}.\n\n' + \ 'Automatic actions taken:\n• ' + ('\n• '.join(action_descriptions)), type=message_type, context=context) self.record_warning(message.author) bm.quote = discordutils.remove_markdown(message.clean_content) await bm.set_reactions(BotMessageReaction.standard_set( did_delete=context.is_deleted, did_kick=context.is_kicked, did_ban=context.is_banned)) await self.post_message(bm) async def on_mod_react(self, bot_message: BotMessage, reaction: BotMessageReaction, reacted_by: Member) -> None: context: PatternContext = bot_message.context if reaction.emoji == CONFIG['trash_emoji']: await context.message.delete() context.is_deleted = True elif reaction.emoji == CONFIG['kick_emoji']: await context.message.author.kick( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{context.statement.name}". Kicked by {reacted_by.name}.') context.is_kicked = True elif reaction.emoji == CONFIG['ban_emoji']: await context.message.author.ban( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{context.statement.name}". Banned by {reacted_by.name}.', delete_message_days=1) context.is_banned = True await bot_message.set_reactions(BotMessageReaction.standard_set( did_delete=context.is_deleted, did_kick=context.is_kicked, did_ban=context.is_banned)) spattern = Group( name='pattern', description='Manages message pattern matching.', guild_only=True, default_permissions=Permissions(Permissions.manage_messages.flag), ) @spattern.command() @rename(expression='if') async def test( self, interaction: Interaction, actions: str, expression: str ) -> None: vals = [ actions, expression ] arg_list = '' for arg in vals: if arg is not None: arg_list += f'- "`{arg}`"\n' await interaction.response.send_message( "Got /pattern test call with arguments\n" + arg_list, ephemeral=True, ) @commands.group( brief='Manages message pattern matching', ) @commands.has_permissions(ban_members=True) @commands.guild_only() async def pattern(self, context: commands.Context): """Message pattern matching command group""" if context.invoked_subcommand is None: await context.send_help() @pattern.command( brief='Adds a custom pattern', description='Adds a custom pattern. Patterns use a simplified ' + \ 'expression language. Full documentation found here: ' + \ 'https://git.rixafrix.com/ialbert/python-app-rocketbot/src/' + \ 'branch/main/docs/patterns.md', usage=' ', ignore_extra=True ) async def add(self, context: commands.Context, name: str): """Command handler""" pattern_str = PatternCompiler.expression_str_from_context(context, name) try: statement = PatternCompiler.parse_statement(name, pattern_str) statement.check_deprecations() patterns = self.__get_patterns(context.guild) patterns[name] = statement self.__save_patterns(context.guild, patterns) await context.message.reply( f'{CONFIG["success_emoji"]} Pattern `{name}` added.', mention_author=False) except PatternError as e: await context.message.reply( f'{CONFIG["failure_emoji"]} Error parsing statement. {e}', mention_author=False) @pattern.command( brief='Removes a custom pattern', usage='' ) async def remove(self, context: commands.Context, name: str): """Command handler""" patterns = self.__get_patterns(context.guild) if patterns.get(name) is not None: del patterns[name] self.__save_patterns(context.guild, patterns) await context.message.reply( f'{CONFIG["success_emoji"]} Pattern `{name}` deleted.', mention_author=False) else: await context.message.reply( f'{CONFIG["failure_emoji"]} No pattern named `{name}`.', mention_author=False) @pattern.command( brief='Lists all patterns' ) async def list(self, context: commands.Context) -> None: """Command handler""" patterns = self.__get_patterns(context.guild) if len(patterns) == 0: await context.message.reply('No patterns defined.', mention_author=False) return msg = '' for name, statement in sorted(patterns.items()): msg += f'Pattern `{name}` (priority={statement.priority}):\n```\n{statement.original}\n```\n' await context.message.reply(msg, mention_author=False) @pattern.command( brief='Sets a pattern\'s priority level', description='Sets the priority for a pattern. Messages are checked ' + 'against patterns with the highest priority first. Patterns with ' + 'the same priority may be checked in arbitrary order. Default ' + 'priority is 100.', ) async def setpriority(self, context: commands.Context, name: str, priority: int) -> None: """Command handler""" patterns = self.__get_patterns(context.guild) statement = patterns.get(name) if statement is None: await context.message.reply( f'{CONFIG["failure_emoji"]} No such pattern `{name}`', mention_author=False) return statement.priority = priority self.__save_patterns(context.guild, patterns) await context.message.reply( f'{CONFIG["success_emoji"]} Priority for pattern `{name}` ' + \ f'updated to `{priority}`.', mention_author=False)