| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- """
- Cog for handling most ungrouped commands and basic behaviors.
- """
- import re
- from datetime import datetime, timedelta, timezone
- from typing import Optional, Union
-
- from discord import Interaction, Message, User, Permissions, AppCommandType
- from discord.app_commands import Command, Group, command, default_permissions, guild_only, Transform, Choice, \
- autocomplete
- from discord.errors import DiscordException
- from discord.ext.commands import Cog
-
- from config import CONFIG
- from rocketbot.bot import Rocketbot
- from rocketbot.cogs.basecog import BaseCog, BotMessage
- from rocketbot.utils import describe_timedelta, TimeDeltaTransformer, dump_stacktrace, MOD_PERMISSIONS
- from rocketbot.storage import ConfigKey, Storage
-
-
- trivial_words = {
- 'a', 'an', 'and', 'are', "aren't", 'as', 'by', 'can', 'for', 'have', 'if', 'in',
- 'is', 'it', 'its', "it's", 'not', 'of', 'on', 'or', 'than', 'that', 'the', 'then',
- 'there', 'them', 'they', "they're", 'this', 'to', 'when', 'with',
- }
-
- async def command_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- choices: list[Choice] = []
- try:
- if current.startswith('/'):
- current = current[1:]
- current = current.lower().strip()
- user_permissions = interaction.permissions
- cmds = GeneralCog.shared.get_command_list(user_permissions)
- return [
- Choice(name=f'/{cmdname} command', value=f'cmd:{cmdname}')
- for cmdname in sorted(cmds.keys())
- if len(current) == 0 or current in cmdname
- ][:25]
- except BaseException as e:
- dump_stacktrace(e)
- return choices
-
- async def subcommand_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- try:
- current = current.lower().strip()
- cmd_name = interaction.namespace['topic']
- cmd = GeneralCog.shared.object_for_help_symbol(cmd_name)
- if not isinstance(cmd, Group):
- return []
- user_permissions = interaction.permissions
- if cmd is None or not isinstance(cmd, Group):
- print(f'No command found named {cmd_name}')
- return []
- grp = cmd
- subcmds = GeneralCog.shared.get_subcommand_list(grp, user_permissions)
- if subcmds is None:
- print(f'Subcommands for {cmd_name} was None')
- return []
- return [
- Choice(name=f'{subcmd_name} subcommand', value=f'subcmd:{cmd_name}.{subcmd_name}')
- for subcmd_name in sorted(subcmds.keys())
- if len(current) == 0 or current in subcmd_name
- ][:25]
- except BaseException as e:
- dump_stacktrace(e)
- return []
-
- async def cog_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- try:
- current = current.lower().strip()
- return [
- Choice(name=f'⚙ {cog.qualified_name} module', value=f'cog:{cog.qualified_name}')
- for cog in sorted(GeneralCog.shared.bot.cogs.values(), key=lambda c: c.qualified_name)
- if isinstance(cog, BaseCog) and
- can_use_cog(cog, interaction.permissions) and
- (len(cog.get_commands()) > 0 or len(cog.settings) > 0) and \
- (len(current) == 0 or current in cog.qualified_name.lower())
- ]
- except BaseException as e:
- dump_stacktrace(e)
- return []
-
- async def topic_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- command_choices = await command_autocomplete(interaction, current)
- cog_choices = await cog_autocomplete(interaction, current)
- return (command_choices + cog_choices)[:25]
-
- async def subtopic_autocomplete(interaction: Interaction, current: str) -> list[Choice[str]]:
- subcommand_choices = await subcommand_autocomplete(interaction, current)
- return subcommand_choices[:25]
-
- def can_use_command(cmd: Union[Group, Command], user_permissions: Optional[Permissions]) -> bool:
- if user_permissions is None:
- return False
- if cmd.parent and not can_use_command(cmd.parent, user_permissions):
- return False
- return cmd.default_permissions is None or cmd.default_permissions.is_subset(user_permissions)
-
- def can_use_cog(cog: BaseCog, user_permissions: Optional[Permissions]) -> bool:
- return user_permissions is not None and MOD_PERMISSIONS.is_subset(user_permissions)
-
- class GeneralCog(BaseCog, name='General'):
- """
- Cog for handling high-level bot functionality and commands. Should be the
- first cog added to the bot.
- """
-
- shared: Optional['GeneralCog'] = None
-
- def __init__(self, bot: Rocketbot):
- super().__init__(
- bot,
- config_prefix=None,
- short_description='',
- )
- self.is_connected = False
- self.is_first_connect = True
- self.last_disconnect_time: Optional[datetime] = None
- self.noteworthy_disconnect_duration = timedelta(seconds=5)
- GeneralCog.shared = self
-
- @Cog.listener()
- async def on_connect(self):
- """Event handler"""
- if self.is_first_connect:
- self.log(None, 'Connected')
- self.is_first_connect = False
- else:
- disconnect_duration = datetime.now(
- timezone.utc) - self.last_disconnect_time if self.last_disconnect_time else None
- if disconnect_duration is not None and disconnect_duration > self.noteworthy_disconnect_duration:
- self.log(None, f'Reconnected after {disconnect_duration.total_seconds()} seconds')
- self.is_connected = True
-
- @Cog.listener()
- async def on_disconnect(self):
- """Event handler"""
- self.last_disconnect_time = datetime.now(timezone.utc)
- # self.log(None, 'Disconnected')
-
- @Cog.listener()
- async def on_resumed(self):
- """Event handler"""
- disconnect_duration = datetime.now(timezone.utc) - self.last_disconnect_time if self.last_disconnect_time else None
- if disconnect_duration is not None and disconnect_duration > self.noteworthy_disconnect_duration:
- self.log(None, f'Session resumed after {disconnect_duration.total_seconds()} seconds')
-
- @command(
- description='Posts a test warning',
- extras={
- 'long_description': 'Tests whether a warning channel is configured for this ' + \
- 'guild by posting a test warning. If a mod mention is ' + \
- 'configured, that user/role will be tagged in the test warning.',
- },
- )
- @guild_only()
- @default_permissions(ban_members=True)
- async def test_warn(self, interaction: Interaction):
- """Command handler"""
- if Storage.get_config_value(interaction.guild, ConfigKey.WARNING_CHANNEL_ID) is None:
- await interaction.response.send_message(
- f'{CONFIG["warning_emoji"]} No warning channel set!',
- ephemeral=True,
- )
- else:
- bm = BotMessage(
- interaction.guild,
- f'Test warning message (requested by {interaction.user.name})',
- type=BotMessage.TYPE_MOD_WARNING)
- await self.post_message(bm)
- await interaction.response.send_message(
- 'Warning issued',
- ephemeral=True,
- )
-
- @command(
- description='Simple test reply',
- extras={
- 'long_description': 'Replies to the command message. Useful to ensure the ' + \
- 'bot is working properly.',
- },
- )
- async def hello(self, interaction: Interaction):
- """Command handler"""
- await interaction.response.send_message(
- f'Hey, {interaction.user.name}!',
- ephemeral=True,
- )
-
- @command(
- description='Shuts down the bot',
- extras={
- 'long_description': 'Causes the bot script to terminate. Only usable by a ' + \
- 'user with server admin permissions.',
- },
- )
- @guild_only()
- @default_permissions(administrator=True)
- async def shutdown(self, interaction: Interaction):
- """Command handler"""
- await interaction.response.send_message('👋', ephemeral=True)
- await self.bot.close()
-
- @command(
- description='Mass deletes messages',
- extras={
- 'long_description': 'Deletes recent messages by the given user. The user ' +
- 'can be either an @ mention or a numeric user ID. The age is ' +
- 'a duration, such as "30s", "5m", "1h30m". Only the most ' +
- 'recent 100 messages in each channel are searched.',
- 'usage': '<user:id|mention> <age:timespan>',
- },
- )
- @guild_only()
- @default_permissions(manage_messages=True)
- async def delete_messages(self, interaction: Interaction, user: User, age: Transform[timedelta, TimeDeltaTransformer]) -> None:
- """Command handler"""
- member_id = user.id
- cutoff: datetime = datetime.now(timezone.utc) - age
- def predicate(message: Message) -> bool:
- return str(message.author.id) == member_id and message.created_at >= cutoff
- deleted_messages = []
- for channel in interaction.guild.text_channels:
- try:
- deleted_messages += await channel.purge(limit=100, check=predicate)
- except DiscordException:
- # XXX: Sloppily glossing over access errors instead of checking access
- pass
- await interaction.response.send_message(
- f'{CONFIG["success_emoji"]} Deleted {len(deleted_messages)} ' + \
- f'messages by {user.mention}> from the past {describe_timedelta(age)}.',
- ephemeral=True,
- )
-
- def __create_help_index(self) -> None:
- if getattr(self, 'obj_index', None) is not None:
- return
- self.obj_index: dict[str, Union[Command, Group, BaseCog]] = {}
- self.keyword_index: dict[str, set[Union[Command, Group, BaseCog]]] = {}
-
- def add_text_to_index(obj, text: str):
- words = [
- word
- for word in re.split(r"[^a-zA-Z']+", text.lower())
- if len(word) > 1 and word not in trivial_words
- ]
- for word in words:
- matches = self.keyword_index.get(word, set())
- matches.add(obj)
- self.keyword_index[word] = matches
-
- for cmd in self.bot.tree.get_commands(type=AppCommandType.chat_input):
- key = f'cmd:{cmd.name}'
- self.obj_index[key] = cmd
- add_text_to_index(cmd, cmd.name)
- if cmd.description:
- add_text_to_index(cmd, cmd.description)
- if isinstance(cmd, Group):
- for subcmd in cmd.commands:
- key = f'subcmd:{cmd.name}.{subcmd.name}'
- self.obj_index[key] = subcmd
- add_text_to_index(subcmd, subcmd.name)
- if subcmd.description:
- add_text_to_index(subcmd, subcmd.description)
- for cog_qname, cog in self.bot.cogs.items():
- if not isinstance(cog, BaseCog):
- continue
- key = f'cog:{cog_qname}'
- self.obj_index[key] = cog
- add_text_to_index(cog, cog.qualified_name)
- if cog.description:
- add_text_to_index(cog, cog.description)
- print(self.obj_index.keys())
- print(self.keyword_index.keys())
-
- def object_for_help_symbol(self, symbol: str) -> Optional[Union[Command, Group, BaseCog]]:
- self.__create_help_index()
- return self.obj_index.get(symbol, None)
-
- @command(name='help')
- @guild_only()
- @autocomplete(topic=topic_autocomplete, subtopic=subtopic_autocomplete)
- async def help_command(self, interaction: Interaction, topic: Optional[str] = None, subtopic: Optional[str] = None) -> None:
- """
- Shows help for using commands and subcommands and configuring modules.
-
- `/help` will show a list of top-level topics.
-
- `/help /<command_name>` will show help about a specific command or
- list a command's subcommands.
-
- `/help /<command_name> <subcommand_name>` will show help about a
- specific subcommand.
-
- `/help <module_name>` will show help about configuring a module.
-
- `/help <keywords>` will do a text search for topics.
-
- Parameters
- ----------
- interaction: Interaction
- topic: Optional[str]
- Optional topic to get specific help for. Getting help on a command can optionally start with a leading slash.
- subtopic: Optional[str]
- Optional subtopic to get specific help for.
- """
- print(f'help_command(interaction, {topic}, {subtopic})')
-
- # General help
- if topic is None:
- await self.__send_general_help(interaction)
- return
-
- # Specific object reference
- obj = self.object_for_help_symbol(subtopic) if subtopic else self.object_for_help_symbol(topic)
- if obj:
- await self.__send_object_help(interaction, obj)
- return
-
- # Text search
- keywords = [
- word
- for word in re.split(r"[^a-zA-Z']+", topic.lower())
- if len(word) > 0 and word not in trivial_words
- ]
- matching_objects_set = None
- for keyword in keywords:
- objs = self.keyword_index.get(keyword, None)
- if objs is not None:
- if matching_objects_set is None:
- matching_objects_set = objs
- else:
- matching_objects_set = matching_objects_set & objs
- accessible_objects = [
- obj
- for obj in matching_objects_set or {}
- if ((isinstance(obj, Command) or isinstance(obj, Group)) and can_use_command(obj, interaction.permissions)) or \
- (isinstance(obj, BaseCog) and can_use_cog(obj, interaction.permissions))
- ]
- await self.__send_keyword_help(interaction, accessible_objects)
-
- async def __send_object_help(self, interaction: Interaction, obj: Union[Command, Group, BaseCog]) -> None:
- if isinstance(obj, Command):
- if obj.parent:
- await self.__send_subcommand_help(interaction, obj.parent, obj)
- else:
- await self.__send_command_help(interaction, obj)
- return
- if isinstance(obj, Group):
- await self.__send_command_help(interaction, obj)
- return
- if isinstance(obj, BaseCog):
- await self.__send_cog_help(interaction, obj)
- return
- print(f'No help for object {obj}')
- await interaction.response.send_message(
- f'{CONFIG["failure_emoji"]} Failed to get help info.',
- ephemeral=True,
- delete_after=10,
- )
-
- def get_command_list(self, permissions: Optional[Permissions] = None) -> dict[str, Union[Command, Group]]:
- return { cmd.name: cmd for cmd in self.bot.tree.get_commands() if can_use_command(cmd, permissions) }
-
- def get_subcommand_list(self, cmd: Group, permissions: Optional[Permissions] = None) -> dict[str, Command]:
- return {
- subcmd.name: subcmd
- for subcmd in cmd.commands
- if can_use_command(subcmd, permissions)
- } if can_use_command(cmd, permissions) else {}
-
- async def __send_general_help(self, interaction: Interaction) -> None:
- user_permissions: Permissions = interaction.permissions
- all_commands = sorted(self.get_command_list(user_permissions).items())
- all_cog_tuples = [
- cog_tuple
- for cog_tuple in sorted(self.bot.cogs.items())
- if isinstance(cog_tuple[1], BaseCog) and \
- can_use_cog(cog_tuple[1], user_permissions) and \
- (len(cog_tuple[1].settings) > 0)
- ]
-
- text = f'## :information_source: Help'
- if len(all_commands) + len(all_cog_tuples) == 0:
- text = 'Nothing available for your permissions!'
-
- if len(all_commands) > 0:
- text += '\n### Commands'
- text += '\nType `/help /commandname` for more information.'
- for cmd_name, cmd in sorted(self.get_command_list(user_permissions).items()):
- text += f'\n- `/{cmd_name}`: {cmd.description}'
- if isinstance(cmd, Group):
- subcommand_count = len(cmd.commands)
- text += f' ({subcommand_count} subcommands)'
-
- if len(all_cog_tuples) > 0:
- text += '\n### Module Configuration'
- for cog_name, cog in all_cog_tuples:
- has_enabled = next((s for s in cog.settings if s.name == 'enabled'), None) is not None
- text += f'\n- **{cog_name}**: {cog.short_description}'
- if has_enabled:
- text += f'\n - `/enable {cog.config_prefix}` or `/disable {cog.config_prefix}`'
- for setting in cog.settings:
- if setting.name == 'enabled':
- continue
- text += f'\n - `/get` or `/set {cog.config_prefix}_{setting.name}`'
- await interaction.response.send_message(
- text,
- ephemeral=True,
- )
-
- async def __send_keyword_help(self, interaction: Interaction, matching_objects: Optional[list[Union[Command, Group, BaseCog]]]) -> None:
- matching_commands = [
- cmd
- for cmd in matching_objects or []
- if isinstance(cmd, Command) or isinstance(cmd, Group)
- ]
- matching_cogs = [
- cog
- for cog in matching_objects or []
- if isinstance(cog, BaseCog)
- ]
- if len(matching_commands) + len(matching_cogs) == 0:
- await interaction.response.send_message(
- f'{CONFIG["failure_emoji"]} No available help topics found.',
- ephemeral=True,
- delete_after=10,
- )
- return
- if len(matching_objects) == 1:
- obj = matching_objects[0]
- await self.__send_object_help(interaction, obj)
- return
-
- text = '## :information_source: Matching Help Topics'
- if len(matching_commands) > 0:
- text += '\n### Commands'
- for cmd in matching_commands:
- if cmd.parent:
- text += f'\n- `/{cmd.parent.name} {cmd.name}`'
- else:
- text += f'\n- `/{cmd.name}`'
- if len(matching_cogs) > 0:
- text += '\n### Cogs'
- for cog in matching_cogs:
- text += f'\n- {cog.qualified_name}'
- await interaction.response.send_message(
- text,
- ephemeral=True,
- )
-
- async def __send_command_help(self, interaction: Interaction, command_or_group: Union[Command, Group], addendum: Optional[str] = None) -> None:
- text = ''
- if addendum is not None:
- text += addendum + '\n\n'
- text += f'## :information_source: Command Help\n`/{command_or_group.name}`\n\n{command_or_group.description}'
- if isinstance(command_or_group, Group):
- subcmds: dict[str, Command] = self.get_subcommand_list(command_or_group, permissions=interaction.permissions)
- if len(subcmds) > 0:
- text += '\n\n### Subcommands:'
- for subcmd_name, subcmd in sorted(subcmds.items()):
- text += f'\n- `{subcmd_name}`: {subcmd.description}'
- else:
- params = command_or_group.parameters
- if len(params) > 0:
- text += '\n\n### Parameters:'
- for param in params:
- text += f'\n- `{param.name}`: {param.description}'
- await interaction.response.send_message(text, ephemeral=True)
-
- async def __send_subcommand_help(self, interaction: Interaction, group: Group, subcommand: Command) -> None:
- text = f'## :information_source: Subcommand Help\n`/{group.name} {subcommand.name}`\n\n{subcommand.description}\n\n{subcommand.description}'
- params = subcommand.parameters
- if len(params) > 0:
- text += '\n\n### Parameters:'
- for param in params:
- text += f'\n- `{param.name}`: {param.description}'
- await interaction.response.send_message(text, ephemeral=True)
-
- async def __send_cog_help(self, interaction: Interaction, cog: BaseCog) -> None:
- text = f'## :information_source: Module Help\n{cog.qualified_name}'
- if cog.description is not None:
- text += f'\n{cog.description}'
- settings = cog.settings
- if len(settings) > 0:
- text += '\n### Configuration'
- enabled_setting = next((s for s in settings if s.name == 'enabled'), None)
- if enabled_setting is not None:
- text += f'\n- `/enable {cog.config_prefix}` or `/disable {cog.config_prefix}`'
- for setting in sorted(settings, key=lambda s: s.name):
- if setting.name == 'enabled':
- continue
- text += f'\n- `/get` or `/set {cog.config_prefix}_{setting.name}` - {setting.description}'
- await interaction.response.send_message(
- text,
- ephemeral=True,
- )
|