Experimental Discord bot written in Python
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

joinagecog.py 5.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import weakref
  2. from datetime import datetime, timedelta
  3. from discord import Guild, Member
  4. from discord.ext import commands
  5. from config import CONFIG
  6. from rocketbot.cogs.basecog import BaseCog, BotMessage, BotMessageReaction, CogSetting
  7. from rocketbot.collections import AgeBoundList
  8. from rocketbot.storage import Storage
  9. from rocketbot.utils import timedelta_from_str
  10. class JoinAgeQueryContext:
  11. """
  12. Data about a join age query
  13. """
  14. def __init__(self, join_members: list, timespan: str):
  15. self.join_members = list(join_members)
  16. self.timespan = timespan
  17. self.kicked_members = set()
  18. self.banned_members = set()
  19. self.results_message_ref = None
  20. class JoinAgeCog(BaseCog, name='Join Age'):
  21. """
  22. Cog for finding users by when they joined.
  23. """
  24. SETTING_ENABLED = CogSetting('enabled', bool,
  25. brief='join age',
  26. description='Whether this cog is enabled for a guild.')
  27. SETTING_JOIN_TIME = CogSetting('jointime', float,
  28. brief='maximum length of time to track new joins',
  29. description='The number of seconds of join history to maintain.',
  30. usage='<seconds:float>',
  31. min_value=1.0)
  32. STATE_KEY_RECENT_JOINS = "JoinAgeCog.recent_joins"
  33. def __init__(self, bot):
  34. super().__init__(bot)
  35. self.add_setting(JoinAgeCog.SETTING_ENABLED)
  36. self.add_setting(JoinAgeCog.SETTING_JOIN_TIME)
  37. @commands.group(
  38. brief='Tracks recently joined users with options to mass kick or ban',
  39. )
  40. @commands.has_permissions(ban_members=True)
  41. @commands.guild_only()
  42. async def joinage(self, context: commands.Context):
  43. 'Join age tracking'
  44. if context.invoked_subcommand is None:
  45. await context.send_help()
  46. @joinage.command(
  47. brief='Queries for users who joined in the past span of time',
  48. description='Searches for users who joined the server recently. ' + \
  49. 'Can use time spans like 30s, 5m, 1h, 7d, etc.',
  50. usage='<time_period>'
  51. )
  52. async def search(self, context: commands.Context, timespan: str):
  53. 'Command handler'
  54. guild: Guild = context.guild
  55. recent_joins: AgeBoundList = Storage.get_state_value(guild, self.STATE_KEY_RECENT_JOINS)
  56. if recent_joins is None:
  57. max_age: timedelta = timedelta(seconds=self.get_guild_setting(guild, self.SETTING_JOIN_TIME))
  58. recent_joins = AgeBoundList(max_age, lambda i, member : member.joined_at)
  59. Storage.set_state_value(guild, self.STATE_KEY_RECENT_JOINS, recent_joins)
  60. results: list = []
  61. ts: timedelta = timedelta_from_str(timespan)
  62. cutoff: datetime = datetime.utcnow() - ts
  63. for member in recent_joins:
  64. if member.joined_at > cutoff:
  65. results.append(member)
  66. ctx = JoinAgeQueryContext(results, timespan)
  67. msg = BotMessage(guild,
  68. text='',
  69. type=BotMessage.TYPE_INFO,
  70. context=ctx)
  71. ctx.results_message_ref = weakref.ref(msg)
  72. await self.__update_results_message(ctx)
  73. await self.post_message(msg)
  74. async def on_mod_react(self,
  75. bot_message: BotMessage,
  76. reaction: BotMessageReaction,
  77. reacted_by: Member) -> None:
  78. guild: Guild = bot_message.guild
  79. ctx: JoinAgeQueryContext = bot_message.context
  80. if reaction.emoji == CONFIG['kick_emoji']:
  81. to_kick = set(ctx.join_members) - ctx.kicked_members
  82. for member in to_kick:
  83. await member.kick(
  84. reason=f'Rocketbot: Mass kick based on join age, by {reacted_by.name}.')
  85. ctx.kicked_members |= to_kick
  86. await self.__update_results_message(ctx)
  87. self.log(guild, f'Users kicked by {reacted_by.name}.')
  88. elif reaction.emoji == CONFIG['ban_emoji']:
  89. to_ban = set(ctx.join_members) - ctx.banned_members
  90. for member in to_ban:
  91. await member.ban(
  92. reason=f'Rocketbot: Mass ban based on join age, by {reacted_by.name}.',
  93. delete_message_days=0)
  94. ctx.banned_members |= to_ban
  95. await self.__update_results_message(ctx)
  96. self.log(guild, f'Users banned by {reacted_by.name}')
  97. @commands.Cog.listener()
  98. async def on_member_join(self, member: Member) -> None:
  99. 'Event handler'
  100. guild: Guild = member.guild
  101. if not self.get_guild_setting(guild, self.SETTING_ENABLED):
  102. return
  103. recent_joins: AgeBoundList = Storage.get_state_value(guild, self.STATE_KEY_RECENT_JOINS)
  104. if recent_joins is None:
  105. max_age: timedelta = timedelta(seconds=self.get_guild_setting(guild, self.SETTING_JOIN_TIME))
  106. recent_joins = AgeBoundList(max_age, lambda i, member : member.joined_at)
  107. Storage.set_state_value(guild, self.STATE_KEY_RECENT_JOINS, recent_joins)
  108. recent_joins.append(member)
  109. async def __update_results_message(self, context: JoinAgeQueryContext) -> None:
  110. if context.results_message_ref is None:
  111. return
  112. bot_message = context.results_message_ref()
  113. if bot_message is None:
  114. return
  115. text = f'The following members joined in the last {context.timespan}\n\n'
  116. if len(context.join_members) > 0:
  117. for member in context.join_members:
  118. text += '\n• '
  119. if member in context.banned_members:
  120. text += f'~~{member.mention} ({member.id})~~ - banned'
  121. elif member in context.kicked_members:
  122. text += f'~~{member.mention} ({member.id})~~ - kicked'
  123. else:
  124. text += f'{member.mention} ({member.id})'
  125. else:
  126. text += 'No members found. If the bot was recently restarted ' + \
  127. 'or JoinAgeCog just enabled, only new joins will be tracked.'
  128. await bot_message.set_text(text)
  129. if len(context.join_members) > 0:
  130. member_count = len(context.join_members)
  131. kick_count = len(context.kicked_members)
  132. ban_count = len(context.banned_members)
  133. await bot_message.set_reactions(BotMessageReaction.standard_set(
  134. did_kick=kick_count >= member_count,
  135. did_ban=ban_count >= member_count,
  136. user_count=member_count))