| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- """
- Cog for handling most ungrouped commands and basic behaviors.
- """
- from datetime import datetime, timedelta, timezone
- from typing import Optional, Union
-
- from discord import Interaction, Message, User, Permissions
- from discord.app_commands import Command, Group, command, default_permissions, guild_only, Transform, rename, Choice, \
- autocomplete
- from discord.errors import DiscordException
- from discord.ext.commands import Cog
-
- from config import CONFIG
- from rocketbot.bot import Rocketbot
- from rocketbot.cogs.basecog import BaseCog, BotMessage
- from rocketbot.utils import describe_timedelta, TimeDeltaTransformer, dump_stacktrace
- from rocketbot.storage import ConfigKey, Storage
-
- async def command_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- choices: list[Choice] = []
- try:
- if current.startswith('/'):
- current = current[1:]
- current = current.lower().strip()
- user_permissions = interaction.permissions
- cmds = GeneralCog.shared.get_command_list(user_permissions)
- return [
- Choice(name=f'/{cmdname}', value=f'/{cmdname}')
- for cmdname in sorted(cmds.keys())
- if len(current) == 0 or cmdname.startswith(current)
- ]
- except BaseException as e:
- dump_stacktrace(e)
- return choices
-
- async def subcommand_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- try:
- current = current.lower().strip()
- cmd_name = interaction.namespace['command']
- if cmd_name.startswith('/'):
- cmd_name = cmd_name[1:]
- user_permissions = interaction.permissions
- cmd = GeneralCog.shared.get_command_list(user_permissions).get(cmd_name)
- if cmd is None or not isinstance(cmd, Group):
- print(f'No command found named {cmd_name}')
- return []
- grp = cmd
- subcmds = GeneralCog.shared.get_subcommand_list(grp, user_permissions)
- if subcmds is None:
- print(f'Subcommands for {cmd_name} was None')
- return []
- return [
- Choice(name=subcmd_name, value=subcmd_name)
- for subcmd_name in sorted(subcmds.keys())
- if len(current) == 0 or subcmd_name.startswith(current)
- ]
- except BaseException as e:
- dump_stacktrace(e)
- return []
-
- def can_use_command(cmd: Union[Group, Command], user_permissions: Optional[Permissions]) -> bool:
- return user_permissions is not None and \
- (cmd.default_permissions is None or cmd.default_permissions.is_subset(user_permissions))
-
- class GeneralCog(BaseCog, name='General'):
- """
- Cog for handling high-level bot functionality and commands. Should be the
- first cog added to the bot.
- """
-
- shared: Optional['GeneralCog'] = None
-
- def __init__(self, bot: Rocketbot):
- super().__init__(
- bot,
- config_prefix=None,
- name='',
- short_description='',
- )
- self.is_connected = False
- self.is_first_connect = True
- self.last_disconnect_time: Optional[datetime] = None
- self.noteworthy_disconnect_duration = timedelta(seconds=5)
- GeneralCog.shared = self
-
- @Cog.listener()
- async def on_connect(self):
- """Event handler"""
- if self.is_first_connect:
- self.log(None, 'Connected')
- self.is_first_connect = False
- else:
- disconnect_duration = datetime.now(
- timezone.utc) - self.last_disconnect_time if self.last_disconnect_time else None
- if disconnect_duration is not None and disconnect_duration > self.noteworthy_disconnect_duration:
- self.log(None, f'Reconnected after {disconnect_duration.total_seconds()} seconds')
- self.is_connected = True
-
- @Cog.listener()
- async def on_disconnect(self):
- """Event handler"""
- self.last_disconnect_time = datetime.now(timezone.utc)
- # self.log(None, 'Disconnected')
-
- @Cog.listener()
- async def on_resumed(self):
- """Event handler"""
- disconnect_duration = datetime.now(timezone.utc) - self.last_disconnect_time if self.last_disconnect_time else None
- if disconnect_duration is not None and disconnect_duration > self.noteworthy_disconnect_duration:
- self.log(None, f'Session resumed after {disconnect_duration.total_seconds()} seconds')
-
- @command(
- description='Posts a test warning',
- extras={
- 'long_description': 'Tests whether a warning channel is configured for this ' + \
- 'guild by posting a test warning. If a mod mention is ' + \
- 'configured, that user/role will be tagged in the test warning.',
- },
- )
- @guild_only()
- @default_permissions(ban_members=True)
- async def test_warn(self, interaction: Interaction):
- """Command handler"""
- if Storage.get_config_value(interaction.guild, ConfigKey.WARNING_CHANNEL_ID) is None:
- await interaction.response.send_message(
- f'{CONFIG["warning_emoji"]} No warning channel set!',
- ephemeral=True,
- )
- else:
- bm = BotMessage(
- interaction.guild,
- f'Test warning message (requested by {interaction.user.name})',
- type=BotMessage.TYPE_MOD_WARNING)
- await self.post_message(bm)
- await interaction.response.send_message(
- 'Warning issued',
- ephemeral=True,
- )
-
- @command(
- description='Simple test reply',
- extras={
- 'long_description': 'Replies to the command message. Useful to ensure the ' + \
- 'bot is working properly.',
- },
- )
- async def hello(self, interaction: Interaction):
- """Command handler"""
- await interaction.response.send_message(
- f'Hey, {interaction.user.name}!',
- ephemeral=True,
- )
-
- @command(
- description='Shuts down the bot',
- extras={
- 'long_description': 'Causes the bot script to terminate. Only usable by a ' + \
- 'user with server admin permissions.',
- },
- )
- @guild_only()
- @default_permissions(administrator=True)
- async def shutdown(self, interaction: Interaction):
- """Command handler"""
- await interaction.response.send_message('👋', ephemeral=True)
- await self.bot.close()
-
- @command(
- description='Mass deletes messages',
- extras={
- 'long_description': 'Deletes recent messages by the given user. The user ' +
- 'can be either an @ mention or a numeric user ID. The age is ' +
- 'a duration, such as "30s", "5m", "1h30m". Only the most ' +
- 'recent 100 messages in each channel are searched.',
- 'usage': '<user:id|mention> <age:timespan>',
- },
- )
- @guild_only()
- @default_permissions(manage_messages=True)
- async def delete_messages(self, interaction: Interaction, user: User, age: Transform[timedelta, TimeDeltaTransformer]) -> None:
- """Command handler"""
- member_id = user.id
- cutoff: datetime = datetime.now(timezone.utc) - age
- def predicate(message: Message) -> bool:
- return str(message.author.id) == member_id and message.created_at >= cutoff
- deleted_messages = []
- for channel in interaction.guild.text_channels:
- try:
- deleted_messages += await channel.purge(limit=100, check=predicate)
- except DiscordException:
- # XXX: Sloppily glossing over access errors instead of checking access
- pass
- await interaction.response.send_message(
- f'{CONFIG["success_emoji"]} Deleted {len(deleted_messages)} ' + \
- f'messages by {user.mention}> from the past {describe_timedelta(age)}.',
- ephemeral=True,
- )
-
- @command(name='help')
- @guild_only()
- @rename(command_name='command', subcommand_name='subcommand')
- @autocomplete(command_name=command_autocomplete, subcommand_name=subcommand_autocomplete)
- async def help_command(self, interaction: Interaction, command_name: Optional[str] = None, subcommand_name: Optional[str] = None) -> None:
- """
- Shows help for using commands and subcommands.
-
- `/help` will show a list of top-level commands.
-
- `/help /<command_name>` will show help about a specific command or
- list a command's subcommands.
-
- `/help /<command_name> <subcommand_name>` will show help about a
- specific subcommand.
-
- Parameters
- ----------
- interaction: Interaction
- command_name: Optional[str]
- Optional name of a command to get specific help for. With or without the leading slash.
- subcommand_name: Optional[str]
- Optional name of a subcommand to get specific help for.
- """
- print(f'help_command(interaction, {command_name}, {subcommand_name})')
- cmds: list[Command] = self.bot.tree.get_commands()
- if command_name is None:
- await self.__send_general_help(interaction)
- return
-
- if command_name.startswith('/'):
- command_name = command_name[1:]
- cmd = next((c for c in cmds if c.name == command_name), None)
- if cmd is None:
- interaction.response.send_message(
- f'Command `{command_name}` not found!',
- ephemeral=True,
- )
- return
- if subcommand_name is None:
- await self.__send_command_help(interaction, cmd)
- return
-
- if not isinstance(cmd, Group):
- await self.__send_command_help(interaction, cmd, addendum=f'{CONFIG["warning_emoji"]} Command does not have subcommands. Showing help for base command.')
- return
- grp: Group = cmd
- subcmd: Command = next((c for c in grp.commands if c.name == subcommand_name), None)
- if subcmd is None:
- await self.__send_command_help(interaction, cmd, addendum=f'{CONFIG["warning_emoji"]} Command `/{command_name}` does not have a subcommand "{subcommand_name}". Showing help for base command.')
- return
- await self.__send_subcommand_help(interaction, grp, subcmd)
- return
-
- def get_command_list(self, permissions: Optional[Permissions] = None) -> dict[str, Union[Command, Group]]:
- return { cmd.name: cmd for cmd in self.bot.tree.get_commands() if can_use_command(cmd, permissions) }
-
- def get_subcommand_list(self, cmd: Group, permissions: Optional[Permissions] = None) -> dict[str, Command]:
- return { subcmd.name: subcmd for subcmd in cmd.commands if can_use_command(subcmd, permissions) }
-
- async def __send_general_help(self, interaction: Interaction) -> None:
- user_permissions: Permissions = interaction.permissions
- text = f'## :information_source: Commands'
- for cmd_name, cmd in sorted(self.get_command_list(user_permissions).items()):
- text += f'\n- `/{cmd_name}`: {cmd.description}'
- await interaction.response.send_message(
- text,
- ephemeral=True,
- )
-
- async def __send_command_help(self, interaction: Interaction, command_or_group: Union[Command, Group], addendum: Optional[str] = None) -> None:
- text = ''
- if addendum is not None:
- text += addendum + '\n\n'
- text += f'## :information_source: Command Help\n`/{command_or_group.name}`\n\n{command_or_group.description}'
- if isinstance(command_or_group, Group):
- subcmds: dict[str, Command] = self.get_subcommand_list(command_or_group, permissions=interaction.permissions)
- if len(subcmds) > 0:
- text += '\n\n### Subcommands:'
- for subcmd_name, subcmd in sorted(subcmds.items()):
- text += f'\n- `{subcmd_name}`: {subcmd.description}'
- else:
- params = command_or_group.parameters
- if len(params) > 0:
- text += '\n\n### Parameters:'
- for param in params:
- text += f'\n- `{param.name}`: {param.description}'
- await interaction.response.send_message(text, ephemeral=True)
-
- async def __send_subcommand_help(self, interaction: Interaction, group: Group, subcommand: Command) -> None:
- text = f'## :information_source: Subcommand Help\n`/{group.name} {subcommand.name}`\n\n{subcommand.description}\n\n{subcommand.description}'
- params = subcommand.parameters
- if len(params) > 0:
- text += '\n\n### Parameters:'
- for param in params:
- text += f'\n- `{param.name}`: {param.description}'
- await interaction.response.send_message(text, ephemeral=True)
|