Module src.pronom_summary.pronom_summary
Provide PRONOM record parsing capabilities.
Expand source code
"""Provide PRONOM record parsing capabilities."""
import asyncio
import json
import multiprocessing
import xml.etree.ElementTree as etree
from pathlib import Path
from typing import Final, Union
from xml.etree.ElementTree import ParseError
try:
try:
from src.pronom_tools import pronom_tools
except ModuleNotFoundError:
from pronom_tools import pronom_tools
except ImportError:
# Module is likely being called from PRONOM tools and so doesn't
# require this import.
pass
NAMESPACES: Final[str] = {"pro": "http://pronom.nationalarchives.gov.uk"}
DEPRECATED: Final[str] = "deprecated"
OUTLINE: Final[str] = "outline"
COMPLETE: Final[str] = "complete"
class PRONOMException(Exception):
"""Exception to raise when there are errors with what is being
processed.
"""
def summarize_container_xml(pronom_container_xml: Path) -> list[str]:
"""Return information about the container signature file from
PRONOM.
"""
try:
tree = etree.parse(pronom_container_xml)
except ParseError as err:
raise PRONOMException(f"cannot parse xml: {pronom_container_xml}") from err
root = tree.getroot()
desc = root.find("FileFormatMappings")
puids = []
try:
for item in desc:
puids.append(item.attrib["Puid"])
except KeyError as err:
raise ProcessLookupError(
f"cannot find puid attrib in {pronom_container_xml}: {err}"
) from err
return puids
def get_puid(identifiers: list[etree.Element]) -> Union[str | None]:
"""Retrieve PUID from a list of identifiers"""
for identifier in identifiers:
identifier_type = identifier.find("pro:IdentifierType", NAMESPACES)
if identifier_type.text != "PUID":
continue
return identifier.find("pro:Identifier", NAMESPACES).text
return None
def process_desc(desc: str) -> str:
"""Identify outline and deprecated records. Simply return true
if the description is complete.
"""
# pylint: disable=R0911
if "this is an outline record" in desc.lower():
return OUTLINE
if "format deprecated" in desc.lower():
return DEPRECATED
if "puid deprecated" in desc.lower():
return DEPRECATED
if "puid is now deprecated" in desc.lower():
return DEPRECATED
if "deprecated in favour" in desc.lower():
return DEPRECATED
if "this format has been deprecated" in desc.lower():
return DEPRECATED
if "this puid has been deprecated" in desc.lower():
return DEPRECATED
return COMPLETE
def process_name_version(name: str, version: str) -> str:
"""Process PRONOM format names and irregularities."""
if version == "" or version is None:
return name.strip()
return f"{name.strip()} {version.strip()}"
def summarize_xml(pronom_xml: list[Path]):
"""Summarize the fmt XML record.
If the record cannot be parsed correctly for any reason a
PRONOMException is raised.
"""
try:
tree = etree.parse(pronom_xml)
except ParseError as err:
raise PRONOMException(f"cannot parse xml: {pronom_xml}") from err
root = tree.getroot()
name = root.find(
"pro:report_format_detail/pro:FileFormat/pro:FormatName", NAMESPACES
)
version = root.find(
"pro:report_format_detail/pro:FileFormat/pro:FormatVersion", NAMESPACES
)
desc = root.find(
"pro:report_format_detail/pro:FileFormat/pro:FormatDescription", NAMESPACES
)
identifiers = root.findall(
"pro:report_format_detail/pro:FileFormat/pro:FileFormatIdentifier", NAMESPACES
)
sig = root.find(
"pro:report_format_detail/pro:FileFormat/pro:InternalSignature/pro:ByteSequence/pro:ByteSequenceValue",
NAMESPACES,
)
puid = get_puid(identifiers)
res = {}
try:
res["name"] = f"{process_name_version(name.text, version.text)}".strip()
res["description"] = process_desc(desc.text)
res["signature"] = sig is not None
res["identifier"] = puid
except AttributeError as err:
raise PRONOMException(f"cannot process {pronom_xml}") from err
return res
async def parse_pronom(pronom_export: str, container_signature: str) -> list[dict]:
"""Parse PRONOM's records and container signature file and return a
list of information we want to understand better sorted by file
format name in alphabetical order.
"""
puid_path = Path(pronom_export)
xml = []
for item in puid_path.glob("**/*"):
if not item.is_file():
continue
xml.append(Path(item.resolve()))
pronom_summary = []
with multiprocessing.Pool() as pool:
pronom_summary = pool.map(summarize_xml, xml)
container_summary = summarize_container_xml(container_signature)
for item in pronom_summary:
if item.get("identifier") not in container_summary:
continue
item["signature"] = True
return sorted(pronom_summary, key=lambda item: item["name"])
def main():
"""Primary entry point for this script."""
rel = pronom_tools.check_existing()
container_name = pronom_tools.download_container(rel=rel)
pronom_summary = asyncio.run(
parse_pronom(
pronom_export="pronom-export",
container_signature=container_name,
)
)
print(json.dumps(pronom_summary, indent=2))
if __name__ == "__main__":
main()
Functions
def get_puid(identifiers: list[xml.etree.ElementTree.Element]) ‑> Optional[str]
-
Retrieve PUID from a list of identifiers
Expand source code
def get_puid(identifiers: list[etree.Element]) -> Union[str | None]: """Retrieve PUID from a list of identifiers""" for identifier in identifiers: identifier_type = identifier.find("pro:IdentifierType", NAMESPACES) if identifier_type.text != "PUID": continue return identifier.find("pro:Identifier", NAMESPACES).text return None
def main()
-
Primary entry point for this script.
Expand source code
def main(): """Primary entry point for this script.""" rel = pronom_tools.check_existing() container_name = pronom_tools.download_container(rel=rel) pronom_summary = asyncio.run( parse_pronom( pronom_export="pronom-export", container_signature=container_name, ) ) print(json.dumps(pronom_summary, indent=2))
async def parse_pronom(pronom_export: str, container_signature: str) ‑> list[dict]
-
Parse PRONOM's records and container signature file and return a list of information we want to understand better sorted by file format name in alphabetical order.
Expand source code
async def parse_pronom(pronom_export: str, container_signature: str) -> list[dict]: """Parse PRONOM's records and container signature file and return a list of information we want to understand better sorted by file format name in alphabetical order. """ puid_path = Path(pronom_export) xml = [] for item in puid_path.glob("**/*"): if not item.is_file(): continue xml.append(Path(item.resolve())) pronom_summary = [] with multiprocessing.Pool() as pool: pronom_summary = pool.map(summarize_xml, xml) container_summary = summarize_container_xml(container_signature) for item in pronom_summary: if item.get("identifier") not in container_summary: continue item["signature"] = True return sorted(pronom_summary, key=lambda item: item["name"])
def process_desc(desc: str) ‑> str
-
Identify outline and deprecated records. Simply return true if the description is complete.
Expand source code
def process_desc(desc: str) -> str: """Identify outline and deprecated records. Simply return true if the description is complete. """ # pylint: disable=R0911 if "this is an outline record" in desc.lower(): return OUTLINE if "format deprecated" in desc.lower(): return DEPRECATED if "puid deprecated" in desc.lower(): return DEPRECATED if "puid is now deprecated" in desc.lower(): return DEPRECATED if "deprecated in favour" in desc.lower(): return DEPRECATED if "this format has been deprecated" in desc.lower(): return DEPRECATED if "this puid has been deprecated" in desc.lower(): return DEPRECATED return COMPLETE
def process_name_version(name: str, version: str) ‑> str
-
Process PRONOM format names and irregularities.
Expand source code
def process_name_version(name: str, version: str) -> str: """Process PRONOM format names and irregularities.""" if version == "" or version is None: return name.strip() return f"{name.strip()} {version.strip()}"
def summarize_container_xml(pronom_container_xml: pathlib.Path) ‑> list[str]
-
Return information about the container signature file from PRONOM.
Expand source code
def summarize_container_xml(pronom_container_xml: Path) -> list[str]: """Return information about the container signature file from PRONOM. """ try: tree = etree.parse(pronom_container_xml) except ParseError as err: raise PRONOMException(f"cannot parse xml: {pronom_container_xml}") from err root = tree.getroot() desc = root.find("FileFormatMappings") puids = [] try: for item in desc: puids.append(item.attrib["Puid"]) except KeyError as err: raise ProcessLookupError( f"cannot find puid attrib in {pronom_container_xml}: {err}" ) from err return puids
def summarize_xml(pronom_xml: list[pathlib.Path])
-
Summarize the fmt XML record.
If the record cannot be parsed correctly for any reason a PRONOMException is raised.
Expand source code
def summarize_xml(pronom_xml: list[Path]): """Summarize the fmt XML record. If the record cannot be parsed correctly for any reason a PRONOMException is raised. """ try: tree = etree.parse(pronom_xml) except ParseError as err: raise PRONOMException(f"cannot parse xml: {pronom_xml}") from err root = tree.getroot() name = root.find( "pro:report_format_detail/pro:FileFormat/pro:FormatName", NAMESPACES ) version = root.find( "pro:report_format_detail/pro:FileFormat/pro:FormatVersion", NAMESPACES ) desc = root.find( "pro:report_format_detail/pro:FileFormat/pro:FormatDescription", NAMESPACES ) identifiers = root.findall( "pro:report_format_detail/pro:FileFormat/pro:FileFormatIdentifier", NAMESPACES ) sig = root.find( "pro:report_format_detail/pro:FileFormat/pro:InternalSignature/pro:ByteSequence/pro:ByteSequenceValue", NAMESPACES, ) puid = get_puid(identifiers) res = {} try: res["name"] = f"{process_name_version(name.text, version.text)}".strip() res["description"] = process_desc(desc.text) res["signature"] = sig is not None res["identifier"] = puid except AttributeError as err: raise PRONOMException(f"cannot process {pronom_xml}") from err return res
Classes
class PRONOMException (*args, **kwargs)
-
Exception to raise when there are errors with what is being processed.
Expand source code
class PRONOMException(Exception): """Exception to raise when there are errors with what is being processed. """
Ancestors
- builtins.Exception
- builtins.BaseException