import re import platform import subprocess from tempfile import TemporaryDirectory from typing import List, Union from enum import Enum from email.utils import parsedate from email.parser import BytesParser from email.message import EmailMessage from zipfile import ZipFile, ZipInfo import sys import os class BooleanOperator(Enum): and_op = 1 or_op = 2 class Filter: """Base class for message filters.""" def matches(self, message: EmailMessage) -> bool: raise "Not implemented" class BodyKeywordFilter(Filter): """Simple substring search filter.""" def __init__(self, keyword: str, case_sensitive: bool = False): self.keyword: str = keyword self.case_sensitive: bool = case_sensitive def matches(self, message: EmailMessage) -> bool: for part in message.walk(): if part.get_content_maintype() == 'text': if self.case_sensitive: if self.keyword in part.as_string(): return True else: if self.keyword.lower() in part.as_string().lower(): return True return False class HeaderFilter(Filter): """Matches a value in an email header. Can search one filter or multiple. Header names case-insensitive; value is case-insensitive.""" def __init__(self, headers: Union[str, List[str]], value: str): self.headers: List[str] = [headers] if isinstance(headers, str) else headers self.value = value def matches(self, message: EmailMessage) -> bool: for header in self.headers: val = message.get(header, None) if val is None: continue if self.value.lower() in val.lower(): return True return False class BooleanFilter(Filter): """Combines other filters with OR/AND logic.""" def __init__(self, operator: BooleanOperator, subfilters: list): self.operator = operator self.subfilters: List[Filter] = subfilters def matches(self, message: EmailMessage) -> bool: for subfilter in self.subfilters: result = subfilter.matches(message) if self.operator == BooleanOperator.and_op and not result: return False if self.operator == BooleanOperator.or_op and result: return True if self.operator == BooleanOperator.and_op: return True return False start_path = '.' output_path = TemporaryDirectory(prefix='Email search results (id ', suffix=')').name filter = None case_sensitive = False result_count = 0 zip_result_count = 0 zip_count = 0 def clean_filename(original: str) -> str: """Returns a scrubbed string with safe filename characters.""" return re.sub(r'[^a-zA-Z0-9 \.!,\(\)\[\]_-]+', '', original) def filename_from_email(email: EmailMessage) -> str: """Creates a safe filename to save the given email to.""" filename = '' date_str = email.get('Date', None) if date_str is not None: parsed_date = parsedate(date_str) if parsed_date is not None: filename += f'{parsed_date[0]:04}-{parsed_date[1]:02}-{parsed_date[2]:02}' + \ f'T{parsed_date[3]:02}.{parsed_date[4]:02}.{parsed_date[5]:02}' + \ ' - ' else: filename += '0000-00-00T00.00.00 - ' else: filename += '0000-00-00T00.00.00 - ' subject = email.get('Subject') if subject is not None: filename += clean_filename(subject)[0:50].strip() else: filename += '(no subject)' filename += '.eml' return filename def walk_directory(dir: str) -> None: """Spiders a directory looking for subdirectories and email zip archives.""" global zip_count for f in os.listdir(dir): full_path = dir + os.sep + f if f.lower().endswith('.zip'): zip_count += 1 process_zip_file(full_path) if os.path.isdir(f): walk_directory(full_path) def process_zip_file(zip_path: str) -> None: """Processes a zip file of email messages.""" global zip_result_count print('Searching ' + zip_path + '...') zip_result_count = 0 with ZipFile(zip_path, mode='r') as zip: for entry in zip.filelist: if entry.is_dir(): continue data = zip.read(entry) parser = BytesParser() try: email = parser.parsebytes(data) search_content(email, zip_path, entry) except UnicodeError: print('Unicode error in message. Skipping.') except: print('Error reading message') if zip_result_count > 0: print(f"\t{zip_result_count} results in zip") def search_content(email: EmailMessage, zip_path: str, entry: ZipInfo) -> None: """Processes an email message in a zip file.""" global result_count, zip_result_count if filter.matches(email): if not os.path.exists(output_path): os.makedirs(output_path) with open(output_path + os.sep + filename_from_email(email), 'wb') as f: result_count += 1 zip_result_count += 1 f.write(email.as_bytes()) def parse_arguments(): """Parses the command-line arguments.""" global filter global start_path global output_path global case_sensitive expect = 'script_name' for arg in sys.argv: if arg.startswith('-'): if arg == '-d': expect = 'start_path' elif arg == '-o': expect = 'output_path' elif arg == '-c': case_sensitive = True else: raise f'Unknown argument {arg}' elif expect is not None: if expect == 'script_name': expect = None continue elif expect == 'start_path': start_path = arg expect = None elif expect == 'output_path': output_path = arg expect = None else: raise f'Expected other argument {expect}' else: if filter is None: words = arg.split(' ') word_filters = [] for word in words: word = word.strip() if len(word) == 0: continue word_filters.append(BodyKeywordFilter(word, case_sensitive)) if len(word_filters) == 0: continue filter = BooleanFilter(BooleanOperator.and_op, word_filters) else: print('Too many arguments') sys.exit(4) def validate_arguments(): if filter is None: print('No filter specified') sys.exit(3) pass def handle_results(): """Final logic after all searching is completed.""" if result_count > 0: if platform.system() == 'Darwin': subprocess.call(['open', output_path]) elif platform.system() == 'Windows': subprocess.call(['explorer.exe', output_path]) print(f'Found {result_count} result(s) total') elif zip_count == 0: print('No zip files found') sys.exit(2) else: print('No results') sys.exit(1) # Main logic parse_arguments() validate_arguments() walk_directory(start_path) handle_results()