| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- from discord import Guild, Intents, Member, Message, PartialEmoji, RawReactionActionEvent
- from discord.ext import commands
- from storage import Storage
- from cogs.base import BaseCog
-
- class JoinRecord:
- """
- Data object containing details about a guild join event.
- """
- def __init__(self, member: Member):
- self.member = member
- self.join_time = member.joined_at or datetime.now()
- self.is_kicked = False
- self.is_banned = False
-
- def age_seconds(self, now: datetime) -> float:
- """
- Returns the age of this join in seconds from the given "now" time.
- """
- a = now - self.join_time
- return float(a.total_seconds())
-
- class RaidPhase:
- """
- Enum of phases in a JoinRaid. Phases progress monotonically.
- """
- NONE = 0
- JUST_STARTED = 1
- CONTINUING = 2
- ENDED = 3
-
- class JoinRaid:
- """
- Tracks recent joins to a guild to detect join raids, where a large number of automated users
- all join at the same time.
- """
- def __init__(self):
- self.joins = []
- self.phase = RaidPhase.NONE
- # datetime when the raid started, or None.
- self.raid_start_time = None
- # Message posted to Discord to warn of the raid. Convenience property managed
- # by caller. Ignored by this class.
- self.warning_message = None
-
- def handle_join(self,
- member: Member,
- now: datetime,
- max_age_seconds: float,
- max_join_count: int) -> None:
- """
- Processes a new member join to a guild and detects join raids. Updates
- self.phase and self.raid_start_time properties.
- """
- # Check for existing record for this user
- print(f'handle_join({member.name}) start')
- join: JoinRecord = None
- i: int = 0
- while i < len(self.joins):
- elem = self.joins[i]
- if elem.member.id == member.id:
- print(f'Member {member.name} already in join list at index {i}. Removing.')
- join = self.joins.pop(i)
- join.join_time = now
- break
- i += 1
- # Add new record to end
- self.joins.append(join or JoinRecord(member))
- # Check raid status and do upkeep
- self.__process_joins(now, max_age_seconds, max_join_count)
- print(f'handle_join({member.name}) end')
-
- def __process_joins(self,
- now: datetime,
- max_age_seconds: float,
- max_join_count: int) -> None:
- """
- Processes self.joins after each addition, detects raids, updates self.phase,
- and throws out unneeded records.
- """
- print('__process_joins {')
- i: int = 0
- recent_count: int = 0
- should_cull: bool = self.phase == RaidPhase.NONE
- while i < len(self.joins):
- join: JoinRecord = self.joins[i]
- age: float = join.age_seconds(now)
- is_old: bool = age > max_age_seconds
- if not is_old:
- recent_count += 1
- print(f'- {i}. {join.member.name} is {age}s old - recent_count={recent_count}')
- if is_old and should_cull:
- self.joins.pop(i)
- print(f'- {i}. {join.member.name} is {age}s old - too old, removing')
- else:
- print(f'- {i}. {join.member.name} is {age}s old - moving on to next')
- i += 1
- is_raid = recent_count > max_join_count
- print(f'- is_raid {is_raid}')
- if is_raid:
- if self.phase == RaidPhase.NONE:
- self.phase = RaidPhase.JUST_STARTED
- self.raid_start_time = now
- print('- Phase moved to JUST_STARTED. Recording raid start time.')
- elif self.phase == RaidPhase.JUST_STARTED:
- self.phase = RaidPhase.CONTINUING
- print('- Phase moved to CONTINUING.')
- elif self.phase == self.phase in (RaidPhase.JUST_STARTED, RaidPhase.CONTINUING):
- self.phase = RaidPhase.ENDED
- print('- Phase moved to ENDED.')
-
- # Undo join add if the raid is over
- if self.phase == RaidPhase.ENDED and len(self.joins) > 0:
- last = self.joins.pop(-1)
- print(f'- Popping last join for {last.member.name}')
- print('} __process_joins')
-
- async def kick_all(self,
- reason: str = "Part of join raid") -> list[Member]:
- """
- Kicks all users in this join raid. Skips users who have already been
- flagged as having been kicked or banned. Returns a List of Members
- who were newly kicked.
- """
- kicks = []
- for join in self.joins:
- if join.is_kicked or join.is_banned:
- continue
- await join.member.kick(reason=reason)
- join.is_kicked = True
- kicks.append(join.member)
- self.phase = RaidPhase.ENDED
- return kicks
-
- async def ban_all(self,
- reason: str = "Part of join raid",
- delete_message_days: int = 0) -> list[Member]:
- """
- Bans all users in this join raid. Skips users who have already been
- flagged as having been banned. Users who were previously kicked can
- still be banned. Returns a List of Members who were newly banned.
- """
- bans = []
- for join in self.joins:
- if join.is_banned:
- continue
- await join.member.ban(reason=reason, delete_message_days=delete_message_days)
- join.is_banned = True
- bans.append(join.member)
- self.phase = RaidPhase.ENDED
- return bans
-
- class JoinRaidCog(BaseCog):
- """
- Cog for monitoring member joins and detecting potential bot raids.
- """
- def __init__(self, bot):
- self.bot = bot
-
- @commands.group(
- brief='Manages join raid detection and handling',
- )
- @commands.has_permissions(ban_members=True)
- @commands.guild_only()
- async def joinraid(self, context: commands.Context):
- 'Command group'
- if context.invoked_subcommand is None:
- await context.send_help()
-
- @joinraid.command(
- name='enable',
- brief='Enables join raid detection',
- description='Join raid detection is off by default.',
- )
- async def joinraid_enable(self, context: commands.Context):
- 'Command handler'
- # TODO
- pass
-
- @joinraid.command(
- name='disable',
- brief='Disables join raid detection',
- description='Join raid detection is off by default.',
- )
- async def joinraid_disable(self, context: commands.Context):
- 'Command handler'
- # TODO
- pass
-
- @joinraid.command(
- name='setrate',
- brief='Sets the rate of joins which triggers a warning to mods',
- description='Each time a member joins, the join records from the ' +
- 'previous _x_ seconds are counted up, where _x_ is the number of ' +
- 'seconds configured by this command. If that count meets or ' +
- 'exceeds the maximum join count configured by this command then ' +
- 'a raid is detected and a warning is issued to the mods.',
- usage='<join_count> <seconds>',
- )
- async def joinraid_setrate(self, context: commands.Context,
- joinCount: int,
- seconds: int):
- 'Command handler'
- # TODO
- pass
-
- @joinraid.command(
- name='getrate',
- brief='Shows the rate of joins which triggers a warning to mods',
- )
- async def joinraid_getrate(self, context: commands.Context):
- 'Command handler'
- # TODO
- pass
-
- @commands.Cog.listener()
- async def on_raw_reaction_add(self, payload: RawReactionActionEvent):
- 'Event handler'
- # TODO
- pass
-
- @commands.Cog.listener()
- async def on_member_join(self, member: Member) -> None:
- 'Event handler'
- # TODO
- pass
-
- async def __send_warning(self) -> None:
- config = self.bot.get_cog('ConfigCog')
- if config is None:
- return
- config.
|