from discord import Guild, Message, PartialEmoji from discord.ext import commands from datetime import datetime, timedelta import math from rscollections import AgeBoundList, SizeBoundDict from cogs.basecog import BaseCog from storage import Storage class SpamContext: def __init__(self, member, message_hash): self.member = member self.message_hash = message_hash self.age = datetime.now() self.warning_message = None self.is_kicked = False self.is_banned = False self.messages = set() self.deleted_messages = set() class CrossPostCog(BaseCog): STATE_KEY_RECENT_MESSAGES = "crosspost_recent_messages" STATE_KEY_SPAM_CONTEXT = "crosspost_spam_context" CONFIG_KEY_WARN_COUNT = "crosspost_warn_count" CONFIG_KEY_BAN_COUNT = "crosspost_ban_count" CONFIG_KEY_MIN_MESSAGE_LENGTH = "crosspost_min_message_length" CONFIG_KEY_MESSAGE_AGE = "crosspost_message_age" MIN_WARN_COUNT = 2 MIN_BAN_COUNT = 2 MIN_MESSAGE_LENGTH = 0 MIN_TIME_SPAN = 1 def __init__(self, bot): super().__init__(bot) self.max_spam_contexts = 12 # Config def __warn_count(self, guild: Guild) -> int: return Storage.get_config_value(guild, self.CONFIG_KEY_WARN_COUNT) or 3 def __ban_count(self, guild: Guild) -> int: return Storage.get_config_value(guild, self.CONFIG_KEY_BAN_COUNT) or 9999 def __min_message_length(self, guild: Guild) -> int: return Storage.get_config_value(guild, self.CONFIG_KEY_MIN_MESSAGE_LENGTH) or 0 def __message_age_seconds(self, guild: Guild) -> int: return Storage.get_config_value(guild, self.CONFIG_KEY_MESSAGE_AGE) or 120 async def __record_message(self, message: Message) -> None: if message.author.permissions_in(message.channel).ban_members: # User exempt from spam detection return if len(message.content) < self.__min_message_length(message.guild): # Message too short to count towards spam total return max_age = timedelta(seconds=self.__message_age_seconds(message.guild)) recent_messages = Storage.get_state_value(message.guild, self.STATE_KEY_RECENT_MESSAGES) \ or AgeBoundList(max_age, lambda index, message : message.created_at) recent_messages.max_age = max_age recent_messages.append(message) Storage.set_state_value(message.guild, self.STATE_KEY_RECENT_MESSAGES, recent_messages) # Get all recent messages by user member_messages = [m for m in recent_messages if m.author.id == message.author.id] if len(member_messages) < self.__warn_count(message.guild): return # Look for repeats hash_to_count = {} max_count = 0 for m in member_messages: key = hash(m.content) count = (hash_to_count.get(key) or 0) + 1 hash_to_count[key] = count max_count = max(max_count, count) if max_count < self.__warn_count(message.guild): return # Handle the spam spam_lookup = Storage.get_state_value(message.guild, self.STATE_KEY_SPAM_CONTEXT) \ or SizeBoundDict(self.max_spam_contexts, lambda key, context : context.age) Storage.set_state_value(message.guild, self.STATE_KEY_SPAM_CONTEXT, spam_lookup) for message_hash, count in hash_to_count.items(): if count < self.__warn_count(message.guild): continue key = f'{message.author.id}|{message_hash}' context = spam_lookup.get(key) is_new = context is None if context is None: context = SpamContext(message.author, message_hash) spam_lookup[key] = context context.age = message.created_at for m in member_messages: if hash(m.content) == message_hash: context.messages.add(m) await self.__update_from_context(context) async def __update_from_context(self, context: SpamContext): content = next(iter(context.messages)).content if len(context.messages) >= self.__ban_count(context.member.guild): if not context.is_banned: await context.member.ban(reason='Posting same message repeatedly', delete_message_days=1) msg = f'User {context.member.mention} auto banned for ' + \ 'crosspost spamming.\n' + \ f'> {content}' if context.warning_message: await self.update_warn(context.warning_message, msg) await context.warning_message.clear_reaction('šŸ—‘') await context.warning_message.clear_reaction('šŸ‘¢') await context.warning_message.clear_reaction('🚫') else: context.warning_message = await self.warn(context.member.guild, msg) elif len(context.messages) >= self.__warn_count(context.member.guild): content = next(iter(context.messages)).content msg = f'User {context.member.mention} has posted the exact same ' + \ f'message {len(context.messages)} times.\n' + \ f'> {content}' + \ '\n' can_delete = len(context.messages) > len(context.deleted_messages) if can_delete: msg += '\nšŸ—‘ to delete messages' else: msg += '\nAll messages deleted' if not context.is_kicked: msg += '\nšŸ‘¢ to kick user' elif not context.is_banned: msg += '\nUser kicked' if context.is_banned: msg += '\nUser banned' else: msg += '\n🚫 to ban user' if context.warning_message: await self.update_warn(context.warning_message, msg) else: context.warning_message = await self.warn(context.member.guild, msg) self.listen_for_reactions_to(context.warning_message, context) if can_delete: await context.warning_message.add_reaction('šŸ—‘') else: await context.warning_message.clear_reaction('šŸ—‘') if not context.is_kicked: await context.warning_message.add_reaction('šŸ‘¢') else: await context.warning_message.clear_reaction('šŸ‘¢') if not context.is_banned: await context.warning_message.add_reaction('🚫') else: await context.warning_message.clear_reaction('🚫') async def __delete_messages(self, context: SpamContext) -> None: for message in context.messages - context.deleted_messages: await message.delete() context.deleted_messages.add(message) await self.__update_from_context(context) async def __kick(self, context: SpamContext) -> None: await context.member.kick(reason='Posting same message repeatedly') context.is_kicked = True await self.__update_from_context(context) async def __ban(self, context: SpamContext) -> None: await context.member.ban(reason='Posting same message repeatedly', delete_message_days=1) context.deleted_messages |= context.messages context.is_kicked = True context.is_banned = True await self.__update_from_context(context) async def on_mod_react(self, message: Message, emoji: PartialEmoji, context: SpamContext) -> None: if context is None: return if emoji.name == 'šŸ—‘': await self.__delete_messages(context) elif emoji.name == 'šŸ‘¢': await self.__kick(context) elif emoji.name == '🚫': await self.__ban(context) @commands.Cog.listener() async def on_message(self, message: Message): await self.__record_message(message) # -- Commands ----------------------------------------------------------- @commands.group( brief='Manages crosspost/repeated post detection and handling', ) @commands.has_permissions(ban_members=True) @commands.guild_only() async def crosspost(self, context: commands.Context): 'Command group' if context.invoked_subcommand is None: await context.send_help() @crosspost.command( name='setwarncount', brief='Sets the number of duplicate messages to trigger a mod warning', description='If the same user posts the exact same message ' + 'content this many times a warning will be posted to the mods,' + 'even if the messages are posted in different channels.', usage='', ) async def joinraid_setwarncount(self, context: commands.Context, warn_count: int): if not await self.validate_param(context, 'warn_count', warn_count, allowed_types=(int, ), min_value=self.MIN_WARN_COUNT): return Storage.set_config_value(context.guild, self.CONFIG_KEY_WARN_COUNT, warn_count) await context.message.reply(f'āœ… Mods will be warned if a user posts ' + f'the exact same message {warn_count} or more times within ' + f'{self.__message_age_seconds(context.guild)} seconds.', mention_author=False) @crosspost.command( name='getwarncount', brief='Returns the number of duplicate messages to trigger a mod warning', ) async def joinraid_getwarncount(self, context: commands.Context): await context.message.reply(f'ā„¹ļø Mods will be warned if a user posts ' + f'the exact same message {self.__warn_count(context.guild)} or more ' + f'times within {self.__message_age_seconds(context.guild)} seconds.', mention_author=False) @crosspost.command( name='setbancount', brief='Sets the number of duplicate messages to trigger an automatic ban', description='If the same user posts the exact same message ' + 'content this many times they will be automatically banned and the ' + 'mods will be alerted.', usage='', ) async def joinraid_setbancount(self, context: commands.Context, ban_count: int): if not await self.validate_param(context, 'ban_count', ban_count, allowed_types=(int, ), min_value=self.MIN_BAN_COUNT): return Storage.set_config_value(context.guild, self.CONFIG_KEY_BAN_COUNT, ban_count) await context.message.reply(f'āœ… Users will be banned if they post ' + f'the exact same message {ban_count} or more times within ' + f'{self.__message_age_seconds(context.guild)} seconds.', mention_author=False) @crosspost.command( name='getbancount', brief='Returns the number of duplicate messages to trigger an automatic ban', ) async def joinraid_getbancount(self, context: commands.Context): await context.message.reply(f'ā„¹ļø Users will be banned if they post ' + f'the exact same message {self.__ban_count(context.guild)} or more ' + f'times within {self.__message_age_seconds(context.guild)} seconds.', mention_author=False) @crosspost.command( name='setminlength', brief='Sets the minimum number of characters for a message to count toward spamming', description='Messages shorter than this number of characters will not ' + 'count toward spam counts. This helps prevent flagging common, ' + 'frequent, short responses like "lol". A value of 0 counts all messages.', usage='', ) async def joinraid_setminlength(self, context: commands.Context, min_length: int): if not await self.validate_param(context, 'min_length', min_length, allowed_types=(int, ), min_value=self.MIN_MESSAGE_LENGTH): return Storage.set_config_value(context.guild, self.CONFIG_KEY_MIN_MESSAGE_LENGTH, min_length) if min_length == 0: await context.message.reply(f'āœ… All messages will count against ' + 'spam counts, regardless of length.', mention_author=False) else: await context.message.reply(f'āœ… Only messages {min_length} ' + f'characters or longer will count against spam counts.', mention_author=False) @crosspost.command( name='getminlength', brief='Returns the number of duplicate messages to trigger an automatic ban', ) async def joinraid_getminlength(self, context: commands.Context): min_length = self.__min_message_length(context.guild) if min_length == 0: await context.message.reply(f'ā„¹ļø All messages will count against ' + 'spam counts, regardless of length.', mention_author=False) else: await context.message.reply(f'ā„¹ļø Only messages {min_length} ' + f'characters or longer will count against spam counts.', mention_author=False) @crosspost.command( name='settimewindow', brief='Sets the length of time recent messages are checked for duplicates', description='Repeated messages are only checked against recent ' + 'messages. This sets the length of that window, in seconds. Lower ' + 'values save memory and prevent false positives.', usage='', ) async def joinraid_settimewindow(self, context: commands.Context, seconds: int): if not await self.validate_param(context, 'seconds', seconds, allowed_types=(int, ), min_value=self.MIN_TIME_SPAN): return Storage.set_config_value(context.guild, self.CONFIG_KEY_MESSAGE_AGE, seconds) await context.message.reply(f'āœ… Only messages in the past {seconds} ' + f'seconds will be checked for duplicates.', mention_author=False) @crosspost.command( name='gettimewindow', brief='Returns the length of time recent messages are checked for duplicates', ) async def joinraid_gettimewindow(self, context: commands.Context): seconds = self.__message_age_seconds(context.guild) await context.message.reply(f'ā„¹ļø Only messages in the past {seconds} ' + 'seconds will be checked for duplicates.', mention_author=False)