""" Cog for matching messages against guild-configurable criteria and taking automated actions on them. """ import re from datetime import datetime from typing import Optional from discord import Guild, Member, Message, utils as discordutils, Permissions, Interaction from discord.app_commands import Choice, Group, autocomplete, rename from discord.ext.commands import Cog from config import CONFIG from rocketbot.bot import Rocketbot from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction from rocketbot.cogsetting import CogSetting from rocketbot.pattern import PatternCompiler, PatternDeprecationError, \ PatternError, PatternStatement from rocketbot.storage import Storage from rocketbot.utils import dump_stacktrace, MOD_PERMISSIONS class PatternContext: """ Data about a message that has matched a configured statement and what actions have been carried out. """ def __init__(self, message: Message, statement: PatternStatement): self.message = message self.statement = statement self.is_deleted = False self.is_kicked = False self.is_banned = False async def pattern_name_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: choices: list[Choice[str]] = [] try: if interaction.guild is None: return [] patterns: dict[str, PatternStatement] = PatternCog.shared.get_patterns(interaction.guild) current_normal = current.lower().strip() for name in sorted(patterns.keys()): if len(current_normal) == 0 or current_normal.startswith(name.lower()): choices.append(Choice(name=name, value=name)) except BaseException as e: dump_stacktrace(e) return choices async def action_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: # FIXME: WORK IN PROGRESS print(f'autocomplete action - current = "{current}"') regex = re.compile('^(.*?)([a-zA-Z]+)$') match: Optional[re.Match[str]] = regex.match(current) initial: str = '' stub: str = current if match: initial = match.group(1).strip() stub = match.group(2) if PatternCompiler.ACTION_TO_ARGS.get(stub, None) is not None: # Matches perfectly. Suggest another instead of completing the current. initial = current.strip() + ', ' stub = '' print(f'initial = "{initial}", stub = "{stub}"') options: list[Choice[str]] = [] for action in sorted(PatternCompiler.ACTION_TO_ARGS.keys()): if len(stub) == 0 or action.startswith(stub.lower()): arg_types = PatternCompiler.ACTION_TO_ARGS[action] arg_type_strs = [] for arg_type in arg_types: if arg_type == PatternCompiler.TYPE_TEXT: arg_type_strs.append('"message"') else: raise ValueError(f'Argument type {arg_type} not yet supported') suffix = '' if len(arg_type_strs) == 0 else ' ' + (' '.join(arg_type_strs)) options.append(Choice(name=action, value=f'{initial.strip()} {action}{suffix}')) return options async def priority_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: return [ Choice(name='very low (50)', value=50), Choice(name='low (75)', value=75), Choice(name='normal (100)', value=100), Choice(name='high (125)', value=125), Choice(name='very high (150)', value=150), ] class PatternCog(BaseCog, name='Pattern Matching'): """ Highly flexible cog for performing various actions on messages that match various criteria. Patterns can be defined by mods for each guild. """ SETTING_PATTERNS = CogSetting('patterns', None, default_value=None) shared: Optional['PatternCog'] = None def __init__(self, bot: Rocketbot): super().__init__( bot, config_prefix='patterns', short_description='Manages message pattern matching.', long_description='Patterns are a powerful but complex topic. See for full documentation.' ) PatternCog.shared = self def get_patterns(self, guild: Guild) -> dict[str, PatternStatement]: """ Returns a name -> PatternStatement lookup for the guild. """ patterns: dict[str, PatternStatement] = Storage.get_state_value(guild, 'PatternCog.patterns') if patterns is None: jsons: list[dict] = self.get_guild_setting(guild, self.SETTING_PATTERNS) or [] pattern_list: list[PatternStatement] = [] for json in jsons: try: ps = PatternStatement.from_json(json) pattern_list.append(ps) try: ps.check_deprecations() except PatternDeprecationError as e: self.log(guild, f'Pattern {ps.name}: {e}') except PatternError as e: self.log(guild, f'Error decoding pattern "{json["name"]}": {e}') patterns = { p.name:p for p in pattern_list} Storage.set_state_value(guild, 'PatternCog.patterns', patterns) return patterns @classmethod def __save_patterns(cls, guild: Guild, patterns: dict[str, PatternStatement]) -> None: to_save: list[dict] = list(map(lambda ps: ps.to_json(), patterns.values())) cls.set_guild_setting(guild, cls.SETTING_PATTERNS, to_save) @classmethod def __get_last_matched(cls, guild: Guild, name: str) -> Optional[datetime]: last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched') if last_matched: return last_matched.get(name) return None @classmethod def __set_last_matched(cls, guild: Guild, name: str, time: datetime) -> None: last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched') if last_matched is None: last_matched = {} Storage.set_state_value(guild, 'PatternCog.last_matched', last_matched) last_matched[name] = time @Cog.listener() async def on_message(self, message: Message) -> None: """Event listener""" if message.author is None or \ message.author.bot or \ message.channel is None or \ message.guild is None or \ message.content is None or \ message.content == '': return if message.channel.permissions_for(message.author).ban_members: # Ignore mods return patterns = self.get_patterns(message.guild) for statement in sorted(patterns.values(), key=lambda s : s.priority, reverse=True): other_fields = { 'last_matched': self.__get_last_matched(message.guild, statement.name), } if statement.expression.matches(message, other_fields): self.__set_last_matched(message.guild, statement.name, message.created_at) await self.__trigger_actions(message, statement) break async def __trigger_actions(self, message: Message, statement: PatternStatement) -> None: context = PatternContext(message, statement) should_post_message = False message_type: int = BotMessage.TYPE_DEFAULT action_descriptions = [] self.log(message.guild, f'Message from {message.author.name} matched ' + \ f'pattern "{statement.name}"') for action in statement.actions: if action.action == 'ban': await message.author.ban( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{statement.name}"', delete_message_days=0) context.is_banned = True context.is_kicked = True action_descriptions.append('Author banned') self.log(message.guild, f'{message.author.name} banned') elif action.action == 'delete': await message.delete() context.is_deleted = True action_descriptions.append('Message deleted') self.log(message.guild, f'{message.author.name}\'s message deleted') elif action.action == 'kick': await message.author.kick( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{statement.name}"') context.is_kicked = True action_descriptions.append('Author kicked') self.log(message.guild, f'{message.author.name} kicked') elif action.action == 'modinfo': should_post_message = True message_type = BotMessage.TYPE_INFO action_descriptions.append('Message logged') elif action.action == 'modwarn': should_post_message = not self.was_warned_recently(message.author) message_type = BotMessage.TYPE_MOD_WARNING action_descriptions.append('Mods alerted') elif action.action == 'reply': await message.reply( f'{action.arguments[0]}', mention_author=False) action_descriptions.append('Autoreplied') self.log(message.guild, f'autoreplied to {message.author.name}') if should_post_message: bm = BotMessage( message.guild, f'User {message.author.name} tripped custom pattern ' + \ f'`{statement.name}` at {message.jump_url}.\n\n' + \ 'Automatic actions taken:\n• ' + ('\n• '.join(action_descriptions)), type=message_type, context=context) self.record_warning(message.author) bm.quote = discordutils.remove_markdown(message.clean_content) await bm.set_reactions(BotMessageReaction.standard_set( did_delete=context.is_deleted, did_kick=context.is_kicked, did_ban=context.is_banned)) await self.post_message(bm) async def on_mod_react(self, bot_message: BotMessage, reaction: BotMessageReaction, reacted_by: Member) -> None: context: PatternContext = bot_message.context if reaction.emoji == CONFIG['trash_emoji']: await context.message.delete() context.is_deleted = True elif reaction.emoji == CONFIG['kick_emoji']: await context.message.author.kick( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{context.statement.name}". Kicked by {reacted_by.name}.') context.is_kicked = True elif reaction.emoji == CONFIG['ban_emoji']: await context.message.author.ban( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{context.statement.name}". Banned by {reacted_by.name}.', delete_message_days=1) context.is_banned = True await bot_message.set_reactions(BotMessageReaction.standard_set( did_delete=context.is_deleted, did_kick=context.is_kicked, did_ban=context.is_banned)) pattern = Group( name='pattern', description='Manages message pattern matching.', guild_only=True, default_permissions=MOD_PERMISSIONS, extras={ 'long_description': 'Patterns are a powerful but complex topic. ' 'See for full documentation.' }, ) @pattern.command( description='Adds or updates a custom pattern.', extras={ 'long_description': 'Patterns use a simplified expression language. Full ' 'documentation found here: ' '', }, ) @autocomplete( name=pattern_name_autocomplete, # actions=action_autocomplete ) async def add( self, interaction: Interaction, name: str, actions: str, expression: str ) -> None: """ Adds a custom pattern. Parameters ---------- interaction : Interaction name : str a name for the new or existing pattern actions : str actions to take when a message matches expression : str criteria for matching chat messages """ pattern_str = f'{actions} if {expression}' guild = interaction.guild try: statement = PatternCompiler.parse_statement(name, pattern_str) statement.check_deprecations() patterns = self.get_patterns(guild) patterns[name] = statement self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Pattern `{name}` added.', ephemeral=True, ) except PatternError as e: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} Error parsing statement. {e}', ephemeral=True, ) @pattern.command( description='Removes a custom pattern.', extras={ 'usage': '', }, ) @autocomplete(name=pattern_name_autocomplete) async def remove(self, interaction: Interaction, name: str): """ Removes a custom pattern. Parameters ---------- interaction: Interaction name: str name of the pattern to remove """ guild = interaction.guild patterns = self.get_patterns(guild) if patterns.get(name) is not None: del patterns[name] self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Pattern `{name}` deleted.', ephemeral=True, ) else: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} No pattern named `{name}`.', ephemeral=True, ) @pattern.command( description='Lists all patterns.', ) async def list(self, interaction: Interaction) -> None: guild = interaction.guild patterns = self.get_patterns(guild) if len(patterns) == 0: await interaction.response.send_message( 'No patterns defined.', ephemeral=True, ) return msg = '' for name, statement in sorted(patterns.items()): msg += f'Pattern `{name}` (priority={statement.priority}):\n```\n{statement.original}\n```\n' await interaction.response.send_message(msg, ephemeral=True) @pattern.command( description="Sets a pattern's priority level.", extras={ 'long_description': 'Messages are checked against patterns with the ' 'highest priority first. Patterns with the same ' 'priority may be checked in arbitrary order. Default ' 'priority is 100.', }, ) @autocomplete(name=pattern_name_autocomplete, priority=priority_autocomplete) async def setpriority(self, interaction: Interaction, name: str, priority: int) -> None: """ Sets a pattern's priority level. Parameters ---------- interaction: Interaction name: str the name of the pattern priority: int evaluation priority """ guild = interaction.guild patterns = self.get_patterns(guild) statement = patterns.get(name) if statement is None: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} No such pattern `{name}`', ephemeral=True, ) return statement.priority = priority self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Priority for pattern `{name}` ' + \ f'updated to `{priority}`.', ephemeral=True, )