| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- import weakref
-
- from datetime import datetime, timedelta
- from discord import Guild, Member
- from discord.ext import commands
-
- from config import CONFIG
- from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction, CogSetting
- from rocketbot.collections import AgeBoundList
- from rocketbot.storage import Storage
- from rocketbot.utils import timedelta_from_str
-
- class JoinAgeQueryContext:
- """
- Data about a join age query
- """
- def __init__(self, join_members: list, timespan: str):
- self.join_members = list(join_members)
- self.timespan = timespan
- self.kicked_members = set()
- self.banned_members = set()
- self.results_message_ref = None
-
- class JoinAgeCog(BaseCog, name='Join Age'):
- """
- Cog for finding users by when they joined.
- """
- SETTING_ENABLED = CogSetting('enabled', bool,
- brief='join age',
- description='Whether this cog is enabled for a guild.')
- SETTING_JOIN_TIME = CogSetting('jointime', float,
- brief='maximum length of time to track new joins',
- description='The number of seconds of join history to maintain.',
- usage='<seconds:float>',
- min_value=1.0)
-
- STATE_KEY_RECENT_JOINS = "JoinAgeCog.recent_joins"
-
- def __init__(self, bot):
- super().__init__(bot)
- self.add_setting(JoinAgeCog.SETTING_ENABLED)
- self.add_setting(JoinAgeCog.SETTING_JOIN_TIME)
-
- @commands.group(
- brief='Tracks recently joined users with options to mass kick or ban',
- )
- @commands.has_permissions(ban_members=True)
- @commands.guild_only()
- async def joinage(self, context: commands.Context):
- 'Join age tracking'
- if context.invoked_subcommand is None:
- await context.send_help()
-
- @joinage.command(
- brief='Queries for users who joined in the past span of time',
- description='Searches for users who joined the server recently. ' + \
- 'Can use time spans like 30s, 5m, 1h, 7d, etc.',
- usage='<time_period>'
- )
- async def search(self, context: commands.Context, timespan: str):
- 'Command handler'
- guild: Guild = context.guild
- recent_joins: AgeBoundList = Storage.get_state_value(guild, self.STATE_KEY_RECENT_JOINS)
- if recent_joins is None:
- max_age: timedelta = timedelta(seconds=self.get_guild_setting(guild, self.SETTING_JOIN_TIME))
- recent_joins = AgeBoundList(max_age, lambda i, member : member.joined_at)
- Storage.set_state_value(guild, self.STATE_KEY_RECENT_JOINS, recent_joins)
- results: list = []
- ts: timedelta = timedelta_from_str(timespan)
- cutoff: datetime = datetime.utcnow() - ts
- for member in recent_joins:
- if member.joined_at > cutoff:
- results.append(member)
- ctx = JoinAgeQueryContext(results, timespan)
- msg = BotMessage(guild,
- text='',
- type=BotMessage.TYPE_INFO,
- context=ctx)
- ctx.results_message_ref = weakref.ref(msg)
- await self.__update_results_message(ctx)
- await self.post_message(msg)
-
- async def on_mod_react(self,
- bot_message: BotMessage,
- reaction: BotMessageReaction,
- reacted_by: Member) -> None:
- guild: Guild = bot_message.guild
- ctx: JoinAgeQueryContext = bot_message.context
- if reaction.emoji == CONFIG['kick_emoji']:
- to_kick = set(ctx.join_members) - ctx.kicked_members
- for member in to_kick:
- await member.kick(
- reason=f'Rocketbot: Mass kick based on join age, by {reacted_by.name}.')
- ctx.kicked_members |= to_kick
- await self.__update_results_message(ctx)
- self.log(guild, f'Users kicked by {reacted_by.name}.')
- elif reaction.emoji == CONFIG['ban_emoji']:
- to_ban = set(ctx.join_members) - ctx.banned_members
- for member in to_ban:
- await member.ban(
- reason=f'Rocketbot: Mass ban based on join age, by {reacted_by.name}.',
- delete_message_days=0)
- ctx.banned_members |= to_ban
- await self.__update_results_message(ctx)
- self.log(guild, f'Users banned by {reacted_by.name}')
-
- @commands.Cog.listener()
- async def on_member_join(self, member: Member) -> None:
- 'Event handler'
- guild: Guild = member.guild
- if not self.get_guild_setting(guild, self.SETTING_ENABLED):
- return
- recent_joins: AgeBoundList = Storage.get_state_value(guild, self.STATE_KEY_RECENT_JOINS)
- if recent_joins is None:
- max_age: timedelta = timedelta(seconds=self.get_guild_setting(guild, self.SETTING_JOIN_TIME))
- recent_joins = AgeBoundList(max_age, lambda i, member : member.joined_at)
- Storage.set_state_value(guild, self.STATE_KEY_RECENT_JOINS, recent_joins)
- recent_joins.append(member)
-
- async def __update_results_message(self, context: JoinAgeQueryContext) -> None:
- if context.results_message_ref is None:
- return
- bot_message = context.results_message_ref()
- if bot_message is None:
- return
- text = f'The following members joined in the last {context.timespan}\n\n'
- if len(context.join_members) > 0:
- max_members = CONFIG['max_members_per_message']
- for member in context.join_members[:max_members]:
- text += '\n• '
- if member in context.banned_members:
- text += f'~~{member.mention} ({member.id})~~ - banned'
- elif member in context.kicked_members:
- text += f'~~{member.mention} ({member.id})~~ - kicked'
- else:
- text += f'{member.mention} ({member.id})'
- if len(context.join_members) > max_members:
- text += f'\n• {len(context.join_members) - max_members} more'
- else:
- text += 'No members found. If the bot was recently restarted ' + \
- 'or JoinAgeCog just enabled, only new joins will be tracked.'
- await bot_message.set_text(text)
- if len(context.join_members) > 0:
- member_count = len(context.join_members)
- kick_count = len(context.kicked_members)
- ban_count = len(context.banned_members)
- await bot_message.set_reactions(BotMessageReaction.standard_set(
- did_kick=kick_count >= member_count,
- did_ban=ban_count >= member_count,
- user_count=member_count))
|