Experimental Discord bot written in Python
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. """
  2. Cog for handling most ungrouped commands and basic behaviors.
  3. """
  4. import re
  5. from datetime import datetime, timedelta, timezone
  6. from typing import Optional
  7. from discord import Message
  8. from discord.errors import DiscordException
  9. from discord.ext import commands
  10. from config import CONFIG
  11. from rocketbot.cogs.basecog import BaseCog, BotMessage
  12. from rocketbot.utils import timedelta_from_str, describe_timedelta
  13. from rocketbot.storage import ConfigKey, Storage
  14. class GeneralCog(BaseCog, name='General'):
  15. """
  16. Cog for handling high-level bot functionality and commands. Should be the
  17. first cog added to the bot.
  18. """
  19. def __init__(self, bot: commands.Bot):
  20. super().__init__(bot)
  21. self.is_connected = False
  22. self.is_ready = False
  23. self.is_first_ready = True
  24. self.is_first_connect = True
  25. @commands.Cog.listener()
  26. async def on_connect(self):
  27. 'Event handler'
  28. if self.is_first_connect:
  29. self.log(None, 'Connected')
  30. self.is_first_connect = False
  31. else:
  32. self.log(None, 'Reconnected')
  33. self.is_connected = True
  34. @commands.Cog.listener()
  35. async def on_disconnect(self):
  36. 'Event handler'
  37. self.log(None, 'Disconnected')
  38. @commands.Cog.listener()
  39. async def on_ready(self):
  40. 'Event handler'
  41. self.log(None, 'Bot done initializing')
  42. self.is_ready = True
  43. if self.is_first_ready:
  44. print('----------------------------------------------------------')
  45. self.is_first_ready = False
  46. @commands.Cog.listener()
  47. async def on_resumed(self):
  48. 'Event handler'
  49. self.log(None, 'Session resumed')
  50. @commands.command(
  51. brief='Posts a test warning',
  52. description='Tests whether a warning channel is configured for this ' + \
  53. 'guild by posting a test warning. If a mod mention is ' + \
  54. 'configured, that user/role will be tagged in the test warning.',
  55. )
  56. @commands.has_permissions(ban_members=True)
  57. @commands.guild_only()
  58. async def testwarn(self, context):
  59. 'Command handler'
  60. if Storage.get_config_value(context.guild, ConfigKey.WARNING_CHANNEL_ID) is None:
  61. await context.message.reply(
  62. f'{CONFIG["warning_emoji"]} No warning channel set!',
  63. mention_author=False)
  64. else:
  65. bm = BotMessage(
  66. context.guild,
  67. f'Test warning message (requested by {context.author.name})',
  68. type=BotMessage.TYPE_MOD_WARNING)
  69. await self.post_message(bm)
  70. @commands.command(
  71. brief='Simple test reply',
  72. description='Replies to the command message. Useful to ensure the ' + \
  73. 'bot is working properly.',
  74. )
  75. async def hello(self, context):
  76. 'Command handler'
  77. await context.message.reply(
  78. f'Hey, {context.author.name}!',
  79. mention_author=False)
  80. @commands.command(
  81. brief='Shuts down the bot',
  82. description='Causes the bot script to terminate. Only usable by a ' + \
  83. 'user with server admin permissions.',
  84. )
  85. @commands.has_permissions(administrator=True)
  86. @commands.guild_only()
  87. async def shutdown(self, context: commands.Context):
  88. 'Command handler'
  89. await context.message.add_reaction('👋')
  90. await self.bot.close()
  91. @commands.command(
  92. brief='Mass deletes messages',
  93. description='Deletes recent messages by the given user. The user ' +
  94. 'can be either an @ mention or a numeric user ID. The age is ' +
  95. 'a duration, such as "30s", "5m", "1h30m". Only the most ' +
  96. 'recent 100 messages in each channel are searched.',
  97. usage='<user:id|mention> <age:timespan>'
  98. )
  99. @commands.has_permissions(manage_messages=True)
  100. @commands.guild_only()
  101. async def deletemessages(self, context, user: str, age: str) -> None:
  102. 'Command handler'
  103. member_id = self.__parse_member_id(user)
  104. if member_id is None:
  105. await context.message.reply(
  106. f'{CONFIG["failure_emoji"]} user must be a mention or numeric user id',
  107. mention_author=False)
  108. return
  109. try:
  110. age_delta: timedelta = timedelta_from_str(age)
  111. except ValueError:
  112. await context.message.reply(
  113. f'{CONFIG["failure_emoji"]} age must be a timespan, like "30s", "10m", "1h30m"',
  114. mention_author=False)
  115. return
  116. cutoff: datetime = datetime.now(timezone.utc) - age_delta
  117. def predicate(message: Message) -> bool:
  118. return str(message.author.id) == member_id and message.created_at >= cutoff
  119. deleted_messages = []
  120. for channel in context.guild.text_channels:
  121. try:
  122. deleted_messages += await channel.purge(limit=100, check=predicate)
  123. except DiscordException:
  124. # XXX: Sloppily glossing over access errors instead of checking access
  125. pass
  126. await context.message.reply(
  127. f'{CONFIG["success_emoji"]} Deleted {len(deleted_messages)} ' + \
  128. f'messages by <@!{member_id}> from the past {describe_timedelta(age_delta)}.',
  129. mention_author=False)
  130. def __parse_member_id(self, arg: str) -> Optional[str]:
  131. p = re.compile('^<@!?([0-9]+)>$')
  132. m = p.match(arg)
  133. if m:
  134. return m.group(1)
  135. p = re.compile('^([0-9]+)$')
  136. m = p.match(arg)
  137. if m:
  138. return m.group(1)
  139. return None