Experimental Discord bot written in Python
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. """
  2. Base cog class and helper classes.
  3. """
  4. from datetime import datetime, timedelta, timezone
  5. from typing import Optional
  6. import discord
  7. from discord import (
  8. Guild,
  9. Interaction,
  10. Member,
  11. Message,
  12. RawReactionActionEvent,
  13. TextChannel,
  14. )
  15. from discord.abc import GuildChannel
  16. from discord.app_commands import AppCommandError
  17. from discord.app_commands.errors import CommandInvokeError
  18. from discord.ext.commands import Cog
  19. from config import CONFIG
  20. from rocketbot.bot import Rocketbot
  21. from rocketbot.botmessage import BotMessage, BotMessageReaction
  22. from rocketbot.cogsetting import CogSetting
  23. from rocketbot.collections import AgeBoundDict
  24. from rocketbot.storage import Storage
  25. from rocketbot.utils import bot_log, dump_stacktrace
  26. class WarningContext:
  27. def __init__(self, member: Member, warn_time: datetime):
  28. self.member = member
  29. self.last_warned = warn_time
  30. class BaseCog(Cog):
  31. STATE_KEY_RECENT_WARNINGS = "BaseCog.recent_warnings"
  32. """
  33. Superclass for all Rocketbot cogs. Provides lots of conveniences for
  34. common tasks.
  35. """
  36. def __init__(
  37. self,
  38. bot: Rocketbot,
  39. config_prefix: Optional[str],
  40. short_description: str,
  41. long_description: Optional[str] = None,
  42. ):
  43. """
  44. Parameters
  45. ----------
  46. bot: Rocketbot
  47. config_prefix: str
  48. Prefix to show on variables in /set and /get commands to namespace
  49. configuration variables. E.g. if config_prefix is "foo", a config
  50. variable named "bar" in that cog will show as "foo.bar". If None,
  51. config variable acts as a top-level variable with no prefix.
  52. """
  53. self.bot: Rocketbot = bot
  54. self.are_settings_setup: bool = False
  55. self.settings: list[CogSetting] = []
  56. self.config_prefix: Optional[str] = config_prefix
  57. self.short_description: str = short_description
  58. self.long_description: str = long_description
  59. async def cog_app_command_error(self, interaction: Interaction, error: AppCommandError) -> None:
  60. if isinstance(error, CommandInvokeError):
  61. error = error.original
  62. dump_stacktrace(error)
  63. message = f"\nException: {error.__class__.__name__}, "\
  64. f"Command: {interaction.command.qualified_name if interaction.command else None}, "\
  65. f"User: {interaction.user}, "\
  66. f"Time: {discord.utils.format_dt(interaction.created_at, style='F')}"
  67. try:
  68. await interaction.response.send_message(f"An error occurred: {message}", ephemeral=True)
  69. except discord.InteractionResponded:
  70. await interaction.followup.send(f"An error occurred: {message}", ephemeral=True)
  71. @property
  72. def basecogs(self) -> list['BaseCog']:
  73. """
  74. List of BaseCog instances. Cogs that do not inherit from BaseCog are omitted.
  75. """
  76. return [
  77. bcog
  78. for bcog in sorted(self.bot.cogs.values(), key=lambda c: c.qualified_name)
  79. if isinstance(bcog, BaseCog)
  80. ]
  81. @property
  82. def basecog_map(self) -> dict[str, 'BaseCog']:
  83. """
  84. Map of qualified names to BaseCog instances. Cogs that do not inherit
  85. from BaseCog are omitted.
  86. """
  87. return {
  88. qname: bcog
  89. for qname, bcog in self.bot.cogs.items()
  90. if isinstance(bcog, BaseCog)
  91. }
  92. # Config
  93. def add_setting(self, setting: CogSetting) -> None:
  94. """
  95. Called by a subclass in __init__ to register a mod-configurable
  96. guild setting. A "get" and "set" command will be generated. If the
  97. setting is named "enabled" (exactly) then "enable" and "disable"
  98. commands will be created instead which set the setting to True/False.
  99. If the cog has a command group it will be detected automatically and
  100. the commands added to that. Otherwise, the commands will be added at
  101. the top level.
  102. Changes to settings can be detected by overriding `on_setting_updated`.
  103. """
  104. self.settings.append(setting)
  105. @classmethod
  106. def get_guild_setting(cls,
  107. guild: Optional[Guild],
  108. setting: CogSetting,
  109. use_cog_default_if_not_set: bool = True):
  110. """
  111. Returns the configured value for a setting for the given guild. If no
  112. setting is configured the default for the cog will be returned,
  113. unless the optional `use_cog_default_if_not_set` is `False`, then
  114. `None` will be returned.
  115. """
  116. if guild:
  117. key = f'{cls.__name__}.{setting.name}'
  118. value = Storage.get_config_value(guild, key)
  119. if value is not None:
  120. return value
  121. if use_cog_default_if_not_set:
  122. return setting.default_value
  123. return None
  124. @classmethod
  125. def set_guild_setting(cls,
  126. guild: Guild,
  127. setting: CogSetting,
  128. new_value) -> None:
  129. """
  130. Manually sets a setting for the given guild. BaseCog creates "get" and
  131. "set" commands for guild administrators to configure values themselves,
  132. but this method can be used for hidden settings from code. A ValueError
  133. will be raised if the new value does not pass validation specified in
  134. the CogSetting.
  135. """
  136. setting.validate_value(new_value)
  137. key = f'{cls.__name__}.{setting.name}'
  138. Storage.set_config_value(guild, key, new_value)
  139. # @commands.Cog.listener()
  140. async def __on_ready(self):
  141. """Event listener"""
  142. if not self.are_settings_setup:
  143. self.are_settings_setup = True
  144. CogSetting.set_up_all(self, self.bot, self.settings)
  145. async def on_setting_updated(self, guild: Guild, setting: CogSetting) -> None:
  146. """
  147. Subclass override point for being notified when a CogSetting is edited.
  148. """
  149. # Warning squelch
  150. def was_warned_recently(self, member: Member) -> bool:
  151. """
  152. Tests if a given member was included in a mod warning message recently.
  153. Used to suppress redundant messages. Should be checked before pinging
  154. mods for relatively minor warnings about single users, but warnings
  155. about larger threats involving several members (e.g. join raids) should
  156. issue warnings regardless. Call record_warning or record_warnings after
  157. triggering a mod warning.
  158. """
  159. recent_warns: AgeBoundDict[int, WarningContext, datetime, timedelta] = Storage.get_state_value(member.guild,
  160. BaseCog.STATE_KEY_RECENT_WARNINGS)
  161. if recent_warns is None:
  162. return False
  163. context: WarningContext = recent_warns.get(member.id)
  164. if context is None:
  165. return False
  166. squelch_warning_seconds: int = CONFIG['squelch_warning_seconds']
  167. return datetime.now() - context.last_warned < timedelta(seconds=squelch_warning_seconds)
  168. def record_warning(self, member: Member):
  169. """
  170. Records that mods have been warned about a member and do not need to be
  171. warned about them again for a short while.
  172. """
  173. recent_warns: AgeBoundDict[int, WarningContext, datetime, timedelta] = Storage.get_state_value(member.guild,
  174. BaseCog.STATE_KEY_RECENT_WARNINGS)
  175. if recent_warns is None:
  176. recent_warns = AgeBoundDict(timedelta(seconds=CONFIG['squelch_warning_seconds']),
  177. lambda i, context0 : context0.last_warned)
  178. Storage.set_state_value(member.guild, BaseCog.STATE_KEY_RECENT_WARNINGS, recent_warns)
  179. context: WarningContext = recent_warns.get(member.id)
  180. if context is None:
  181. context = WarningContext(member, datetime.now())
  182. recent_warns[member.id] = context
  183. else:
  184. context.last_warned = datetime.now()
  185. def record_warnings(self, members: list[Member]):
  186. """
  187. Records that mods have been warned about some members and do not need to
  188. be warned about them again for a short while.
  189. """
  190. for member in members:
  191. self.record_warning(member)
  192. # Bot message handling
  193. @classmethod
  194. def __bot_messages(cls, guild: Guild) -> AgeBoundDict[int, BotMessage, datetime, timedelta]:
  195. bm: AgeBoundDict[int, BotMessage, datetime, timedelta] = Storage.get_state_value(guild, 'bot_messages')
  196. if bm is None:
  197. far_future = datetime.now(timezone.utc) + timedelta(days=1000)
  198. bm = AgeBoundDict(timedelta(seconds=600),
  199. lambda k, v : v.message_sent_at() or far_future)
  200. Storage.set_state_value(guild, 'bot_messages', bm)
  201. return bm
  202. async def post_message(self, message: BotMessage) -> bool:
  203. """
  204. Posts a BotMessage to a guild. Returns whether it was successful. If
  205. the caller wants to listen to reactions they should be added before
  206. calling this method. Listen to reactions by overriding `on_mod_react`.
  207. """
  208. message.source_cog = self
  209. await message.update()
  210. return message.is_sent()
  211. @Cog.listener()
  212. async def on_raw_reaction_add(self, payload: RawReactionActionEvent):
  213. """Event handler"""
  214. # Avoid any unnecessary requests. Gets called for every reaction
  215. # multiplied by every active cog.
  216. if payload.user_id == self.bot.user.id:
  217. # Ignore bot's own reactions
  218. return
  219. guild: Guild = self.bot.get_guild(payload.guild_id) or await self.bot.fetch_guild(payload.guild_id)
  220. if guild is None:
  221. # Possibly a DM
  222. return
  223. guild_messages: dict[int, BotMessage] = Storage.get_bot_messages(guild)
  224. bot_message = guild_messages.get(payload.message_id)
  225. if bot_message is None:
  226. # Unknown message (expired or was never tracked)
  227. return
  228. if self is not bot_message.source_cog:
  229. # Belongs to a different cog
  230. return
  231. reaction = bot_message.reaction_for_emoji(payload.emoji)
  232. if reaction is None or not reaction.is_enabled:
  233. # Can't use this reaction with this message
  234. return
  235. g_channel: GuildChannel = guild.get_channel(payload.channel_id) or await guild.fetch_channel(payload.channel_id)
  236. if g_channel is None:
  237. # Possibly a DM
  238. return
  239. if not isinstance(g_channel, TextChannel):
  240. return
  241. channel: TextChannel = g_channel
  242. member: Member = payload.member
  243. if member is None:
  244. return
  245. if not channel.permissions_for(member).ban_members:
  246. # Not a mod (could make permissions configurable per BotMessageReaction some day)
  247. return
  248. message: Message = await channel.fetch_message(payload.message_id)
  249. if message is None:
  250. # Message deleted?
  251. return
  252. if message.author.id != self.bot.user.id:
  253. # Bot didn't author this
  254. return
  255. await self.on_mod_react(bot_message, reaction, member)
  256. async def on_mod_react(self,
  257. bot_message: BotMessage,
  258. reaction: BotMessageReaction,
  259. reacted_by: Member) -> None:
  260. """
  261. Subclass override point for receiving mod reactions to bot messages sent
  262. via `post_message()`.
  263. """
  264. # Helpers
  265. @classmethod
  266. def log(cls, guild: Optional[Guild], message) -> None:
  267. """
  268. Writes a message to the console. Intended for significant events only.
  269. """
  270. bot_log(guild, cls, message)