""" Cog for matching messages against guild-configurable criteria and taking automated actions on them. """ import re from datetime import datetime from typing import Optional from discord import Guild, Member, Message, utils as discordutils, Permissions, Interaction from discord.app_commands import Choice, Group, autocomplete, rename from discord.ext.commands import Cog from config import CONFIG from rocketbot.bot import Rocketbot from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction from rocketbot.cogsetting import CogSetting from rocketbot.pattern import PatternCompiler, PatternDeprecationError, \ PatternError, PatternStatement from rocketbot.storage import Storage from rocketbot.utils import dump_stacktrace, MOD_PERMISSIONS class PatternContext: """ Data about a message that has matched a configured statement and what actions have been carried out. """ def __init__(self, message: Message, statement: PatternStatement): self.message = message self.statement = statement self.is_deleted = False self.is_kicked = False self.is_banned = False async def pattern_name_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: choices: list[Choice[str]] = [] try: if interaction.guild is None: return [] patterns: dict[str, PatternStatement] = PatternCog.shared.get_patterns(interaction.guild) current_normal = current.lower().strip() for name in sorted(patterns.keys()): if len(current_normal) == 0 or current_normal.startswith(name.lower()): choices.append(Choice(name=name, value=name)) except BaseException as e: dump_stacktrace(e) return choices async def action_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: # FIXME: WORK IN PROGRESS print(f'autocomplete action - current = "{current}"') regex = re.compile('^(.*?)([a-zA-Z]+)$') match: Optional[re.Match[str]] = regex.match(current) initial: str = '' stub: str = current if match: initial = match.group(1).strip() stub = match.group(2) if PatternCompiler.ACTION_TO_ARGS.get(stub, None) is not None: # Matches perfectly. Suggest another instead of completing the current. initial = current.strip() + ', ' stub = '' print(f'initial = "{initial}", stub = "{stub}"') options: list[Choice[str]] = [] for action in sorted(PatternCompiler.ACTION_TO_ARGS.keys()): if len(stub) == 0 or action.startswith(stub.lower()): arg_types = PatternCompiler.ACTION_TO_ARGS[action] arg_type_strs = [] for arg_type in arg_types: if arg_type == PatternCompiler.TYPE_TEXT: arg_type_strs.append('"message"') else: raise ValueError(f'Argument type {arg_type} not yet supported') suffix = '' if len(arg_type_strs) == 0 else ' ' + (' '.join(arg_type_strs)) options.append(Choice(name=action, value=f'{initial.strip()} {action}{suffix}')) return options async def priority_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: return [ Choice(name='very low (50)', value=50), Choice(name='low (75)', value=75), Choice(name='normal (100)', value=100), Choice(name='high (125)', value=125), Choice(name='very high (150)', value=150), ] class PatternCog(BaseCog, name='Pattern Matching'): """ Highly flexible cog for performing various actions on messages that match various criteria. Patterns can be defined by mods for each guild. """ SETTING_PATTERNS = CogSetting('patterns', None) shared: Optional['PatternCog'] = None def __init__(self, bot: Rocketbot): super().__init__( bot, config_prefix='patterns', name='patterns', short_description='Manages message pattern matching.', ) PatternCog.shared = self def get_patterns(self, guild: Guild) -> dict[str, PatternStatement]: """ Returns a name -> PatternStatement lookup for the guild. """ patterns: dict[str, PatternStatement] = Storage.get_state_value(guild, 'PatternCog.patterns') if patterns is None: jsons: list[dict] = self.get_guild_setting(guild, self.SETTING_PATTERNS) or [] pattern_list: list[PatternStatement] = [] for json in jsons: try: ps = PatternStatement.from_json(json) pattern_list.append(ps) try: ps.check_deprecations() except PatternDeprecationError as e: self.log(guild, f'Pattern {ps.name}: {e}') except PatternError as e: self.log(guild, f'Error decoding pattern "{json["name"]}": {e}') patterns = { p.name:p for p in pattern_list} Storage.set_state_value(guild, 'PatternCog.patterns', patterns) return patterns @classmethod def __save_patterns(cls, guild: Guild, patterns: dict[str, PatternStatement]) -> None: to_save: list[dict] = list(map(lambda ps: ps.to_json(), patterns.values())) cls.set_guild_setting(guild, cls.SETTING_PATTERNS, to_save) @classmethod def __get_last_matched(cls, guild: Guild, name: str) -> Optional[datetime]: last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched') if last_matched: return last_matched.get(name) return None @classmethod def __set_last_matched(cls, guild: Guild, name: str, time: datetime) -> None: last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched') if last_matched is None: last_matched = {} Storage.set_state_value(guild, 'PatternCog.last_matched', last_matched) last_matched[name] = time @Cog.listener() async def on_message(self, message: Message) -> None: """Event listener""" if message.author is None or \ message.author.bot or \ message.channel is None or \ message.guild is None or \ message.content is None or \ message.content == '': return if message.channel.permissions_for(message.author).ban_members: # Ignore mods return patterns = self.get_patterns(message.guild) for statement in sorted(patterns.values(), key=lambda s : s.priority, reverse=True): other_fields = { 'last_matched': self.__get_last_matched(message.guild, statement.name), } if statement.expression.matches(message, other_fields): self.__set_last_matched(message.guild, statement.name, message.created_at) await self.__trigger_actions(message, statement) break async def __trigger_actions(self, message: Message, statement: PatternStatement) -> None: context = PatternContext(message, statement) should_post_message = False message_type: int = BotMessage.TYPE_DEFAULT action_descriptions = [] self.log(message.guild, f'Message from {message.author.name} matched ' + \ f'pattern "{statement.name}"') for action in statement.actions: if action.action == 'ban': await message.author.ban( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{statement.name}"', delete_message_days=0) context.is_banned = True context.is_kicked = True action_descriptions.append('Author banned') self.log(message.guild, f'{message.author.name} banned') elif action.action == 'delete': await message.delete() context.is_deleted = True action_descriptions.append('Message deleted') self.log(message.guild, f'{message.author.name}\'s message deleted') elif action.action == 'kick': await message.author.kick( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{statement.name}"') context.is_kicked = True action_descriptions.append('Author kicked') self.log(message.guild, f'{message.author.name} kicked') elif action.action == 'modinfo': should_post_message = True message_type = BotMessage.TYPE_INFO action_descriptions.append('Message logged') elif action.action == 'modwarn': should_post_message = not self.was_warned_recently(message.author) message_type = BotMessage.TYPE_MOD_WARNING action_descriptions.append('Mods alerted') elif action.action == 'reply': await message.reply( f'{action.arguments[0]}', mention_author=False) action_descriptions.append('Autoreplied') self.log(message.guild, f'autoreplied to {message.author.name}') if should_post_message: bm = BotMessage( message.guild, f'User {message.author.name} tripped custom pattern ' + \ f'`{statement.name}` at {message.jump_url}.\n\n' + \ 'Automatic actions taken:\n• ' + ('\n• '.join(action_descriptions)), type=message_type, context=context) self.record_warning(message.author) bm.quote = discordutils.remove_markdown(message.clean_content) await bm.set_reactions(BotMessageReaction.standard_set( did_delete=context.is_deleted, did_kick=context.is_kicked, did_ban=context.is_banned)) await self.post_message(bm) async def on_mod_react(self, bot_message: BotMessage, reaction: BotMessageReaction, reacted_by: Member) -> None: context: PatternContext = bot_message.context if reaction.emoji == CONFIG['trash_emoji']: await context.message.delete() context.is_deleted = True elif reaction.emoji == CONFIG['kick_emoji']: await context.message.author.kick( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{context.statement.name}". Kicked by {reacted_by.name}.') context.is_kicked = True elif reaction.emoji == CONFIG['ban_emoji']: await context.message.author.ban( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{context.statement.name}". Banned by {reacted_by.name}.', delete_message_days=1) context.is_banned = True await bot_message.set_reactions(BotMessageReaction.standard_set( did_delete=context.is_deleted, did_kick=context.is_kicked, did_ban=context.is_banned)) pattern = Group( name='pattern', description='Manages message pattern matching.', guild_only=True, default_permissions=MOD_PERMISSIONS, ) @pattern.command() @rename(expression='if') @autocomplete( name=pattern_name_autocomplete, # actions=action_autocomplete ) async def add( self, interaction: Interaction, name: str, actions: str, expression: str ) -> None: """ Adds a custom pattern. Adds a custom pattern. Patterns use a simplified expression language. Full documentation found here: https://git.rixafrix.com/ialbert/python-app-rocketbot/src/branch/main/docs/patterns.md Parameters ---------- interaction : Interaction name : str A name for the pattern. actions : str One or more actions to take when a message matches the expression. expression : str Criteria for matching chat messages. """ pattern_str = f'{actions} if {expression}' guild = interaction.guild try: statement = PatternCompiler.parse_statement(name, pattern_str) statement.check_deprecations() patterns = self.get_patterns(guild) patterns[name] = statement self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Pattern `{name}` added.', ephemeral=True, ) except PatternError as e: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} Error parsing statement. {e}', ephemeral=True, ) @pattern.command( description='Removes a custom pattern', extras={ 'usage': '', }, ) @autocomplete(name=pattern_name_autocomplete) async def remove(self, interaction: Interaction, name: str): """ Command handler Parameters ---------- interaction: Interaction name: str Name of the pattern to remove. """ guild = interaction.guild patterns = self.get_patterns(guild) if patterns.get(name) is not None: del patterns[name] self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Pattern `{name}` deleted.', ephemeral=True, ) else: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} No pattern named `{name}`.', ephemeral=True, ) @pattern.command( description='Lists all patterns', ) async def list(self, interaction: Interaction) -> None: """ Command handler Parameters ---------- interaction: Interaction """ guild = interaction.guild patterns = self.get_patterns(guild) if len(patterns) == 0: await interaction.response.send_message( 'No patterns defined.', ephemeral=True, ) return msg = '' for name, statement in sorted(patterns.items()): msg += f'Pattern `{name}` (priority={statement.priority}):\n```\n{statement.original}\n```\n' await interaction.response.send_message(msg, ephemeral=True) @pattern.command( description='Sets a pattern\'s priority level', extras={ 'long_description': 'Sets the priority for a pattern. Messages are checked ' + 'against patterns with the highest priority first. Patterns with ' + 'the same priority may be checked in arbitrary order. Default ' + 'priority is 100.', }, ) @autocomplete(name=pattern_name_autocomplete, priority=priority_autocomplete) async def setpriority(self, interaction: Interaction, name: str, priority: int) -> None: """ Command handler Parameters ---------- interaction: Interaction name: str A name for the pattern priority: int Priority for evaluating the pattern. Default is 100. Higher values match first. """ guild = interaction.guild patterns = self.get_patterns(guild) statement = patterns.get(name) if statement is None: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} No such pattern `{name}`', ephemeral=True, ) return statement.priority = priority self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Priority for pattern `{name}` ' + \ f'updated to `{priority}`.', ephemeral=True, )