from datetime import datetime, timedelta from discord import Guild, Member, Status from discord.ext import commands, tasks from discord.ext.tasks import Loop from config import CONFIG from rocketbot.cogs.basecog import BaseCog, BotMessage, CogSetting from rocketbot.collections import AgeBoundDict from rocketbot.storage import Storage class AutoKickContext: """ Data about a join raid. """ def __init__(self, member: Member, first_kick: datetime): self.member = member self.first_kick = first_kick self.last_kick = first_kick self.kick_count = 1 def record_kick(self, time: datetime): self.last_kick = time self.kick_count += 1 class StatusCheckContext: def __init__(self, member: Member): self.member = member self.joined_at = datetime.now() class AutoKickCog(BaseCog, name='Auto Kick'): """ Cog for automatically kicking ALL new joins. For temporary use during join raids. """ SETTING_ENABLED = CogSetting('enabled', bool, brief='autokick', description='Whether this cog is enabled for a guild.') SETTING_BAN_COUNT = CogSetting('bancount', int, brief='number of repeat kicks before a ban', description='The number of times a user can join and be kicked ' + \ 'before the next rejoin will result in a ban. A value of 0 ' + \ 'disables this feature (only kick, never ban).', usage='', min_value=0) SETTING_OFFLINE_ONLY = CogSetting('offlineonly', bool, brief='whether to only kick users whose status is offline', description='Compromised accounts may have a status of offline. ' + \ 'If this setting is enabled, the user\'s status will be ' + \ 'checked a few seconds after joining. If it is offline ' + \ 'they will be kicked.', usage='') STATE_KEY_RECENT_KICKS = "AutoKickCog.recent_joins" def __init__(self, bot): super().__init__(bot) self.add_setting(AutoKickCog.SETTING_ENABLED) self.add_setting(AutoKickCog.SETTING_BAN_COUNT) self.add_setting(AutoKickCog.SETTING_OFFLINE_ONLY) self.status_check_members = [] timer: Loop = self.status_check_timer timer.start() @commands.group( brief='Automatically kicks all new users as soon as they join', ) @commands.has_permissions(ban_members=True) @commands.guild_only() async def autokick(self, context: commands.Context): """Auto-kick""" if context.invoked_subcommand is None: await context.send_help() @commands.Cog.listener() async def on_member_join(self, member: Member) -> None: """Event handler""" guild: Guild = member.guild if not self.get_guild_setting(guild, self.SETTING_ENABLED): return if self.get_guild_setting(guild, self.SETTING_OFFLINE_ONLY): self.log(guild, f'New member {member.name} status is {member.status}') self.status_check_members.append(StatusCheckContext(member)) return await self.__kick_or_ban_if_needed(member) @tasks.loop(seconds=5.0) async def status_check_timer(self): """Checks status of new members shortly after joining to see if they go offline""" contexts = self.status_check_members.copy() self.status_check_members = [] now = datetime.now() # bot_log(guild=None, cog_class=None, message=f'Found {len(contexts)} members to check') for c in contexts: context: StatusCheckContext = c member: Member = context.member guild: Guild = member.guild if now - context.joined_at < timedelta(seconds=5.0): # Too soon, check again later self.status_check_members.append(context) continue if member.status != Status.offline: # Online, ignore self.log(guild, f'{member.name} status is {member.status}. Not kicking.') continue self.log(guild, f'{member.name} went offline 5s later') await self.__kick_or_ban_if_needed(member) async def __kick_or_ban_if_needed(self, member: Member): guild: Guild = member.guild recent_kicks: AgeBoundDict = Storage.get_state_value(guild, AutoKickCog.STATE_KEY_RECENT_KICKS) if recent_kicks is None: recent_kicks = AgeBoundDict(timedelta(seconds=3600), lambda i, context : context.last_kick) Storage.set_state_value(guild, self.STATE_KEY_RECENT_KICKS, recent_kicks) context: AutoKickContext = recent_kicks.get(member.id) if context is None: context = AutoKickContext(member, datetime.now()) recent_kicks[member.id] = context else: context.record_kick(datetime.now()) max_kick_count: int = self.get_guild_setting(guild, self.SETTING_BAN_COUNT) disable_help = f'To disable this feature: `{CONFIG["command_prefix"]}autokick disable`.' ban_help = f'To configure ban threshold: `{CONFIG["command_prefix"]}autokick ' + \ 'setbancount #` (0 to disable).' if max_kick_count > 0 and context.kick_count > max_kick_count: await member.ban(reason=f'Rocketbot: Ban after {context.kick_count} joins', delete_message_days=0) msg = BotMessage(guild, text=f'Banned {member.mention} ({member.id}) after {context.kick_count} joins. ' + \ disable_help + ' ' + ban_help, type=BotMessage.TYPE_INFO, context=None) await self.post_message(msg) self.log(guild, f'Banned {member.name} after {context.kick_count} joins') else: await member.kick(reason='Rocketbot: Autokick enabled.') msg = BotMessage(guild, text=f'Autokicked {member.mention} ({member.id}) ' + \ f'({AutoKickCog.ordinal(context.kick_count)} time). ' + \ disable_help + ' ' + ban_help, type=BotMessage.TYPE_INFO, context=None) await self.post_message(msg) self.log(guild, f'Autokicked {member.name} ' + \ f'({AutoKickCog.ordinal(context.kick_count)} time)') @staticmethod def ordinal(val: int): """Formats an integer with an ordinal suffix (English only)""" if val % 100 < 10 or val % 100 > 20: if val % 10 == 1: return f'{val}st' if val % 10 == 2: return f'{val}nd' if val % 10 == 3: return f'{val}rd' return f'{val}th'