Module src.jsonid.registry
JSON registry processor.
Functions
def build_identifier(registry_entry: RegistryEntry,
base_obj: BaseCharacteristics) ‑> RegistryEntry-
Expand source code
def build_identifier( registry_entry: registry_class.RegistryEntry, base_obj: BaseCharacteristics, ) -> registry_class.RegistryEntry: """Create a match object to return to the caller. For the identifier and borrowing from MIMETypes buuld a hierarchical identifier using the registry identifier and the doctype, e.g. yaml, json, etc. """ match_obj = copy.deepcopy(registry_entry) match_obj.encoding = base_obj.encoding core = _get_core(base_obj.doctype) match_obj.mime = core.mime if base_obj.compression: match_obj.mime = base_obj.compression if base_obj.doctype == DOCTYPE_JSONL: try: suffix = base_obj.compression.split("/")[1] match_obj.mime = [f"{mime}+{suffix}" for mime in core.mime] except AttributeError: pass return match_objCreate a match object to return to the caller. For the identifier and borrowing from MIMETypes buuld a hierarchical identifier using the registry identifier and the doctype, e.g. yaml, json, etc.
def get_additional(data: dict | list | float | int) ‑> str-
Expand source code
def get_additional(data: Union[dict, list, float, int]) -> str: """Return additional characterization information about the JSON we encountered. """ # pylint: disable=R0911 if not data: if data is False: return TYPE_BOOL if isinstance(data, list): return TYPE_LIST if isinstance(data, dict): return TYPE_DICT return TYPE_NONE if isinstance(data, dict): return TYPE_DICT if isinstance(data, list): return TYPE_LIST if isinstance(data, float): return TYPE_FLOAT if isinstance(data, int): if data is True: return TYPE_BOOL return TYPE_INT return TYPE_ERRReturn additional characterization information about the JSON we encountered.
def matcher(base_obj: BaseCharacteristics) ‑> list-
Expand source code
def matcher(base_obj: BaseCharacteristics) -> list: """Matcher for registry objects.""" logger.debug("type: '%s'", type(base_obj.data)) if isinstance(base_obj.data, str): try: base_obj.data = json.loads(base_obj.data) except json.decoder.JSONDecodeError as err: logger.error("unprocessable data: %s", err) return [] reg = registry_data.registry() matches = [] for idx, registry_entry in enumerate(reg): try: logger.debug("processing registry entry: %s", idx) match = process_markers(registry_entry, base_obj.data) if not match: continue if registry_entry in matches: continue match_obj = build_identifier(registry_entry, base_obj) matches.append(match_obj) except TypeError as err: logger.debug("%s", err) continue if len(matches) == 0 or matches[0] == NIL_ENTRY: additional = get_additional(base_obj.data) res_obj = registry_class.RegistryEntry() if base_obj.doctype == DOCTYPE_JSON: res_obj = JSON_ONLY res_obj.depth = analysis.analyse_depth(base_obj.data) elif base_obj.doctype == DOCTYPE_JSONL: # NB. JSONL does not have a depth calculation we can # use at this point in the analysis. This can only be # output via the analysis switch. res_obj = JSONL_ONLY elif base_obj.doctype == DOCTYPE_YAML: res_obj = YAML_ONLY res_obj.depth = analysis.analyse_depth(base_obj.data) elif base_obj.doctype == DOCTYPE_TOML: res_obj = TOML_ONLY res_obj.depth = analysis.analyse_depth(base_obj.data) res_obj.additional = additional res_obj.encoding = base_obj.encoding res_obj = build_identifier(res_obj, base_obj) return [res_obj] logger.debug(matches) return matchesMatcher for registry objects.
def process_markers(registry_entry: RegistryEntry,
data: dict) ‑> bool-
Expand source code
def process_markers(registry_entry: registry_class.RegistryEntry, data: dict) -> bool: """Run through the markers for an entry in the registry. Attempt to exit early if there isn't a match. """ # pylint: disable=R0911,R0912.R0915 if isinstance(data, list): for marker in registry_entry.markers: try: _ = marker[registry_matchers.MARKER_INDEX] data = registry_matchers.at_index(marker, data) break except KeyError: return False top_level_pointer = data # ensure we're always looking at top-level dict for marker in registry_entry.markers: data = top_level_pointer try: _ = marker[registry_matchers.MARKER_GOTO] data = registry_matchers.at_goto(marker, data) except KeyError as err: logger.debug("following through: %s", err) try: _ = marker[registry_matchers.MARKER_CONTAINS] match = registry_matchers.contains_match(marker, data) if not match: return False except KeyError as err: logger.debug("following through: %s", err) try: _ = marker[registry_matchers.MARKER_STARTSWITH] match = registry_matchers.startswith_match(marker, data) if not match: return False except KeyError as err: logger.debug("following through: %s", err) try: _ = marker[registry_matchers.MARKER_ENDSWITH] match = registry_matchers.endswith_match(marker, data) if not match: return False except KeyError as err: logger.debug("following through: %s", err) try: _ = marker[registry_matchers.MARKER_IS] match = registry_matchers.is_match(marker, data) if not match: return False except KeyError as err: logger.debug("following through: %s", err) try: _ = marker[registry_matchers.MARKER_IS_TYPE] match = registry_matchers.is_type(marker, data) if not match: return False except KeyError as err: logger.debug("following through: %s", err) try: _ = marker[registry_matchers.MARKER_REGEX] match = registry_matchers.regex_match(marker, data) if not match: return False except KeyError as err: logger.debug("following through: %s", err) try: _ = marker[registry_matchers.MARKER_KEY_EXISTS] match = registry_matchers.key_exists_match(marker, data) if not match: return False except KeyError as err: logger.debug("following through: %s", err) try: _ = marker[registry_matchers.MARKER_KEY_NO_EXIST] match = registry_matchers.key_no_exist_match(marker, data) if not match: return False except KeyError as err: logger.debug("following through: %s", err) return TrueRun through the markers for an entry in the registry. Attempt to exit early if there isn't a match.
Classes
class BaseCharacteristics (valid: bool = False,
data: Any | None = None,
doctype: str | None = None,
encoding: str | None = None,
content_for_analysis: str | None = None,
compression: str | None = None,
binary: bool = False,
text: bool = False,
empty: bool = False,
only_whitespace: bool = False)-
Expand source code
@dataclass class BaseCharacteristics: """BaseCharacteristics wraps information about the base object for ease of moving it through the code to where we need it. NB. one consideration is what to do with the term `valid` here. It is doing more work than necessary. It is both, not valid text, and not valid object type, i.e. JSON,YAML,TOML etc. It's probably too broad and might cause inconsistent results. We should observe `binary` output and make sure it works as expected. """ # Too many instance attributes. # pylint: disable=R0902 # valid describes whether or not the object has been parsed # correctly. valid: bool = False # data represents the Data as parsed by the utility. data: Union[Any, None] = None # doctype describes the object type we have identified. doctype: Union[str, None] = None # encoding describes the character encoding of the object. encoding: Union[str, None] = None # content_for_analysis is the string/byte data that was the # original object and is used in the structural analysis of # the object. content_for_analysis: Union[str, None] = None # compression describes whether or not the object was originally # compressed before identification. (JSONL only) compression: Union[str, None] = None # content is binary content. binary: bool = False # content is text. NB. This may be redundant in the fullness of # time, but to begin with we will try to be as explicit as # possible to ensure the accuracy of the output. text: bool = False # file is empty. empty: bool = False # file only contains whitespace. only_whitespace: bool = FalseBaseCharacteristics wraps information about the base object for ease of moving it through the code to where we need it.
NB. one consideration is what to do with the term
validhere. It is doing more work than necessary. It is both, not valid text, and not valid object type, i.e. JSON,YAML,TOML etc. It's probably too broad and might cause inconsistent results. We should observebinaryoutput and make sure it works as expected.Instance variables
var binary : boolvar compression : str | Nonevar content_for_analysis : str | Nonevar data : Any | Nonevar doctype : str | Nonevar empty : boolvar encoding : str | Nonevar only_whitespace : boolvar text : boolvar valid : bool
class IdentificationFailure (*args, **kwargs)-
Expand source code
class IdentificationFailure(Exception): """Raise when identification fails."""Raise when identification fails.
Ancestors
- builtins.Exception
- builtins.BaseException