Module src.jsonid.registry

JSON registry processor.

Functions

def build_identifier(registry_entry: RegistryEntry,
base_obj: BaseCharacteristics) ‑> RegistryEntry
Expand source code
def build_identifier(
    registry_entry: registry_class.RegistryEntry,
    base_obj: BaseCharacteristics,
) -> registry_class.RegistryEntry:
    """Create a match object to return to the caller. For the
    identifier and borrowing from MIMETypes buuld a hierarchical
    identifier using the registry identifier and the doctype,
    e.g. yaml, json, etc.
    """
    match_obj = copy.deepcopy(registry_entry)
    match_obj.encoding = base_obj.encoding
    core = _get_core(base_obj.doctype)
    match_obj.mime = core.mime
    if base_obj.compression:
        match_obj.mime = base_obj.compression
    if base_obj.doctype == DOCTYPE_JSONL:
        try:
            suffix = base_obj.compression.split("/")[1]
            match_obj.mime = [f"{mime}+{suffix}" for mime in core.mime]
        except AttributeError:
            pass
    return match_obj

Create a match object to return to the caller. For the identifier and borrowing from MIMETypes buuld a hierarchical identifier using the registry identifier and the doctype, e.g. yaml, json, etc.

def get_additional(data: dict | list | float | int) ‑> str
Expand source code
def get_additional(data: Union[dict, list, float, int]) -> str:
    """Return additional characterization information about the JSON
    we encountered.
    """

    # pylint: disable=R0911

    if not data:
        if data is False:
            return TYPE_BOOL
        if isinstance(data, list):
            return TYPE_LIST
        if isinstance(data, dict):
            return TYPE_DICT
        return TYPE_NONE
    if isinstance(data, dict):
        return TYPE_DICT
    if isinstance(data, list):
        return TYPE_LIST
    if isinstance(data, float):
        return TYPE_FLOAT
    if isinstance(data, int):
        if data is True:
            return TYPE_BOOL
        return TYPE_INT
    return TYPE_ERR

Return additional characterization information about the JSON we encountered.

def matcher(base_obj: BaseCharacteristics) ‑> list
Expand source code
def matcher(base_obj: BaseCharacteristics) -> list:
    """Matcher for registry objects."""
    logger.debug("type: '%s'", type(base_obj.data))
    if isinstance(base_obj.data, str):
        try:
            base_obj.data = json.loads(base_obj.data)
        except json.decoder.JSONDecodeError as err:
            logger.error("unprocessable data: %s", err)
            return []
    reg = registry_data.registry()
    matches = []
    for idx, registry_entry in enumerate(reg):
        try:
            logger.debug("processing registry entry: %s", idx)
            match = process_markers(registry_entry, base_obj.data)
            if not match:
                continue
            if registry_entry in matches:
                continue
            match_obj = build_identifier(registry_entry, base_obj)
            matches.append(match_obj)
        except TypeError as err:
            logger.debug("%s", err)
            continue
    if len(matches) == 0 or matches[0] == NIL_ENTRY:
        additional = get_additional(base_obj.data)
        res_obj = registry_class.RegistryEntry()
        if base_obj.doctype == DOCTYPE_JSON:
            res_obj = JSON_ONLY
            res_obj.depth = analysis.analyse_depth(base_obj.data)
        elif base_obj.doctype == DOCTYPE_JSONL:
            # NB. JSONL does not have a depth calculation we can
            # use at this point in the analysis. This can only be
            # output via the analysis switch.
            res_obj = JSONL_ONLY
        elif base_obj.doctype == DOCTYPE_YAML:
            res_obj = YAML_ONLY
            res_obj.depth = analysis.analyse_depth(base_obj.data)
        elif base_obj.doctype == DOCTYPE_TOML:
            res_obj = TOML_ONLY
            res_obj.depth = analysis.analyse_depth(base_obj.data)
        res_obj.additional = additional
        res_obj.encoding = base_obj.encoding
        res_obj = build_identifier(res_obj, base_obj)
        return [res_obj]
    logger.debug(matches)
    return matches

Matcher for registry objects.

def process_markers(registry_entry: RegistryEntry,
data: dict) ‑> bool
Expand source code
def process_markers(registry_entry: registry_class.RegistryEntry, data: dict) -> bool:
    """Run through the markers for an entry in the registry.
    Attempt to exit early if there isn't a match.
    """

    # pylint: disable=R0911,R0912.R0915

    if isinstance(data, list):
        for marker in registry_entry.markers:
            try:
                _ = marker[registry_matchers.MARKER_INDEX]
                data = registry_matchers.at_index(marker, data)
                break
            except KeyError:
                return False
    top_level_pointer = data  # ensure we're always looking at top-level dict
    for marker in registry_entry.markers:
        data = top_level_pointer
        try:
            _ = marker[registry_matchers.MARKER_GOTO]
            data = registry_matchers.at_goto(marker, data)
        except KeyError as err:
            logger.debug("following through: %s", err)
        try:
            _ = marker[registry_matchers.MARKER_CONTAINS]
            match = registry_matchers.contains_match(marker, data)
            if not match:
                return False
        except KeyError as err:
            logger.debug("following through: %s", err)
        try:
            _ = marker[registry_matchers.MARKER_STARTSWITH]
            match = registry_matchers.startswith_match(marker, data)
            if not match:
                return False
        except KeyError as err:
            logger.debug("following through: %s", err)
        try:
            _ = marker[registry_matchers.MARKER_ENDSWITH]
            match = registry_matchers.endswith_match(marker, data)
            if not match:
                return False
        except KeyError as err:
            logger.debug("following through: %s", err)
        try:
            _ = marker[registry_matchers.MARKER_IS]
            match = registry_matchers.is_match(marker, data)
            if not match:
                return False
        except KeyError as err:
            logger.debug("following through: %s", err)
        try:
            _ = marker[registry_matchers.MARKER_IS_TYPE]
            match = registry_matchers.is_type(marker, data)
            if not match:
                return False
        except KeyError as err:
            logger.debug("following through: %s", err)
        try:
            _ = marker[registry_matchers.MARKER_REGEX]
            match = registry_matchers.regex_match(marker, data)
            if not match:
                return False
        except KeyError as err:
            logger.debug("following through: %s", err)
        try:
            _ = marker[registry_matchers.MARKER_KEY_EXISTS]
            match = registry_matchers.key_exists_match(marker, data)
            if not match:
                return False
        except KeyError as err:
            logger.debug("following through: %s", err)
        try:
            _ = marker[registry_matchers.MARKER_KEY_NO_EXIST]
            match = registry_matchers.key_no_exist_match(marker, data)
            if not match:
                return False
        except KeyError as err:
            logger.debug("following through: %s", err)
    return True

Run through the markers for an entry in the registry. Attempt to exit early if there isn't a match.

Classes

class BaseCharacteristics (valid: bool = False,
data: Any | None = None,
doctype: str | None = None,
encoding: str | None = None,
content_for_analysis: str | None = None,
compression: str | None = None,
binary: bool = False,
text: bool = False,
empty: bool = False,
only_whitespace: bool = False)
Expand source code
@dataclass
class BaseCharacteristics:
    """BaseCharacteristics wraps information about the base object
    for ease of moving it through the code to where we need it.

    NB. one consideration is what to do with the term `valid` here. It
    is doing more work than necessary. It is both, not valid text, and
    not valid object type, i.e. JSON,YAML,TOML etc. It's probably
    too broad and might cause inconsistent results. We should observe
    `binary` output and make sure it works as expected.
    """

    # Too many instance attributes.
    # pylint: disable=R0902

    # valid describes whether or not the object has been parsed
    # correctly.
    valid: bool = False
    # data represents the Data as parsed by the utility.
    data: Union[Any, None] = None
    # doctype describes the object type we have identified.
    doctype: Union[str, None] = None
    # encoding describes the character encoding of the object.
    encoding: Union[str, None] = None
    # content_for_analysis is the string/byte data that was the
    # original object and is used in the structural analysis of
    # the object.
    content_for_analysis: Union[str, None] = None
    # compression describes whether or not the object was originally
    # compressed before identification. (JSONL only)
    compression: Union[str, None] = None
    # content is binary content.
    binary: bool = False
    # content is text. NB. This may be redundant in the fullness of
    # time, but to begin with we will try to be as explicit as
    # possible to ensure the accuracy of the output.
    text: bool = False
    # file is empty.
    empty: bool = False
    # file only contains whitespace.
    only_whitespace: bool = False

BaseCharacteristics wraps information about the base object for ease of moving it through the code to where we need it.

NB. one consideration is what to do with the term valid here. It is doing more work than necessary. It is both, not valid text, and not valid object type, i.e. JSON,YAML,TOML etc. It's probably too broad and might cause inconsistent results. We should observe binary output and make sure it works as expected.

Instance variables

var binary : bool
var compression : str | None
var content_for_analysis : str | None
var data : Any | None
var doctype : str | None
var empty : bool
var encoding : str | None
var only_whitespace : bool
var text : bool
var valid : bool
class IdentificationFailure (*args, **kwargs)
Expand source code
class IdentificationFailure(Exception):
    """Raise when identification fails."""

Raise when identification fails.

Ancestors

  • builtins.Exception
  • builtins.BaseException