| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- """
- Cog for matching messages against guild-configurable criteria and taking
- automated actions on them.
- """
- from datetime import datetime
- from discord import Guild, Member, Message, utils as discordutils
- from discord.ext import commands
-
- from config import CONFIG
- from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction
- from rocketbot.cogsetting import CogSetting
- from rocketbot.pattern import PatternCompiler, PatternDeprecationError, \
- PatternError, PatternStatement
- from rocketbot.storage import Storage
-
- class PatternContext:
- """
- Data about a message that has matched a configured statement and what
- actions have been carried out.
- """
- def __init__(self, message: Message, statement: PatternStatement):
- self.message = message
- self.statement = statement
- self.is_deleted = False
- self.is_kicked = False
- self.is_banned = False
-
- class PatternCog(BaseCog, name='Pattern Matching'):
- """
- Highly flexible cog for performing various actions on messages that match
- various critera. Patterns can be defined by mods for each guild.
- """
-
- SETTING_PATTERNS = CogSetting('patterns', None)
-
- def __get_patterns(self, guild: Guild) -> dict[str, PatternStatement]:
- """
- Returns a name -> PatternStatement lookup for the guild.
- """
- patterns: dict[str, PatternStatement] = Storage.get_state_value(guild,
- 'PatternCog.patterns')
- if patterns is None:
- jsons: list[dict] = self.get_guild_setting(guild, self.SETTING_PATTERNS)
- pattern_list: list[PatternStatement] = []
- for json in jsons:
- try:
- ps = PatternStatement.from_json(json)
- pattern_list.append(ps)
- try:
- ps.check_deprecations()
- except PatternDeprecationError as e:
- self.log(guild, f'Pattern {ps.name}: {e}')
- except PatternError as e:
- self.log(guild, f'Error decoding pattern "{json["name"]}": {e}')
- patterns = { p.name:p for p in pattern_list}
- Storage.set_state_value(guild, 'PatternCog.patterns', patterns)
- return patterns
-
- @classmethod
- def __save_patterns(cls,
- guild: Guild,
- patterns: dict[str, PatternStatement]) -> None:
- to_save: list[dict] = list(map(PatternStatement.to_json, patterns.values()))
- cls.set_guild_setting(guild, cls.SETTING_PATTERNS, to_save)
-
- @classmethod
- def __get_last_matched(cls, guild: Guild, name: str) -> datetime:
- last_matched: dict[name, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched')
- if last_matched:
- return last_matched.get(name)
- return None
-
- @classmethod
- def __set_last_matched(cls, guild: Guild, name: str, time: datetime) -> None:
- last_matched: dict[name, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched')
- if last_matched is None:
- last_matched = {}
- Storage.set_state_value(guild, 'PatternCog.last_matched', last_matched)
- last_matched[name] = time
-
- @commands.Cog.listener()
- async def on_message(self, message: Message) -> None:
- 'Event listener'
- if message.author is None or \
- message.author.bot or \
- message.channel is None or \
- message.guild is None or \
- message.content is None or \
- message.content == '':
- return
- if message.author.permissions_in(message.channel).ban_members:
- # Ignore mods
- return
-
- patterns = self.__get_patterns(message.guild)
- for statement in sorted(patterns.values(), key=lambda s : s.priority, reverse=True):
- other_fields = {
- 'last_matched': self.__get_last_matched(message.guild, statement.name),
- }
- if statement.expression.matches(message, other_fields):
- self.__set_last_matched(message.guild, statement.name, message.created_at)
- await self.__trigger_actions(message, statement)
- break
-
- async def __trigger_actions(self,
- message: Message,
- statement: PatternStatement) -> None:
- context = PatternContext(message, statement)
- should_post_message = False
- message_type: int = BotMessage.TYPE_DEFAULT
- action_descriptions = []
- self.log(message.guild, f'Message from {message.author.name} matched ' + \
- f'pattern "{statement.name}"')
- for action in statement.actions:
- if action.action == 'ban':
- await message.author.ban(
- reason='Rocketbot: Message matched custom pattern named ' + \
- f'"{statement.name}"',
- delete_message_days=0)
- context.is_banned = True
- context.is_kicked = True
- action_descriptions.append('Author banned')
- self.log(message.guild, f'{message.author.name} banned')
- elif action.action == 'delete':
- await message.delete()
- context.is_deleted = True
- action_descriptions.append('Message deleted')
- self.log(message.guild, f'{message.author.name}\'s message deleted')
- elif action.action == 'kick':
- await message.author.kick(
- reason='Rocketbot: Message matched custom pattern named ' + \
- f'"{statement.name}"')
- context.is_kicked = True
- action_descriptions.append('Author kicked')
- self.log(message.guild, f'{message.author.name} kicked')
- elif action.action == 'modinfo':
- should_post_message = True
- message_type = BotMessage.TYPE_INFO
- action_descriptions.append('Message logged')
- elif action.action == 'modwarn':
- should_post_message = True
- message_type = BotMessage.TYPE_MOD_WARNING
- action_descriptions.append('Mods alerted')
- elif action.action == 'reply':
- await message.reply(
- f'{action.arguments[0]}',
- mention_author=False)
- action_descriptions.append('Autoreplied')
- self.log(message.guild, f'autoreplied to {message.author.name}')
- if should_post_message:
- bm = BotMessage(
- message.guild,
- f'User {message.author.name} tripped custom pattern ' + \
- f'`{statement.name}`.\n\nAutomatic actions taken:\n• ' + \
- ('\n• '.join(action_descriptions)),
- type=message_type,
- context=context)
- bm.quote = discordutils.remove_markdown(message.clean_content)
- await bm.set_reactions(BotMessageReaction.standard_set(
- did_delete=context.is_deleted,
- did_kick=context.is_kicked,
- did_ban=context.is_banned))
- await self.post_message(bm)
-
- async def on_mod_react(self,
- bot_message: BotMessage,
- reaction: BotMessageReaction,
- reacted_by: Member) -> None:
- context: PatternContext = bot_message.context
- if reaction.emoji == CONFIG['trash_emoji']:
- await context.message.delete()
- context.is_deleted = True
- elif reaction.emoji == CONFIG['kick_emoji']:
- await context.message.author.kick(
- reason='Rocketbot: Message matched custom pattern named ' + \
- f'"{context.statement.name}". Kicked by {reacted_by.name}.')
- context.is_kicked = True
- elif reaction.emoji == CONFIG['ban_emoji']:
- await context.message.author.ban(
- reason='Rocketbot: Message matched custom pattern named ' + \
- f'"{context.statement.name}". Banned by {reacted_by.name}.',
- delete_message_days=1)
- context.is_banned = True
- await bot_message.set_reactions(BotMessageReaction.standard_set(
- did_delete=context.is_deleted,
- did_kick=context.is_kicked,
- did_ban=context.is_banned))
-
- @commands.group(
- brief='Manages message pattern matching',
- )
- @commands.has_permissions(ban_members=True)
- @commands.guild_only()
- async def pattern(self, context: commands.Context):
- 'Message pattern matching command group'
- if context.invoked_subcommand is None:
- await context.send_help()
-
- @pattern.command(
- brief='Adds a custom pattern',
- description='Adds a custom pattern. Patterns use a simplified ' + \
- 'expression language. Full documentation found here: ' + \
- 'https://git.rixafrix.com/ialbert/python-app-rocketbot/src/' + \
- 'branch/master/patterns.md',
- usage='<pattern_name> <expression...>',
- ignore_extra=True
- )
- async def add(self, context: commands.Context, name: str):
- 'Command handler'
- pattern_str = PatternCompiler.expression_str_from_context(context, name)
- try:
- statement = PatternCompiler.parse_statement(name, pattern_str)
- statement.check_deprecations()
- patterns = self.__get_patterns(context.guild)
- patterns[name] = statement
- self.__save_patterns(context.guild, patterns)
- await context.message.reply(
- f'{CONFIG["success_emoji"]} Pattern `{name}` added.',
- mention_author=False)
- except PatternError as e:
- await context.message.reply(
- f'{CONFIG["failure_emoji"]} Error parsing statement. {e}',
- mention_author=False)
-
- @pattern.command(
- brief='Removes a custom pattern',
- usage='<pattern_name>'
- )
- async def remove(self, context: commands.Context, name: str):
- 'Command handler'
- patterns = self.__get_patterns(context.guild)
- if patterns.get(name) is not None:
- del patterns[name]
- self.__save_patterns(context.guild, patterns)
- await context.message.reply(
- f'{CONFIG["success_emoji"]} Pattern `{name}` deleted.',
- mention_author=False)
- else:
- await context.message.reply(
- f'{CONFIG["failure_emoji"]} No pattern named `{name}`.',
- mention_author=False)
-
- @pattern.command(
- brief='Lists all patterns'
- )
- async def list(self, context: commands.Context) -> None:
- 'Command handler'
- patterns = self.__get_patterns(context.guild)
- if len(patterns) == 0:
- await context.message.reply('No patterns defined.', mention_author=False)
- return
- msg = ''
- for name, statement in sorted(patterns.items()):
- msg += f'Pattern `{name}` (priority={statement.priority}):\n```\n{statement.original}\n```\n'
- await context.message.reply(msg, mention_author=False)
-
- @pattern.command(
- brief='Sets a pattern\'s priority level',
- description='Sets the priority for a pattern. Messages are checked ' +
- 'against patterns with the highest priority first. Patterns with ' +
- 'the same priority may be checked in arbitrary order. Default ' +
- 'priority is 100.',
- )
- async def setpriority(self, context: commands.Context, name: str, priority: int) -> None:
- 'Command handler'
- patterns = self.__get_patterns(context.guild)
- statement = patterns.get(name)
- if statement is None:
- await context.message.reply(
- f'{CONFIG["failure_emoji"]} No such pattern `{name}`',
- mention_author=False)
- return
- statement.priority = priority
- self.__save_patterns(context.guild, patterns)
- await context.message.reply(
- f'{CONFIG["success_emoji"]} Priority for pattern `{name}` ' + \
- f'updated to `{priority}`.',
- mention_author=False)
|