Experimental Discord bot written in Python
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

patterncog.py 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. """
  2. Cog for matching messages against guild-configurable criteria and taking
  3. automated actions on them.
  4. """
  5. import re
  6. from datetime import datetime
  7. from typing import Optional
  8. from discord import Guild, Member, Message, utils as discordutils, Interaction
  9. from discord.app_commands import Choice, Group, autocomplete
  10. from discord.ext.commands import Cog
  11. from config import CONFIG
  12. from rocketbot.bot import Rocketbot
  13. from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction
  14. from rocketbot.cogsetting import CogSetting
  15. from rocketbot.pattern import PatternCompiler, PatternDeprecationError, \
  16. PatternError, PatternStatement
  17. from rocketbot.storage import Storage
  18. from rocketbot.utils import dump_stacktrace, MOD_PERMISSIONS
  19. class PatternContext:
  20. """
  21. Data about a message that has matched a configured statement and what
  22. actions have been carried out.
  23. """
  24. def __init__(self, message: Message, statement: PatternStatement):
  25. self.message = message
  26. self.statement = statement
  27. self.is_deleted = False
  28. self.is_kicked = False
  29. self.is_banned = False
  30. async def pattern_name_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
  31. choices: list[Choice[str]] = []
  32. try:
  33. if interaction.guild is None:
  34. return []
  35. patterns: dict[str, PatternStatement] = PatternCog.shared.get_patterns(interaction.guild)
  36. current_normal = current.lower().strip()
  37. for name in sorted(patterns.keys()):
  38. if len(current_normal) == 0 or current_normal.startswith(name.lower()):
  39. choices.append(Choice(name=name, value=name))
  40. except BaseException as e:
  41. dump_stacktrace(e)
  42. return choices
  43. async def action_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
  44. # FIXME: WORK IN PROGRESS
  45. print(f'autocomplete action - current = "{current}"')
  46. regex = re.compile('^(.*?)([a-zA-Z]+)$')
  47. match: Optional[re.Match[str]] = regex.match(current)
  48. initial: str = ''
  49. stub: str = current
  50. if match:
  51. initial = match.group(1).strip()
  52. stub = match.group(2)
  53. if PatternCompiler.ACTION_TO_ARGS.get(stub, None) is not None:
  54. # Matches perfectly. Suggest another instead of completing the current.
  55. initial = current.strip() + ', '
  56. stub = ''
  57. print(f'initial = "{initial}", stub = "{stub}"')
  58. options: list[Choice[str]] = []
  59. for action in sorted(PatternCompiler.ACTION_TO_ARGS.keys()):
  60. if len(stub) == 0 or action.startswith(stub.lower()):
  61. arg_types = PatternCompiler.ACTION_TO_ARGS[action]
  62. arg_type_strs = []
  63. for arg_type in arg_types:
  64. if arg_type == PatternCompiler.TYPE_TEXT:
  65. arg_type_strs.append('"message"')
  66. else:
  67. raise ValueError(f'Argument type {arg_type} not yet supported')
  68. suffix = '' if len(arg_type_strs) == 0 else ' ' + (' '.join(arg_type_strs))
  69. options.append(Choice(name=action, value=f'{initial.strip()} {action}{suffix}'))
  70. return options
  71. async def priority_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
  72. return [
  73. Choice(name='very low (50)', value=50),
  74. Choice(name='low (75)', value=75),
  75. Choice(name='normal (100)', value=100),
  76. Choice(name='high (125)', value=125),
  77. Choice(name='very high (150)', value=150),
  78. ]
  79. _long_help = \
  80. """Patterns are a powerful but complex topic. See <https://git.rixafrix.com/ialbert/python-app-rocketbot/src/branch/main/docs/patterns.md> for full documentation.
  81. ### Quick cheat sheet
  82. > `/pattern add` _pattern\\_name_ _action\\_list_ `if` _expression_
  83. - _pattern\\_name_ is a brief name for identifying the pattern later (not shown to user)
  84. - _action\\_list_ is a comma-delimited list of actions to take on matching messages and is any of:
  85. - `ban`
  86. - `delete`
  87. - `kick`
  88. - `modinfo` - logs a message but doesn't tag mods
  89. - `modwarn` - tags mods
  90. - `reply` "message text"
  91. - _expression_ determines which messages match, of the form _field_ _op_ _value_.
  92. - Fields:
  93. - `content.markdown`: string
  94. - `content.plain`: string
  95. - `author`: user
  96. - `author.id`: id
  97. - `author.joinage`: timespan
  98. - `author.name`: string
  99. - `lastmatched`: timespan
  100. - Operators: `==`, `!=`, `<`, `>`, `<=`, `>=`, `contains`, `!contains`, `matches`, `!matches`, `containsword`, `!containsword`
  101. - Can combine multiple expressions with `!`, `and`, `or`, and parentheses."""
  102. class PatternCog(BaseCog, name='Pattern Matching'):
  103. """
  104. Highly flexible cog for performing various actions on messages that match
  105. various criteria. Patterns can be defined by mods for each guild.
  106. """
  107. SETTING_PATTERNS = CogSetting('patterns', None, default_value=None)
  108. shared: Optional['PatternCog'] = None
  109. def __init__(self, bot: Rocketbot):
  110. super().__init__(
  111. bot,
  112. config_prefix='patterns',
  113. short_description='Manages message pattern matching.',
  114. long_description=_long_help
  115. )
  116. PatternCog.shared = self
  117. def get_patterns(self, guild: Guild) -> dict[str, PatternStatement]:
  118. """
  119. Returns a name -> PatternStatement lookup for the guild.
  120. """
  121. patterns: dict[str, PatternStatement] = Storage.get_state_value(guild,
  122. 'PatternCog.patterns')
  123. if patterns is None:
  124. jsons: list[dict] = self.get_guild_setting(guild, self.SETTING_PATTERNS) or []
  125. pattern_list: list[PatternStatement] = []
  126. for json in jsons:
  127. try:
  128. ps = PatternStatement.from_json(json)
  129. pattern_list.append(ps)
  130. try:
  131. ps.check_deprecations()
  132. except PatternDeprecationError as e:
  133. self.log(guild, f'Pattern {ps.name}: {e}')
  134. except PatternError as e:
  135. self.log(guild, f'Error decoding pattern "{json["name"]}": {e}')
  136. patterns = { p.name:p for p in pattern_list}
  137. Storage.set_state_value(guild, 'PatternCog.patterns', patterns)
  138. return patterns
  139. @classmethod
  140. def __save_patterns(cls,
  141. guild: Guild,
  142. patterns: dict[str, PatternStatement]) -> None:
  143. to_save: list[dict] = list(map(lambda ps: ps.to_json(), patterns.values()))
  144. cls.set_guild_setting(guild, cls.SETTING_PATTERNS, to_save)
  145. @classmethod
  146. def __get_last_matched(cls, guild: Guild, name: str) -> Optional[datetime]:
  147. last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched')
  148. if last_matched:
  149. return last_matched.get(name)
  150. return None
  151. @classmethod
  152. def __set_last_matched(cls, guild: Guild, name: str, time: datetime) -> None:
  153. last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched')
  154. if last_matched is None:
  155. last_matched = {}
  156. Storage.set_state_value(guild, 'PatternCog.last_matched', last_matched)
  157. last_matched[name] = time
  158. @Cog.listener()
  159. async def on_message(self, message: Message) -> None:
  160. """Event listener"""
  161. if message.author is None or \
  162. message.author.bot or \
  163. message.channel is None or \
  164. message.guild is None or \
  165. message.content is None or \
  166. message.content == '':
  167. return
  168. if message.channel.permissions_for(message.author).ban_members:
  169. # Ignore mods
  170. return
  171. patterns = self.get_patterns(message.guild)
  172. for statement in sorted(patterns.values(), key=lambda s : s.priority, reverse=True):
  173. other_fields = {
  174. 'last_matched': self.__get_last_matched(message.guild, statement.name),
  175. }
  176. if statement.expression.matches(message, other_fields):
  177. self.__set_last_matched(message.guild, statement.name, message.created_at)
  178. await self.__trigger_actions(message, statement)
  179. break
  180. async def __trigger_actions(self,
  181. message: Message,
  182. statement: PatternStatement) -> None:
  183. context = PatternContext(message, statement)
  184. should_post_message = False
  185. message_type: int = BotMessage.TYPE_DEFAULT
  186. action_descriptions = []
  187. self.log(message.guild, f'Message from {message.author.name} matched ' + \
  188. f'pattern "{statement.name}"')
  189. for action in statement.actions:
  190. if action.action == 'ban':
  191. await message.author.ban(
  192. reason='Rocketbot: Message matched custom pattern named ' + \
  193. f'"{statement.name}"',
  194. delete_message_days=0)
  195. context.is_banned = True
  196. context.is_kicked = True
  197. action_descriptions.append('Author banned')
  198. self.log(message.guild, f'{message.author.name} banned')
  199. elif action.action == 'delete':
  200. await message.delete()
  201. context.is_deleted = True
  202. action_descriptions.append('Message deleted')
  203. self.log(message.guild, f'{message.author.name}\'s message deleted')
  204. elif action.action == 'kick':
  205. await message.author.kick(
  206. reason='Rocketbot: Message matched custom pattern named ' + \
  207. f'"{statement.name}"')
  208. context.is_kicked = True
  209. action_descriptions.append('Author kicked')
  210. self.log(message.guild, f'{message.author.name} kicked')
  211. elif action.action == 'modinfo':
  212. should_post_message = True
  213. message_type = BotMessage.TYPE_INFO
  214. action_descriptions.append('Message logged')
  215. elif action.action == 'modwarn':
  216. should_post_message = not self.was_warned_recently(message.author)
  217. message_type = BotMessage.TYPE_MOD_WARNING
  218. action_descriptions.append('Mods alerted')
  219. elif action.action == 'reply':
  220. await message.reply(
  221. f'{action.arguments[0]}',
  222. mention_author=False)
  223. action_descriptions.append('Autoreplied')
  224. self.log(message.guild, f'autoreplied to {message.author.name}')
  225. if should_post_message:
  226. bm = BotMessage(
  227. message.guild,
  228. f'User {message.author.name} tripped custom pattern ' + \
  229. f'`{statement.name}` at {message.jump_url}.\n\n' + \
  230. 'Automatic actions taken:\n• ' + ('\n• '.join(action_descriptions)),
  231. type=message_type,
  232. context=context)
  233. self.record_warning(message.author)
  234. bm.quote = discordutils.remove_markdown(message.clean_content)
  235. await bm.set_reactions(BotMessageReaction.standard_set(
  236. did_delete=context.is_deleted,
  237. did_kick=context.is_kicked,
  238. did_ban=context.is_banned))
  239. await self.post_message(bm)
  240. async def on_mod_react(self,
  241. bot_message: BotMessage,
  242. reaction: BotMessageReaction,
  243. reacted_by: Member) -> None:
  244. context: PatternContext = bot_message.context
  245. if reaction.emoji == CONFIG['trash_emoji']:
  246. await context.message.delete()
  247. context.is_deleted = True
  248. elif reaction.emoji == CONFIG['kick_emoji']:
  249. await context.message.author.kick(
  250. reason='Rocketbot: Message matched custom pattern named ' + \
  251. f'"{context.statement.name}". Kicked by {reacted_by.name}.')
  252. context.is_kicked = True
  253. elif reaction.emoji == CONFIG['ban_emoji']:
  254. await context.message.author.ban(
  255. reason='Rocketbot: Message matched custom pattern named ' + \
  256. f'"{context.statement.name}". Banned by {reacted_by.name}.',
  257. delete_message_days=1)
  258. context.is_banned = True
  259. await bot_message.set_reactions(BotMessageReaction.standard_set(
  260. did_delete=context.is_deleted,
  261. did_kick=context.is_kicked,
  262. did_ban=context.is_banned))
  263. pattern = Group(
  264. name='pattern',
  265. description='Manages message pattern matching.',
  266. guild_only=True,
  267. default_permissions=MOD_PERMISSIONS,
  268. extras={
  269. 'long_description': _long_help,
  270. },
  271. )
  272. @pattern.command(
  273. description='Adds or updates a custom pattern.',
  274. extras={
  275. 'long_description': _long_help,
  276. },
  277. )
  278. @autocomplete(
  279. name=pattern_name_autocomplete,
  280. # actions=action_autocomplete
  281. )
  282. async def add(
  283. self,
  284. interaction: Interaction,
  285. name: str,
  286. actions: str,
  287. expression: str
  288. ) -> None:
  289. """
  290. Adds a custom pattern.
  291. Parameters
  292. ----------
  293. interaction : Interaction
  294. name : str
  295. a name for the new or existing pattern
  296. actions : str
  297. actions to take when a message matches
  298. expression : str
  299. criteria for matching chat messages
  300. """
  301. pattern_str = f'{actions} if {expression}'
  302. guild = interaction.guild
  303. try:
  304. statement = PatternCompiler.parse_statement(name, pattern_str)
  305. statement.check_deprecations()
  306. patterns = self.get_patterns(guild)
  307. patterns[name] = statement
  308. self.__save_patterns(guild, patterns)
  309. await interaction.response.send_message(
  310. f'{CONFIG["success_emoji"]} Pattern `{name}` added.',
  311. ephemeral=True,
  312. )
  313. except PatternError as e:
  314. await interaction.response.send_message(
  315. f'{CONFIG["failure_emoji"]} Error parsing statement. {e}',
  316. ephemeral=True,
  317. )
  318. @pattern.command(
  319. description='Removes a custom pattern.',
  320. extras={
  321. 'usage': '<pattern_name>',
  322. },
  323. )
  324. @autocomplete(name=pattern_name_autocomplete)
  325. async def remove(self, interaction: Interaction, name: str):
  326. """
  327. Removes a custom pattern.
  328. Parameters
  329. ----------
  330. interaction: Interaction
  331. name: str
  332. name of the pattern to remove
  333. """
  334. guild = interaction.guild
  335. patterns = self.get_patterns(guild)
  336. if patterns.get(name) is not None:
  337. del patterns[name]
  338. self.__save_patterns(guild, patterns)
  339. await interaction.response.send_message(
  340. f'{CONFIG["success_emoji"]} Pattern `{name}` deleted.',
  341. ephemeral=True,
  342. )
  343. else:
  344. await interaction.response.send_message(
  345. f'{CONFIG["failure_emoji"]} No pattern named `{name}`.',
  346. ephemeral=True,
  347. )
  348. @pattern.command(
  349. description='Lists all patterns.',
  350. )
  351. async def list(self, interaction: Interaction) -> None:
  352. guild = interaction.guild
  353. patterns = self.get_patterns(guild)
  354. if len(patterns) == 0:
  355. await interaction.response.send_message(
  356. 'No patterns defined.',
  357. ephemeral=True,
  358. )
  359. return
  360. msg = ''
  361. for name, statement in sorted(patterns.items()):
  362. msg += f'Pattern `{name}` (priority={statement.priority}):\n```\n{statement.original}\n```\n'
  363. await interaction.response.send_message(msg, ephemeral=True)
  364. @pattern.command(
  365. description="Sets a pattern's priority level.",
  366. extras={
  367. 'long_description': 'Messages are checked against patterns with the '
  368. 'highest priority first. Patterns with the same '
  369. 'priority may be checked in arbitrary order. Default '
  370. 'priority is 100.',
  371. },
  372. )
  373. @autocomplete(name=pattern_name_autocomplete, priority=priority_autocomplete)
  374. async def setpriority(self, interaction: Interaction, name: str, priority: int) -> None:
  375. """
  376. Sets a pattern's priority level.
  377. Parameters
  378. ----------
  379. interaction: Interaction
  380. name: str
  381. the name of the pattern
  382. priority: int
  383. evaluation priority
  384. """
  385. guild = interaction.guild
  386. patterns = self.get_patterns(guild)
  387. statement = patterns.get(name)
  388. if statement is None:
  389. await interaction.response.send_message(
  390. f'{CONFIG["failure_emoji"]} No such pattern `{name}`',
  391. ephemeral=True,
  392. )
  393. return
  394. statement.priority = priority
  395. self.__save_patterns(guild, patterns)
  396. await interaction.response.send_message(
  397. f'{CONFIG["success_emoji"]} Priority for pattern `{name}` ' + \
  398. f'updated to `{priority}`.',
  399. ephemeral=True,
  400. )