"""Script for searching email messages in a collection of zip files of raw email message files.""" import argparse import os import platform import re import subprocess import sys from email.message import EmailMessage from email.parser import BytesParser from email.utils import parsedate from enum import Enum from tempfile import TemporaryDirectory from typing import List, Optional, Union from zipfile import ZipFile class BooleanOperator(Enum): """Boolean combinatory operator enum.""" and_op = 1 or_op = 2 class Filter: """Base class for message filters.""" def matches(self, message: EmailMessage) -> bool: """Returns true if the given email message matches this filter's criteria.""" raise AssertionError("Not implemented") def matches_raw(self, raw_content: str) -> bool: """Returns true if the given raw, unparsed email content matches this filter's criteria.""" raise AssertionError("Not implemented") def supports_raw(self) -> bool: """Whether this filter supports raw messages at least partially.""" return False class BodyKeywordFilter(Filter): """Simple substring search filter.""" def __init__(self, keyword: str, case_sensitive: bool = False): self.keyword: str = keyword self.case_sensitive: bool = case_sensitive def matches(self, message: EmailMessage) -> bool: for part in message.walk(): if part.get_content_maintype() == 'text': if self.case_sensitive: if self.keyword in part.as_string(): return True else: if self.keyword.lower() in part.as_string().lower(): return True return False def matches_raw(self, raw_content: str) -> bool: if self.case_sensitive: if self.keyword in raw_content: return True else: if self.keyword.lower() in raw_content.lower(): return True return False def supports_raw(self) -> bool: return True class HeaderFilter(Filter): """Matches a value in an email header. Can search one filter or multiple. Header names case-insensitive; value is case-insensitive.""" def __init__(self, headers: Union[str, List[str]], value: str): self.headers: List[str] = [headers] if isinstance(headers, str) else headers self.value = value def matches(self, message: EmailMessage) -> bool: for header in self.headers: val = message.get(header, None) if val is None: continue if self.value.lower() in val.lower(): return True return False class BooleanFilter(Filter): """Combines other filters with OR/AND logic.""" def __init__(self, operator: BooleanOperator, subfilters: list): self.operator = operator self.subfilters: List[Filter] = subfilters def matches(self, message: EmailMessage) -> bool: for subfilter in self.subfilters: result = subfilter.matches(message) if self.operator == BooleanOperator.and_op and not result: return False if self.operator == BooleanOperator.or_op and result: return True if self.operator == BooleanOperator.and_op: return True return False def matches_raw(self, raw_content: str) -> bool: for subfilter in self.subfilters: result = subfilter.matches_raw(raw_content) if self.operator == BooleanOperator.and_op and not result: return False if self.operator == BooleanOperator.or_op and result: return True if self.operator == BooleanOperator.and_op: return True return False def supports_raw(self) -> bool: for subfilter in self.subfilters: if subfilter.supports_raw(): return True return False class DateFilter(Filter): """Filters messages based on the date field. For each message with a parseable date field, the given comparator is called with a `maketime` list representation of the date and time. The comparator must return a bool of whether to match the given date or not.""" def __init__(self, comparator): self.comparator = comparator def matches(self, message: EmailMessage) -> bool: date_str = message.get('date', None) if date_str is None: return False date_elems = parsedate(date_str) if date_elems is None: return False return self.comparator(date_elems) class Options: """Parsed command-line options.""" def __init__(self): self.keywords: List[str] = [] self.any: bool = False self.dir: List[str] = [] self.output: Optional[str] = None self.casesensitive: bool = False setattr(self, 'from', None) setattr(self, 'to', None) self.subject: Optional[str] = None self.before: Optional[List[int]] = None self.after: Optional[List[int]] = None args: Options = Options() message_filter: Filter = None result_count = 0 zip_result_count = 0 zip_count = 0 parser: argparse.ArgumentParser = None def compare_dates(a: List[int], b: List[int]) -> int: """Compares two list representations of `maketime` date-times. Returns -1 if a < b, 1 if a > b, and 0 if they are equal.""" for i in range(6): a_elem = a[i] if i < len(a) else -1 b_elem = b[i] if i < len(b) else -1 if a_elem < b_elem: return -1 if a_elem > b_elem: return 1 return 0 def clean_filename(original: str) -> str: """Returns a scrubbed string with safe filename characters.""" return re.sub(r'[^a-zA-Z0-9 \.!,\(\)\[\]_-]+', '', original) def filename_from_email(email: EmailMessage) -> str: """Creates a safe filename to save the given email to.""" filename = '' date_str = email.get('date', None) if date_str is not None: parsed_date = parsedate(date_str) if parsed_date is not None: filename += f'{parsed_date[0]:04}-{parsed_date[1]:02}-{parsed_date[2]:02}' + \ f'T{parsed_date[3]:02}.{parsed_date[4]:02}.{parsed_date[5]:02}' + \ ' - ' else: filename += '0000-00-00T00.00.00 - ' else: filename += '0000-00-00T00.00.00 - ' filename += f'{result_count:04} - ' subject = email.get('subject') if subject is not None: filename += clean_filename(subject)[0:50].strip() else: filename += '(no subject)' filename += '.eml' return filename def walk_directory(directory: str) -> None: """Spiders a directory looking for subdirectories and email zip archives.""" global zip_count for f in os.listdir(directory): full_path = directory + os.sep + f if f.lower().endswith('.zip'): zip_count += 1 process_zip_file(full_path) if os.path.isdir(full_path): walk_directory(full_path) def process_zip_file(zip_path: str) -> None: """Processes a zip file of email messages.""" global parser, zip_result_count print('Searching ' + zip_path + '...') zip_result_count = 0 with ZipFile(zip_path, mode='r') as z: for entry in z.filelist: if entry.is_dir(): continue data = z.read(entry) parser = BytesParser() try: email = parser.parsebytes(data) search_content(email) except: if message_filter.supports_raw(): search_raw_content(data) else: print('Message cannot be parsed. Skipping.') if zip_result_count > 0: print(f"\t{zip_result_count} results in zip") def search_content(email: EmailMessage) -> None: """Processes an email message in a zip file.""" if message_filter.matches(email): save_message(email) def search_raw_content(raw_bytes: bytes) -> None: """Searches an unparsed email message.""" encodings = [ 'ascii', 'iso-8859-1', 'utf-8' ] content = None for encoding in encodings: try: content = raw_bytes.decode(encoding) break except: pass if content is None: print('Cannot decode email bytes. Skipping message.', file=sys.stderr) return print('Could not parse message. Searching raw content.', file=sys.stderr) if message_filter.matches_raw(content): save_raw_message(raw_bytes) def save_message(email: EmailMessage) -> None: """Saves a matching message to the results directory.""" global result_count, zip_result_count if not os.path.exists(args.output): os.makedirs(args.output) with open(args.output + os.sep + filename_from_email(email), 'wb') as f: result_count += 1 zip_result_count += 1 f.write(email.as_bytes()) def save_raw_message(content: bytes) -> None: """Saves an unparseable matching message to the results directory.""" global result_count, zip_result_count if not os.path.exists(args.output): os.makedirs(args.output) filename = f'unparseable-match-{result_count:04}.eml' with open(args.output + os.sep + filename, 'wb') as f: result_count += 1 zip_result_count += 1 f.write(content) def parse_arguments(): """Parses command-line arguments to `args`.""" global args, parser parser = argparse.ArgumentParser( prog='search.py', description='Searches a directory of zipped email messages. ' + \ 'Messages are assumed to be stored one per file within the zip files (Maildir format). ' + \ 'Input directories are searched recursively for any zip files contained within.' ) parser.add_argument( 'keywords', action='append', nargs='*', help='one or more phrases to search for in the message body' ) parser.add_argument( '--any', default=False, action='store_true', help='matches messages containing any of the given search phrases (default requires ' + \ 'all phrases appear in a message)' ) parser.add_argument( '-d', '--dir', action='append', help='directory(s) to search for email zip archives (default is working directory)' ) parser.add_argument( '-o', '--output', help='directory to copy matching messages to (default is a temp directory)' ) parser.add_argument( '-c', '--casesensitive', default=False, action='store_true', help='search case-sensitively (default is case-insensitive)' ) parser.add_argument( '-f', '--from', metavar='sender-email', help='email address of sender' ) parser.add_argument( '-t', '--to', metavar='recipient-email', help='email address of recipient (searches to:, cc:, bcc: fields)' ) parser.add_argument( '-s', '--subject', help='searches subject field' ) parser.add_argument( '-a', '--after', metavar='YYYY-MM-DD', help='date to search on or after' ) parser.add_argument( '-b', '--before', metavar='YYYY-MM-DD', help='date to search on or before' ) args = parser.parse_args() def validate_arguments(): """Validate and parse special field types""" args.keywords = args.keywords[0] # no idea why it nests it 2D if args.before is not None: m = re.match('^([0-9]{4})-([0-9]{2})-([0-9]{2})$', args.before) if m is None: parser.error('before date must be in YYYY-MM-DD format (e.g. 2015-03-28)') args.before = [ int(m.group(1)), int(m.group(2)), int(m.group(3)) ] if args.after is not None: m = re.match('^([0-9]{4})-([0-9]{2})-([0-9]{2})$', args.after) if m is None: parser.error('after date must be in YYYY-MM-DD format (e.g. 2015-03-28)') args.after = [ int(m.group(1)), int(m.group(2)), int(m.group(3)) ] if args.dir is None: args.dir = [ '.' ] else: for d in args.dir: if not os.path.exists(d) or not os.path.isdir(d): parser.error(f'search path \'{d}\' does not exist or is not a directory') if args.output is None: args.output = TemporaryDirectory(prefix='Email search results (id ', suffix=')').name else: if not os.path.exists(args.output) or not os.path.isdir(args.output): parser.error(f'output path \'{args.output}\' does not exist or is not a directory') def construct_filter(): """Sets `filter` from parsed command line arguments.""" global message_filter criteria: List[Filter] = [] keyword_filters = [] for k in args.keywords: k = k.strip() if len(k) > 0: keyword_filters.append(BodyKeywordFilter(k, case_sensitive=args.casesensitive)) if len(keyword_filters) > 0: op = BooleanOperator.or_op if args.any else BooleanOperator.and_op criteria.append(BooleanFilter(op, keyword_filters)) if getattr(args, 'from') is not None: criteria.append(HeaderFilter('from', getattr(args, 'from'))) if getattr(args, 'to') is not None: criteria.append(HeaderFilter(['to', 'cc', 'bcc'], getattr(args, 'to'))) if args.subject is not None: criteria.append(HeaderFilter('subject', args.subject)) if args.before is not None: criteria.append(DateFilter(lambda d: compare_dates(d, args.before) <= 0)) if args.after is not None: criteria.append(DateFilter(lambda d: compare_dates(d, args.after) >= 0)) if len(criteria) == 0: parser.error('No filters specified') message_filter = BooleanFilter(BooleanOperator.and_op, criteria) def handle_results(): """Final logic after all searching is completed.""" if result_count > 0: if platform.system() == 'Darwin': subprocess.call(['open', args.output]) elif platform.system() == 'Windows': subprocess.call(['explorer.exe', args.output]) print(f'Found {result_count} result(s) total') elif zip_count == 0: print('No zip files found') sys.exit(2) else: print('No results') sys.exit(1) # Main logic parse_arguments() validate_arguments() construct_filter() for path in args.dir: walk_directory(path) handle_results()