""" Cog for handling most ungrouped commands and basic behaviors. """ import re from datetime import datetime, timedelta, timezone from typing import Optional from discord import Message from discord.errors import DiscordException from discord.ext import commands from config import CONFIG from rocketbot.bot import Rocketbot from rocketbot.cogs.basecog import BaseCog, BotMessage from rocketbot.utils import timedelta_from_str, describe_timedelta, dump_stacktrace from rocketbot.storage import ConfigKey, Storage class GeneralCog(BaseCog, name='General'): """ Cog for handling high-level bot functionality and commands. Should be the first cog added to the bot. """ def __init__(self, bot: Rocketbot): super().__init__(bot, None) self.is_connected = False self.is_ready = False self.is_first_ready = True self.is_first_connect = True self.last_disconnect_time: Optional[datetime] = None self.noteworthy_disconnect_duration = timedelta(seconds=5) @commands.Cog.listener() async def on_connect(self): """Event handler""" if self.is_first_connect: self.log(None, 'Connected') self.is_first_connect = False else: disconnect_duration = datetime.now( timezone.utc) - self.last_disconnect_time if self.last_disconnect_time else None if disconnect_duration is not None and disconnect_duration > self.noteworthy_disconnect_duration: self.log(None, f'Reconnected after {disconnect_duration.total_seconds()} seconds') self.is_connected = True @commands.Cog.listener() async def on_disconnect(self): """Event handler""" self.last_disconnect_time = datetime.now(timezone.utc) # self.log(None, 'Disconnected') @commands.Cog.listener() async def on_ready(self): """Event handler""" self.log(None, 'Bot done initializing') self.is_ready = True try: synced_commands = await self.bot.tree.sync() for command in synced_commands: self.log(None, f'Synced command: {command.name}') except Exception as e: dump_stacktrace(e) if self.is_first_ready: print('----------------------------------------------------------') self.is_first_ready = False @commands.Cog.listener() async def on_resumed(self): """Event handler""" disconnect_duration = datetime.now(timezone.utc) - self.last_disconnect_time if self.last_disconnect_time else None if disconnect_duration is not None and disconnect_duration > self.noteworthy_disconnect_duration: self.log(None, f'Session resumed after {disconnect_duration.total_seconds()} seconds') @commands.command( brief='Posts a test warning', description='Tests whether a warning channel is configured for this ' + \ 'guild by posting a test warning. If a mod mention is ' + \ 'configured, that user/role will be tagged in the test warning.', ) @commands.has_permissions(ban_members=True) @commands.guild_only() async def testwarn(self, context): """Command handler""" if Storage.get_config_value(context.guild, ConfigKey.WARNING_CHANNEL_ID) is None: await context.message.reply( f'{CONFIG["warning_emoji"]} No warning channel set!', mention_author=False) else: bm = BotMessage( context.guild, f'Test warning message (requested by {context.author.name})', type=BotMessage.TYPE_MOD_WARNING) await self.post_message(bm) @commands.command( brief='Simple test reply', description='Replies to the command message. Useful to ensure the ' + \ 'bot is working properly.', ) async def hello(self, context): """Command handler""" await context.message.reply( f'Hey, {context.author.name}!', mention_author=False) @commands.command( brief='Shuts down the bot', description='Causes the bot script to terminate. Only usable by a ' + \ 'user with server admin permissions.', ) @commands.has_permissions(administrator=True) @commands.guild_only() async def shutdown(self, context: commands.Context): """Command handler""" await context.message.add_reaction('👋') await self.bot.close() @commands.command( brief='Mass deletes messages', description='Deletes recent messages by the given user. The user ' + 'can be either an @ mention or a numeric user ID. The age is ' + 'a duration, such as "30s", "5m", "1h30m". Only the most ' + 'recent 100 messages in each channel are searched.', usage=' ' ) @commands.has_permissions(manage_messages=True) @commands.guild_only() async def deletemessages(self, context, user: str, age: str) -> None: """Command handler""" member_id = self.__parse_member_id(user) if member_id is None: await context.message.reply( f'{CONFIG["failure_emoji"]} user must be a mention or numeric user id', mention_author=False) return try: age_delta: timedelta = timedelta_from_str(age) except ValueError: await context.message.reply( f'{CONFIG["failure_emoji"]} age must be a timespan, like "30s", "10m", "1h30m"', mention_author=False) return cutoff: datetime = datetime.now(timezone.utc) - age_delta def predicate(message: Message) -> bool: return str(message.author.id) == member_id and message.created_at >= cutoff deleted_messages = [] for channel in context.guild.text_channels: try: deleted_messages += await channel.purge(limit=100, check=predicate) except DiscordException: # XXX: Sloppily glossing over access errors instead of checking access pass await context.message.reply( f'{CONFIG["success_emoji"]} Deleted {len(deleted_messages)} ' + \ f'messages by <@!{member_id}> from the past {describe_timedelta(age_delta)}.', mention_author=False) def __parse_member_id(self, arg: str) -> Optional[str]: p = re.compile('^<@!?([0-9]+)>$') m = p.match(arg) if m: return m.group(1) p = re.compile('^([0-9]+)$') m = p.match(arg) if m: return m.group(1) return None