| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928 |
- """
- Cog for detecting large numbers of guild joins in a short period of time.
- """
- import difflib
- import re
- from collections.abc import Sequence
- from datetime import datetime, timedelta, timezone
- from typing import Callable, Optional, Tuple, Union
-
- from discord import (
- AuditLogAction,
- AuditLogEntry,
- Emoji,
- Guild,
- GuildSticker,
- Invite,
- Member,
- Message,
- RawBulkMessageDeleteEvent,
- RawMessageDeleteEvent,
- RawMessageUpdateEvent,
- Role,
- Thread,
- User,
- )
- from discord.abc import GuildChannel
- from discord.ext import tasks
- from discord.ext.commands import Cog
- from discord.utils import escape_markdown
-
- from rocketbot.cogs.basecog import BaseCog, BotMessage, CogSetting
- from rocketbot.storage import Storage
- from rocketbot.utils import dump_stacktrace
-
-
- class BufferedMessageEditEvent:
- def __init__(self, guild: Guild, channel: GuildChannel, before: Optional[Message], after: Message, data = None) -> None:
- self.guild = guild
- self.channel = channel
- self.before = before
- self.after = after
- self.data = data
-
- class BufferedMessageDeleteEvent:
- def __init__(self, guild: Guild, channel: GuildChannel, message_id: int, message: Optional[Message] = None) -> None:
- self.guild = guild
- self.channel = channel
- self.message_id = message_id
- self.message = message
- self.author = message.author if message is not None else None
-
- class LoggingCog(BaseCog, name='Logging'):
- """
- Cog for logging notable events to a designated logging channel.
- """
- SETTING_ENABLED = CogSetting(
- 'enabled',
- bool,
- default_value=False,
- brief='logging',
- description='Whether this module is enabled for a guild.',
- )
-
- STATE_EVENT_BUFFER = 'LoggingCog.eventBuffer'
-
- def __init__(self, bot):
- super().__init__(
- bot,
- config_prefix='logging',
- short_description='Manages event logging.',
- )
- self.add_setting(LoggingCog.SETTING_ENABLED)
- self.flush_buffers.start()
- self.buffered_guilds: set[Guild] = set()
-
- def cog_unload(self) -> None:
- self.flush_buffers.cancel()
-
- # Events - Channels
-
- @Cog.listener()
- async def on_guild_channel_delete(self, channel: GuildChannel) -> None:
- """
- Called whenever a guild channel is deleted or created.
-
- Note that you can get the guild from guild.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_guild_channel_delete
- """
- guild = channel.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- text = f'Channel **{channel.name}** deleted.'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_guild_channel_create(self, channel: GuildChannel) -> None:
- """
- Called whenever a guild channel is deleted or created.
-
- Note that you can get the guild from guild.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_guild_channel_create
- """
- guild = channel.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- text = f'Channel **{channel.name}** created. {channel.mention}'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_guild_channel_update(self, before: GuildChannel, after: GuildChannel) -> None:
- """
- Called whenever a guild channel is updated. e.g. changed name, topic,
- permissions.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_guild_channel_update
- """
- guild = after.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- changes = []
- if after.name != before.name:
- changes.append(f'Name: `{before.name}` -> `{after.name}`')
- if after.category != before.category:
- changes.append(f'Category: {before.category.name if before.category else None}' + \
- f' -> {after.category.name if after.category else None}')
- if after.changed_roles != before.changed_roles:
- changes.append('Roles changed')
- if after.overwrites != before.overwrites:
- changes.append('Permission overwrites changed')
- if after.position != before.position:
- changes.append(f'Position: {before.position} -> {after.position}')
-
- if len(changes) == 0:
- return
- text = f'Channel **{before.name}** updated. Changes:\n'
- text += '* ' + '\n* '.join(changes)
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- # Events - Guilds
-
- @Cog.listener()
- async def on_guild_available(self, guild: Guild) -> None:
- pass
-
- @Cog.listener()
- async def on_guild_unavailable(self, guild: Guild) -> None:
- pass
-
- @Cog.listener()
- async def on_guild_update(self, before: Guild, after: Guild) -> None:
- pass
-
- @Cog.listener()
- async def on_guild_emojis_update(self, guild: Guild, before: Sequence[Emoji], after: Sequence[Emoji]) -> None:
- pass
-
- @Cog.listener()
- async def on_guild_stickers_update(self, guild: Guild, before: Sequence[GuildSticker], after: Sequence[GuildSticker]) -> None:
- pass
-
- @Cog.listener()
- async def on_invite_create(self, invite: Invite) -> None:
- """
- Called when an `Invite` is created. You must have manage_channels to receive this.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_invite_create
- """
- guild = invite.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- text = f'Invite code `{invite.code}` created by {self.__describe_user(invite.inviter)}. '
- if invite.max_age == 0:
- text += "Doesn't expire."
- else:
- text += f'Expires in {invite.max_age} seconds.'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_invite_delete(self, invite: Invite) -> None:
- """
- Called when an `Invite` is deleted. You must have manage_channels to receive this.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_invite_delete
- """
- guild = invite.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- if invite.inviter:
- text = f'Invite code `{invite.code}` deleted. Originally created by {self.__describe_user(invite.inviter)}.'
- else:
- text = f'Invite code `{invite.code}` deleted.'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- # Events - Members
-
- @Cog.listener()
- async def on_member_join(self, member: Member) -> None:
- """
- Called when a Member joins a Guild.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_member_join
- """
- guild = member.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- text = f'Member joined server: {self.__describe_user(member)}.'
- flags = []
- noteworthy = False
-
- if member.flags.did_rejoin:
- flags.append('Rejoined this server')
-
- if member.public_flags.active_developer:
- flags.append('Is an active developer')
- if member.public_flags.hypesquad:
- flags.append('Is a HypeSquad Events member')
- if member.public_flags.hypesquad_bravery:
- flags.append('Is a HypeSquad Bravery member')
- if member.public_flags.hypesquad_brilliance:
- flags.append('Is a HypeSquad Brilliance member')
- if member.public_flags.hypesquad_balance:
- flags.append('Is a HypeSquad Balance member')
- if member.public_flags.early_supporter:
- flags.append('Is an early supporter')
-
- if member.public_flags.spammer:
- flags.append('**Is flagged as a spammer**')
- noteworthy = True
- if member.public_flags.discord_certified_moderator:
- flags.append('**Is a Discord Certified Moderator**')
- noteworthy = True
- if member.public_flags.early_verified_bot_developer:
- flags.append('**Is a verified bot developer**')
- noteworthy = True
- if member.public_flags.verified_bot:
- flags.append('**Is a verified bot**')
- noteworthy = True
- if member.public_flags.bug_hunter or member.public_flags.bug_hunter_level_2:
- flags.append('**Is a bug hunter**')
- noteworthy = True
- if member.public_flags.system:
- flags.append('**Is a Discord system user**')
- noteworthy = True
- if member.public_flags.staff:
- flags.append('**Is Discord staff**')
- noteworthy = True
- if member.public_flags.partner:
- flags.append('**Is a Discord partner**')
- noteworthy = True
-
- if len(flags) > 0:
- text += '\n* ' + '\n* '.join(flags)
- if noteworthy:
- text += f'\n\nLink: {member.mention}'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_MOD_WARNING)
- else:
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_member_remove(self, member: Member) -> None:
- """
- Called when a Member leaves a Guild.
-
- If the guild or member could not be found in the internal cache this event
- will not be called, you may use on_raw_member_remove() instead.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_member_remove
- """
- guild = member.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- is_kick = False
- kicker = None
- kick_reason = None
- entry = await self.__find_audit_entry(member, AuditLogAction.kick)
- if entry:
- is_kick = True
- kicker = entry.user
- kick_reason = entry.reason
-
- if is_kick:
- if kicker and kicker != member:
- text = f'Member kicked from the server: {self.__describe_user(member)} by **{kicker.name}**'
- else:
- text = f'Member kicked from the server: {self.__describe_user(member)}'
- else:
- text = f'Member left server: {self.__describe_user(member)}'
- if kick_reason:
- text += f'\nReason: "{kick_reason}"'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_member_update(self, before: Member, after: Member) -> None:
- """
- Called when a Member updates their profile.
-
- This is called when one or more of the following things change:
- * nickname
- * roles
- * pending
- * timeout
- * guild avatar
- * flags
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_member_update
- """
- guild = after.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- changes = []
- if after.nick != before.nick:
- changes.append(f'Nick: `{before.nick}` -> `{after.nick}`')
- if after.roles != before.roles:
- added_role_names = []
- removed_role_names = []
- for role in before.roles:
- if role not in after.roles:
- removed_role_names.append(role.name)
- for role in after.roles:
- if role not in before.roles:
- added_role_names.append(role.name)
- if len(removed_role_names) > 0:
- changes.append(f'Removed roles: ~~**{"**~~, ~~**".join(removed_role_names)}**~~')
- if len(added_role_names) > 0:
- changes.append(f'Added roles: **{"**, **".join(added_role_names)}**')
- if after.pending != before.pending:
- pass # not that interesting and probably noisy
- if after.timed_out_until != before.timed_out_until:
- if after.timed_out_until:
- delta = after.timed_out_until - datetime.now()
- changes.append(f'Timed out for `{delta}`')
- elif before.timed_out_until:
- changes.append('Timeout cleared')
- before_guild_avatar = before.guild_avatar.url if before.guild_avatar else None
- after_guild_avatar = after.guild_avatar.url if after.guild_avatar else None
- if after_guild_avatar != before_guild_avatar:
- changes.append(f'Guild avatar: <{before_guild_avatar}> -> <{after_guild_avatar}>')
- if after.flags != before.flags:
- flag_changes = []
- for (name, after_value) in iter(after.flags):
- before_value = getattr(before.flags, name)
- if after_value != before_value:
- flag_changes.append(f'`{name}` = `{before_value}` -> `{after_value}`')
- if len(flag_changes) > 0:
- changes.append(f'Flag changes: {", ".join(flag_changes)}')
-
- if len(changes) == 0:
- return
- text = f'Details for member {self.__describe_user(before)} changed:\n'
- text += '* ' + ('\n* '.join(changes))
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_user_update(self, before: User, after: User) -> None:
- """
- Called when a User updates their profile.
-
- This is called when one or more of the following things change:
- * avatar
- * username
- * discriminator
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_user_update
- """
- if hasattr(after, 'guild'):
- guild = after.guild
- else:
- return
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- changes = []
- before_avatar_url = before.avatar.url if before.avatar else None
- after_avatar_url = after.avatar.url if after.avatar else None
- if after_avatar_url != before_avatar_url:
- changes.append(f'Avatar URL: <{before_avatar_url}> -> <{after_avatar_url}>')
- if after.name != before.name:
- changes.append(f'Username: `{before.name}` -> `{after.name}`')
- if len(changes) == 0:
- return
- text = f'Details for user {self.__describe_user(before)} changed:\n'
- text += '* ' + '\n* '.join(changes)
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_member_ban(self, guild: Guild, user: Union[User, Member]) -> None:
- """
- Called when user gets banned from a Guild.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_member_ban
- """
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- banner = None
- ban_reason = None
- entry = await self.__find_audit_entry(user, AuditLogAction.ban)
- if entry:
- banner = entry.user
- ban_reason = entry.reason
- if banner:
- text = f'Member {self.__describe_user(user)} banned by **{banner.name}**.'
- if ban_reason:
- text += f'\nReason: "{ban_reason}"'
- else:
- text = f'Member {self.__describe_user(user)} banned.'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- async def __find_audit_entry(self, user: Union[User, Member], action: AuditLogAction, max_age: int = 10) -> Optional[AuditLogEntry]:
- """
- Searches the audit log for the most recent entry of a given type for a
- given user. Intended for finding the relevant entry for a ban/kick that
- just occurred.
- """
- if hasattr(user, 'guild') and user.guild:
- guild = user.guild
- else:
- return None
- now = datetime.now()
- async for entry in guild.audit_logs():
- age_seconds = now.timestamp() - entry.created_at.timestamp()
- if entry.action == action and entry.target == user and age_seconds <= max_age:
- return entry
- return None
-
- @Cog.listener()
- async def on_member_unban(self, guild: Guild, user: Union[User, Member]) -> None:
- """
- Called when a User gets unbanned from a Guild.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_member_unban
- """
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- text = f'Member {self.__describe_user(user)} unbanned'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- # Events - Messages
-
- def __buffer_event(self, guild: Guild, event_type: str, event) -> None:
- buffers: dict[str, list] = Storage.get_state_value(guild, self.STATE_EVENT_BUFFER)
- if buffers is None:
- buffers = {}
- Storage.set_state_value(guild, self.STATE_EVENT_BUFFER, buffers)
- if buffers.get(event_type) is None:
- buffers[event_type] = [ event ]
- else:
- buffers[event_type].append(event)
- self.buffered_guilds.add(guild)
-
- @tasks.loop(seconds=3.0)
- async def flush_buffers(self) -> None:
- try:
- if len(self.buffered_guilds) == 0:
- return
- guilds = set(self.buffered_guilds)
- self.buffered_guilds.clear()
- for guild in guilds:
- await self.__flush_buffers_for_guild(guild)
- except Exception as e:
- dump_stacktrace(e)
-
- async def __flush_buffers_for_guild(self, guild: Guild) -> None:
- buffers: dict[str, list] = Storage.get_state_value(guild, self.STATE_EVENT_BUFFER)
- if buffers is None:
- return
- Storage.set_state_value(guild, self.STATE_EVENT_BUFFER, None)
- for event_type, buffer in buffers.items():
- if event_type == 'edit':
- await self.__flush_edit_buffers(guild, buffer)
- elif event_type == 'delete':
- await self.__flush_delete_buffers(guild, buffer)
-
- @flush_buffers.before_loop
- async def before_flush_buffers_start(self) -> None:
- await self.bot.wait_until_ready()
-
- @Cog.listener()
- async def on_message(self, message: Message) -> None:
- """
- Called when a Message is created and sent.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_message
- """
- # print(f"Saw message {message.id} \"{message.content}\"")
-
- @Cog.listener()
- async def on_message_edit(self, before: Message, after: Message) -> None:
- """
- Called when a Message receives an update event. If the message is not
- found in the internal message cache, then these events will not be
- called. Messages might not be in cache if the message is too old or the
- client is participating in high traffic guilds.
-
- If this occurs increase the max_messages parameter or use the
- on_raw_message_edit() event instead.
-
- The following non-exhaustive cases trigger this event:
- * A message has been pinned or unpinned.
- * The message content has been changed.
- * The message has received an embed.
- * For performance reasons, the embed server does not do this in a
- “consistent” manner.
- * The message’s embeds were suppressed or unsuppressed.
- * A call message has received an update to its participants or ending time.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_message_edit
- """
- guild = after.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- if after.author.id == self.bot.user.id:
- return
- channel = after.channel
-
- self.__buffer_event(guild, 'edit', BufferedMessageEditEvent(guild, channel, before, after))
-
- @Cog.listener()
- async def on_raw_message_edit(self, payload: RawMessageUpdateEvent) -> None:
- """
- Called when a message is edited. Unlike on_message_edit(), this is called
- regardless of the state of the internal message cache.
-
- If the message is found in the message cache, it can be accessed via
- RawMessageUpdateEvent.cached_message. The cached message represents the
- message before it has been edited. For example, if the content of a
- message is modified and triggers the on_raw_message_edit() coroutine,
- the RawMessageUpdateEvent.cached_message will return a Message object
- that represents the message before the content was modified.
-
- Due to the inherently raw nature of this event, the data parameter
- coincides with the raw data given by the gateway.
-
- Since the data payload can be partial, care must be taken when accessing
- stuff in the dictionary. One example of a common case of partial data is
- when the 'content' key is inaccessible. This denotes an “embed” only
- edit, which is an edit in which only the embeds are updated by the
- Discord embed server.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_raw_message_edit
- """
- if payload.cached_message:
- return # already handled by on_message_edit
- guild = self.bot.get_guild(payload.guild_id) or await self.bot.fetch_guild(payload.guild_id)
- if not guild:
- return
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- channel = guild.get_channel(payload.channel_id) or await guild.fetch_channel(payload.channel_id)
- if not channel:
- return
-
- self.__buffer_event(guild, 'edit', BufferedMessageEditEvent(
- guild, channel, None, payload.message, payload.data))
-
- async def __flush_edit_buffers(self, guild: Guild, events: list[BufferedMessageEditEvent]) -> None:
- simple_edits: list[BufferedMessageEditEvent] = []
- complex_edits: list[BufferedMessageEditEvent] = []
- old_cutoff = timedelta(days=1)
- now = datetime.now(timezone.utc)
- for event in events:
- if event.before is not None and (now - event.after.created_at) < old_cutoff:
- simple_edits.append(event)
- else:
- complex_edits.append(event)
- if len(simple_edits) <= 3:
- # A small number of edits with full details. Log them individually.
- for event in simple_edits:
- await self.__handle_complete_edit_event(event)
- else:
- complex_edits = events
- if len(complex_edits) > 0:
- # These messages are not cached, too old, or too numerous
- text = 'Multiple messages edited' if len(complex_edits) > 1 else 'Message edited'
- for event in complex_edits[:10]:
- text += f'\n- {event.after.jump_url} by {event.after.author.name} ' + \
- f'first posted <t:{int(event.after.created_at.timestamp())}:f>'
- if len(complex_edits) > 10:
- text += f'\n- ...{len(complex_edits) - 10} more...'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG, suppress_embeds=True)
- await bot_message.update()
-
- async def __handle_complete_edit_event(self, event: BufferedMessageEditEvent) -> None:
- before = event.before
- after = event.after
- guild = after.guild
-
- content_changed = (after.content != before.content)
- attachments_changed = (after.attachments != before.attachments)
- embeds_changed = (after.embeds != before.embeds)
- embeds_add_only = len(before.embeds or []) == 0 and len(after.embeds or []) > 0
-
- if not content_changed and not attachments_changed and (not embeds_changed or embeds_add_only):
- # Most likely an embed being asynchronously populated by server
- return
- if content_changed:
- (before_markdown, after_markdown) = self.__diff(self.__quote_markdown(before.content),
- self.__quote_markdown(after.content))
- else:
- before_markdown = self.__quote_markdown(before.content)
- after_markdown = before_markdown if len(before.content.strip()) == 0 else '> _<content unchanged>_'
- if attachments_changed:
- if len(before.attachments or []) > 0:
- for attachment in before.attachments:
- before_markdown += f'\n> * 📎 {attachment.url}'
- if attachment not in after.attachments or []:
- before_markdown += ' (removed)'
- else:
- before_markdown += '\n> * _<no attachments>_'
- if len(after.attachments or []) > 0:
- for attachment in after.attachments:
- after_markdown += f'\n> * 📎 {attachment.url}'
- if attachment not in before.attachments or []:
- after_markdown += ' (added)'
- else:
- after_markdown += '\n> * _<no attachments>_'
- if embeds_changed:
- if len(before.embeds or []) > 0:
- for embed in before.embeds:
- before_markdown += f'\n> * 🔗 {embed.url}'
- if embed not in after.embeds or []:
- before_markdown += ' (removed)'
- else:
- before_markdown += '\n> * _<no embeds>_'
- if len(after.embeds or []) > 0:
- for embed in after.embeds:
- after_markdown += f'\n> * 🔗 {embed.url}'
- if embed not in before.embeds or []:
- after_markdown += ' (added)'
- else:
- after_markdown += '\n> * _<no embeds>_'
- text = f'Message {after.jump_url} edited by {self.__describe_user(after.author)}.\n' + \
- f'Original:\n{before_markdown}\n' + \
- f'Updated:\n{after_markdown}'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG, suppress_embeds=True)
- await bot_message.update()
-
- @Cog.listener()
- async def on_raw_message_delete(self, payload: RawMessageDeleteEvent) -> None:
- """
- Called when a message is deleted. Unlike on_message_delete(), this is
- called regardless of the message being in the internal message cache or not.
-
- If the message is found in the message cache, it can be accessed via
- RawMessageDeleteEvent.cached_message
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_raw_message_delete
- """
- message = payload.cached_message
- if message and message.author.id == self.bot.user.id:
- return
- guild = (message.guild if message else None) or \
- self.bot.get_guild(payload.guild_id) or \
- await self.bot.fetch_guild(payload.guild_id)
- if guild is None:
- return
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- channel = (message.channel if message else None) or \
- self.bot.get_channel(payload.channel_id) or \
- await guild.fetch_channel(payload.channel_id)
- if channel is None:
- return
- self.__buffer_event(guild, 'delete', BufferedMessageDeleteEvent(guild, channel, payload.message_id, message))
-
- @Cog.listener()
- async def on_raw_bulk_message_delete(self, payload: RawBulkMessageDeleteEvent) -> None:
- """
- Called when a bulk delete is triggered. Unlike on_bulk_message_delete(),
- this is called regardless of the messages being in the internal message
- cache or not.
-
- If the messages are found in the message cache, they can be accessed via
- RawBulkMessageDeleteEvent.cached_messages
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_raw_bulk_message_delete
- """
- guild = self.bot.get_guild(payload.guild_id) or await self.bot.fetch_guild(payload.guild_id)
- if not guild:
- return
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- channel = guild.get_channel(payload.channel_id) or await guild.fetch_channel(payload.channel_id)
- for message_id in payload.message_ids:
- message = None
- for cached_message in payload.cached_messages:
- if cached_message.id == message_id:
- message = cached_message
- self.__buffer_event(guild, 'delete', BufferedMessageDeleteEvent(guild, channel, message_id, message))
-
- async def __flush_delete_buffers(self, guild: Guild, events: list[BufferedMessageDeleteEvent]) -> None:
- simple_deletes: list[BufferedMessageDeleteEvent] = []
- complex_deletes: list[BufferedMessageDeleteEvent] = []
- for event in events:
- if event.message is not None:
- simple_deletes.append(event)
- else:
- complex_deletes.append(event)
- if len(simple_deletes) <= 3:
- # Small number of deletes with complete info
- for event in simple_deletes:
- await self.__handle_complete_delete_event(event)
- else:
- complex_deletes = events
- if len(complex_deletes) > 0:
- messages_per_author: dict[Optional[User], list[BufferedMessageDeleteEvent]] = self.__groupby(complex_deletes, lambda e: e.author)
- text = 'Multiple messages deleted' if len(complex_deletes) > 1 else 'Message deleted'
- row_count = 0
- for author, messages in messages_per_author.items():
- row_count += 1
- if row_count > 10:
- break
- count = len(messages)
- text += f'\n- {count} {"message" if count == 1 else "messages"} by {author.mention if author else "unavailable user"}'
- if count == 1:
- text += f' in {messages[0].channel.mention}'
- else:
- messages_by_channel: dict[GuildChannel, list[BufferedMessageDeleteEvent]] = self.__groupby(messages, lambda e: e.channel)
- if len(messages_by_channel) == 1:
- text += f' in {messages[0].channel.mention}'
- else:
- for channel, ch_messages in messages_by_channel.items():
- row_count += 1
- if row_count > 10:
- break
- ch_count = len(ch_messages)
- text += f'\n - {ch_count} in {channel.mention}'
- if row_count > 10:
- text += '- ...more omitted...'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG, suppress_embeds=True)
- await bot_message.update()
-
- async def __handle_complete_delete_event(self, event: BufferedMessageDeleteEvent) -> None:
- message: Message = event.message
- text = f'Message by {self.__describe_user(message.author)} deleted from {message.channel.mention}. ' + \
- f'Markdown:\n{self.__quote_markdown(message.content)}'
- for attachment in message.attachments or []:
- text += f'\n> * 📎 {attachment.url}'
- for embed in message.embeds or []:
- text += f'\n> * 🔗 {embed.url}'
- bot_message = BotMessage(message.guild, text, BotMessage.TYPE_LOG, suppress_embeds=True)
- await bot_message.update()
-
- # Events - Roles
-
- @Cog.listener()
- async def on_guild_role_create(self, role: Role) -> None:
- """
- Called when a Guild creates or deletes a new Role.
-
- To get the guild it belongs to, use Role.guild.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_guild_role_create
- """
- guild = role.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- text = f'Role created: **{role.name}**'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_guild_role_delete(self, role: Role) -> None:
- """
- Called when a Guild creates or deletes a new Role.
-
- To get the guild it belongs to, use Role.guild.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_guild_role_delete
- """
- guild = role.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- text = f'Role removed: **{role.name}**'
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- @Cog.listener()
- async def on_guild_role_update(self, before: Role, after: Role) -> None:
- """
- Called when a Role is changed guild-wide.
-
- https://discordpy.readthedocs.io/en/stable/api.html#discord.on_guild_role_update
- """
- guild = after.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- changes = []
- if after.name != before.name:
- changes.append(f'Name: `{before.name}` -> `{after.name}`')
- if after.hoist != before.hoist:
- changes.append(f'Hoisted: `{before.hoist}` -> `{after.hoist}`')
- if after.position != before.position:
- changes.append(f'Position: `{before.position}` -> `{after.position}`')
- if after.unicode_emoji != before.unicode_emoji:
- changes.append(f'Emoji: {before.unicode_emoji} -> {after.unicode_emoji}')
- if after.mentionable != before.mentionable:
- changes.append(f'Mentionable: `{before.mentionable}` -> `{after.mentionable}`')
- if after.permissions != before.permissions:
- changes.append('Permissions edited')
- if after.color != before.color:
- changes.append('Color edited')
- before_icon_url = before.icon.url if before.icon else None
- after_icon_url = after.icon.url if after.icon else None
- if after_icon_url != before_icon_url:
- changes.append(f'Icon: <{before_icon_url}> -> <{after_icon_url}>')
- before_icon_url = before.display_icon.url if before.display_icon else None
- after_icon_url = after.display_icon.url if after.display_icon else None
- if after_icon_url != before_icon_url:
- changes.append(f'Display icon: <{before_icon_url}> -> <{after_icon_url}>')
-
- if len(changes) == 0:
- return
- text = f'Role **{after.name}** updated. Changes:\n'
- text += '* ' + '\n* '.join(changes)
- bot_message = BotMessage(guild, text, BotMessage.TYPE_LOG)
- await bot_message.update()
-
- # Events - Threads
-
- @Cog.listener()
- async def on_thread_create(self, thread: Thread) -> None:
- pass
-
- @Cog.listener()
- async def on_thread_update(self, before: Thread, after: Thread) -> None:
- pass
-
- @Cog.listener()
- async def on_thread_delete(self, thread: Thread) -> None:
- pass
-
-
- # ------------------------------------------------------------------------
- def __quote_markdown(self, s: str) -> str:
- if len(s.strip()) == 0:
- return '> _<no content>_'
- return '> ' + escape_markdown(s).replace('\n', '\n> ')
-
- def __describe_user(self, user: Union[User, Member]) -> str:
- """
- Standardized Markdown describing a user or member.
- """
- return f'**{user.name}** ({user.display_name} {user.id})'
-
- def __diff(self, a: str, b: str) -> Tuple[str, str]:
- # URLs don't work well in the diffs. Replace them with private use characters, one per unique URL.
- preserved_sequences = []
- def sub_token(match: re.Match) -> str:
- seq = match.group(0)
- sequence_index = len(preserved_sequences)
- if seq in preserved_sequences:
- sequence_index = preserved_sequences.index(seq)
- else:
- preserved_sequences.append(seq)
- return chr(0xe000 + sequence_index)
- url_regex = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
- a = re.sub(url_regex, sub_token, a)
- b = re.sub(url_regex, sub_token, b)
-
- deletion_start = '~~'
- deletion_end = '~~'
- addition_start = '**'
- addition_end = '**'
- markdown_a = ''
- markdown_b = ''
- a_open = False
- b_open = False
- for i, s in enumerate(difflib.ndiff(a, b)):
- operation = s[0]
- content = s[2:]
- if operation != '-' and a_open:
- markdown_a += deletion_end
- a_open = False
- if operation != '+' and b_open:
- markdown_b += addition_end
- b_open = False
- if operation == ' ':
- markdown_a += content
- markdown_b += content
- elif operation == '-':
- if not a_open:
- markdown_a += deletion_start
- a_open = True
- markdown_a += content
- elif operation == '+':
- if not b_open:
- markdown_b += addition_start
- b_open = True
- markdown_b += content
- if a_open:
- markdown_a += deletion_end
- if b_open:
- markdown_b += addition_end
-
- # Sub URLs back in
- def unsub_token(match: re.Match) -> str:
- char = match.group(0)
- index = ord(char) - 0xe000
- if 0 <= index < len(preserved_sequences):
- return preserved_sequences[index]
- return char
- markdown_a = re.sub(r'[\ue000-\uefff]', unsub_token, markdown_a)
- markdown_b = re.sub(r'[\ue000-\uefff]', unsub_token, markdown_b)
-
- return markdown_a, markdown_b
-
- def __groupby(self, a_list: list, grouper: Callable[[any], any]) -> dict:
- """itertools.groupby just less annoying"""
- d = {}
- for elem in a_list:
- key = grouper(elem)
- if key in d:
- d[key].append(elem)
- else:
- d[key] = [elem]
- return d
|