root / edi
Newer
Older
edi / edifact_to_yaml.py
@fabre fabre 8 days ago 28 KB init
#!/usr/bin/env python3
"""Standalone EDIFACT parser and CLI (single-file, YAML-first output)."""

from __future__ import annotations

import argparse
import json
from pathlib import Path
from typing import Any


DEFAULT_SEPARATORS = {
    "segment_separator": "'",
    "data_element_separator": "+",
    "component_element_separator": ":",
    "release_character": "?",
}

STRUCTURE_KEYS = {
    "interchanges",
    "functional_groups",
    "transaction_sets",
    "start_segment",
    "end_segment",
    "segments",
    "elements",
    "components",
}

EXCLUDED_SEMANTIC_KEYS = {
    "data_type",
    "required",
    "min_length",
    "max_length",
    "qualifier_ref",
}


CONTROL_SEGMENT_SCHEMAS: dict[str, dict[str, Any]] = {
    "UNA": {
        "Desc": "Delimiter String Advice",
        "Purpose": "To define separators used in the interchange.",
        "Elements": [
            {"Id": "UNA01", "Desc": "Component data element separator"},
            {"Id": "UNA02", "Desc": "Data element separator"},
            {"Id": "UNA03", "Desc": "Decimal notification"},
            {"Id": "UNA04", "Desc": "Release indicator"},
            {"Id": "UNA05", "Desc": "Reserved"},
        ],
    },
    "UNB": {
        "Desc": "Interchange header",
        "Purpose": "To start, identify and specify an interchange.",
        "Elements": [
            {"Id": "S001", "Desc": "Syntax identifier"},
            {"Id": "S002", "Desc": "Interchange sender"},
            {"Id": "S003", "Desc": "Interchange recipient"},
            {"Id": "S004", "Desc": "Date/time of preparation"},
            {"Id": "0020", "Desc": "Interchange control reference"},
            {"Id": "S005", "Desc": "Recipient's reference/password"},
            {"Id": "0026", "Desc": "Application reference"},
            {"Id": "0029", "Desc": "Processing priority code"},
            {"Id": "0031", "Desc": "Acknowledgement request"},
            {"Id": "0032", "Desc": "Communications agreement ID"},
            {"Id": "0035", "Desc": "Test indicator"},
        ],
    },
    "UNG": {
        "Desc": "Functional group header",
        "Purpose": "To head, identify and specify a functional group.",
        "Elements": [
            {"Id": "0038", "Desc": "Message group identification"},
            {"Id": "S006", "Desc": "Application sender identification"},
            {"Id": "S007", "Desc": "Application recipient identification"},
            {"Id": "S004", "Desc": "Date/time of preparation"},
            {"Id": "0048", "Desc": "Group reference number"},
            {"Id": "0051", "Desc": "Controlling agency"},
            {"Id": "S008", "Desc": "Message version"},
            {"Id": "0058", "Desc": "Application password"},
        ],
    },
    "UNE": {
        "Desc": "Functional group trailer",
        "Purpose": "To end and check the completeness of a group.",
        "Elements": [
            {"Id": "0060", "Desc": "Group control count"},
            {"Id": "0048", "Desc": "Group reference number"},
        ],
    },
    "UNZ": {
        "Desc": "Interchange trailer",
        "Purpose": "To end and check the completeness of an interchange.",
        "Elements": [
            {"Id": "0036", "Desc": "Interchange control count"},
            {"Id": "0020", "Desc": "Interchange control reference"},
        ],
    },
}


class SchemaStore:
    def __init__(self, schema_root: str | Path | None) -> None:
        self.schema_root = resolve_schema_root(schema_root)
        self._release_cache: dict[str, dict[str, Any] | None] = {}
        self._version_doc_cache: dict[tuple[str, str], dict[str, Any] | None] = {}

    def load_release(self, release: str | None) -> dict[str, Any] | None:
        if not release:
            return None
        if release in self._release_cache:
            return self._release_cache[release]
        if not self.schema_root:
            self._release_cache[release] = None
            return None

        schema_path = self.schema_root / release / f"{release}.json"
        if not schema_path.exists():
            self._release_cache[release] = None
            return None

        try:
            with schema_path.open(encoding="utf-8") as f:
                raw = json.load(f)
        except (OSError, json.JSONDecodeError):
            raw = None

        self._release_cache[release] = raw
        return raw

    def load_version_doc(self, release: str | None, version: str | None) -> dict[str, Any] | None:
        if not release or not version:
            return None

        cache_key = (release, version)
        if cache_key in self._version_doc_cache:
            return self._version_doc_cache[cache_key]
        if not self.schema_root:
            self._version_doc_cache[cache_key] = None
            return None

        versions_path = self.schema_root / release / f"{release}_versions.json"
        if not versions_path.exists():
            self._version_doc_cache[cache_key] = None
            return None

        try:
            with versions_path.open(encoding="utf-8") as f:
                raw_versions = json.load(f)
        except (OSError, json.JSONDecodeError):
            self._version_doc_cache[cache_key] = None
            return None

        key = f"{release}_{version}"
        document_types = raw_versions.get("DocumentTypes", {})
        version_doc = document_types.get(key)
        self._version_doc_cache[cache_key] = version_doc
        return version_doc


def parse_edifact(
    document: str,
    schema_root: str | Path | None = None,
    semantic: bool = True,
) -> dict[str, Any]:
    separators = parse_separators(document)
    segment_slices = split_segments(document, separators["segment_separator"], separators["release_character"])

    parsed_segments: list[dict[str, Any]] = []
    for raw_segment, start_index in segment_slices:
        segment = parse_segment(raw_segment, start_index, separators)
        if segment is not None:
            parsed_segments.append(segment)

    result: dict[str, Any] = {
        "separators": separators,
        "separators_segment": None,
        "interchanges": [],
        "all_segments": parsed_segments,
    }

    active_interchange: dict[str, Any] | None = None
    active_group: dict[str, Any] | None = None
    active_transaction: dict[str, Any] | None = None

    def new_interchange(meta: dict[str, Any], start_segment: dict[str, Any] | None) -> dict[str, Any]:
        return {
            "meta": meta,
            "id": meta.get("id"),
            "start_segment": start_segment,
            "end_segment": None,
            "functional_groups": [],
        }

    def new_group(meta: dict[str, Any], start_segment: dict[str, Any] | None) -> dict[str, Any]:
        return {
            "meta": meta,
            "id": meta.get("id"),
            "is_fake": start_segment is None,
            "start_segment": start_segment,
            "end_segment": None,
            "transaction_sets": [],
        }

    def new_transaction(meta: dict[str, Any], start_segment: dict[str, Any] | None) -> dict[str, Any]:
        return {
            "meta": meta,
            "id": meta.get("id"),
            "start_segment": start_segment,
            "end_segment": None,
            "segments": [],
        }

    def ensure_interchange() -> dict[str, Any]:
        nonlocal active_interchange
        if active_interchange is None:
            active_interchange = new_interchange({}, None)
            result["interchanges"].append(active_interchange)
        return active_interchange

    def ensure_group() -> dict[str, Any]:
        nonlocal active_group
        interchange = ensure_interchange()
        if active_group is None:
            active_group = new_group({}, None)
            interchange["functional_groups"].append(active_group)
        return active_group

    def ensure_transaction() -> dict[str, Any]:
        nonlocal active_transaction
        group = ensure_group()
        if active_transaction is None:
            active_transaction = new_transaction({}, None)
            group["transaction_sets"].append(active_transaction)
        return active_transaction

    for segment in parsed_segments:
        segment_id = segment["id"]

        if segment_id == "UNA":
            result["separators_segment"] = segment
            continue

        if segment_id == "UNB":
            interchange = new_interchange(parse_interchange_meta(segment), segment)
            result["interchanges"].append(interchange)
            active_interchange = interchange
            active_group = None
            active_transaction = None
            continue

        if segment_id == "UNZ":
            interchange = ensure_interchange()
            interchange["end_segment"] = segment
            active_interchange = None
            active_group = None
            active_transaction = None
            continue

        if segment_id == "UNG":
            interchange = ensure_interchange()
            group = new_group(parse_functional_group_meta(segment), segment)
            interchange["functional_groups"].append(group)
            active_group = group
            active_transaction = None
            continue

        if segment_id == "UNE":
            group = ensure_group()
            group["end_segment"] = segment
            active_group = None
            active_transaction = None
            continue

        if segment_id == "UNH":
            group = ensure_group()
            transaction = new_transaction(parse_transaction_meta(segment), segment)
            group["transaction_sets"].append(transaction)
            active_transaction = transaction
            continue

        if segment_id == "UNT":
            transaction = ensure_transaction()
            transaction["end_segment"] = segment
            active_transaction = None
            continue

        ensure_transaction()["segments"].append(segment)

    if semantic:
        schema_store = SchemaStore(schema_root)
        apply_semantic_enrichment(result, schema_store)

    return result


def parse_separators(document: str) -> dict[str, str]:
    separators = dict(DEFAULT_SEPARATORS)
    trimmed = document.strip()
    if not trimmed or not trimmed.startswith("UNA"):
        return separators

    una_string = trimmed.split("UNB", 1)[0] if "UNB" in trimmed else trimmed

    if len(una_string) > 8:
        separators["segment_separator"] = una_string[8]
    if len(una_string) > 4:
        separators["data_element_separator"] = una_string[4]
    if len(una_string) > 3:
        separators["component_element_separator"] = una_string[3]
    if len(una_string) > 6:
        separators["release_character"] = una_string[6]

    return separators


def split_segments(document: str, segment_separator: str, release_character: str) -> list[tuple[str, int]]:
    if not document:
        return []

    result: list[tuple[str, int]] = []
    start = 0

    for index, char in enumerate(document):
        if char == segment_separator and is_unescaped(document, index, release_character):
            result.append((document[start : index + 1], start))
            start = index + 1

    if start < len(document):
        result.append((document[start:], start))

    return result


def parse_segment(
    raw_segment: str,
    start_index: int,
    separators: dict[str, str],
) -> dict[str, Any] | None:
    if not raw_segment.strip():
        return None

    left_trimmed = raw_segment.lstrip()
    start_index += len(raw_segment) - len(left_trimmed)
    if not left_trimmed:
        return None

    segment_str = left_trimmed.rstrip()
    if not segment_str:
        return None

    segment_separator = separators["segment_separator"]
    ending_delimiter = segment_separator if segment_str.endswith(segment_separator) else ""
    content = segment_str[:-1] if ending_delimiter else segment_str
    segment_id = get_segment_id(content)
    if not segment_id:
        return None

    end_index = start_index + len(segment_str) - len(ending_delimiter) - 1

    elements: list[dict[str, Any]]
    if segment_id == "UNA" and len(segment_str) == 9:
        elements = [
            {
                "index": index + 1,
                "value": segment_str[index + 3],
            }
            for index in range(5)
        ]
    else:
        elements = parse_elements(content, segment_id, separators)

    return {
        "id": segment_id,
        "raw": segment_str,
        "start_index": start_index,
        "end_index": end_index,
        "length": len(segment_str),
        "ending_delimiter": ending_delimiter,
        "elements": elements,
    }


def get_segment_id(content: str) -> str:
    if not content:
        return ""
    index = 0
    while index < len(content) and (content[index].isalnum() or content[index] == "_"):
        index += 1
    return content[:index]


def parse_elements(content: str, segment_id: str, separators: dict[str, str]) -> list[dict[str, Any]]:
    tail = content[len(segment_id) :]
    if not tail or not tail.startswith(separators["data_element_separator"]):
        return []

    payload = tail[1:]
    values = split_unescaped(payload, separators["data_element_separator"], separators["release_character"])
    if payload == "":
        values = [""]

    elements: list[dict[str, Any]] = []
    for element_index, value in enumerate(values, start=1):
        element: dict[str, Any] = {
            "index": element_index,
            "value": value,
        }

        components = split_unescaped(value, separators["component_element_separator"], separators["release_character"])
        if value and len(components) > 1:
            element["components"] = [
                {
                    "index": component_index,
                    "value": component_value,
                }
                for component_index, component_value in enumerate(components, start=1)
            ]

        elements.append(element)

    return elements


def split_unescaped(value: str, delimiter: str, release_character: str) -> list[str]:
    if delimiter == "":
        return [value]

    parts: list[str] = []
    current: list[str] = []
    for index, char in enumerate(value):
        if char == delimiter and is_unescaped(value, index, release_character):
            parts.append("".join(current))
            current = []
        else:
            current.append(char)
    parts.append("".join(current))
    return parts


def is_unescaped(value: str, index: int, release_character: str) -> bool:
    if index <= 0 or not release_character:
        return True
    return value[index - 1] != release_character


def parse_interchange_meta(segment: dict[str, Any]) -> dict[str, Any]:
    elements = segment["elements"]
    meta: dict[str, Any] = {}

    if len(elements) > 1:
        components = elements[1].get("components", [])
        if len(components) > 0:
            meta["senderID"] = components[0]["value"]
        if len(components) > 1:
            meta["senderQualifer"] = components[1]["value"]

    if len(elements) > 2:
        components = elements[2].get("components", [])
        if len(components) > 0:
            meta["receiverID"] = components[0]["value"]
        if len(components) > 1:
            meta["receiverQualifer"] = components[1]["value"]

    if len(elements) > 3:
        components = elements[3].get("components", [])
        if len(components) > 0:
            meta["date"] = components[0]["value"]
        if len(components) > 1:
            meta["time"] = components[1]["value"]

    if len(elements) > 4:
        meta["id"] = elements[4]["value"]

    return meta


def parse_functional_group_meta(segment: dict[str, Any]) -> dict[str, Any]:
    elements = segment["elements"]
    meta: dict[str, Any] = {}

    if len(elements) > 3:
        components = elements[3].get("components", [])
        if len(components) > 0:
            meta["date"] = components[0]["value"]
        if len(components) > 1:
            meta["time"] = components[1]["value"]

    if len(elements) > 4:
        meta["id"] = elements[4]["value"]

    return meta


def parse_transaction_meta(segment: dict[str, Any]) -> dict[str, Any]:
    elements = segment["elements"]
    meta: dict[str, Any] = {}

    if len(elements) > 0:
        meta["id"] = elements[0]["value"]

    if len(elements) > 1:
        components = elements[1].get("components", [])
        if len(components) > 0:
            meta["version"] = components[0]["value"]
        if len(components) > 2:
            release_left = components[1]["value"] if components[1]["value"] is not None else ""
            release_right = components[2]["value"] if components[2]["value"] is not None else ""
            meta["release"] = f"{release_left}{release_right}"

    return meta


def resolve_schema_root(schema_root: str | Path | None) -> Path | None:
    if schema_root:
        candidate = Path(schema_root).expanduser()
        return candidate if candidate.exists() else None

    here = Path(__file__).resolve()
    candidates = [
        here.parent / "schemas" / "edifact",
        here.parent.parent / "src" / "schemas" / "edifact",
        here.parent.parent / "vscode-edi-support" / "src" / "schemas" / "edifact",
        Path.cwd() / "src" / "schemas" / "edifact",
        Path.cwd() / "vscode-edi-support" / "src" / "schemas" / "edifact",
    ]
    for candidate in candidates:
        if candidate.exists():
            return candidate
    return None


def apply_semantic_enrichment(result: dict[str, Any], schema_store: SchemaStore) -> None:
    semantic_info: dict[str, Any] = {
        "enabled": True,
        "schema_root": str(schema_store.schema_root) if schema_store.schema_root else None,
        "warnings": [],
    }
    if schema_store.schema_root is None:
        semantic_info["warnings"].append(
            "Schema root not found. Put EDIFACT schema files under the default schema paths."
        )
    result["semantic_enrichment"] = semantic_info

    annotate_control_segment(result.get("separators_segment"))

    for interchange in result.get("interchanges", []):
        annotate_control_segment(interchange.get("start_segment"))
        annotate_control_segment(interchange.get("end_segment"))

        for group in interchange.get("functional_groups", []):
            annotate_control_segment(group.get("start_segment"))
            annotate_control_segment(group.get("end_segment"))

            for transaction in group.get("transaction_sets", []):
                enrich_transaction_semantics(transaction, schema_store)


def annotate_control_segment(segment: dict[str, Any] | None) -> None:
    if not segment:
        return
    schema = CONTROL_SEGMENT_SCHEMAS.get(segment.get("id", ""))
    if not schema:
        return
    annotate_segment_with_schema(segment, schema=schema, qualifiers=None)


def enrich_transaction_semantics(transaction: dict[str, Any], schema_store: SchemaStore) -> None:
    meta = transaction.get("meta", {})
    release = meta.get("release")
    version = meta.get("version")

    release_schema = schema_store.load_release(release)
    version_doc = schema_store.load_version_doc(release, version)

    tx_semantic: dict[str, Any] = {}
    if release:
        tx_semantic["release"] = release
        tx_semantic["release_schema_loaded"] = release_schema is not None
    if version:
        tx_semantic["document_type"] = version
    if version and release_schema:
        message_type_desc = lookup_qualifier_desc(release_schema, "Message type identifier", version)
        if message_type_desc:
            tx_semantic["document_type_desc"] = message_type_desc
            meta["version_desc"] = message_type_desc
    if release and version:
        tx_semantic["version_schema_found"] = version_doc is not None
    if version_doc:
        tx_semantic["transaction_set"] = {
            "segment_count": len(version_doc.get("TransactionSet", [])),
        }
    if tx_semantic:
        transaction["semantic"] = tx_semantic

    segment_schemas = release_schema.get("Segments", {}) if release_schema else {}
    qualifiers = release_schema.get("Qualifiers", {}) if release_schema else None

    segments: list[dict[str, Any]] = []
    if transaction.get("start_segment"):
        segments.append(transaction["start_segment"])
    segments.extend(transaction.get("segments", []))
    if transaction.get("end_segment"):
        segments.append(transaction["end_segment"])

    for segment in segments:
        segment_schema = segment_schemas.get(segment.get("id", ""))
        fallback_schema = CONTROL_SEGMENT_SCHEMAS.get(segment.get("id", ""))

        if segment_schema:
            segment["is_invalid_segment"] = False
            annotate_segment_with_schema(segment, schema=segment_schema, qualifiers=qualifiers)
        elif fallback_schema:
            annotate_segment_with_schema(segment, schema=fallback_schema, qualifiers=qualifiers)
        elif release_schema is not None:
            segment["is_invalid_segment"] = True


def annotate_segment_with_schema(
    segment: dict[str, Any],
    schema: dict[str, Any],
    qualifiers: dict[str, Any] | None,
) -> None:
    segment["semantic"] = compact_dict(
        {
            "desc": schema.get("Desc"),
            "purpose": schema.get("Purpose"),
        }
    )

    element_schemas = schema.get("Elements") or []
    for element in segment.get("elements", []):
        element_index = element.get("index", 0) - 1
        if element_index < 0 or element_index >= len(element_schemas):
            continue
        element_schema = element_schemas[element_index]
        annotate_element_with_schema(element, element_schema, qualifiers)


def annotate_element_with_schema(
    element: dict[str, Any],
    element_schema: dict[str, Any],
    qualifiers: dict[str, Any] | None,
) -> None:
    qualifier_ref = element_schema.get("QualifierRef")
    qualifier_desc = lookup_qualifier_desc_from_table(
        qualifiers=qualifiers,
        qualifier_ref=qualifier_ref,
        value=element.get("value"),
    )
    element["semantic"] = compact_dict(
        {
            "id": element_schema.get("Id"),
            "name": element_schema.get("Name"),
            "desc": element_schema.get("Desc"),
            "definition": element_schema.get("Definition"),
            "data_type": element_schema.get("DataType"),
            "required": element_schema.get("Required"),
            "min_length": element_schema.get("MinLength"),
            "max_length": element_schema.get("MaxLength"),
            "qualifier_ref": qualifier_ref,
            "qualifier_desc": qualifier_desc,
        }
    )

    component_schemas = element_schema.get("Components") or []
    if component_schemas and "components" not in element and element.get("value") not in (None, ""):
        # Match vscode parser behavior: composite elements can still have a single component
        # even when no component separator appears in source text.
        element["components"] = [{"index": 1, "value": element.get("value")}]

    for component in element.get("components", []):
        component_index = component.get("index", 0) - 1
        if component_index < 0 or component_index >= len(component_schemas):
            continue
        component_schema = component_schemas[component_index]
        annotate_component_with_schema(component, component_schema, qualifiers)


def annotate_component_with_schema(
    component: dict[str, Any],
    component_schema: dict[str, Any],
    qualifiers: dict[str, Any] | None,
) -> None:
    qualifier_ref = component_schema.get("QualifierRef")
    qualifier_desc = lookup_qualifier_desc_from_table(
        qualifiers=qualifiers,
        qualifier_ref=qualifier_ref,
        value=component.get("value"),
    )
    component["semantic"] = compact_dict(
        {
            "id": component_schema.get("Id"),
            "name": component_schema.get("Name"),
            "desc": component_schema.get("Desc"),
            "definition": component_schema.get("Definition"),
            "data_type": component_schema.get("DataType"),
            "required": component_schema.get("Required"),
            "min_length": component_schema.get("MinLength"),
            "max_length": component_schema.get("MaxLength"),
            "qualifier_ref": qualifier_ref,
            "qualifier_desc": qualifier_desc,
        }
    )


def lookup_qualifier_desc(
    release_schema: dict[str, Any],
    qualifier_ref: str,
    value: str | None,
) -> str | None:
    qualifiers = release_schema.get("Qualifiers", {})
    return lookup_qualifier_desc_from_table(qualifiers, qualifier_ref, value)


def lookup_qualifier_desc_from_table(
    qualifiers: dict[str, Any] | None,
    qualifier_ref: str | None,
    value: str | None,
) -> str | None:
    if not qualifiers or not qualifier_ref or value is None:
        return None
    qualifier_map = qualifiers.get(qualifier_ref)
    if not isinstance(qualifier_map, dict):
        return None
    desc = qualifier_map.get(value)
    return desc if isinstance(desc, str) else None


def compact_dict(value: dict[str, Any]) -> dict[str, Any]:
    return {k: v for k, v in value.items() if v is not None}


def is_yaml_scalar(value: Any) -> bool:
    return value is None or isinstance(value, (bool, int, float, str))


def format_yaml_key(key: str) -> str:
    safe = key.replace("_", "").replace("-", "").isalnum()
    if safe and key and not key[0].isspace():
        return key
    return json.dumps(key, ensure_ascii=False)


def format_yaml_scalar(value: Any) -> str:
    if value is None:
        return "null"
    if isinstance(value, bool):
        return "true" if value else "false"
    if isinstance(value, (int, float)):
        return str(value)
    if isinstance(value, str):
        return json.dumps(value, ensure_ascii=False)
    return json.dumps(value, ensure_ascii=False)


def to_yaml(value: Any, indent: int = 0) -> str:
    prefix = " " * indent

    if isinstance(value, dict):
        if not value:
            return f"{prefix}{{}}"
        lines: list[str] = []
        for key, child in value.items():
            rendered_key = format_yaml_key(str(key))
            if is_yaml_scalar(child):
                lines.append(f"{prefix}{rendered_key}: {format_yaml_scalar(child)}")
            else:
                lines.append(f"{prefix}{rendered_key}:")
                lines.append(to_yaml(child, indent + 2))
        return "\n".join(lines)

    if isinstance(value, list):
        if not value:
            return f"{prefix}[]"
        lines = []
        for child in value:
            if is_yaml_scalar(child):
                lines.append(f"{prefix}- {format_yaml_scalar(child)}")
            else:
                lines.append(f"{prefix}-")
                lines.append(to_yaml(child, indent + 2))
        return "\n".join(lines)

    return f"{prefix}{format_yaml_scalar(value)}"


def keep_semantic_and_value(value: Any) -> Any | None:
    if isinstance(value, dict):
        filtered: dict[str, Any] = {}
        for key, child in value.items():
            if key in {"semantic", "value"}:
                if key == "semantic" and isinstance(child, dict):
                    semantic = {
                        k: v
                        for k, v in child.items()
                        if k not in EXCLUDED_SEMANTIC_KEYS and v is not None
                    }
                    if semantic:
                        filtered[key] = semantic
                else:
                    filtered[key] = child
                continue
            if key not in STRUCTURE_KEYS:
                continue
            nested = keep_semantic_and_value(child)
            if nested not in (None, {}, []):
                filtered[key] = nested
        return filtered if filtered else None

    if isinstance(value, list):
        filtered_list = [keep_semantic_and_value(item) for item in value]
        filtered_list = [item for item in filtered_list if item not in (None, {}, [])]
        return filtered_list if filtered_list else None

    return None


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Parse EDIFACT and output YAML (default).")
    parser.add_argument("input", help="Path to EDIFACT input file.")
    parser.add_argument("-o", "--output", help="Output path. Defaults to stdout.")
    parser.add_argument(
        "--json",
        action="store_true",
        help="Output JSON instead of YAML.",
    )
    parser.add_argument(
        "--all",
        action="store_true",
        help="Output full structure (default only keeps semantic and value fields).",
    )
    return parser


def main() -> int:
    args = build_parser().parse_args()

    input_path = Path(args.input)
    document = input_path.read_text(encoding="utf-8")
    parsed = parse_edifact(document)
    if not args.all:
        parsed = keep_semantic_and_value(parsed) or {}

    if args.json:
        out_text = json.dumps(parsed, ensure_ascii=False, indent=2)
    else:
        out_text = to_yaml(parsed)

    if args.output:
        Path(args.output).write_text(out_text + "\n", encoding="utf-8")
    else:
        print(out_text)

    return 0


if __name__ == "__main__":
    raise SystemExit(main())