| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- import argparse
- import re
- import platform
- import subprocess
- from tempfile import TemporaryDirectory
- from typing import List, Union
- from enum import Enum
- from email.utils import parsedate
- from email.parser import BytesParser
- from email.message import EmailMessage
- from zipfile import ZipFile, ZipInfo
- import sys
- import os
-
- class BooleanOperator(Enum):
- and_op = 1
- or_op = 2
-
- class Filter:
- """Base class for message filters."""
- def matches(self, message: EmailMessage) -> bool:
- """Returns true if the given email message matches this filter's criteria."""
- raise "Not implemented"
- def matches_raw(self, raw_content: str) -> bool:
- """Returns true if the given raw, unparsed email content matches this filter's criteria."""
- raise "Not implemented"
-
- class BodyKeywordFilter(Filter):
- """Simple substring search filter."""
- def __init__(self, keyword: str, case_sensitive: bool = False):
- self.keyword: str = keyword
- self.case_sensitive: bool = case_sensitive
-
- def matches(self, message: EmailMessage) -> bool:
- for part in message.walk():
- if part.get_content_maintype() == 'text':
- if self.case_sensitive:
- if self.keyword in part.as_string():
- return True
- else:
- if self.keyword.lower() in part.as_string().lower():
- return True
- return False
-
- def matches_raw(self, raw_content: str) -> bool:
- if self.case_sensitive:
- if self.keyword in raw_content:
- return True
- else:
- if self.keyword.lower() in raw_content.lower():
- return True
- return False
-
- class HeaderFilter(Filter):
- """Matches a value in an email header. Can search one filter or multiple.
- Header names case-insensitive; value is case-insensitive."""
- def __init__(self, headers: Union[str, List[str]], value: str):
- self.headers: List[str] = [headers] if isinstance(headers, str) else headers
- self.value = value
-
- def matches(self, message: EmailMessage) -> bool:
- for header in self.headers:
- val = message.get(header, None)
- if val is None:
- continue
- if self.value.lower() in val.lower():
- return True
- return False
-
- class BooleanFilter(Filter):
- """Combines other filters with OR/AND logic."""
- def __init__(self, operator: BooleanOperator, subfilters: list):
- self.operator = operator
- self.subfilters: List[Filter] = subfilters
-
- def matches(self, message: EmailMessage) -> bool:
- for subfilter in self.subfilters:
- result = subfilter.matches(message)
- if self.operator == BooleanOperator.and_op and not result:
- return False
- if self.operator == BooleanOperator.or_op and result:
- return True
- if self.operator == BooleanOperator.and_op:
- return True
- return False
-
- def matches_raw(self, raw_content: str) -> bool:
- for subfilter in self.subfilters:
- result = subfilter.matches_raw(raw_content)
- if self.operator == BooleanOperator.and_op and not result:
- return False
- if self.operator == BooleanOperator.or_op and result:
- return True
- if self.operator == BooleanOperator.and_op:
- return True
- return False
-
- args: argparse.Namespace = None
- filter = None
- result_count = 0
- zip_result_count = 0
- zip_count = 0
-
- def clean_filename(original: str) -> str:
- """Returns a scrubbed string with safe filename characters."""
- return re.sub(r'[^a-zA-Z0-9 \.!,\(\)\[\]_-]+', '', original)
-
- def filename_from_email(email: EmailMessage) -> str:
- """Creates a safe filename to save the given email to."""
- filename = ''
- date_str = email.get('Date', None)
- if date_str is not None:
- parsed_date = parsedate(date_str)
- if parsed_date is not None:
- filename += f'{parsed_date[0]:04}-{parsed_date[1]:02}-{parsed_date[2]:02}' + \
- f'T{parsed_date[3]:02}.{parsed_date[4]:02}.{parsed_date[5]:02}' + \
- ' - '
- else:
- filename += '0000-00-00T00.00.00 - '
- else:
- filename += '0000-00-00T00.00.00 - '
- subject = email.get('Subject')
- if subject is not None:
- filename += clean_filename(subject)[0:50].strip()
- else:
- filename += '(no subject)'
- filename += '.eml'
- return filename
-
- def walk_directory(dir: str) -> None:
- """Spiders a directory looking for subdirectories and email zip archives."""
- global zip_count
- for f in os.listdir(dir):
- full_path = dir + os.sep + f
- if f.lower().endswith('.zip'):
- zip_count += 1
- process_zip_file(full_path)
- if os.path.isdir(f):
- walk_directory(full_path)
-
- def process_zip_file(zip_path: str) -> None:
- """Processes a zip file of email messages."""
- global zip_result_count
- print('Searching ' + zip_path + '...')
- zip_result_count = 0
- with ZipFile(zip_path, mode='r') as zip:
- for entry in zip.filelist:
- if entry.is_dir():
- continue
- data = zip.read(entry)
- parser = BytesParser()
- try:
- email = parser.parsebytes(data)
- search_content(email, zip_path, entry)
- except UnicodeError:
- print('Unicode error in message. Searching raw content.', file=sys.stderr)
- except:
- print('Error parsing message. Searching raw content.', file=sys.stderr)
- if zip_result_count > 0:
- print(f"\t{zip_result_count} results in zip")
-
- def search_content(email: EmailMessage, zip_path: str, entry: ZipInfo) -> None:
- """Processes an email message in a zip file."""
- global result_count, zip_result_count
- if filter.matches(email):
- if not os.path.exists(output_path):
- os.makedirs(output_path)
- with open(output_path + os.sep + filename_from_email(email), 'wb') as f:
- result_count += 1
- zip_result_count += 1
- f.write(email.as_bytes())
-
- def search_raw_content(raw_bytes: bytes, zip_path: str, entry: ZipInfo) -> None:
- global result_count, zip_result_count
- try:
- content = raw_bytes.decode('iso-8859-1', errors='ignore')
- except:
- try:
- content = raw_bytes.decode('utf-8', errors='ignore')
- except:
- print('Cannot decode email bytes. Skipping.', file=sys.stderr)
-
- def parse_arguments():
- """Parses command-line arguments to `args`."""
- global args
- parser = argparse.ArgumentParser(
- prog='search.py',
- description='Searches a directory of zipped email messages. ' + \
- 'Messages are assumed to be stored one per file within the zip files (Maildir format). ' + \
- 'Input directories are searched recursively for any zip files contained within.',
- epilog='Raw mode will skip parsing each email message and treat them like simple text files. ' + \
- 'The headers and body are all searched together without decoding. ' + \
- 'Arguments for searching individual fields will be ignored. ' + \
- 'This option exists for messages with encoding errors that prevent them from being ' + \
- 'parsed correctly. ' + \
- 'Note that various escaping/encoding schemes commonly used in email messages, such ' + \
- 'as base64, may cause keywords to not be found despite being in the decoded message ' + \
- 'because only the raw encoded content is searched. ' + \
- 'Use this option as a last resort.'
- )
- parser.add_argument(
- 'keywords',
- action='append',
- nargs='+',
- help='one or more phrases to search for in the message body'
- )
- parser.add_argument(
- '--any',
- default=False,
- action='store_true',
- help='matches messages containing any of the given search phrases (default requires all phrases appear in a message)'
- )
- parser.add_argument(
- '-d', '--dir',
- action='append',
- help='directory(s) to search for email zip archives (default is working directory)'
- )
- parser.add_argument(
- '-o', '--output',
- help='directory to copy matching messages to (default is a temp directory)'
- )
- parser.add_argument(
- '-c', '--casesensitive',
- default=False,
- action='store_true',
- help='search case-sensitively (default is case-insensitive)'
- )
- parser.add_argument(
- '-f', '--from',
- help='email address of sender'
- )
- parser.add_argument(
- '-t', '--to',
- help='email address of recipient (searches to:, cc:, bcc: fields)'
- )
- parser.add_argument(
- '-s', '--subject',
- help='searches subject field'
- )
- parser.add_argument(
- '-a', '--after',
- metavar='YYYY-MM-DD',
- help='date to search on or after'
- )
- parser.add_argument(
- '-b', '--before',
- metavar='YYYY-MM-DD',
- help='date to search on or before'
- )
- parser.add_argument(
- '-r', '--raw',
- default=False,
- action='store_true',
- help='searches raw email content (see below)'
- )
- args = parser.parse_args()
-
- if args.before is not None:
- m = re.match('^([0-9]{4})-([0-9]{2})-([0-9]{2})$', args.before)
- if m is None:
- parser.error('before date must be in YYYY-MM-DD format (e.g. 2015-03-28)')
- args.before = [ int(m.group(1)), int(m.group(2)), int(m.group(3)) ]
- if args.after is not None:
- m = re.match('^([0-9]{4})-([0-9]{2})-([0-9]{2})$', args.after)
- if m is None:
- parser.error('after date must be in YYYY-MM-DD format (e.g. 2015-03-28)')
- args.after = [ int(m.group(1)), int(m.group(2)), int(m.group(3)) ]
- if args.raw:
- if getattr(args, 'from') is not None or \
- getattr(args, 'to') is not None or \
- args.subject is not None or \
- args.before is not None or \
- args.after is not None:
- print('Warning: Cannot search header fields in raw mode. Ignoring.', file=sys.stderr)
- if args.dir is None:
- args.dir = [ '.' ]
- else:
- for d in args.dir:
- if not os.path.exists(d) or not os.path.isdir(d):
- parser.error(f'search path \'{d}\' does not exist or is not a directory')
- if args.output is None:
- args.output = TemporaryDirectory(prefix='Email search results (id ', suffix=')').name
- else:
- if not os.path.exists(args.output) or not os.path.isdir(args.output):
- parser.error(f'output path \'{args.output}\' does not exist or is not a directory')
-
- def construct_filter():
- global filter
- criteria: List[Filter] = []
- keyword_filters = map(lambda k : BodyKeywordFilter(k, case_sensitive=args.casesensitive), args.keywords)
- criteria.append(BooleanFilter(BooleanOperator.or_op if args.any else BooleanOperator.and_op, keyword_filters))
- if getattr(args, 'from') is not None:
- criteria.append(HeaderFilter('from', getattr(args, 'from')))
- if getattr(args, 'to') is not None:
- criteria.append(HeaderFilter(['to', 'cc', 'bcc'], getattr(args, 'to')))
- if args.subject is not None:
- criteria.append(HeaderFilter('subject', args.subject))
- # TODO: Dates
- filter = BooleanFilter(BooleanOperator.and_op, criteria)
-
- def handle_results():
- """Final logic after all searching is completed."""
- if result_count > 0:
- if platform.system() == 'Darwin':
- subprocess.call(['open', output_path])
- elif platform.system() == 'Windows':
- subprocess.call(['explorer.exe', output_path])
- print(f'Found {result_count} result(s) total')
- elif zip_count == 0:
- print('No zip files found')
- sys.exit(2)
- else:
- print('No results')
- sys.exit(1)
-
- # Main logic
- parse_arguments()
- construct_filter()
- walk_directory(start_path)
- handle_results()
|