| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- """
- Cog for matching messages against guild-configurable criteria and taking
- automated actions on them.
- """
- import re
- from datetime import datetime
- from typing import Optional
-
- from discord import Guild, Member, Message, utils as discordutils, Permissions, Interaction
- from discord.app_commands import Choice, Group, autocomplete, rename
- from discord.ext.commands import Cog
-
- from config import CONFIG
- from rocketbot.bot import Rocketbot
- from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction
- from rocketbot.cogsetting import CogSetting
- from rocketbot.pattern import PatternCompiler, PatternDeprecationError, \
- PatternError, PatternStatement
- from rocketbot.storage import Storage
- from rocketbot.utils import dump_stacktrace, MOD_PERMISSIONS
-
-
- class PatternContext:
- """
- Data about a message that has matched a configured statement and what
- actions have been carried out.
- """
- def __init__(self, message: Message, statement: PatternStatement):
- self.message = message
- self.statement = statement
- self.is_deleted = False
- self.is_kicked = False
- self.is_banned = False
-
- async def pattern_name_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- choices: list[Choice[str]] = []
- try:
- if interaction.guild is None:
- return []
- patterns: dict[str, PatternStatement] = PatternCog.shared.get_patterns(interaction.guild)
- current_normal = current.lower().strip()
- for name in sorted(patterns.keys()):
- if len(current_normal) == 0 or current_normal.startswith(name.lower()):
- choices.append(Choice(name=name, value=name))
- except BaseException as e:
- dump_stacktrace(e)
- return choices
-
- async def action_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- # FIXME: WORK IN PROGRESS
- print(f'autocomplete action - current = "{current}"')
- regex = re.compile('^(.*?)([a-zA-Z]+)$')
- match: Optional[re.Match[str]] = regex.match(current)
- initial: str = ''
- stub: str = current
- if match:
- initial = match.group(1).strip()
- stub = match.group(2)
- if PatternCompiler.ACTION_TO_ARGS.get(stub, None) is not None:
- # Matches perfectly. Suggest another instead of completing the current.
- initial = current.strip() + ', '
- stub = ''
- print(f'initial = "{initial}", stub = "{stub}"')
-
- options: list[Choice[str]] = []
- for action in sorted(PatternCompiler.ACTION_TO_ARGS.keys()):
- if len(stub) == 0 or action.startswith(stub.lower()):
- arg_types = PatternCompiler.ACTION_TO_ARGS[action]
- arg_type_strs = []
- for arg_type in arg_types:
- if arg_type == PatternCompiler.TYPE_TEXT:
- arg_type_strs.append('"message"')
- else:
- raise ValueError(f'Argument type {arg_type} not yet supported')
- suffix = '' if len(arg_type_strs) == 0 else ' ' + (' '.join(arg_type_strs))
- options.append(Choice(name=action, value=f'{initial.strip()} {action}{suffix}'))
- return options
-
- async def priority_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- return [
- Choice(name='very low (50)', value=50),
- Choice(name='low (75)', value=75),
- Choice(name='normal (100)', value=100),
- Choice(name='high (125)', value=125),
- Choice(name='very high (150)', value=150),
- ]
-
- class PatternCog(BaseCog, name='Pattern Matching'):
- """
- Highly flexible cog for performing various actions on messages that match
- various criteria. Patterns can be defined by mods for each guild.
- """
-
- SETTING_PATTERNS = CogSetting('patterns', None, default_value=None)
-
- shared: Optional['PatternCog'] = None
-
- def __init__(self, bot: Rocketbot):
- super().__init__(
- bot,
- config_prefix='patterns',
- short_description='Manages message pattern matching.',
- )
- PatternCog.shared = self
-
- def get_patterns(self, guild: Guild) -> dict[str, PatternStatement]:
- """
- Returns a name -> PatternStatement lookup for the guild.
- """
- patterns: dict[str, PatternStatement] = Storage.get_state_value(guild,
- 'PatternCog.patterns')
- if patterns is None:
- jsons: list[dict] = self.get_guild_setting(guild, self.SETTING_PATTERNS) or []
- pattern_list: list[PatternStatement] = []
- for json in jsons:
- try:
- ps = PatternStatement.from_json(json)
- pattern_list.append(ps)
- try:
- ps.check_deprecations()
- except PatternDeprecationError as e:
- self.log(guild, f'Pattern {ps.name}: {e}')
- except PatternError as e:
- self.log(guild, f'Error decoding pattern "{json["name"]}": {e}')
- patterns = { p.name:p for p in pattern_list}
- Storage.set_state_value(guild, 'PatternCog.patterns', patterns)
- return patterns
-
- @classmethod
- def __save_patterns(cls,
- guild: Guild,
- patterns: dict[str, PatternStatement]) -> None:
- to_save: list[dict] = list(map(lambda ps: ps.to_json(), patterns.values()))
- cls.set_guild_setting(guild, cls.SETTING_PATTERNS, to_save)
-
- @classmethod
- def __get_last_matched(cls, guild: Guild, name: str) -> Optional[datetime]:
- last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched')
- if last_matched:
- return last_matched.get(name)
- return None
-
- @classmethod
- def __set_last_matched(cls, guild: Guild, name: str, time: datetime) -> None:
- last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched')
- if last_matched is None:
- last_matched = {}
- Storage.set_state_value(guild, 'PatternCog.last_matched', last_matched)
- last_matched[name] = time
-
- @Cog.listener()
- async def on_message(self, message: Message) -> None:
- """Event listener"""
- if message.author is None or \
- message.author.bot or \
- message.channel is None or \
- message.guild is None or \
- message.content is None or \
- message.content == '':
- return
- if message.channel.permissions_for(message.author).ban_members:
- # Ignore mods
- return
-
- patterns = self.get_patterns(message.guild)
- for statement in sorted(patterns.values(), key=lambda s : s.priority, reverse=True):
- other_fields = {
- 'last_matched': self.__get_last_matched(message.guild, statement.name),
- }
- if statement.expression.matches(message, other_fields):
- self.__set_last_matched(message.guild, statement.name, message.created_at)
- await self.__trigger_actions(message, statement)
- break
-
- async def __trigger_actions(self,
- message: Message,
- statement: PatternStatement) -> None:
- context = PatternContext(message, statement)
- should_post_message = False
- message_type: int = BotMessage.TYPE_DEFAULT
- action_descriptions = []
- self.log(message.guild, f'Message from {message.author.name} matched ' + \
- f'pattern "{statement.name}"')
- for action in statement.actions:
- if action.action == 'ban':
- await message.author.ban(
- reason='Rocketbot: Message matched custom pattern named ' + \
- f'"{statement.name}"',
- delete_message_days=0)
- context.is_banned = True
- context.is_kicked = True
- action_descriptions.append('Author banned')
- self.log(message.guild, f'{message.author.name} banned')
- elif action.action == 'delete':
- await message.delete()
- context.is_deleted = True
- action_descriptions.append('Message deleted')
- self.log(message.guild, f'{message.author.name}\'s message deleted')
- elif action.action == 'kick':
- await message.author.kick(
- reason='Rocketbot: Message matched custom pattern named ' + \
- f'"{statement.name}"')
- context.is_kicked = True
- action_descriptions.append('Author kicked')
- self.log(message.guild, f'{message.author.name} kicked')
- elif action.action == 'modinfo':
- should_post_message = True
- message_type = BotMessage.TYPE_INFO
- action_descriptions.append('Message logged')
- elif action.action == 'modwarn':
- should_post_message = not self.was_warned_recently(message.author)
- message_type = BotMessage.TYPE_MOD_WARNING
- action_descriptions.append('Mods alerted')
- elif action.action == 'reply':
- await message.reply(
- f'{action.arguments[0]}',
- mention_author=False)
- action_descriptions.append('Autoreplied')
- self.log(message.guild, f'autoreplied to {message.author.name}')
- if should_post_message:
- bm = BotMessage(
- message.guild,
- f'User {message.author.name} tripped custom pattern ' + \
- f'`{statement.name}` at {message.jump_url}.\n\n' + \
- 'Automatic actions taken:\n• ' + ('\n• '.join(action_descriptions)),
- type=message_type,
- context=context)
- self.record_warning(message.author)
- bm.quote = discordutils.remove_markdown(message.clean_content)
- await bm.set_reactions(BotMessageReaction.standard_set(
- did_delete=context.is_deleted,
- did_kick=context.is_kicked,
- did_ban=context.is_banned))
- await self.post_message(bm)
-
- async def on_mod_react(self,
- bot_message: BotMessage,
- reaction: BotMessageReaction,
- reacted_by: Member) -> None:
- context: PatternContext = bot_message.context
- if reaction.emoji == CONFIG['trash_emoji']:
- await context.message.delete()
- context.is_deleted = True
- elif reaction.emoji == CONFIG['kick_emoji']:
- await context.message.author.kick(
- reason='Rocketbot: Message matched custom pattern named ' + \
- f'"{context.statement.name}". Kicked by {reacted_by.name}.')
- context.is_kicked = True
- elif reaction.emoji == CONFIG['ban_emoji']:
- await context.message.author.ban(
- reason='Rocketbot: Message matched custom pattern named ' + \
- f'"{context.statement.name}". Banned by {reacted_by.name}.',
- delete_message_days=1)
- context.is_banned = True
- await bot_message.set_reactions(BotMessageReaction.standard_set(
- did_delete=context.is_deleted,
- did_kick=context.is_kicked,
- did_ban=context.is_banned))
-
- pattern = Group(
- name='pattern',
- description='Manages message pattern matching.',
- guild_only=True,
- default_permissions=MOD_PERMISSIONS,
- )
-
- @pattern.command()
- @rename(expression='if')
- @autocomplete(
- name=pattern_name_autocomplete,
- # actions=action_autocomplete
- )
- async def add(
- self,
- interaction: Interaction,
- name: str,
- actions: str,
- expression: str
- ) -> None:
- """
- Adds a custom pattern.
-
- Adds a custom pattern. Patterns use a simplified
- expression language. Full documentation found here:
- https://git.rixafrix.com/ialbert/python-app-rocketbot/src/branch/main/docs/patterns.md
-
- Parameters
- ----------
- interaction : Interaction
- name : str
- A name for the pattern.
- actions : str
- One or more actions to take when a message matches the expression.
- expression : str
- Criteria for matching chat messages.
- """
- pattern_str = f'{actions} if {expression}'
- guild = interaction.guild
- try:
- statement = PatternCompiler.parse_statement(name, pattern_str)
- statement.check_deprecations()
- patterns = self.get_patterns(guild)
- patterns[name] = statement
- self.__save_patterns(guild, patterns)
- await interaction.response.send_message(
- f'{CONFIG["success_emoji"]} Pattern `{name}` added.',
- ephemeral=True,
- )
- except PatternError as e:
- await interaction.response.send_message(
- f'{CONFIG["failure_emoji"]} Error parsing statement. {e}',
- ephemeral=True,
- )
-
- @pattern.command(
- description='Removes a custom pattern',
- extras={
- 'usage': '<pattern_name>',
- },
- )
- @autocomplete(name=pattern_name_autocomplete)
- async def remove(self, interaction: Interaction, name: str):
- """
- Command handler
-
- Parameters
- ----------
- interaction: Interaction
- name: str
- Name of the pattern to remove.
- """
- guild = interaction.guild
- patterns = self.get_patterns(guild)
- if patterns.get(name) is not None:
- del patterns[name]
- self.__save_patterns(guild, patterns)
- await interaction.response.send_message(
- f'{CONFIG["success_emoji"]} Pattern `{name}` deleted.',
- ephemeral=True,
- )
- else:
- await interaction.response.send_message(
- f'{CONFIG["failure_emoji"]} No pattern named `{name}`.',
- ephemeral=True,
- )
-
- @pattern.command(
- description='Lists all patterns',
- )
- async def list(self, interaction: Interaction) -> None:
- """
- Command handler
-
- Parameters
- ----------
- interaction: Interaction
- """
- guild = interaction.guild
- patterns = self.get_patterns(guild)
- if len(patterns) == 0:
- await interaction.response.send_message(
- 'No patterns defined.',
- ephemeral=True,
- )
- return
- msg = ''
- for name, statement in sorted(patterns.items()):
- msg += f'Pattern `{name}` (priority={statement.priority}):\n```\n{statement.original}\n```\n'
- await interaction.response.send_message(msg, ephemeral=True)
-
- @pattern.command(
- description='Sets a pattern\'s priority level',
- extras={
- 'long_description': 'Sets the priority for a pattern. Messages are checked ' +
- 'against patterns with the highest priority first. Patterns with ' +
- 'the same priority may be checked in arbitrary order. Default ' +
- 'priority is 100.',
- },
- )
- @autocomplete(name=pattern_name_autocomplete, priority=priority_autocomplete)
- async def setpriority(self, interaction: Interaction, name: str, priority: int) -> None:
- """
- Command handler
-
- Parameters
- ----------
- interaction: Interaction
- name: str
- A name for the pattern
- priority: int
- Priority for evaluating the pattern. Default is 100. Higher values match first.
- """
- guild = interaction.guild
- patterns = self.get_patterns(guild)
- statement = patterns.get(name)
- if statement is None:
- await interaction.response.send_message(
- f'{CONFIG["failure_emoji"]} No such pattern `{name}`',
- ephemeral=True,
- )
- return
- statement.priority = priority
- self.__save_patterns(guild, patterns)
- await interaction.response.send_message(
- f'{CONFIG["success_emoji"]} Priority for pattern `{name}` ' + \
- f'updated to `{priority}`.',
- ephemeral=True,
- )
|