| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- """
- Cog for detecting spam messages posted in multiple channels.
- """
- import re
- from datetime import datetime, timedelta, timezone
- from typing import Optional
-
- from discord import Member, Message, utils as discordutils, TextChannel
- from discord.ext.commands import Cog
-
- from config import CONFIG
- from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction, CogSetting
- from rocketbot.collections import AgeBoundList, AgeBoundDict
- from rocketbot.storage import Storage
- from rocketbot.utils import str_from_timedelta
-
-
- class SpamContext:
- """
- Data about a set of duplicate messages from a user.
- """
- def __init__(self, member: Member) -> None:
- self.member: Member = member
- self.age: datetime = datetime.now()
- self.bot_message: Optional[BotMessage] = None
- self.is_kicked: bool = False
- self.is_banned: bool = False
- self.is_autobanned: bool = False
- self.spam_messages: set[Message] = set()
- self.deleted_messages: set[Message] = set()
- self.unique_channels: set[TextChannel] = set()
- self.duplicate_count: int = 0
-
- class CrossPostCog(BaseCog, name='Crosspost Detection'):
- """
- Detects a user posting in multiple channels in a short period
- of time: a common pattern for spammers.
-
- These used to be identical text, but more recent attacks have had small
- variations, such as different imgur URLs. It's reasonable to treat
- posting in many channels in a short period as suspicious on its own,
- regardless of whether they are identical.
-
- Repeated posts in the same channel aren't currently detected, as this can
- often be for a reason or due to trying a failed post when connectivity is
- poor. Minimum message length can be enforced for detection.
- """
- SETTING_ENABLED = CogSetting('enabled', bool,
- brief='crosspost detection',
- description='Whether crosspost detection is enabled.')
- SETTING_WARN_COUNT = CogSetting('warncount', int,
- brief='number of messages to trigger a warning',
- description='The number of unique channels messages are ' + \
- 'posted in by the same user to trigger a mod warning. The ' + \
- 'messages need not be identical (see dupewarncount).',
- usage='<count:int>',
- min_value=2)
- SETTING_DUPE_WARN_COUNT = CogSetting('dupewarncount', int,
- brief='number of identical messages to trigger a warning',
- description='The number of unique channels identical messages are ' + \
- 'posted in by the same user to trigger a mod warning.',
- usage='<count:int>',
- min_value=2)
- SETTING_BAN_COUNT = CogSetting('bancount', int,
- brief='number of messages to trigger a ban',
- description='The number of unique channels messages are ' + \
- 'posted in by the same user to trigger an automatic ban. The ' + \
- 'messages need not be identical (see dupebancount). Set ' + \
- 'to a large value to effectively disable, e.g. 9999.',
- usage='<count:int>',
- min_value=2)
- SETTING_DUPE_BAN_COUNT = CogSetting('dupebancount', int,
- brief='number of identical messages to trigger a ban',
- description='The number of unique channels identical messages are ' + \
- 'posted in by the same user to trigger an automatic ban. Set ' + \
- 'to a large value to effectively disable, e.g. 9999.',
- usage='<count:int>',
- min_value=2)
- SETTING_MIN_LENGTH = CogSetting('minlength', int,
- brief='minimum message length',
- description='The minimum number of characters in a message to be ' + \
- 'checked for duplicates. This can help ignore common short ' + \
- 'messages like "lol" or a single emoji.',
- usage='<character_count:int>',
- min_value=1)
- SETTING_TIMESPAN = CogSetting('timespan', timedelta,
- brief='time window to look for dupe messages',
- description='The number of seconds of message history to look at ' + \
- 'when looking for duplicates. Shorter values are preferred, ' + \
- 'both to detect bots and avoid excessive memory usage.',
- usage='<seconds:int>',
- min_value=timedelta(seconds=1))
-
- STATE_KEY_RECENT_MESSAGES = "CrossPostCog.recent_messages"
- STATE_KEY_SPAM_CONTEXT = "CrossPostCog.spam_context"
-
- def __init__(self, bot):
- super().__init__(
- bot,
- config_prefix='crosspost',
- short_description='Manages crosspost detection and handling.',
- )
- self.add_setting(CrossPostCog.SETTING_ENABLED)
- self.add_setting(CrossPostCog.SETTING_WARN_COUNT)
- self.add_setting(CrossPostCog.SETTING_DUPE_WARN_COUNT)
- self.add_setting(CrossPostCog.SETTING_BAN_COUNT)
- self.add_setting(CrossPostCog.SETTING_DUPE_BAN_COUNT)
- self.add_setting(CrossPostCog.SETTING_MIN_LENGTH)
- self.add_setting(CrossPostCog.SETTING_TIMESPAN)
- self.max_spam_contexts = 12
-
- async def __record_message(self, message: Message) -> None:
- if message.channel.permissions_for(message.author).ban_members:
- # User exempt from spam detection
- self.__trace("User exempt from crosspost checks")
- return
- def compute_message_hash(m: Message) -> int:
- to_hash = m.content
- # URLs sometimes differ per spam message, so simplify them
- url_regex = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
- to_hash = re.sub(url_regex, '<url>', to_hash)
- # Add attachment metadata
- for attachment in m.attachments:
- to_hash += f'\n[[ATT: ct={attachment.content_type} s={attachment.size} w={attachment.width} h={attachment.height}]]'
- h = hash(to_hash)
- self.__trace(f"Message hash for {m.id} is {h}")
- return h
-
- min_length = self.get_guild_setting(message.guild, self.SETTING_MIN_LENGTH)
- if len(message.attachments) == 0 and len(message.content) < min_length:
- # Message too short to count towards spam total
- self.__trace(f"Message len {len(message.content)} < {min_length}")
- return
-
- # Get config
- max_age = timedelta(seconds=self.get_guild_setting(message.guild, self.SETTING_TIMESPAN))
- warn_count: int = self.get_guild_setting(message.guild, self.SETTING_WARN_COUNT)
- dupe_warn_count: int = self.get_guild_setting(message.guild, self.SETTING_DUPE_WARN_COUNT)
-
- # Record message
- recent_messages: AgeBoundList[Message, datetime, timedelta] = Storage.get_state_value(message.guild, self.STATE_KEY_RECENT_MESSAGES)
- if recent_messages is None:
- recent_messages = AgeBoundList(max_age, lambda index, message : message.created_at)
- Storage.set_state_value(message.guild, self.STATE_KEY_RECENT_MESSAGES, recent_messages)
- recent_messages.max_age = max_age
- recent_messages.append(message)
- self.__trace(f"Recent messages now length {len(recent_messages)}")
-
- # Get all recent messages by user
- member_messages = [m for m in recent_messages if m.author.id == message.author.id]
- message_count = len(member_messages)
- self.__trace(f"Found {message_count} messages for {message.author.name}")
- if message_count < warn_count and message_count < dupe_warn_count:
- self.__trace(f"Bailing because message count {message_count} < warn count {warn_count} and < dupe warn count {dupe_warn_count}")
- return
-
- # Look for identical(ish) messages and unique channels
- hash_to_channels: dict[int, set[TextChannel]] = {}
- unique_channels: set[TextChannel] = set()
- max_duplicate_count = 0
- for m in member_messages:
- message_hash = compute_message_hash(m)
- dupe_message_channels: set[TextChannel] = hash_to_channels.get(message_hash)
- if dupe_message_channels is None:
- dupe_message_channels = set()
- hash_to_channels[message_hash] = dupe_message_channels
- dupe_message_channels.add(m.channel)
- unique_channels.add(m.channel)
- max_duplicate_count = max(max_duplicate_count, len(dupe_message_channels))
- channel_count = len(unique_channels)
- self.__trace(f"Found {len(hash_to_channels)} unique messages, {channel_count} unique channels, {max_duplicate_count} duplicated messages")
- if channel_count < warn_count and max_duplicate_count < dupe_warn_count:
- self.__trace(f"Bailing because channels {channel_count} < warn count {warn_count} and max dupes {max_duplicate_count} < dupe warn count {dupe_warn_count}")
- return
-
- # This person is a problem
-
- spam_lookup: AgeBoundDict[str, SpamContext, datetime, timedelta] = Storage.get_state_value(message.guild, self.STATE_KEY_SPAM_CONTEXT)
- if spam_lookup is None:
- spam_lookup = AgeBoundDict(
- max_age,
- lambda key, context : context.age)
- Storage.set_state_value(message.guild, self.STATE_KEY_SPAM_CONTEXT, spam_lookup)
- key = f'{message.author.id}'
- context = spam_lookup.get(key)
- if context is not None and message.created_at - context.age > max_age:
- context = None
- if context is None:
- context = SpamContext(message.author)
- spam_lookup[key] = context
- self.log(message.guild,
- f'\u0007{message.author.name} ({message.author.id}) ' + \
- f'posted messages in {channel_count} channels.')
- context.age = message.created_at
- context.duplicate_count = max_duplicate_count
- context.spam_messages.update(member_messages)
- context.unique_channels.update(unique_channels)
- await self.__update_from_context(context)
-
- async def __update_from_context(self, context: SpamContext):
- ban_count = self.get_guild_setting(context.member.guild, self.SETTING_BAN_COUNT)
- dupe_ban_count = self.get_guild_setting(context.member.guild, self.SETTING_DUPE_BAN_COUNT)
- channel_count = len(context.unique_channels)
- if channel_count >= ban_count or context.duplicate_count >= dupe_ban_count:
- if not context.is_banned:
- max_age = timedelta(seconds=self.get_guild_setting(context.member.guild, self.SETTING_TIMESPAN))
- max_age_str = str_from_timedelta(max_age)
- await context.member.ban(
- reason=f'Rocketbot: Posted in {channel_count} channels within {max_age_str} ' + \
- f'({context.duplicate_count} identical). Banned by {self.bot.user.name}.',
- delete_message_days=1)
- context.is_kicked = True
- context.is_banned = True
- context.is_autobanned = True
- context.deleted_messages |= context.spam_messages
- self.__log_ban(context, self.bot.user.name)
- else:
- # Already banned. Nothing to update in the message.
- return
- await self.__update_message_from_context(context)
-
- async def __update_message_from_context(self, context: SpamContext) -> None:
- first_spam_message: Message = sorted(list(context.spam_messages), key=lambda m: m.created_at)[0]
- spam_count = len(context.spam_messages)
- channel_count = len(context.unique_channels)
- deleted_count = len(context.deleted_messages)
- duplicate_count = context.duplicate_count
- max_age = timedelta(seconds=self.get_guild_setting(context.member.guild, self.SETTING_TIMESPAN))
- max_age_str = str_from_timedelta(max_age)
- message = context.bot_message
- if message is None:
- message_type: int = BotMessage.TYPE_INFO if self.was_warned_recently(context.member) \
- else BotMessage.TYPE_MOD_WARNING
- message = BotMessage(context.member.guild, '', message_type, context)
- message.quote = discordutils.remove_markdown(first_spam_message.clean_content)
- self.record_warning(context.member)
- if context.is_autobanned:
- text = f'User {context.member.mention} auto banned for ' + \
- f'posting messages in {channel_count} channels within {max_age_str} ' + \
- f'({duplicate_count} identical). Messages from past 24 hours deleted.'
- await message.set_reactions([])
- await message.set_text(text)
- else:
- body: str = f'User {context.member.mention} posted '
- if duplicate_count == channel_count:
- body += f'identical messages in {channel_count} channels within {max_age_str} .'
- elif duplicate_count == 1:
- body += f'**different** messages in {channel_count} channels within ' + \
- f'{max_age_str}. (Showing first one).'
- else:
- body += f'messages in {channel_count} channels within {max_age_str} ' + \
- f'({duplicate_count} are identical, showing first one).'
- max_links = 10
- for msg in sorted(list(context.spam_messages), key=lambda m: m.created_at)[:max_links]:
- body += f'\n- {msg.jump_url}'
- if len(context.spam_messages) > max_links:
- body += f'\n- ...{len(context.spam_messages) - max_links} more...'
- await message.set_text(body)
- await message.set_reactions(BotMessageReaction.standard_set(
- did_delete = deleted_count >= spam_count,
- message_count = spam_count,
- did_kick = context.is_kicked,
- did_ban = context.is_banned))
- if context.bot_message is None:
- await self.post_message(message)
- context.bot_message = message
-
- async def on_mod_react(self,
- bot_message: BotMessage,
- reaction: BotMessageReaction,
- reacted_by: Member) -> None:
- context: SpamContext = bot_message.context
- if context is None:
- return
-
- channel_count = len(context.unique_channels)
- if reaction.emoji == CONFIG['trash_emoji']:
- for message in context.spam_messages - context.deleted_messages:
- await message.delete()
- context.deleted_messages.add(message)
- await self.__update_from_context(context)
- self.__log_deletion(context, reacted_by.name)
- elif reaction.emoji == CONFIG['kick_emoji']:
- await context.member.kick(
- reason=f'Rocketbot: Posted messages in {channel_count} ' + \
- f'channels. Kicked by {reacted_by.name}.')
- context.is_kicked = True
- await self.__update_from_context(context)
- self.__log_kick(context, reacted_by.name)
- elif reaction.emoji == CONFIG['ban_emoji']:
- await context.member.ban(
- reason=f'Rocketbot: Posted messages in {channel_count} ' + \
- f'channels. Banned by {reacted_by.name}.',
- delete_message_days=1)
- context.deleted_messages |= context.spam_messages
- context.is_kicked = True
- context.is_banned = True
- await self.__update_from_context(context)
- self.__log_ban(context, reacted_by.name)
-
- def __log_deletion(self, context: SpamContext, by_who: str) -> None:
- max_age = timedelta(seconds=self.get_guild_setting(context.member.guild, self.SETTING_TIMESPAN))
- max_age_str = str_from_timedelta(max_age)
- channel_count = len(context.unique_channels)
- duplicate_count = context.duplicate_count
- self.log(context.member.guild,
- f'{context.member.name} ({context.member.id}) posted ' + \
- f'messages in {channel_count} channels withint {max_age_str} ' + \
- f'({duplicate_count} identical). Deleted by {by_who}.')
-
- def __log_kick(self, context: SpamContext, by_who: str) -> None:
- max_age = timedelta(seconds=self.get_guild_setting(context.member.guild, self.SETTING_TIMESPAN))
- max_age_str = str_from_timedelta(max_age)
- channel_count = len(context.unique_channels)
- duplicate_count = context.duplicate_count
- self.log(context.member.guild,
- f'{context.member.name} ({context.member.id}) posted ' + \
- f'messages in {channel_count} channels within {max_age_str} ' + \
- f'({duplicate_count} identical). Kicked by {by_who}.')
-
- def __log_ban(self, context: SpamContext, by_who: str) -> None:
- max_age = timedelta(seconds=self.get_guild_setting(context.member.guild, self.SETTING_TIMESPAN))
- max_age_str = str_from_timedelta(max_age)
- channel_count = len(context.unique_channels)
- duplicate_count = context.duplicate_count
- self.log(context.member.guild,
- f'{context.member.name} ({context.member.id}) posted ' + \
- f'messages in {channel_count} channels within {max_age_str} ' + \
- f'({duplicate_count} identical). Banned by {by_who}.')
-
- def __trace(self, message):
- # print(message)
- pass
-
- @Cog.listener()
- async def on_message(self, message: Message):
- """Event handler"""
- if message.author is None or \
- message.author.bot or \
- message.channel is None or \
- message.guild is None:
- return
- if not self.get_guild_setting(message.guild, self.SETTING_ENABLED):
- return
- self.__trace("--ON MESSAGE--")
- await self.__record_message(message)
|