diff --git a/cardinal_pythonlib/openxml/grep_in_openxml.py b/cardinal_pythonlib/openxml/grep_in_openxml.py index 2d1c0ff..9c0a03b 100644 --- a/cardinal_pythonlib/openxml/grep_in_openxml.py +++ b/cardinal_pythonlib/openxml/grep_in_openxml.py @@ -37,44 +37,219 @@ """ from argparse import ArgumentParser +from enum import Enum import logging import multiprocessing import os import re from sys import argv, getdefaultencoding, stdin -from typing import Pattern +from typing import Optional, Union +from xml.etree import ElementTree from zipfile import BadZipFile, ZipFile import zlib from rich_argparse import RawDescriptionRichHelpFormatter from cardinal_pythonlib.logs import ( - BraceStyleAdapter, main_only_quicksetup_rootlogger, ) from cardinal_pythonlib.fileops import gen_filenames -log = BraceStyleAdapter(logging.getLogger(__name__)) +log = logging.getLogger(__name__) + + +class GrepSearchSubstrate(Enum): + XML_TEXT = 1 + RAW_TEXT = 2 + INNER_FILENAME = 3 + + +class GrepReportContent(Enum): + CONTENTS_MATCHING = 1 + CONTENTS_NOT_MATCHING = 2 + FILENAMES_MATCHING = 3 + FILENAMES_NOT_MATCHING = 4 + + +class GrepMode: + def __init__( + self, + pattern: str, + ignore_case: bool = False, + search_mode: Optional[GrepSearchSubstrate] = None, + search_raw_text: bool = False, + search_inner_filename: bool = False, + report_mode: Optional[GrepReportContent] = None, + report_invert_match: bool = False, + report_files_with_matches: bool = False, + report_files_without_match: bool = False, + display_no_filename: bool = False, + display_inner_filename: bool = False, + ) -> None: + """ + Args: + pattern: + What pattern to search for? + ignore_case: + Use a case-insensitive search. + + search_mode: + Where to search? Specify an enum-based search mode directly. + search_raw_text: + Boolean flag alternative to search_mode. Search raw text + (rather than the default of XML node text)? (Cannot be combined + with search_mode, or search_inner_filename.) + search_inner_filename: + Boolean flag alternative to search_mode. Search inner filename + (rather than the default of XML node text)? (Cannot be combined + with search_mode, or search_raw_text.) + + report_mode: + How to report? Specify an enum-based report mode directly. + report_invert_match: + Boolean flag alternative to report_mode. Inverts grep-like + behaviour, reporting lines that do not match. (Cannot be + combined with report_files_without_match or + report_files_with_matches.) + report_files_with_matches: + Boolean flag alternative to report_mode. Show filenames of + files with matches. (Cannot be combined with invert_match or + report_files_without_match.) + report_files_without_match: + Boolean flag alternative to report_mode. Show filenames of + files without matches. (Cannot be combined with invert_match or + report_files_with_matches.) + + display_no_filename: + For hits, omit the filename of the OpenXML (ZIP) file. + display_inner_filename: + For hits, show the filenames of inner files, within each + OpenXML (ZIP) file. + """ + # self.search_mode: what to search + if search_mode is not None: + if search_raw_text or search_inner_filename: + raise ValueError( + "Can't specify search_raw_text or search_inner_filename " + "if you specify search_mode" + ) + self.search_mode = search_mode + else: + if search_raw_text and search_inner_filename: + raise ValueError( + "Can't specify both 'search_raw_text' and " + "'search_inner_filename' options" + ) + if search_raw_text: + self.search_mode = GrepSearchSubstrate.RAW_TEXT + elif search_inner_filename: + self.search_mode = GrepSearchSubstrate.INNER_FILENAME + else: + # Default is nothing is specified + self.search_mode = GrepSearchSubstrate.XML_TEXT + + self.invert_match = report_invert_match + + # self.regex: what to search for + self.pattern = pattern + self.ignore_case = ignore_case + if self.use_byte_regex: + # Create a regex for type: bytes + encoding = getdefaultencoding() + final_pattern = pattern.encode(encoding) + else: + # Create a regex for type: str + final_pattern = pattern + flags = re.IGNORECASE if ignore_case else 0 + self.regex = re.compile(final_pattern, flags) + + # self.report_mode: what to report + n_report_booleans = sum( + [ + report_invert_match, + report_files_with_matches, + report_files_without_match, + ] + ) + if report_mode is not None: + if n_report_booleans > 0: + raise ValueError( + "Can't specify report_invert_match, " + "report_files_with_matches, or report_files_without_match " + "if you specify report_mode" + ) + self.report_mode = report_mode + else: + if n_report_booleans > 1: + raise ValueError( + "Specify at most one of: report_invert_match, " + "report_files_with_matches, report_files_without_match" + ) + if report_invert_match: + self.report_mode = GrepReportContent.CONTENTS_NOT_MATCHING + elif report_files_with_matches: + self.report_mode = GrepReportContent.FILENAMES_MATCHING + elif report_files_without_match: + self.report_mode = GrepReportContent.FILENAMES_NOT_MATCHING + else: + # default + self.report_mode = GrepReportContent.CONTENTS_MATCHING + + self.display_no_filename = display_no_filename + self.display_inner_filename = display_inner_filename + + def __repr__(self) -> str: + return ( + f"GrepMode(pattern={self.pattern!r}, " + f"ignore_case={self.ignore_case}, " + f"search_mode={self.search_mode}, " + f"report_mode={self.report_mode}, " + f"display_no_filename={self.display_no_filename}, " + f"display_inner_filename={self.display_inner_filename})" + ) + + def __str__(self) -> str: + return repr(self) + + @property + def use_byte_regex(self) -> bool: + return self.search_mode == GrepSearchSubstrate.RAW_TEXT + + @property + def report_hit_lines(self) -> bool: + return self.report_mode == GrepReportContent.CONTENTS_MATCHING + + @property + def report_miss_lines(self) -> bool: + return self.report_mode == GrepReportContent.CONTENTS_NOT_MATCHING + + @property + def report_files_with_matches(self) -> bool: + return self.report_mode == GrepReportContent.FILENAMES_MATCHING + + @property + def report_files_without_match(self) -> bool: + return self.report_mode == GrepReportContent.FILENAMES_NOT_MATCHING def report_hit_filename( - zipfilename: str, contentsfilename: str, show_inner_file: bool + zipfilename: str, inner_filename: str, display_inner_filename: bool ) -> None: """ For "hits": prints either the ``.zip`` filename, or the ``.zip`` filename and the inner filename. Args: - zipfilename: filename of the ``.zip`` file - contentsfilename: filename of the inner file - show_inner_file: if ``True``, show both; if ``False``, show just the - ``.zip`` filename - - Returns: - + zipfilename: + Filename of the outer OpenXML/zip file. + inner_filename: + Filename of the inner file. + display_inner_filename: + If True, show both outer and inner filename; if False, show just + the outer (OpenXML/zip) filename. """ - if show_inner_file: - print(f"{zipfilename} [{contentsfilename}]") + if display_inner_filename: + print(f"{zipfilename} [{inner_filename}]") else: print(zipfilename) @@ -87,117 +262,212 @@ def report_miss_filename(zipfilename: str) -> None: def report_line( - zipfilename: str, contentsfilename: str, line: str, show_inner_file: bool + zipfilename: str, + inner_filename: str, + line: Union[bytes, str], + display_no_filename: bool, + display_inner_filename: bool, ) -> None: """ Prints a line from a file, with the ``.zip`` filename and optionally also the inner filename. Args: - zipfilename: filename of the ``.zip`` file - contentsfilename: filename of the inner file - line: the line from the inner file - show_inner_file: if ``True``, show both filenames; if ``False``, show - just the ``.zip`` filename + zipfilename: + Filename of the ``.zip`` file. + inner_filename: + Filename of the inner file. + line: + The line from the inner file. + display_no_filename: + Skip display of the outer filename. + display_inner_filename: + (Only applicable if no_filename is False.) If True, show both + outer and inner filename; if False, show just the outer + (OpenXML/zip) filename. """ - if show_inner_file: - print(f"{zipfilename} [{contentsfilename}]: {line}") + if display_no_filename: + print(line) + elif display_inner_filename: + print(f"{zipfilename} [{inner_filename}]: {line}") else: print(f"{zipfilename}: {line}") -def parse_zip( - zipfilename: str, - regex: Pattern, - invert_match: bool, - files_with_matches: bool, - files_without_match: bool, - grep_inner_file_name: bool, - show_inner_file: bool, -) -> None: +def parse_zip(zipfilename: str, mode: GrepMode) -> None: """ Implement a "grep within an OpenXML file" for a single OpenXML file, which is by definition a ``.zip`` file. Args: - zipfilename: name of the OpenXML (zip) file - regex: regular expression to match - invert_match: find files that do NOT match, instead of ones that do? - files_with_matches: show filenames of files with a match? - files_without_match: show filenames of files with no match? - grep_inner_file_name: search the names of "inner" files, rather than - their contents? - show_inner_file: show the names of the "inner" files, not just the - "outer" (OpenXML) file? + zipfilename: + Name of the OpenXML (zip) file. + mode: + Object configuring grep-type mode. """ - assert not (files_without_match and files_with_matches) - report_lines = (not files_without_match) and (not files_with_matches) - report_hit_lines = report_lines and not invert_match - report_miss_lines = report_lines and invert_match - log.debug("Checking ZIP: " + zipfilename) + log.debug(f"Checking OpenXML ZIP: {zipfilename}") + + # Cache for speed: + search_mode = mode.search_mode + regex_search = mode.regex.search + report_files_with_matches = mode.report_files_with_matches + report_hit_lines = mode.report_hit_lines + report_miss_lines = mode.report_miss_lines + display_no_filename = mode.display_no_filename + display_inner_filename = mode.display_inner_filename + + # Local data: found_in_zip = False - try: - with ZipFile(zipfilename, "r") as zf: - for contentsfilename in zf.namelist(): - log.debug("... checking file: " + contentsfilename) - if grep_inner_file_name: - found_in_filename = bool(regex.search(contentsfilename)) - found_in_zip = found_in_zip or found_in_filename - if files_with_matches and found_in_zip: - report_hit_filename( - zipfilename, contentsfilename, show_inner_file - ) - return - if (report_hit_lines and found_in_filename) or ( - report_miss_lines and not found_in_filename - ): - report_line( - zipfilename, - contentsfilename, - contentsfilename, - show_inner_file, - ) - else: + # Have we found something in this zip file? May be used for early abort. + + def _report( + _found_locally: bool, + _innerfilename: str, + _to_report: Union[bytes, str], + ) -> bool: + """ + Reporting function. This gets called more often than you might think, + including for lines that do not need reporting, but this is to simplify + the handling of "invert_match" (which may require all non-match lines + to be reported). + + Arguments: + _found_locally: + Have we found a match in a current line? + _innerfilename: + The name of the inner file we are currently searching. + _to_report: + The text (usually a line, possibly the inner filename) that + should be reported, if we report something. It might be + matching text, or non-matching text. + + Returns: + Are we done for this ZIP file (should the outer function return)? + """ + if report_files_with_matches and found_in_zip: + report_hit_filename( + zipfilename=zipfilename, + inner_filename=_innerfilename, + display_inner_filename=display_inner_filename, + ) + return True + if (report_hit_lines and _found_locally) or ( + report_miss_lines and not _found_locally + ): + report_line( + zipfilename=zipfilename, + inner_filename=_innerfilename, + line=_to_report, + display_no_filename=display_no_filename, + display_inner_filename=display_inner_filename, + ) + return False + + def _search_inner_file(zf: ZipFile, innerfilename: str) -> bool: + """ + Deal with a single inner file. + + Arguments: + zf: + zip file + innerfilename: + inner filename + + Returns: + Are we done for this ZIP file (should the outer function return)? + """ + nonlocal found_in_zip + if search_mode == GrepSearchSubstrate.INNER_FILENAME: + # ----------------------------------------------------------------- + # Search the (inner) filename + # ----------------------------------------------------------------- + # log.debug("... ... searching filename") + found_in_filename = bool(regex_search(innerfilename)) + found_in_zip |= found_in_filename + done = _report( + _found_locally=found_in_filename, + _innerfilename=innerfilename, + _to_report=innerfilename, + ) + return done + + if search_mode == GrepSearchSubstrate.RAW_TEXT: + # ----------------------------------------------------------------- + # Search textually, line by line + # --------------------------------------------------------- + # log.debug("... ... searching plain text") + try: + with zf.open(innerfilename, "r") as file: + try: + for line in file.readlines(): + # "line" is of type "bytes" + found_in_line = bool(regex_search(line)) + found_in_zip |= found_in_line + done = _report( + _found_locally=found_in_line, + _innerfilename=innerfilename, + _to_report=line, + ) + if done: + return True + except EOFError: + pass + except RuntimeError as e: + log.warning( + f"RuntimeError whilst processing {zipfilename} " + f"[{innerfilename}]: probably encrypted contents; " + f"error was {e!r}" + ) + else: + # ----------------------------------------------------------------- + # Search the text contents of XML + # ----------------------------------------------------------------- + # log.debug("... ... searching XML contents") + try: + with zf.open(innerfilename, "r") as file: + data_str = file.read() try: - with zf.open(contentsfilename, "r") as file: - try: - for line in file.readlines(): - # log.debug("line: {!r}", line) - found_in_line = bool(regex.search(line)) - found_in_zip = ( - found_in_zip or found_in_line - ) - if files_with_matches and found_in_zip: - report_hit_filename( - zipfilename, - contentsfilename, - show_inner_file, - ) - return - if ( - report_hit_lines and found_in_line - ) or ( - report_miss_lines and not found_in_line - ): - report_line( - zipfilename, - contentsfilename, - line, - show_inner_file, - ) - except EOFError: - pass - except RuntimeError as e: - log.warning( - "RuntimeError whilst processing {} [{}]: probably " - "encrypted contents; error was {!r}", - zipfilename, - contentsfilename, - e, + tree = ElementTree.fromstring(data_str) + except ElementTree.ParseError: + log.debug( + f"... ... skipping (not XML): " f"{innerfilename}" + ) + return False + for elem in tree.iter(): + line = elem.text + if not line: + continue + found_in_line = bool(regex_search(line)) + found_in_zip |= found_in_line + done = _report( + _found_locally=found_in_line, + _innerfilename=innerfilename, + _to_report=line, ) - except (zlib.error, BadZipFile) as e: - log.debug("Invalid zip: {}; error was {!r}", zipfilename, e) - if files_without_match and not found_in_zip: + if done: + return True + except RuntimeError as e: + log.warning( + f"RuntimeError whilst processing {zipfilename} " + f"[{innerfilename}]: probably encrypted contents; " + f"error was {e!r}" + ) + return False + + # Process the zip file + try: + with ZipFile(zipfilename, "r") as _zf: + # Iterate through inner files + for _innerfilename in _zf.namelist(): + log.debug(f"... checking inner file: {_innerfilename}") + zip_done = _search_inner_file(_zf, _innerfilename) + if zip_done: + return + except (zlib.error, BadZipFile) as exc: + log.warning(f"Invalid zip: {zipfilename}; error was {exc!r}") + except IsADirectoryError: + log.warning(f"Skipping directory: {zipfilename}") + if mode.report_files_without_match and not found_in_zip: report_miss_filename(zipfilename) @@ -227,7 +497,7 @@ def main() -> None: CHAINING. Note that you can chain. For example, to find both "Laurel" and "Hardy" in DOC/DOCX documents, in case-insensitive fashion: - find . -type f -name "*.doc*" -exec {exe_name} -l -i "laurel" {{}} \; | {exe_name} -x -l -i "hardy" + find . -type f -iname "*.doc*" -exec {exe_name} -l -i "laurel" {{}} \; | {exe_name} -x -l -i "hardy" """, # noqa: E501 ) parser.add_argument("pattern", help="Regular expression pattern to apply.") @@ -255,7 +525,11 @@ def main() -> None: "--ignore_case", "-i", action="store_true", help="Ignore case" ) parser.add_argument( - "--invert_match", "-v", action="store_true", help="Invert match" + "--invert_match", + "-v", + action="store_true", + help="Invert match (show content lines not matching the search " + "pattern)", ) parser.add_argument( "--files_with_matches", @@ -275,9 +549,20 @@ def main() -> None: help="Search the NAMES of the inner files, not their contents.", ) parser.add_argument( - "--show_inner_file", + "--grep_raw_text", action="store_true", - help="For hits, show the filenames of inner files, within each ZIP.", + help="Search the raw text, not the XML node text contents.", + ) + parser.add_argument( + "--no_filename", + action="store_true", + help="For hits, omit the filename of the OpenXML file.", + ) + parser.add_argument( + "--show_inner_filename", + action="store_true", + help="For hits, show the filenames of inner files, within each " + "OpenXML (ZIP) file. Ignored if --no_filename is true.", ) parser.add_argument( "--nprocesses", @@ -289,13 +574,23 @@ def main() -> None: "--verbose", action="store_true", help="Verbose output" ) args = parser.parse_args() - main_only_quicksetup_rootlogger( - level=logging.DEBUG if args.verbose else logging.INFO + + if args.grep_raw_text and args.grep_inner_file_name: + raise ValueError( + "Can't specify both --grep_raw_text and --grep_inner_file_name" + ) + n_report_booleans = sum( + [ + args.invert_match, + args.files_with_matches, + args.files_without_match, + ] ) - if args.files_with_matches and args.files_without_match: + if n_report_booleans > 1: raise ValueError( - "Can't specify both --files_with_matches (-l) and " - "--files_without_match (-L)!" + "Specify at most one of --invert_match (-v), " + "--files_with_matches (-l), " + "--files_without_match (-L)" ) if bool(args.filenames_from_stdin) == bool(args.filename): raise ValueError( @@ -303,45 +598,55 @@ def main() -> None: "command line, but not both" ) - # Compile regular expression - if args.grep_inner_file_name: - final_pattern = args.pattern - else: - encoding = getdefaultencoding() - final_pattern = args.pattern.encode(encoding) - flags = re.IGNORECASE if args.ignore_case else 0 - log.debug( - "Using regular expression {!r} with flags {!r}", final_pattern, flags + main_only_quicksetup_rootlogger( + level=logging.DEBUG if args.verbose else logging.INFO ) - regex = re.compile(final_pattern, flags) - - # Set up pool for parallel processing - pool = multiprocessing.Pool(processes=args.nprocesses) + mode = GrepMode( + pattern=args.pattern, + ignore_case=args.ignore_case, + search_raw_text=args.grep_raw_text, + search_inner_filename=args.grep_inner_file_name, + report_invert_match=args.invert_match, + report_files_with_matches=args.files_with_matches, + report_files_without_match=args.files_without_match, + display_no_filename=args.no_filename, + display_inner_filename=args.show_inner_filename, + ) + log.debug(f"Mode: {mode}") # Iterate through files - parse_kwargs = dict( - regex=regex, - invert_match=args.invert_match, - files_with_matches=args.files_with_matches, - files_without_match=args.files_without_match, - grep_inner_file_name=args.grep_inner_file_name, - show_inner_file=args.show_inner_file, - ) + # - Common arguments + common_kwargs = dict(mode=mode) + # - Filenames, as iterator if args.filenames_from_stdin: - for line in stdin.readlines(): - zipfilename = line.strip() - parallel_kwargs = {"zipfilename": zipfilename} - parallel_kwargs.update(**parse_kwargs) - pool.apply_async(parse_zip, [], parallel_kwargs) + line_it = (line.strip() for line in stdin.readlines()) + zipfilename_it = filter(None, line_it) # remove any blanks else: - for zipfilename in gen_filenames( + zipfilename_it = gen_filenames( starting_filenames=args.filename, recursive=args.recursive - ): - parallel_kwargs = {"zipfilename": zipfilename} - parallel_kwargs.update(**parse_kwargs) - pool.apply_async(parse_zip, [], parallel_kwargs) - pool.close() - pool.join() + ) + # - Combined arguments, as iterator + arg_it = ( + dict(zipfilename=zipfilename, **common_kwargs) + for zipfilename in zipfilename_it + ) + if args.nprocesses == 1: + # Force serial processing (useful for debugging). + for kwargs in arg_it: + parse_zip(**kwargs) + else: + # Set up pool for parallel processing + pool = multiprocessing.Pool(processes=args.nprocesses) + # Launch in parallel + jobs = [pool.apply_async(parse_zip, [], kwargs) for kwargs in arg_it] + # Stop entry to the pool (close) and wait for children (join). + # See https://stackoverflow.com/questions/38271547/. + pool.close() + pool.join() + # Collect results, re-raising any exceptions. (Otherwise they will be + # invisible.) See https://stackoverflow.com/questions/6728236/. + for j in jobs: + j.get() if __name__ == "__main__": diff --git a/cardinal_pythonlib/version_string.py b/cardinal_pythonlib/version_string.py index 3b58af4..e832389 100644 --- a/cardinal_pythonlib/version_string.py +++ b/cardinal_pythonlib/version_string.py @@ -31,5 +31,5 @@ """ -VERSION_STRING = "2.1.1" +VERSION_STRING = "2.1.2" # Use semantic versioning: https://semver.org/ diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 30bfcca..ba425c9 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -908,3 +908,9 @@ Quick links: - Add support for Outlook ``.msg`` files with attachments processed by supported document converters (``.docx``, ``.pdf``, ``.odt`` etc.) to :func:`cardinal_pythonlib.extract_text.document_to_text`. + +**2.1.2 (IN PROGRESS)** + +- ``cardinalpythonlib_grep_in_openxml``: new facility to search XML node text + (rather than raw file text), and this is now the default. Also, behind the + scenes, exceptions in subprocesses are now reported.