Module src.jsonid.file_processing

File processing functions.

Functions

async def analyse_data(path: str, strategy: list) ‑> list
Expand source code
async def analyse_data(path: str, strategy: list) -> list:
    """Process all objects at a given path."""
    logger.debug("processing: %s", path)
    res = []
    if "*" in path:
        paths = await process_glob(path)
        res = await analyse_json(paths=paths, strategy=strategy)
        await output_analysis(res)
        sys.exit()
    if not os.path.exists(path):
        logger.error("path: '%s' does not exist", path)
        sys.exit(1)
    if os.path.isfile(path):
        res = await analyse_json(paths=[path], strategy=strategy)
        await output_analysis(res)
        sys.exit(1)
    paths = await create_manifest(path)
    if not paths:
        logger.info("no files in directory: %s", path)
        sys.exit(1)
    res = await analyse_json(paths=paths, strategy=strategy)
    await output_analysis(res)
    sys.exit()

Process all objects at a given path.

async def analyse_json(paths: list[str], strategy: list)
Expand source code
async def analyse_json(paths: list[str], strategy: list):
    """Analyse a JSON object."""
    analysis_res = []
    for path in paths:
        if os.path.getsize(path) == 0:
            logger.debug("'%s' is an empty file")
            continue
        base_obj = await identify_plaintext_bytestream(
            path=path,
            strategy=strategy,
            analyse=True,
        )
        if not base_obj.valid:
            logger.debug("%s: is not plaintext", path)
            continue
        if base_obj.data == "" or base_obj.data is None:
            continue
        res = await analysis.analyse_input(base_obj.data, base_obj.content)
        res["doctype"] = base_obj.doctype
        res["encoding"] = base_obj.encoding
        analysis_res.append(res)
    return analysis_res

Analyse a JSON object.

async def create_manifest(path: str) ‑> list[str]
Expand source code
async def create_manifest(path: str) -> list[str]:
    """Get a list of paths to process."""
    paths = []
    for root, _, files in os.walk(path):
        for file in files:
            file_path = os.path.join(root, file)
            logger.debug(file_path)
            paths.append(file_path)
    return paths

Get a list of paths to process.

def decode(content: str, strategy: list)
Expand source code
def decode(content: str, strategy: list):
    """Decode the given content stream."""
    data = ""
    if "JSON" in strategy:
        try:
            data = json.loads(content)
            return True, data, registry.DOCTYPE_JSON
        except json.decoder.JSONDecodeError as err:
            logger.debug("(decode) can't process: %s", err)
    if "YAML" in strategy:
        try:
            if content.strip()[:3] != "---":
                raise TypeError
            data = yaml.load(content.strip(), Loader=Loader)
            if not isinstance(data, str):
                return True, data, registry.DOCTYPE_YAML
        except (
            yaml.scanner.ScannerError,
            yaml.parser.ParserError,
            yaml.reader.ReaderError,
            yaml.composer.ComposerError,
        ) as err:
            logger.debug("(decode) can't process: %s", err)
        except (TypeError, IndexError):
            # Document too short, or YAML without header is not supported.
            pass
    if "TOML" in strategy:
        try:
            data = toml.loads(content)
            return True, data, registry.DOCTYPE_TOML
        except toml.TOMLDecodeError as err:
            logger.debug("(decode) can't process: %s", err)
    return False, None, None

Decode the given content stream.

def get_date_time() ‑> str
Expand source code
def get_date_time() -> str:
    """Return a datetime string for now(),"""
    return datetime.datetime.now(timezone.utc).strftime(version.UTC_TIME_FORMAT)

Return a datetime string for now(),

async def identify_json(paths: list[str], strategy: list, binary: bool, simple: bool)
Expand source code
async def identify_json(paths: list[str], strategy: list, binary: bool, simple: bool):
    """Identify objects."""
    for idx, path in enumerate(paths):
        if os.path.getsize(path) == 0:
            logger.debug("'%s' is an empty file")
            if binary:
                logger.warning("report on binary object...")
            continue
        base_obj = await identify_plaintext_bytestream(
            path=path,
            strategy=strategy,
            analyse=False,
        )
        if not base_obj.valid:
            logger.debug("%s: is not plaintext", path)
            if binary:
                logger.warning("report on binary object...")
            continue
        if base_obj.data == "" or base_obj.data is None:
            continue
        logger.debug("processing: %s (%s)", path, base_obj.doctype)
        await process_result(
            idx, path, base_obj.data, base_obj.doctype, base_obj.encoding, simple
        )

Identify objects.

async def output_analysis(res: list) ‑> None
Expand source code
async def output_analysis(res: list) -> None:
    """Format the output of the analysis."""
    for item in res:
        print(json.dumps(item, indent=2))

Format the output of the analysis.

async def process_data(path: str, strategy: list, binary: bool, simple: bool)
Expand source code
async def process_data(path: str, strategy: list, binary: bool, simple: bool):
    """Process all objects at a given path."""
    logger.debug("processing: %s", path)
    if "*" in path:
        paths = await process_glob(path)
        await identify_json(paths, strategy, binary, simple)
        sys.exit(0)
    if not os.path.exists(path):
        logger.error("path: '%s' does not exist", path)
        sys.exit(1)
    if os.path.isfile(path):
        await identify_json([path], strategy, binary, simple)
        sys.exit(0)
    paths = await create_manifest(path)
    if not paths:
        logger.info("no files in directory: %s", path)
        sys.exit(1)
    await identify_json(paths, strategy, binary, simple)

Process all objects at a given path.

async def process_glob(glob_path: str)
Expand source code
async def process_glob(glob_path: str):
    """Process glob patterns provided by the user."""
    paths = []
    for path in glob.glob(glob_path):
        if os.path.isdir(path):
            paths = paths + await create_manifest(path)
        if os.path.isfile(path):
            paths.append(path)
    return paths

Process glob patterns provided by the user.

async def process_result(idx: int, path: str, data: Any, doctype: str, encoding: str, simple: bool)
Expand source code
async def process_result(
    idx: int, path: str, data: Any, doctype: str, encoding: str, simple: bool
):
    """Process something JSON/YAML/TOML"""
    res = []
    if doctype == registry.DOCTYPE_JSON:
        res = registry.matcher(data, encoding=encoding)
    if doctype == registry.DOCTYPE_YAML:
        res = [registry.YAML_ONLY]
    if doctype == registry.DOCTYPE_TOML:
        res = [registry.TOML_ONLY]
    if simple:
        for item in res:
            name_ = item.name[0]["@en"]
            version_ = item.version
            if version_ is not None:
                name_ = f"{name_}: {version_}"
            print(
                json.dumps(
                    {
                        "identifier": item.identifier,
                        "filename": os.path.basename(path),
                        "encoding": item.encoding,
                    }
                )
            )
        return
    if idx == 0:
        print("---")
        print(version_header())
        print("---")
    print(f"file: {path}")
    for item in res:
        print(item)
    print("---")
    return

Process something JSON/YAML/TOML

async def text_check(chars: str) ‑> bool
Expand source code
async def text_check(chars: str) -> bool:
    """Check the first characters of the file to figure out if the
    file is text. Return `True` if the file is text, i.e. no binary
    bytes are detected.

    via. https://stackoverflow.com/a/7392391
    """
    text_chars = bytearray(
        {0, 7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7F}
    )
    for char in chars:
        is_binary = bool(chr(char).encode().translate(None, text_chars))
        if is_binary is True:
            return False
    return True

Check the first characters of the file to figure out if the file is text. Return True if the file is text, i.e. no binary bytes are detected.

via. https://stackoverflow.com/a/7392391

def version_header() ‑> str
Expand source code
def version_header() -> str:
    """Output a formatted version header."""
    return f"""jsonid: {version.get_version()}
scandate: {get_date_time()}""".strip()

Output a formatted version header.

async def whitespace_check(chars: str) ‑> bool
Expand source code
async def whitespace_check(chars: str) -> bool:
    """Check whether the file only contains whitespace.

    NB. this check might take longer than needed.
    """
    if not chars.strip():
        return False
    return True

Check whether the file only contains whitespace.

NB. this check might take longer than needed.

Classes

class BaseCharacteristics (valid: bool = False,
data: Any | None = None,
doctype: str | None = None,
encoding: str | None = None,
content: str | None = None)
Expand source code
@dataclass
class BaseCharacteristics:
    """BaseCharacteristics wraps information about the base object
    for ease of moving it through the code to where we need it.
    """

    # valid describes whether or not the object has been parsed
    # correctly.
    valid: bool = False
    # data represents the Data as parsed by the utility.
    data: Union[Any, None] = None
    # doctype describes the object type we have identified.
    doctype: Union[str, None] = None
    # encoding describes the character encoding of the object.
    encoding: Union[str, None] = None
    # content is the string/byte data that was the original object and
    # is used in the structural analysis of the object.
    content: Union[str, None] = None

BaseCharacteristics wraps information about the base object for ease of moving it through the code to where we need it.

Instance variables

var content : str | None
var data : Any | None
var doctype : str | None
var encoding : str | None
var valid : bool