Experimental Discord bot written in Python
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

basecog.py 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. """
  2. Base cog class and helper classes.
  3. """
  4. from datetime import datetime, timedelta, timezone
  5. from typing import Optional
  6. import discord
  7. from discord import Guild, Interaction, Member, Message, RawReactionActionEvent, TextChannel
  8. from discord.abc import GuildChannel
  9. from discord.app_commands import AppCommandError
  10. from discord.app_commands.errors import CommandInvokeError
  11. from discord.ext.commands import Cog
  12. from config import CONFIG
  13. from rocketbot.bot import Rocketbot
  14. from rocketbot.botmessage import BotMessage, BotMessageReaction
  15. from rocketbot.cogsetting import CogSetting
  16. from rocketbot.collections import AgeBoundDict
  17. from rocketbot.storage import Storage
  18. from rocketbot.utils import bot_log, dump_stacktrace
  19. class WarningContext:
  20. def __init__(self, member: Member, warn_time: datetime):
  21. self.member = member
  22. self.last_warned = warn_time
  23. class BaseCog(Cog):
  24. STATE_KEY_RECENT_WARNINGS = "BaseCog.recent_warnings"
  25. """
  26. Superclass for all Rocketbot cogs. Provides lots of conveniences for
  27. common tasks.
  28. """
  29. def __init__(
  30. self,
  31. bot: Rocketbot,
  32. config_prefix: Optional[str],
  33. short_description: str,
  34. long_description: Optional[str] = None,
  35. ):
  36. """
  37. Parameters
  38. ----------
  39. bot: Rocketbot
  40. config_prefix: str
  41. Prefix to show on variables in /set and /get commands to namespace
  42. configuration variables. E.g. if config_prefix is "foo", a config
  43. variable named "bar" in that cog will show as "foo.bar". If None,
  44. config variable acts as a top-level variable with no prefix.
  45. """
  46. self.bot: Rocketbot = bot
  47. self.are_settings_setup: bool = False
  48. self.settings: list[CogSetting] = []
  49. self.config_prefix: Optional[str] = config_prefix
  50. self.short_description: str = short_description
  51. self.long_description: str = long_description
  52. async def cog_app_command_error(self, interaction: Interaction, error: AppCommandError) -> None:
  53. if isinstance(error, CommandInvokeError):
  54. error = error.original
  55. dump_stacktrace(error)
  56. message = f"\nException: {error.__class__.__name__}, "\
  57. f"Command: {interaction.command.qualified_name if interaction.command else None}, "\
  58. f"User: {interaction.user}, "\
  59. f"Time: {discord.utils.format_dt(interaction.created_at, style='F')}"
  60. try:
  61. await interaction.response.send_message(f"An error occurred: {message}", ephemeral=True)
  62. except discord.InteractionResponded:
  63. await interaction.followup.send(f"An error occurred: {message}", ephemeral=True)
  64. @property
  65. def basecogs(self) -> list['BaseCog']:
  66. """
  67. List of BaseCog instances. Cogs that do not inherit from BaseCog are omitted.
  68. """
  69. return [
  70. bcog
  71. for bcog in self.bot.cogs.values()
  72. if isinstance(bcog, BaseCog)
  73. ]
  74. @property
  75. def basecog_map(self) -> dict[str, 'BaseCog']:
  76. """
  77. Map of qualified names to BaseCog instances. Cogs that do not inherit
  78. from BaseCog are omitted.
  79. """
  80. return {
  81. qname: bcog
  82. for qname, bcog in self.bot.cogs.items()
  83. if isinstance(bcog, BaseCog)
  84. }
  85. # Config
  86. @classmethod
  87. def get_cog_default(cls, key: str):
  88. """
  89. Convenience method for getting a cog configuration default from
  90. `CONFIG['cogs'][<cog_name>][<key>]`. These values are used for
  91. CogSettings when no guild-specific value is configured yet.
  92. """
  93. cogs: dict = CONFIG['cog_defaults']
  94. cog = cogs.get(cls.__name__)
  95. if cog is None:
  96. return None
  97. return cog.get(key)
  98. def add_setting(self, setting: CogSetting) -> None:
  99. """
  100. Called by a subclass in __init__ to register a mod-configurable
  101. guild setting. A "get" and "set" command will be generated. If the
  102. setting is named "enabled" (exactly) then "enable" and "disable"
  103. commands will be created instead which set the setting to True/False.
  104. If the cog has a command group it will be detected automatically and
  105. the commands added to that. Otherwise, the commands will be added at
  106. the top level.
  107. Changes to settings can be detected by overriding `on_setting_updated`.
  108. """
  109. self.settings.append(setting)
  110. @classmethod
  111. def get_guild_setting(cls,
  112. guild: Guild,
  113. setting: CogSetting,
  114. use_cog_default_if_not_set: bool = True):
  115. """
  116. Returns the configured value for a setting for the given guild. If no
  117. setting is configured the default for the cog will be returned,
  118. unless the optional `use_cog_default_if_not_set` is `False`, then
  119. `None` will be returned.
  120. """
  121. key = f'{cls.__name__}.{setting.name}'
  122. value = Storage.get_config_value(guild, key)
  123. if value is None and use_cog_default_if_not_set:
  124. value = cls.get_cog_default(setting.name)
  125. return value
  126. @classmethod
  127. def set_guild_setting(cls,
  128. guild: Guild,
  129. setting: CogSetting,
  130. new_value) -> None:
  131. """
  132. Manually sets a setting for the given guild. BaseCog creates "get" and
  133. "set" commands for guild administrators to configure values themselves,
  134. but this method can be used for hidden settings from code. A ValueError
  135. will be raised if the new value does not pass validation specified in
  136. the CogSetting.
  137. """
  138. setting.validate_value(new_value)
  139. key = f'{cls.__name__}.{setting.name}'
  140. Storage.set_config_value(guild, key, new_value)
  141. # @commands.Cog.listener()
  142. async def __on_ready(self):
  143. """Event listener"""
  144. if not self.are_settings_setup:
  145. self.are_settings_setup = True
  146. CogSetting.set_up_all(self, self.bot, self.settings)
  147. async def on_setting_updated(self, guild: Guild, setting: CogSetting) -> None:
  148. """
  149. Subclass override point for being notified when a CogSetting is edited.
  150. """
  151. # Warning squelch
  152. def was_warned_recently(self, member: Member) -> bool:
  153. """
  154. Tests if a given member was included in a mod warning message recently.
  155. Used to suppress redundant messages. Should be checked before pinging
  156. mods for relatively minor warnings about single users, but warnings
  157. about larger threats involving several members (e.g. join raids) should
  158. issue warnings regardless. Call record_warning or record_warnings after
  159. triggering a mod warning.
  160. """
  161. recent_warns: AgeBoundDict[int, WarningContext, datetime, timedelta] = Storage.get_state_value(member.guild,
  162. BaseCog.STATE_KEY_RECENT_WARNINGS)
  163. if recent_warns is None:
  164. return False
  165. context: WarningContext = recent_warns.get(member.id)
  166. if context is None:
  167. return False
  168. squelch_warning_seconds: int = CONFIG['squelch_warning_seconds']
  169. return datetime.now() - context.last_warned < timedelta(seconds=squelch_warning_seconds)
  170. def record_warning(self, member: Member):
  171. """
  172. Records that mods have been warned about a member and do not need to be
  173. warned about them again for a short while.
  174. """
  175. recent_warns: AgeBoundDict[int, WarningContext, datetime, timedelta] = Storage.get_state_value(member.guild,
  176. BaseCog.STATE_KEY_RECENT_WARNINGS)
  177. if recent_warns is None:
  178. recent_warns = AgeBoundDict(timedelta(seconds=CONFIG['squelch_warning_seconds']),
  179. lambda i, context0 : context0.last_warned)
  180. Storage.set_state_value(member.guild, BaseCog.STATE_KEY_RECENT_WARNINGS, recent_warns)
  181. context: WarningContext = recent_warns.get(member.id)
  182. if context is None:
  183. context = WarningContext(member, datetime.now())
  184. recent_warns[member.id] = context
  185. else:
  186. context.last_warned = datetime.now()
  187. def record_warnings(self, members: list):
  188. """
  189. Records that mods have been warned about some members and do not need to
  190. be warned about them again for a short while.
  191. """
  192. for member in members:
  193. self.record_warning(member)
  194. # Bot message handling
  195. @classmethod
  196. def __bot_messages(cls, guild: Guild) -> AgeBoundDict[int, BotMessage, datetime, timedelta]:
  197. bm: AgeBoundDict[int, BotMessage, datetime, timedelta] = Storage.get_state_value(guild, 'bot_messages')
  198. if bm is None:
  199. far_future = datetime.now(timezone.utc) + timedelta(days=1000)
  200. bm = AgeBoundDict(timedelta(seconds=600),
  201. lambda k, v : v.message_sent_at() or far_future)
  202. Storage.set_state_value(guild, 'bot_messages', bm)
  203. return bm
  204. async def post_message(self, message: BotMessage) -> bool:
  205. """
  206. Posts a BotMessage to a guild. Returns whether it was successful. If
  207. the caller wants to listen to reactions they should be added before
  208. calling this method. Listen to reactions by overriding `on_mod_react`.
  209. """
  210. message.source_cog = self
  211. await message.update()
  212. return message.is_sent()
  213. @Cog.listener()
  214. async def on_raw_reaction_add(self, payload: RawReactionActionEvent):
  215. """Event handler"""
  216. # Avoid any unnecessary requests. Gets called for every reaction
  217. # multiplied by every active cog.
  218. if payload.user_id == self.bot.user.id:
  219. # Ignore bot's own reactions
  220. return
  221. guild: Guild = self.bot.get_guild(payload.guild_id) or await self.bot.fetch_guild(payload.guild_id)
  222. if guild is None:
  223. # Possibly a DM
  224. return
  225. guild_messages: dict[int, BotMessage] = Storage.get_bot_messages(guild)
  226. bot_message = guild_messages.get(payload.message_id)
  227. if bot_message is None:
  228. # Unknown message (expired or was never tracked)
  229. return
  230. if self is not bot_message.source_cog:
  231. # Belongs to a different cog
  232. return
  233. reaction = bot_message.reaction_for_emoji(payload.emoji)
  234. if reaction is None or not reaction.is_enabled:
  235. # Can't use this reaction with this message
  236. return
  237. g_channel: GuildChannel = guild.get_channel(payload.channel_id) or await guild.fetch_channel(payload.channel_id)
  238. if g_channel is None:
  239. # Possibly a DM
  240. return
  241. if not isinstance(g_channel, TextChannel):
  242. return
  243. channel: TextChannel = g_channel
  244. member: Member = payload.member
  245. if member is None:
  246. return
  247. if not channel.permissions_for(member).ban_members:
  248. # Not a mod (could make permissions configurable per BotMessageReaction some day)
  249. return
  250. message: Message = await channel.fetch_message(payload.message_id)
  251. if message is None:
  252. # Message deleted?
  253. return
  254. if message.author.id != self.bot.user.id:
  255. # Bot didn't author this
  256. return
  257. await self.on_mod_react(bot_message, reaction, member)
  258. async def on_mod_react(self,
  259. bot_message: BotMessage,
  260. reaction: BotMessageReaction,
  261. reacted_by: Member) -> None:
  262. """
  263. Subclass override point for receiving mod reactions to bot messages sent
  264. via `post_message()`.
  265. """
  266. # Helpers
  267. @classmethod
  268. def log(cls, guild: Optional[Guild], message) -> None:
  269. """
  270. Writes a message to the console. Intended for significant events only.
  271. """
  272. bot_log(guild, cls, message)