| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- from discord import Guild, Message
- from discord.ext import commands
- from datetime import timedelta
-
- from cogs.basecog import BaseCog, BotMessage
- from storage import Storage
-
- class Criterion:
- def __init__(self, type, **kwargs):
- self.type = type
- if type == 'contains':
- text = kwargs['text']
- self.text = text
- self.test = lambda m : text.lower() in m.content.lower()
- elif type == 'joinage':
- min = kwargs['min']
- self.min = min
- self.test = lambda m : m.created_at - m.author.joined_at < min
- else:
- raise RuntimeError(f'Unknown criterion type "{type}"')
-
- def matches(self, message: Message) -> bool:
- return self.test(message)
-
- @classmethod
- def decode(cls, val: dict):
- type = val['type']
- if type == 'contains':
- return Criterion(type, text=val['text'])
- elif type == 'joinage':
- return Criterion(type, min=timedelta(seconds=val['min']))
-
- class Pattern:
- def __init__(self, criteria: list, action: str, must_match_all: bool = True):
- self.criteria = criteria
- self.action = action
- self.must_match_all = must_match_all
-
- def matches(self, message: Message) -> bool:
- for criterion in self.criteria:
- crit_matches = criterion.matches(message)
- if crit_matches and not self.must_match_all:
- return True
- if not crit_matches and self.must_match_all:
- return False
- return self.must_match_all
-
- @classmethod
- def decode(cls, val: dict):
- match_all = val.get('must_match_all')
- action = val.get('action')
- encoded_criteria = val.get('criteria')
- criteria = []
- for ec in encoded_criteria:
- criteria.append(Criterion.decode(ec))
- return Pattern(criteria, action, match_all if isinstance(match_all, bool) else True)
-
- class PatternCog(BaseCog):
- def __init__(self, bot):
- super().__init__(bot)
-
- def __patterns(self, guild: Guild) -> list:
- patterns = Storage.get_state_value(guild, 'pattern_patterns')
- if patterns is None:
- patterns_encoded = Storage.get_config_value(guild, 'pattern_patterns')
- if patterns_encoded:
- patterns = []
- for pe in patterns_encoded:
- patterns.append(Pattern.decode(pe))
- Storage.set_state_value(guild, 'pattern_patterns', patterns)
- return patterns
-
- @commands.Cog.listener()
- async def on_message(self, message: Message) -> None:
- if message.guild is None or message.content is None or message.channel is None:
- return
- if message.author.id == self.bot.user.id:
- # Ignore self
- return
- if message.author.permissions_in(message.channel).ban_members:
- # Ignore mods
- return
- patterns = self.__patterns(message.guild)
- for pattern in patterns:
- if pattern.matches(message):
- msg = None
- if pattern.action == 'delete':
- await message.delete()
- msg = f'Message from {message.author.mention} matched ' + \
- 'banned pattern. Deleted.'
- elif pattern.action == 'kick':
- await message.delete()
- await message.author.kick(reason='Rocketbot: Message matched banned pattern')
- msg = f'Message from {message.author.mention} matched ' + \
- 'banned pattern. Message deleted and user kicked.'
- elif pattern.action == 'ban':
- await message.delete()
- await message.author.ban(reason='Rocketbot: Message matched banned pattern')
- msg = f'Message from {message.author.mention} matched ' + \
- 'banned pattern. Message deleted and user banned.'
- if msg:
- m = BotMessage(message.guild,
- text = msg,
- type = BotMessage.TYPE_MOD_WARNING)
- m.quote = message.content
- await self.post_message(m)
- break
|