from discord import Guild, Message, PartialEmoji from discord.ext import commands from datetime import datetime, timedelta from rscollections import AgeBoundList, SizeBoundDict from cogs.basecog import BaseCog from storage import Storage class SpamContext: def __init__(self, member, message_hash): self.member = member self.message_hash = message_hash self.age = datetime.now() self.warning_message = None self.is_kicked = False self.is_banned = False self.messages = set() self.deleted_messages = set() class CrossPostCog(BaseCog): STATE_KEY_RECENT_MESSAGES = "crosspost_recent_messages" STATE_KEY_SPAM_CONTEXT = "crosspost_spam_context" def __init__(self, bot): super().__init__(bot) self.max_recent_message_age = timedelta(minutes=2) self.max_spam_contexts = 12 self.warn_messages_per_user = 3 self.ban_messages_per_user = 5 self.min_message_length = 10 async def __record_message(self, message: Message) -> None: if message.author.permissions_in(message.channel).ban_members: # User exempt from spam detection return recent_messages = Storage.get_state_value(message.guild, self.STATE_KEY_RECENT_MESSAGES) \ or AgeBoundList(self.max_recent_message_age, lambda index, message : message.created_at) recent_messages.append(message) Storage.set_state_value(message.guild, self.STATE_KEY_RECENT_MESSAGES, recent_messages) # Get all recent messages by user member_messages = [m for m in recent_messages if m.author.id == message.author.id] if len(member_messages) < self.warn_messages_per_user: return # Look for repeats hash_to_count = {} max_count = 0 for m in member_messages: key = hash(m.content) count = (hash_to_count.get(key) or 0) + 1 hash_to_count[key] = count max_count = max(max_count, count) if max_count < self.warn_messages_per_user: return # Handle the spam spam_lookup = Storage.get_state_value(message.guild, self.STATE_KEY_SPAM_CONTEXT) \ or SizeBoundDict(self.max_spam_contexts, lambda key, context : context.age) Storage.set_state_value(message.guild, self.STATE_KEY_SPAM_CONTEXT, spam_lookup) for message_hash, count in hash_to_count.items(): if count < self.warn_messages_per_user: continue key = f'{message.author.id}|{message_hash}' context = spam_lookup.get(key) is_new = context is None if context is None: context = SpamContext(message.author, message_hash) spam_lookup[key] = context context.age = message.created_at for m in member_messages: if hash(m.content) == message_hash: context.messages.add(m) await self.__update_from_context(context) async def __update_from_context(self, context: SpamContext): content = next(iter(context.messages)).content if len(context.messages) > self.ban_messages_per_user: if not context.is_banned: await context.member.ban(reason='Posting same message repeatedly', delete_message_days=1) msg = f'User {context.member.mention} auto banned for ' + \ 'crosspost spamming.\n' + \ '> {content}' if context.warning_message: await self.update_warn(context.warning_message, msg) await context.warning_message.clear_reaction('šŸ—‘') await context.warning_message.clear_reaction('šŸ‘¢') await context.warning_message.clear_reaction('🚫') else: self.warning_message = await self.warn(context.member.guild, msg) elif len(context.messages) > self.warn_messages_per_user: content = next(iter(context.messages)).content msg = f'User {context.member.mention} has posted the exact same ' + \ f'message {len(context.messages)} times.\n' + \ f'> {content}' + \ '\n' can_delete = len(context.messages) > len(context.deleted_messages) if can_delete: msg += '\nšŸ—‘ to delete messages' else: msg += '\nAll messages deleted' if not context.is_kicked: msg += '\nšŸ‘¢ to kick user' elif not context.is_banned: msg += '\nUser kicked' if context.is_banned: msg += '\nUser banned' else: msg += '\n🚫 to ban user' if context.warning_message: await self.update_warn(context.warning_message, msg) else: context.warning_message = await self.warn(context.member.guild, msg) self.listen_for_reactions_to(context.warning_message) if can_delete: await context.warning_message.add_reaction('šŸ—‘') else: await context.warning_message.clear_reaction('šŸ—‘') if not context.is_kicked: await context.warning_message.add_reaction('šŸ‘¢') else: await context.warning_message.clear_reaction('šŸ‘¢') if not context.is_banned: await context.warning_message.add_reaction('🚫') else: await context.warning_message.clear_reaction('🚫') def __context_for_warning_message(self, message: Message) -> SpamContext: spam_lookup = Storage.get_state_value(message.guild, self.STATE_KEY_SPAM_CONTEXT) if spam_lookup is None: return for _, context in spam_lookup.items(): if context.warning_message.id == message.id: return context return None async def on_mod_react(self, message: Message, emoji: PartialEmoji) -> None: context = self.__context_for_warning_message(message) if context is None: return if emoji.name == 'šŸ—‘': await self.__delete_messages(context) elif emoji.name == 'šŸ‘¢': await self.__kick(context) elif emoji.name == '🚫': await self.__ban(context) async def __delete_messages(self, context: SpamContext) -> None: for message in context.messages - context.deleted_messages: await message.delete() context.deleted_messages.add(message) await self.__update_from_context(context) async def __kick(self, context: SpamContext) -> None: await context.member.kick(reason='Posting same message repeatedly') context.is_kicked = True await self.__update_from_context(context) async def __ban(self, context: SpamContext) -> None: await context.member.ban(reason='Posting same message repeatedly', delete_message_days=1) context.deleted_messages |= context.messages context.is_kicked = True context.is_banned = True await self.__update_from_context(context) @commands.Cog.listener() async def on_message(self, message: Message): await self.__record_message(message)