""" Cog for matching messages against guild-configurable criteria and taking automated actions on them. """ import re from datetime import datetime from typing import Optional from discord import Guild, Interaction, Member, Message from discord import utils as discordutils from discord.app_commands import Choice, Group, autocomplete from discord.ext.commands import Cog from config import CONFIG from rocketbot.bot import Rocketbot from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction from rocketbot.cogsetting import CogSetting from rocketbot.pattern import ( PatternCompiler, PatternDeprecationError, PatternError, PatternStatement, ) from rocketbot.storage import Storage from rocketbot.utils import MOD_PERMISSIONS, dump_stacktrace class PatternContext: """ Data about a message that has matched a configured statement and what actions have been carried out. """ def __init__(self, message: Message, statement: PatternStatement): self.message: Message = message self.statement: PatternStatement = statement self.is_deleted: bool = False self.is_kicked: bool = False self.is_banned: bool = False async def pattern_name_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: choices: list[Choice[str]] = [] try: if interaction.guild is None: return [] patterns: dict[str, PatternStatement] = PatternCog.shared.get_patterns(interaction.guild) current_normal = current.lower().strip() for name in sorted(patterns.keys()): if len(current_normal) == 0 or current_normal.startswith(name.lower()): choices.append(Choice(name=name, value=name)) except BaseException as e: dump_stacktrace(e) return choices async def action_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: # FIXME: WORK IN PROGRESS print(f'autocomplete action - current = "{current}"') regex = re.compile('^(.*?)([a-zA-Z]+)$') match: Optional[re.Match[str]] = regex.match(current) initial: str = '' stub: str = current if match: initial = match.group(1).strip() stub = match.group(2) if PatternCompiler.ACTION_TO_ARGS.get(stub, None) is not None: # Matches perfectly. Suggest another instead of completing the current. initial = current.strip() + ', ' stub = '' print(f'initial = "{initial}", stub = "{stub}"') options: list[Choice[str]] = [] for action in sorted(PatternCompiler.ACTION_TO_ARGS.keys()): if len(stub) == 0 or action.startswith(stub.lower()): arg_types = PatternCompiler.ACTION_TO_ARGS[action] arg_type_strs = [] for arg_type in arg_types: if arg_type == PatternCompiler.TYPE_TEXT: arg_type_strs.append('"message"') else: raise ValueError(f'Argument type {arg_type} not yet supported') suffix = '' if len(arg_type_strs) == 0 else ' ' + (' '.join(arg_type_strs)) options.append(Choice(name=action, value=f'{initial.strip()} {action}{suffix}')) return options async def priority_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]: return [ Choice(name='very low (50)', value=50), Choice(name='low (75)', value=75), Choice(name='normal (100)', value=100), Choice(name='high (125)', value=125), Choice(name='very high (150)', value=150), ] _long_help = \ """Patterns are a powerful but complex topic. See for full documentation. ### Quick cheat sheet > `/pattern add` _pattern\\_name_ _action\\_list_ `if` _expression_ - _pattern\\_name_ is a brief name for identifying the pattern later (not shown to user) - _action\\_list_ is a comma-delimited list of actions to take on matching messages and is any of: - `ban` - `delete` - `kick` - `modinfo` - logs a message but doesn't tag mods - `modwarn` - tags mods - `reply` "message text" - _expression_ determines which messages match, of the form _field_ _op_ _value_. - Fields: - `content.markdown`: string - `content.plain`: string - `author`: user - `author.id`: id - `author.joinage`: timespan - `author.name`: string - `lastmatched`: timespan - Operators: `==`, `!=`, `<`, `>`, `<=`, `>=`, `contains`, `!contains`, `matches`, `!matches`, `containsword`, `!containsword` - Can combine multiple expressions with `!`, `and`, `or`, and parentheses.""" class PatternCog(BaseCog, name='Pattern Matching'): """ Highly flexible cog for performing various actions on messages that match various criteria. Patterns can be defined by mods for each guild. """ SETTING_PATTERNS = CogSetting('patterns', None, default_value=None) shared: Optional['PatternCog'] = None def __init__(self, bot: Rocketbot): super().__init__( bot, config_prefix='patterns', short_description='Manages message pattern matching.', long_description=_long_help ) PatternCog.shared = self def get_patterns(self, guild: Guild) -> dict[str, PatternStatement]: """ Returns a name -> PatternStatement lookup for the guild. """ patterns: dict[str, PatternStatement] = Storage.get_state_value(guild, 'PatternCog.patterns') if patterns is None: jsons: list[dict] = self.get_guild_setting(guild, self.SETTING_PATTERNS) or [] pattern_list: list[PatternStatement] = [] for json in jsons: try: ps = PatternStatement.from_json(json) pattern_list.append(ps) try: ps.check_deprecations() except PatternDeprecationError as e: self.log(guild, f'Pattern {ps.name}: {e}') except PatternError as e: self.log(guild, f'Error decoding pattern "{json["name"]}": {e}') patterns = { p.name:p for p in pattern_list} Storage.set_state_value(guild, 'PatternCog.patterns', patterns) return patterns @classmethod def __save_patterns(cls, guild: Guild, patterns: dict[str, PatternStatement]) -> None: to_save: list[dict] = list(map(lambda ps: ps.to_json(), patterns.values())) cls.set_guild_setting(guild, cls.SETTING_PATTERNS, to_save) @classmethod def __get_last_matched(cls, guild: Guild, name: str) -> Optional[datetime]: last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched') if last_matched: return last_matched.get(name) return None @classmethod def __set_last_matched(cls, guild: Guild, name: str, time: datetime) -> None: last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched') if last_matched is None: last_matched = {} Storage.set_state_value(guild, 'PatternCog.last_matched', last_matched) last_matched[name] = time @Cog.listener() async def on_message(self, message: Message) -> None: """Event listener""" if message.author is None or \ message.author.bot or \ message.channel is None or \ message.guild is None or \ message.content is None or \ message.content == '': return if message.channel.permissions_for(message.author).ban_members: # Ignore mods return patterns = self.get_patterns(message.guild) for statement in sorted(patterns.values(), key=lambda s : s.priority, reverse=True): other_fields = { 'last_matched': self.__get_last_matched(message.guild, statement.name), } if statement.expression.matches(message, other_fields): self.__set_last_matched(message.guild, statement.name, message.created_at) await self.__trigger_actions(message, statement) break async def __trigger_actions(self, message: Message, statement: PatternStatement) -> None: context = PatternContext(message, statement) should_post_message = False message_type: int = BotMessage.TYPE_DEFAULT action_descriptions = [] self.log(message.guild, f'Message from {message.author.name} matched ' + \ f'pattern "{statement.name}"') for action in statement.actions: if action.action == 'ban': await message.author.ban( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{statement.name}"', delete_message_days=0) context.is_banned = True context.is_kicked = True action_descriptions.append('Author banned') self.log(message.guild, f'{message.author.name} banned') elif action.action == 'delete': await message.delete() context.is_deleted = True action_descriptions.append('Message deleted') self.log(message.guild, f'{message.author.name}\'s message deleted') elif action.action == 'kick': await message.author.kick( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{statement.name}"') context.is_kicked = True action_descriptions.append('Author kicked') self.log(message.guild, f'{message.author.name} kicked') elif action.action == 'modinfo': should_post_message = True message_type = BotMessage.TYPE_INFO action_descriptions.append('Message logged') elif action.action == 'modwarn': should_post_message = not self.was_warned_recently(message.author) message_type = BotMessage.TYPE_MOD_WARNING action_descriptions.append('Mods alerted') elif action.action == 'reply': await message.reply( f'{action.arguments[0]}', mention_author=False) action_descriptions.append('Autoreplied') self.log(message.guild, f'autoreplied to {message.author.name}') if should_post_message: bm = BotMessage( message.guild, f'User {message.author.name} tripped custom pattern ' + \ f'`{statement.name}` at {message.jump_url}.\n\n' + \ 'Automatic actions taken:\n• ' + ('\n• '.join(action_descriptions)), type=message_type, context=context) self.record_warning(message.author) bm.quote = discordutils.remove_markdown(message.clean_content) await bm.set_reactions(BotMessageReaction.standard_set( did_delete=context.is_deleted, did_kick=context.is_kicked, did_ban=context.is_banned)) await self.post_message(bm) async def on_mod_react(self, bot_message: BotMessage, reaction: BotMessageReaction, reacted_by: Member) -> None: context: PatternContext = bot_message.context if reaction.emoji == CONFIG['trash_emoji']: await context.message.delete() context.is_deleted = True elif reaction.emoji == CONFIG['kick_emoji']: await context.message.author.kick( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{context.statement.name}". Kicked by {reacted_by.name}.') context.is_kicked = True elif reaction.emoji == CONFIG['ban_emoji']: await context.message.author.ban( reason='Rocketbot: Message matched custom pattern named ' + \ f'"{context.statement.name}". Banned by {reacted_by.name}.', delete_message_days=1) context.is_banned = True await bot_message.set_reactions(BotMessageReaction.standard_set( did_delete=context.is_deleted, did_kick=context.is_kicked, did_ban=context.is_banned)) pattern = Group( name='pattern', description='Manages message pattern matching.', guild_only=True, default_permissions=MOD_PERMISSIONS, extras={ 'long_description': _long_help, }, ) @pattern.command( description='Adds or updates a custom pattern.', extras={ 'long_description': _long_help, }, ) @autocomplete( name=pattern_name_autocomplete, # actions=action_autocomplete ) async def add( self, interaction: Interaction, name: str, actions: str, expression: str ) -> None: """ Adds a custom pattern. Parameters ---------- interaction : Interaction name : str a name for the new or existing pattern actions : str actions to take when a message matches expression : str criteria for matching chat messages """ pattern_str = f'{actions} if {expression}' guild = interaction.guild try: statement = PatternCompiler.parse_statement(name, pattern_str) statement.check_deprecations() patterns = self.get_patterns(guild) patterns[name] = statement self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Pattern `{name}` added.', ephemeral=True, ) except PatternError as e: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} Error parsing statement. {e}', ephemeral=True, ) @pattern.command( description='Removes a custom pattern.', extras={ 'usage': '', }, ) @autocomplete(name=pattern_name_autocomplete) async def remove(self, interaction: Interaction, name: str): """ Removes a custom pattern. Parameters ---------- interaction: Interaction name: str name of the pattern to remove """ guild = interaction.guild patterns = self.get_patterns(guild) if patterns.get(name) is not None: del patterns[name] self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Pattern `{name}` deleted.', ephemeral=True, ) else: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} No pattern named `{name}`.', ephemeral=True, ) @pattern.command( description='Lists all patterns.', ) async def list(self, interaction: Interaction) -> None: guild = interaction.guild patterns = self.get_patterns(guild) if len(patterns) == 0: await interaction.response.send_message( 'No patterns defined.', ephemeral=True, ) return msg = '' for name, statement in sorted(patterns.items()): msg += f'Pattern `{name}` (priority={statement.priority}):\n```\n{statement.original}\n```\n' await interaction.response.send_message(msg, ephemeral=True) @pattern.command( description="Sets a pattern's priority level.", extras={ 'long_description': 'Messages are checked against patterns with the ' 'highest priority first. Patterns with the same ' 'priority may be checked in arbitrary order. Default ' 'priority is 100.', }, ) @autocomplete(name=pattern_name_autocomplete, priority=priority_autocomplete) async def setpriority(self, interaction: Interaction, name: str, priority: int) -> None: """ Sets a pattern's priority level. Parameters ---------- interaction: Interaction name: str the name of the pattern priority: int evaluation priority """ guild = interaction.guild patterns = self.get_patterns(guild) statement = patterns.get(name) if statement is None: await interaction.response.send_message( f'{CONFIG["failure_emoji"]} No such pattern `{name}`', ephemeral=True, ) return statement.priority = priority self.__save_patterns(guild, patterns) await interaction.response.send_message( f'{CONFIG["success_emoji"]} Priority for pattern `{name}` ' + \ f'updated to `{priority}`.', ephemeral=True, )