Module src.jsonid.file_processing
File processing functions.
Functions
async def analyse_data(path: str, strategy: list) ‑> list-
Expand source code
async def analyse_data(path: str, strategy: list) -> list: """Process all objects at a given path.""" logger.debug("processing: %s", path) res = [] if "*" in path: paths = await process_glob(path) res = await analyse_json(paths=paths, strategy=strategy) await output_analysis(res) sys.exit() if not os.path.exists(path): logger.error("path: '%s' does not exist", path) sys.exit(1) if os.path.isfile(path): res = await analyse_json(paths=[path], strategy=strategy) await output_analysis(res) sys.exit(1) paths = await create_manifest(path) if not paths: logger.info("no files in directory: %s", path) sys.exit(1) res = await analyse_json(paths=paths, strategy=strategy) await output_analysis(res) sys.exit()Process all objects at a given path.
async def analyse_json(paths: list[str], strategy: list)-
Expand source code
async def analyse_json(paths: list[str], strategy: list): """Analyse a JSON object.""" analysis_res = [] for path in paths: if os.path.getsize(path) == 0: logger.debug("%s is an empty file", path) continue base_obj = await identify_plaintext_bytestream( path=path, strategy=strategy, analyse=True, ) if not base_obj.valid: logger.debug("%s: is not plaintext", path) continue if base_obj.data == "" or base_obj.data is None: continue res = await analysis.analyse_input(base_obj.data, base_obj.content_for_analysis) res["doctype"] = base_obj.doctype res["encoding"] = base_obj.encoding if base_obj.doctype == registry.DOCTYPE_JSONL: res["compression"] = base_obj.compression res.pop("content_length") res.pop("depth") res.pop("heterogeneous_list_types") res.pop("line_warning") res.pop("top_level_types") res.pop("top_level_keys") res.pop("top_level_keys_count") analysis_res.append(res) return analysis_resAnalyse a JSON object.
async def create_manifest(path: str) ‑> list[str]-
Expand source code
async def create_manifest(path: str) -> list[str]: """Get a list of paths to process.""" paths = [] for root, _, files in os.walk(path): for file in files: file_path = os.path.join(root, file) logger.debug(file_path) paths.append(file_path) return pathsGet a list of paths to process.
def decode(content: str, strategy: list) ‑> tuple-
Expand source code
def decode(content: str, strategy: list) -> tuple: """Decode the given content stream.""" data = "" if registry.DOCTYPE_JSON in strategy: valid, content_, type_ = _json_processing(content) if valid: return valid, content_, type_ if registry.DOCTYPE_JSONL in strategy: valid, content_, type_ = _jsonl_processing(content) if valid: return valid, content_, type_ if registry.DOCTYPE_YAML in strategy: try: if content.strip()[:3] != "---": raise TypeError data = yaml.load(content.strip(), Loader=Loader) if not isinstance(data, str): return True, data, registry.DOCTYPE_YAML except ( yaml.scanner.ScannerError, yaml.parser.ParserError, yaml.reader.ReaderError, yaml.composer.ComposerError, ) as err: logger.debug("(decode) can't process: %s", err) except (TypeError, IndexError): # Document too short, or YAML without header is not supported. pass if registry.DOCTYPE_TOML in strategy: try: data = toml.loads(content) return True, data, registry.DOCTYPE_TOML except toml.TOMLDecodeError as err: logger.debug("(decode) can't process: %s", err) return False, None, NoneDecode the given content stream.
def get_date_time() ‑> str-
Expand source code
def get_date_time() -> str: """Return a datetime string for now(),""" return datetime.datetime.now(timezone.utc).strftime(version.UTC_TIME_FORMAT)Return a datetime string for now(),
async def identify_json(paths: list[str], strategy: list, binary: bool, agentout: bool)-
Expand source code
async def identify_json(paths: list[str], strategy: list, binary: bool, agentout: bool): """Identify objects.""" padding = _get_padding(paths=paths) for _, path in enumerate(paths): if os.path.getsize(path) == 0: logger.debug("%s is an empty file", path) base_obj = registry.BaseCharacteristics(empty=True) if binary: await process_result( path=path, base_obj=base_obj, padding=padding, agentout=agentout, ) continue base_obj = await identify_plaintext_bytestream( path=path, strategy=strategy, analyse=False, ) if not base_obj.valid: logger.debug("%s: is not plaintext", path) if binary: await process_result( path=path, base_obj=base_obj, padding=padding, agentout=agentout, ) continue logger.debug("processing: %s (%s)", path, base_obj.doctype) await process_result( path=path, base_obj=base_obj, padding=padding, agentout=agentout, )Identify objects.
async def open_and_decode(path: str, strategy: list) ‑> BaseCharacteristics-
Expand source code
async def open_and_decode(path: str, strategy: list) -> registry.BaseCharacteristics: """Attempt to open a given file and decode it as JSON.""" content = None compression = None if not os.path.getsize(path): logger.debug("file is zero bytes: %s", path) return presets.no_id_empty() with open(path, "rb") as json_stream: first_chars = json_stream.read(FFB) if not await text_check(first_chars): if registry.DOCTYPE_JSONL not in strategy: return presets.no_id_binary() # If not text, check at least for compression. We might # have a compressed JSONL file. compression = await compressionlib.compress_check(first_chars) if not compression: return presets.no_id_binary() # Read the content whether we have compression or not. if not compression: content = first_chars + json_stream.read() elif compression: content = await compressionlib.decompress_stream( path=path, compression=compression ) if not content: return presets.no_id_compression(compression=compression) # We have content, but it might only be whitespace. if not await whitespace_check(content): return presets.no_id_whitespace() # We have something we can try to identify. return presets.possible_id( content=content, compression=compression, )Attempt to open a given file and decode it as JSON.
async def output_analysis(res: list) ‑> None-
Expand source code
async def output_analysis(res: list) -> None: """Format the output of the analysis.""" for item in res: print(json.dumps(item, indent=2))Format the output of the analysis.
async def process_data(path: str, strategy: list, binary: bool, agentout: bool)-
Expand source code
async def process_data(path: str, strategy: list, binary: bool, agentout: bool): """Process all objects at a given path.""" logger.debug("processing: %s", path) if "*" in path: paths = await process_glob(path) await identify_json( paths=paths, strategy=strategy, binary=binary, agentout=agentout, ) sys.exit(0) if not os.path.exists(path): logger.error("path: '%s' does not exist", path) sys.exit(1) if os.path.isfile(path): await identify_json( paths=[path], strategy=strategy, binary=binary, agentout=agentout, ) sys.exit(0) paths = await create_manifest(path) if not paths: logger.info("no files in directory: %s", path) sys.exit(1) await identify_json( paths=paths, strategy=strategy, binary=binary, agentout=agentout, )Process all objects at a given path.
async def process_glob(glob_path: str)-
Expand source code
async def process_glob(glob_path: str): """Process glob patterns provided by the user.""" paths = [] for path in glob.glob(glob_path): if os.path.isdir(path): paths = paths + await create_manifest(path) if os.path.isfile(path): paths.append(path) return pathsProcess glob patterns provided by the user.
async def process_result(path: str,
base_obj: BaseCharacteristics,
padding: int,
agentout: bool)-
Expand source code
async def process_result( path: str, base_obj: registry.BaseCharacteristics, padding: int, agentout: bool, ): """Process something JSON/YAML/TOML""" results = [] # NB. these switch-like ifs might not be needed in the fullness # of time. It depends if we need to do any custom processing of # any of the formats registered. We may want to consider removing # these before releasing v1.0.0. if base_obj.empty or base_obj.binary: output.output_results( path=path, results=[base_obj], padding=padding, agentout=agentout, ) return if not base_obj.valid: output.output_results( path=path, results=[base_obj], padding=padding, agentout=agentout, ) return # If we don't exit early and we try and identify the file... we then # create a new class object with an identification... if base_obj.doctype == registry.DOCTYPE_JSON: results = registry.matcher(base_obj) if base_obj.doctype == registry.DOCTYPE_JSONL: results = registry.matcher(base_obj) if base_obj.doctype == registry.DOCTYPE_YAML: results = registry.matcher(base_obj) if base_obj.doctype == registry.DOCTYPE_TOML: results = registry.matcher(base_obj) output.output_results( path=path, results=results, padding=padding, agentout=agentout, ) returnProcess something JSON/YAML/TOML
async def text_check(chars: str) ‑> bool-
Expand source code
async def text_check(chars: str) -> bool: """Check the first characters of the file to figure out if the file is text. Return `True` if the file is text, i.e. no binary bytes are detected. via. https://stackoverflow.com/a/7392391 """ text_chars = bytearray( {0, 7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7F} ) for char in chars: binary = bool(chr(char).encode().translate(None, text_chars)) if binary is True: return False return TrueCheck the first characters of the file to figure out if the file is text. Return
Trueif the file is text, i.e. no binary bytes are detected. def version_header() ‑> str-
Expand source code
def version_header() -> str: """Output a formatted version header.""" return f"""jsonid: {version.get_version()} scandate: {get_date_time()}""".strip()Output a formatted version header.
async def whitespace_check(chars: str) ‑> bool-
Expand source code
async def whitespace_check(chars: str) -> bool: """Check whether the file only contains whitespace. NB. this check might take longer than needed. """ if not chars.strip(): return False return TrueCheck whether the file only contains whitespace.
NB. this check might take longer than needed.
Classes
class NotJSONLError (*args, **kwargs)-
Expand source code
class NotJSONLError(Exception): """Provides an exception to handle when we can't process jsonl."""Provides an exception to handle when we can't process jsonl.
Ancestors
- builtins.Exception
- builtins.BaseException