| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- from discord import Guild, Member, Message
- from discord.ext import commands
- import re
- from datetime import timedelta
-
- from cogs.basecog import BaseCog, BotMessage, BotMessageReaction, CogSetting
- from config import CONFIG
- from storage import Storage
-
- class URLSpamContext:
- def __init__(self, spam_message: Message):
- self.spam_message = spam_message
- self.is_deleted = False
- self.is_kicked = False
- self.is_banned = False
-
- class URLSpamCog(BaseCog):
- """
- Detects users posting URLs who just joined recently: a common spam pattern.
- Can be configured to take immediate action or just warn the mods.
- """
-
- SETTING_ENABLED = CogSetting('enabled', bool,
- brief='URL spam detection',
- description='Whether URLs posted soon after joining are flagged.')
- SETTING_ACTION = CogSetting('action', str,
- brief='action to take on spam',
- description='The action to take on detected URL spam.',
- enum_values=set(['nothing', 'modwarn', 'delete', 'kick', 'ban']))
- SETTING_JOIN_AGE = CogSetting('joinage', float,
- brief='seconds since member joined',
- description='The minimum seconds since the user joined the ' + \
- 'server before they can post URLs. URLs posted by users ' + \
- 'who joined too recently will be flagged. Keep in mind ' + \
- 'many servers have a minimum 10 minute cooldown before ' + \
- 'new members can say anything. Setting to 0 effectively ' + \
- 'disables URL spam detection.',
- usage='<seconds:int>',
- min_value=0)
-
- def __init__(self, bot):
- super().__init__(bot)
- self.add_setting(URLSpamCog.SETTING_ENABLED)
- self.add_setting(URLSpamCog.SETTING_ACTION)
- self.add_setting(URLSpamCog.SETTING_JOIN_AGE)
-
- @commands.group(
- brief='Manages URL spam detection',
- )
- @commands.has_permissions(ban_members=True)
- @commands.guild_only()
- async def urlspam(self, context: commands.Context):
- 'Command group'
- if context.invoked_subcommand is None:
- await context.send_help()
-
- @commands.Cog.listener()
- async def on_message(self, message: Message):
- if message.author is None or \
- message.author.bot or \
- message.guild is None or \
- message.channel is None or \
- message.content is None:
- return
- if not self.get_guild_setting(message.guild, self.SETTING_ENABLED):
- return
-
- action = self.get_guild_setting(message.guild, self.SETTING_ACTION)
- join_seconds = self.get_guild_setting(message.guild, self.SETTING_JOIN_AGE)
- min_join_age = timedelta(seconds=join_seconds)
- if action == 'nothing':
- return
- if not self.__contains_url(message.content):
- return
- join_age = message.created_at - message.author.joined_at
- join_age_str = self.__format_timedelta(join_age)
- if join_age < min_join_age:
- context = URLSpamContext(message)
- needs_attention = False
- if action == 'modwarn':
- needs_attention = True
- self.log(message.guild, f'New user {message.author.name} ' + \
- f'({message.author.id}) posted URL {join_age_str} after ' + \
- 'joining. Mods alerted.')
- elif action == 'delete':
- await message.delete()
- context.is_deleted = True
- self.log(message.guild, f'New user {message.author.name} ' + \
- f'({message.author.id}) posted URL {join_age_str} after ' + \
- 'joining. Message deleted.')
- elif action == 'kick':
- await message.delete()
- context.is_deleted = True
- await message.author.kick(
- reason=f'Rocketbot: Posted a link {join_age_str} after joining')
- context.is_kicked = True
- self.log(message.guild, f'New user {message.author.name} ' + \
- f'({message.author.id}) posted URL {join_age_str} after ' + \
- 'joining. User kicked.')
- elif action == 'ban':
- await message.author.ban(
- reason=f'Rocketbot: User posted a link {join_age_str} after joining',
- delete_message_days=1)
- context.is_deleted = True
- context.is_kicked = True
- context.is_banned = True
- self.log(message.guild, f'New user {message.author.name} ' + \
- f'({message.author.id}) posted URL {join_age_str} after ' + \
- 'joining. User banned.')
- bm = BotMessage(
- message.guild,
- f'User {message.author.mention} posted a URL ' + \
- f'{join_age_str} after joining.',
- type = BotMessage.TYPE_MOD_WARNING if needs_attention else BotMessage.TYPE_INFO,
- context = context)
- bm.quote = message.content
- await bm.set_reactions(BotMessageReaction.standard_set(
- did_delete=context.is_deleted,
- did_kick=context.is_kicked,
- did_ban=context.is_banned))
- await self.post_message(bm)
-
- async def on_mod_react(self,
- bot_message: BotMessage,
- reaction: BotMessageReaction,
- reacted_by: Member) -> None:
- context: URLSpamContext = bot_message.context
- if context is None:
- return
- sm: Message = context.spam_message
- if reaction.emoji == CONFIG['trash_emoji']:
- if not context.is_deleted:
- await sm.delete()
- context.is_deleted = True
- self.log(sm.guild, f'URL spam by {sm.author.name} deleted ' + \
- f'by {reacted_by.name}')
- elif reaction.emoji == CONFIG['kick_emoji']:
- if not context.is_deleted:
- await sm.delete()
- context.is_deleted = True
- if not context.is_kicked:
- await sm.author.kick(
- reason=f'Rocketbot: Kicked for URL spam by {reacted_by.name}')
- context.is_kicked = True
- self.log(sm.guild, f'URL spammer {sm.author.name} kicked ' + \
- f'by {reacted_by.name}')
- elif reaction.emoji == CONFIG['ban_emoji']:
- if not context.is_banned:
- await sm.author.ban(
- reason=f'Rocketbot: Banned for URL spam by {reacted_by.name}',
- delete_message_days=1)
- context.is_deleted = True
- context.is_kicked = True
- context.is_banned = True
- self.log(sm.guild, f'URL spammer {sm.author.name} banned ' + \
- f'by {reacted_by.name}')
- else:
- return
- await bot_message.set_reactions(BotMessageReaction.standard_set(
- did_delete=context.is_deleted,
- did_kick=context.is_kicked,
- did_ban=context.is_banned))
-
- @classmethod
- def __contains_url(cls, text: str) -> bool:
- p = re.compile(r'http(?:s)?://[^\s]+')
- return p.search(text) is not None
-
- @classmethod
- def __format_timedelta(cls, timespan: timedelta) -> str:
- parts = []
- d = timespan.days
- h = timespan.seconds // 3600
- m = (timespan.seconds // 60) % 60
- s = timespan.seconds % 60
- if d > 0:
- parts.append(f'{d} days')
- if d > 0 or h > 0:
- parts.append(f'{h} hours')
- if d > 0 or h > 0 or m > 0:
- parts.append(f'{m} minutes')
- parts.append(f'{s} seconds')
- # Limit the precision to the two most significant elements
- while len(parts) > 2:
- parts.pop(-1)
- return ' '.join(parts)
|