| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- from datetime import datetime, timedelta
- from typing import cast
-
- from discord import Guild, Member, Status
- from discord.ext import commands, tasks
- from discord.ext.tasks import Loop
-
- from config import CONFIG
- from rocketbot.cogs.basecog import BaseCog, BotMessage, CogSetting
- from rocketbot.collections import AgeBoundDict
- from rocketbot.storage import Storage
- from rocketbot.utils import bot_log
-
- class AutoKickContext:
- """
- Data about a join raid.
- """
- def __init__(self, member: Member, first_kick: datetime):
- self.member = member
- self.first_kick = first_kick
- self.last_kick = first_kick
- self.kick_count = 1
-
- def record_kick(self, time: datetime):
- self.last_kick = time
- self.kick_count += 1
-
- class StatusCheckContext:
- def __init__(self, member: Member):
- self.member = member
- self.joined_at = datetime.now()
-
- class AutoKickCog(BaseCog, name='Auto Kick'):
- """
- Cog for automatically kicking ALL new joins. For temporary use during join raids.
- """
- SETTING_ENABLED = CogSetting('enabled', bool,
- brief='autokick',
- description='Whether this cog is enabled for a guild.')
- SETTING_BAN_COUNT = CogSetting('bancount', int,
- brief='number of repeat kicks before a ban',
- description='The number of times a user can join and be kicked ' + \
- 'before the next rejoin will result in a ban. A value of 0 ' + \
- 'disables this feature (only kick, never ban).',
- usage='<count:int>',
- min_value=0)
- SETTING_OFFLINE_ONLY = CogSetting('offlineonly', bool,
- brief='whether to only kick users whose status is offline',
- description='Compromised accounts may have a status of offline. ' + \
- 'If this setting is enabled, the user\'s status will be ' + \
- 'checked a few seconds after joining. If it is offline ' + \
- 'they will be kicked.',
- usage='<true|false>')
-
- STATE_KEY_RECENT_KICKS = "AutoKickCog.recent_joins"
-
- def __init__(self, bot):
- super().__init__(bot)
- self.add_setting(AutoKickCog.SETTING_ENABLED)
- self.add_setting(AutoKickCog.SETTING_BAN_COUNT)
- self.add_setting(AutoKickCog.SETTING_OFFLINE_ONLY)
- self.status_check_members = []
- timer: Loop = cast(Loop, self.status_check_timer)
- timer.start()
-
- @commands.group(
- brief='Automatically kicks all new users as soon as they join',
- )
- @commands.has_permissions(ban_members=True)
- @commands.guild_only()
- async def autokick(self, context: commands.Context):
- 'Auto-kick'
- if context.invoked_subcommand is None:
- await context.send_help()
-
- @commands.Cog.listener()
- async def on_member_join(self, member: Member) -> None:
- 'Event handler'
- guild: Guild = member.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- if self.get_guild_setting(guild, self.SETTING_OFFLINE_ONLY):
- self.log(guild, f'New member {member.name} status is {member.status}')
- self.status_check_members.append(StatusCheckContext(member))
- return
- self.__kick_or_ban_if_needed(member)
-
- @tasks.loop(seconds=5.0)
- async def status_check_timer(self):
- 'Checks status of new members shortly after joining to see if they go offline'
- contexts = self.status_check_members.copy()
- self.status_check_members = []
- now = datetime.now()
- # bot_log(guild=None, cog_class=None, message=f'Found {len(contexts)} members to check')
- for c in contexts:
- context: StatusCheckContext = c
- member: Member = context.member
- guild: Guild = member.guild
- if now - context.joined_at < timedelta(seconds=5.0):
- # Too soon, check again later
- self.status_check_members.append(context)
- continue
- if member.status != Status.offline:
- # Online, ignore
- self.log(guild, f'{member.name} status is {member.status}. Not kicking.')
- continue
- self.log(guild, f'{member.name} went offline 5s later')
- await self.__kick_or_ban_if_needed(member)
-
- async def __kick_or_ban_if_needed(self, member: Member):
- guild: Guild = member.guild
- recent_kicks: AgeBoundDict = Storage.get_state_value(guild, AutoKickCog.STATE_KEY_RECENT_KICKS)
- if recent_kicks is None:
- recent_kicks = AgeBoundDict(timedelta(seconds=3600), lambda i, context : context.last_kick)
- Storage.set_state_value(guild, self.STATE_KEY_RECENT_KICKS, recent_kicks)
- context: AutoKickContext = recent_kicks.get(member.id)
- if context is None:
- context = AutoKickContext(member, datetime.now())
- recent_kicks[member.id] = context
- else:
- context.record_kick(datetime.now())
- max_kick_count: int = self.get_guild_setting(guild, self.SETTING_BAN_COUNT)
- disable_help = f'To disable this feature: `{CONFIG["command_prefix"]}autokick disable`.'
- ban_help = f'To configure ban threshold: `{CONFIG["command_prefix"]}autokick ' + \
- 'setbancount #` (0 to disable).'
- if max_kick_count > 0 and context.kick_count > max_kick_count:
- await member.ban(reason=f'Rocketbot: Ban after {context.kick_count} joins',
- delete_message_days=0)
- msg = BotMessage(guild,
- text=f'Banned {member.mention} ({member.id}) after {context.kick_count} joins. ' + \
- disable_help + ' ' + ban_help,
- type=BotMessage.TYPE_INFO,
- context=None)
- await self.post_message(msg)
- self.log(guild, f'Banned {member.name} after {context.kick_count} joins')
- else:
- await member.kick(reason='Rocketbot: Autokick enabled.')
- msg = BotMessage(guild,
- text=f'Autokicked {member.mention} ({member.id}) ' + \
- f'({AutoKickCog.ordinal(context.kick_count)} time). ' + \
- disable_help + ' ' + ban_help,
- type=BotMessage.TYPE_INFO,
- context=None)
- await self.post_message(msg)
- self.log(guild, f'Autokicked {member.name} ' + \
- f'({AutoKickCog.ordinal(context.kick_count)} time)')
-
- @staticmethod
- def ordinal(val: int):
- 'Formats an integer with an ordinal suffix (English only)'
- if val % 10 == 1:
- return f'{val}st'
- if val % 10 == 2:
- return f'{val}nd'
- if val % 10 == 3:
- return f'{val}rd'
- return f'{val}th'
|