| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- """
- Cog for handling most ungrouped commands and basic behaviors.
- """
- import re
- from datetime import datetime, timedelta, timezone
- from typing import Optional
-
- from discord import Message
- from discord.errors import DiscordException
- from discord.ext import commands
-
- from config import CONFIG
- from rocketbot.cogs.basecog import BaseCog, BotMessage
- from rocketbot.utils import timedelta_from_str, describe_timedelta
- from rocketbot.storage import ConfigKey, Storage
-
- class GeneralCog(BaseCog, name='General'):
- """
- Cog for handling high-level bot functionality and commands. Should be the
- first cog added to the bot.
- """
- def __init__(self, bot: commands.Bot):
- super().__init__(bot)
- self.is_connected = False
- self.is_ready = False
- self.is_first_ready = True
- self.is_first_connect = True
- self.last_disconnect_time: Optional[datetime] = None
- self.noteworthy_disconnect_duration = timedelta(seconds=5)
-
- @commands.Cog.listener()
- async def on_connect(self):
- """Event handler"""
- if self.is_first_connect:
- self.log(None, 'Connected')
- self.is_first_connect = False
- else:
- disconnect_duration = datetime.now(
- timezone.utc) - self.last_disconnect_time if self.last_disconnect_time else None
- if disconnect_duration is not None and disconnect_duration > self.noteworthy_disconnect_duration:
- self.log(None, f'Reconnected after {disconnect_duration.total_seconds()} seconds')
- self.is_connected = True
-
- @commands.Cog.listener()
- async def on_disconnect(self):
- """Event handler"""
- self.last_disconnect_time = datetime.now(timezone.utc)
- # self.log(None, 'Disconnected')
-
- @commands.Cog.listener()
- async def on_ready(self):
- """Event handler"""
- self.log(None, 'Bot done initializing')
- self.is_ready = True
- if self.is_first_ready:
- print('----------------------------------------------------------')
- self.is_first_ready = False
-
- @commands.Cog.listener()
- async def on_resumed(self):
- """Event handler"""
- disconnect_duration = datetime.now(timezone.utc) - self.last_disconnect_time if self.last_disconnect_time else None
- if disconnect_duration is not None and disconnect_duration > self.noteworthy_disconnect_duration:
- self.log(None, f'Session resumed after {disconnect_duration.total_seconds()} seconds')
-
- @commands.command(
- brief='Posts a test warning',
- description='Tests whether a warning channel is configured for this ' + \
- 'guild by posting a test warning. If a mod mention is ' + \
- 'configured, that user/role will be tagged in the test warning.',
- )
- @commands.has_permissions(ban_members=True)
- @commands.guild_only()
- async def testwarn(self, context):
- """Command handler"""
- if Storage.get_config_value(context.guild, ConfigKey.WARNING_CHANNEL_ID) is None:
- await context.message.reply(
- f'{CONFIG["warning_emoji"]} No warning channel set!',
- mention_author=False)
- else:
- bm = BotMessage(
- context.guild,
- f'Test warning message (requested by {context.author.name})',
- type=BotMessage.TYPE_MOD_WARNING)
- await self.post_message(bm)
-
- @commands.command(
- brief='Simple test reply',
- description='Replies to the command message. Useful to ensure the ' + \
- 'bot is working properly.',
- )
- async def hello(self, context):
- """Command handler"""
- await context.message.reply(
- f'Hey, {context.author.name}!',
- mention_author=False)
-
- @commands.command(
- brief='Shuts down the bot',
- description='Causes the bot script to terminate. Only usable by a ' + \
- 'user with server admin permissions.',
- )
- @commands.has_permissions(administrator=True)
- @commands.guild_only()
- async def shutdown(self, context: commands.Context):
- """Command handler"""
- await context.message.add_reaction('👋')
- await self.bot.close()
-
- @commands.command(
- brief='Mass deletes messages',
- description='Deletes recent messages by the given user. The user ' +
- 'can be either an @ mention or a numeric user ID. The age is ' +
- 'a duration, such as "30s", "5m", "1h30m". Only the most ' +
- 'recent 100 messages in each channel are searched.',
- usage='<user:id|mention> <age:timespan>'
- )
- @commands.has_permissions(manage_messages=True)
- @commands.guild_only()
- async def deletemessages(self, context, user: str, age: str) -> None:
- """Command handler"""
- member_id = self.__parse_member_id(user)
- if member_id is None:
- await context.message.reply(
- f'{CONFIG["failure_emoji"]} user must be a mention or numeric user id',
- mention_author=False)
- return
- try:
- age_delta: timedelta = timedelta_from_str(age)
- except ValueError:
- await context.message.reply(
- f'{CONFIG["failure_emoji"]} age must be a timespan, like "30s", "10m", "1h30m"',
- mention_author=False)
- return
- cutoff: datetime = datetime.now(timezone.utc) - age_delta
- def predicate(message: Message) -> bool:
- return str(message.author.id) == member_id and message.created_at >= cutoff
- deleted_messages = []
- for channel in context.guild.text_channels:
- try:
- deleted_messages += await channel.purge(limit=100, check=predicate)
- except DiscordException:
- # XXX: Sloppily glossing over access errors instead of checking access
- pass
- await context.message.reply(
- f'{CONFIG["success_emoji"]} Deleted {len(deleted_messages)} ' + \
- f'messages by <@!{member_id}> from the past {describe_timedelta(age_delta)}.',
- mention_author=False)
-
- def __parse_member_id(self, arg: str) -> Optional[str]:
- p = re.compile('^<@!?([0-9]+)>$')
- m = p.match(arg)
- if m:
- return m.group(1)
- p = re.compile('^([0-9]+)$')
- m = p.match(arg)
- if m:
- return m.group(1)
- return None
|