Experimental Discord bot written in Python
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. """
  2. Cog for matching messages against guild-configurable criteria and taking
  3. automated actions on them.
  4. """
  5. import re
  6. from datetime import datetime
  7. from typing import Optional
  8. from discord import Guild, Member, Message, utils as discordutils, Permissions, Interaction
  9. from discord.app_commands import Choice, Group, autocomplete, rename
  10. from discord.ext.commands import Cog
  11. from config import CONFIG
  12. from rocketbot.bot import Rocketbot
  13. from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction
  14. from rocketbot.cogsetting import CogSetting
  15. from rocketbot.pattern import PatternCompiler, PatternDeprecationError, \
  16. PatternError, PatternStatement
  17. from rocketbot.storage import Storage
  18. from rocketbot.utils import dump_stacktrace, MOD_PERMISSIONS
  19. class PatternContext:
  20. """
  21. Data about a message that has matched a configured statement and what
  22. actions have been carried out.
  23. """
  24. def __init__(self, message: Message, statement: PatternStatement):
  25. self.message = message
  26. self.statement = statement
  27. self.is_deleted = False
  28. self.is_kicked = False
  29. self.is_banned = False
  30. async def pattern_name_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
  31. choices: list[Choice[str]] = []
  32. try:
  33. if interaction.guild is None:
  34. return []
  35. patterns: dict[str, PatternStatement] = PatternCog.shared.get_patterns(interaction.guild)
  36. current_normal = current.lower().strip()
  37. for name in sorted(patterns.keys()):
  38. if len(current_normal) == 0 or current_normal.startswith(name.lower()):
  39. choices.append(Choice(name=name, value=name))
  40. except BaseException as e:
  41. dump_stacktrace(e)
  42. return choices
  43. async def action_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
  44. # FIXME: WORK IN PROGRESS
  45. print(f'autocomplete action - current = "{current}"')
  46. regex = re.compile('^(.*?)([a-zA-Z]+)$')
  47. match: Optional[re.Match[str]] = regex.match(current)
  48. initial: str = ''
  49. stub: str = current
  50. if match:
  51. initial = match.group(1).strip()
  52. stub = match.group(2)
  53. if PatternCompiler.ACTION_TO_ARGS.get(stub, None) is not None:
  54. # Matches perfectly. Suggest another instead of completing the current.
  55. initial = current.strip() + ', '
  56. stub = ''
  57. print(f'initial = "{initial}", stub = "{stub}"')
  58. options: list[Choice[str]] = []
  59. for action in sorted(PatternCompiler.ACTION_TO_ARGS.keys()):
  60. if len(stub) == 0 or action.startswith(stub.lower()):
  61. arg_types = PatternCompiler.ACTION_TO_ARGS[action]
  62. arg_type_strs = []
  63. for arg_type in arg_types:
  64. if arg_type == PatternCompiler.TYPE_TEXT:
  65. arg_type_strs.append('"message"')
  66. else:
  67. raise ValueError(f'Argument type {arg_type} not yet supported')
  68. suffix = '' if len(arg_type_strs) == 0 else ' ' + (' '.join(arg_type_strs))
  69. options.append(Choice(name=action, value=f'{initial.strip()} {action}{suffix}'))
  70. return options
  71. async def priority_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
  72. return [
  73. Choice(name='very low (50)', value=50),
  74. Choice(name='low (75)', value=75),
  75. Choice(name='normal (100)', value=100),
  76. Choice(name='high (125)', value=125),
  77. Choice(name='very high (150)', value=150),
  78. ]
  79. class PatternCog(BaseCog, name='Pattern Matching'):
  80. """
  81. Highly flexible cog for performing various actions on messages that match
  82. various criteria. Patterns can be defined by mods for each guild.
  83. """
  84. SETTING_PATTERNS = CogSetting('patterns', None, default_value=None)
  85. shared: Optional['PatternCog'] = None
  86. def __init__(self, bot: Rocketbot):
  87. super().__init__(
  88. bot,
  89. config_prefix='patterns',
  90. short_description='Manages message pattern matching.',
  91. long_description='Patterns are a powerful but complex topic. See <https://git.rixafrix.com/ialbert/python-app-rocketbot/src/branch/main/docs/patterns.md> for full documentation.'
  92. )
  93. PatternCog.shared = self
  94. def get_patterns(self, guild: Guild) -> dict[str, PatternStatement]:
  95. """
  96. Returns a name -> PatternStatement lookup for the guild.
  97. """
  98. patterns: dict[str, PatternStatement] = Storage.get_state_value(guild,
  99. 'PatternCog.patterns')
  100. if patterns is None:
  101. jsons: list[dict] = self.get_guild_setting(guild, self.SETTING_PATTERNS) or []
  102. pattern_list: list[PatternStatement] = []
  103. for json in jsons:
  104. try:
  105. ps = PatternStatement.from_json(json)
  106. pattern_list.append(ps)
  107. try:
  108. ps.check_deprecations()
  109. except PatternDeprecationError as e:
  110. self.log(guild, f'Pattern {ps.name}: {e}')
  111. except PatternError as e:
  112. self.log(guild, f'Error decoding pattern "{json["name"]}": {e}')
  113. patterns = { p.name:p for p in pattern_list}
  114. Storage.set_state_value(guild, 'PatternCog.patterns', patterns)
  115. return patterns
  116. @classmethod
  117. def __save_patterns(cls,
  118. guild: Guild,
  119. patterns: dict[str, PatternStatement]) -> None:
  120. to_save: list[dict] = list(map(lambda ps: ps.to_json(), patterns.values()))
  121. cls.set_guild_setting(guild, cls.SETTING_PATTERNS, to_save)
  122. @classmethod
  123. def __get_last_matched(cls, guild: Guild, name: str) -> Optional[datetime]:
  124. last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched')
  125. if last_matched:
  126. return last_matched.get(name)
  127. return None
  128. @classmethod
  129. def __set_last_matched(cls, guild: Guild, name: str, time: datetime) -> None:
  130. last_matched: dict[str, datetime] = Storage.get_state_value(guild, 'PatternCog.last_matched')
  131. if last_matched is None:
  132. last_matched = {}
  133. Storage.set_state_value(guild, 'PatternCog.last_matched', last_matched)
  134. last_matched[name] = time
  135. @Cog.listener()
  136. async def on_message(self, message: Message) -> None:
  137. """Event listener"""
  138. if message.author is None or \
  139. message.author.bot or \
  140. message.channel is None or \
  141. message.guild is None or \
  142. message.content is None or \
  143. message.content == '':
  144. return
  145. if message.channel.permissions_for(message.author).ban_members:
  146. # Ignore mods
  147. return
  148. patterns = self.get_patterns(message.guild)
  149. for statement in sorted(patterns.values(), key=lambda s : s.priority, reverse=True):
  150. other_fields = {
  151. 'last_matched': self.__get_last_matched(message.guild, statement.name),
  152. }
  153. if statement.expression.matches(message, other_fields):
  154. self.__set_last_matched(message.guild, statement.name, message.created_at)
  155. await self.__trigger_actions(message, statement)
  156. break
  157. async def __trigger_actions(self,
  158. message: Message,
  159. statement: PatternStatement) -> None:
  160. context = PatternContext(message, statement)
  161. should_post_message = False
  162. message_type: int = BotMessage.TYPE_DEFAULT
  163. action_descriptions = []
  164. self.log(message.guild, f'Message from {message.author.name} matched ' + \
  165. f'pattern "{statement.name}"')
  166. for action in statement.actions:
  167. if action.action == 'ban':
  168. await message.author.ban(
  169. reason='Rocketbot: Message matched custom pattern named ' + \
  170. f'"{statement.name}"',
  171. delete_message_days=0)
  172. context.is_banned = True
  173. context.is_kicked = True
  174. action_descriptions.append('Author banned')
  175. self.log(message.guild, f'{message.author.name} banned')
  176. elif action.action == 'delete':
  177. await message.delete()
  178. context.is_deleted = True
  179. action_descriptions.append('Message deleted')
  180. self.log(message.guild, f'{message.author.name}\'s message deleted')
  181. elif action.action == 'kick':
  182. await message.author.kick(
  183. reason='Rocketbot: Message matched custom pattern named ' + \
  184. f'"{statement.name}"')
  185. context.is_kicked = True
  186. action_descriptions.append('Author kicked')
  187. self.log(message.guild, f'{message.author.name} kicked')
  188. elif action.action == 'modinfo':
  189. should_post_message = True
  190. message_type = BotMessage.TYPE_INFO
  191. action_descriptions.append('Message logged')
  192. elif action.action == 'modwarn':
  193. should_post_message = not self.was_warned_recently(message.author)
  194. message_type = BotMessage.TYPE_MOD_WARNING
  195. action_descriptions.append('Mods alerted')
  196. elif action.action == 'reply':
  197. await message.reply(
  198. f'{action.arguments[0]}',
  199. mention_author=False)
  200. action_descriptions.append('Autoreplied')
  201. self.log(message.guild, f'autoreplied to {message.author.name}')
  202. if should_post_message:
  203. bm = BotMessage(
  204. message.guild,
  205. f'User {message.author.name} tripped custom pattern ' + \
  206. f'`{statement.name}` at {message.jump_url}.\n\n' + \
  207. 'Automatic actions taken:\n• ' + ('\n• '.join(action_descriptions)),
  208. type=message_type,
  209. context=context)
  210. self.record_warning(message.author)
  211. bm.quote = discordutils.remove_markdown(message.clean_content)
  212. await bm.set_reactions(BotMessageReaction.standard_set(
  213. did_delete=context.is_deleted,
  214. did_kick=context.is_kicked,
  215. did_ban=context.is_banned))
  216. await self.post_message(bm)
  217. async def on_mod_react(self,
  218. bot_message: BotMessage,
  219. reaction: BotMessageReaction,
  220. reacted_by: Member) -> None:
  221. context: PatternContext = bot_message.context
  222. if reaction.emoji == CONFIG['trash_emoji']:
  223. await context.message.delete()
  224. context.is_deleted = True
  225. elif reaction.emoji == CONFIG['kick_emoji']:
  226. await context.message.author.kick(
  227. reason='Rocketbot: Message matched custom pattern named ' + \
  228. f'"{context.statement.name}". Kicked by {reacted_by.name}.')
  229. context.is_kicked = True
  230. elif reaction.emoji == CONFIG['ban_emoji']:
  231. await context.message.author.ban(
  232. reason='Rocketbot: Message matched custom pattern named ' + \
  233. f'"{context.statement.name}". Banned by {reacted_by.name}.',
  234. delete_message_days=1)
  235. context.is_banned = True
  236. await bot_message.set_reactions(BotMessageReaction.standard_set(
  237. did_delete=context.is_deleted,
  238. did_kick=context.is_kicked,
  239. did_ban=context.is_banned))
  240. pattern = Group(
  241. name='pattern',
  242. description='Manages message pattern matching.',
  243. guild_only=True,
  244. default_permissions=MOD_PERMISSIONS,
  245. extras={
  246. 'long_description': 'Patterns are a powerful but complex topic. '
  247. 'See <https://git.rixafrix.com/ialbert/python-app-rocketbot/src/branch/main/docs/patterns.md> for full documentation.'
  248. },
  249. )
  250. @pattern.command(
  251. description='Adds or updates a custom pattern.',
  252. extras={
  253. 'long_description': 'Patterns use a simplified expression language. Full '
  254. 'documentation found here: '
  255. '<https://git.rixafrix.com/ialbert/python-app-rocketbot/src/branch/main/docs/patterns.md>',
  256. },
  257. )
  258. @autocomplete(
  259. name=pattern_name_autocomplete,
  260. # actions=action_autocomplete
  261. )
  262. async def add(
  263. self,
  264. interaction: Interaction,
  265. name: str,
  266. actions: str,
  267. expression: str
  268. ) -> None:
  269. """
  270. Adds a custom pattern.
  271. Parameters
  272. ----------
  273. interaction : Interaction
  274. name : str
  275. a name for the new or existing pattern
  276. actions : str
  277. actions to take when a message matches
  278. expression : str
  279. criteria for matching chat messages
  280. """
  281. pattern_str = f'{actions} if {expression}'
  282. guild = interaction.guild
  283. try:
  284. statement = PatternCompiler.parse_statement(name, pattern_str)
  285. statement.check_deprecations()
  286. patterns = self.get_patterns(guild)
  287. patterns[name] = statement
  288. self.__save_patterns(guild, patterns)
  289. await interaction.response.send_message(
  290. f'{CONFIG["success_emoji"]} Pattern `{name}` added.',
  291. ephemeral=True,
  292. )
  293. except PatternError as e:
  294. await interaction.response.send_message(
  295. f'{CONFIG["failure_emoji"]} Error parsing statement. {e}',
  296. ephemeral=True,
  297. )
  298. @pattern.command(
  299. description='Removes a custom pattern.',
  300. extras={
  301. 'usage': '<pattern_name>',
  302. },
  303. )
  304. @autocomplete(name=pattern_name_autocomplete)
  305. async def remove(self, interaction: Interaction, name: str):
  306. """
  307. Removes a custom pattern.
  308. Parameters
  309. ----------
  310. interaction: Interaction
  311. name: str
  312. name of the pattern to remove
  313. """
  314. guild = interaction.guild
  315. patterns = self.get_patterns(guild)
  316. if patterns.get(name) is not None:
  317. del patterns[name]
  318. self.__save_patterns(guild, patterns)
  319. await interaction.response.send_message(
  320. f'{CONFIG["success_emoji"]} Pattern `{name}` deleted.',
  321. ephemeral=True,
  322. )
  323. else:
  324. await interaction.response.send_message(
  325. f'{CONFIG["failure_emoji"]} No pattern named `{name}`.',
  326. ephemeral=True,
  327. )
  328. @pattern.command(
  329. description='Lists all patterns.',
  330. )
  331. async def list(self, interaction: Interaction) -> None:
  332. guild = interaction.guild
  333. patterns = self.get_patterns(guild)
  334. if len(patterns) == 0:
  335. await interaction.response.send_message(
  336. 'No patterns defined.',
  337. ephemeral=True,
  338. )
  339. return
  340. msg = ''
  341. for name, statement in sorted(patterns.items()):
  342. msg += f'Pattern `{name}` (priority={statement.priority}):\n```\n{statement.original}\n```\n'
  343. await interaction.response.send_message(msg, ephemeral=True)
  344. @pattern.command(
  345. description="Sets a pattern's priority level.",
  346. extras={
  347. 'long_description': 'Messages are checked against patterns with the '
  348. 'highest priority first. Patterns with the same '
  349. 'priority may be checked in arbitrary order. Default '
  350. 'priority is 100.',
  351. },
  352. )
  353. @autocomplete(name=pattern_name_autocomplete, priority=priority_autocomplete)
  354. async def setpriority(self, interaction: Interaction, name: str, priority: int) -> None:
  355. """
  356. Sets a pattern's priority level.
  357. Parameters
  358. ----------
  359. interaction: Interaction
  360. name: str
  361. the name of the pattern
  362. priority: int
  363. evaluation priority
  364. """
  365. guild = interaction.guild
  366. patterns = self.get_patterns(guild)
  367. statement = patterns.get(name)
  368. if statement is None:
  369. await interaction.response.send_message(
  370. f'{CONFIG["failure_emoji"]} No such pattern `{name}`',
  371. ephemeral=True,
  372. )
  373. return
  374. statement.priority = priority
  375. self.__save_patterns(guild, patterns)
  376. await interaction.response.send_message(
  377. f'{CONFIG["success_emoji"]} Priority for pattern `{name}` ' + \
  378. f'updated to `{priority}`.',
  379. ephemeral=True,
  380. )