root / edi
Newer
Older
edi / edifact_summary.py
@smdhz smdhz 5 days ago 15 KB RECADV
#!/usr/bin/env python3
"""CLI: parse EDIFACT and output a concise business summary."""

from __future__ import annotations

import argparse
import json
from pathlib import Path
from typing import Any

from edifact_parser import parse_edifact

REFERENCE_QUALIFIER_NAMES = {
    "ADE": "Account related details",
    "CR": "Customer reference number",
    "PD": "Promotion deal number",
    "VA": "VAT registration number",
}

DATE_QUALIFIER_NAMES = {
    "137": "Document/message date",
    "63": "Delivery date/time, latest",
    "64": "Delivery date/time, earliest",
}

QTY_QUALIFIER_NAMES = {
    "21": "Ordered quantity",
}

PRICE_QUALIFIER_NAMES = {
    "AAA": "Calculation net",
}

PREFERRED_QTY_QUALIFIERS = ("21", "113", "194", "263")
PREFERRED_PRICE_QUALIFIERS = ("AAA",)


def safe_value(segment: dict[str, Any], element_index: int) -> str | None:
    elements = segment.get("elements", [])
    idx = element_index - 1
    if idx < 0 or idx >= len(elements):
        return None
    value = elements[idx].get("value")
    return value if isinstance(value, str) else None


def safe_components(segment: dict[str, Any], element_index: int) -> list[str]:
    elements = segment.get("elements", [])
    idx = element_index - 1
    if idx < 0 or idx >= len(elements):
        return []
    components = elements[idx].get("components", [])
    if not isinstance(components, list):
        return []
    values: list[str] = []
    for component in components:
        value = component.get("value")
        if isinstance(value, str):
            values.append(value)
    return values


def component_metadata(segment: dict[str, Any], element_index: int, component_index: int) -> dict[str, Any]:
    elements = segment.get("elements", [])
    element_idx = element_index - 1
    component_idx = component_index - 1
    if element_idx < 0 or element_idx >= len(elements):
        return {}
    components = elements[element_idx].get("components", [])
    if not isinstance(components, list):
        return {}
    if component_idx < 0 or component_idx >= len(components):
        return {}
    component = components[component_idx]
    return component if isinstance(component, dict) else {}


def qualifier_name(
    segment: dict[str, Any],
    element_index: int,
    code: str,
    fallback_names: dict[str, str],
) -> str:
    name = fallback_names.get(code, "")
    if name:
        return name
    semantic = component_metadata(segment, element_index, 1).get("semantic", {})
    if isinstance(semantic, dict):
        semantic_name = semantic.get("qualifier_desc")
        if isinstance(semantic_name, str):
            return semantic_name
    return ""


def parse_decimal(value: str | None) -> float | None:
    if value is None or value == "":
        return None
    try:
        return float(value)
    except ValueError:
        return None


def extract_dates(segments: list[dict[str, Any]]) -> list[dict[str, str]]:
    dates: list[dict[str, str]] = []
    for segment in segments:
        if segment.get("id") != "DTM":
            continue
        parts = safe_components(segment, 1)
        if len(parts) < 2:
            continue
        qualifier = parts[0]
        value = parts[1]
        dates.append(
            {
                "qualifier": qualifier,
                "qualifier_name": qualifier_name(segment, 1, qualifier, DATE_QUALIFIER_NAMES),
                "value": value,
            }
        )
    return dates


def extract_references(segments: list[dict[str, Any]]) -> list[dict[str, str]]:
    refs: list[dict[str, str]] = []
    for segment in segments:
        if segment.get("id") != "RFF":
            continue
        parts = safe_components(segment, 1)
        if not parts:
            continue
        refs.append(
            {
                "qualifier": parts[0],
                "qualifier_name": qualifier_name(segment, 1, parts[0], REFERENCE_QUALIFIER_NAMES),
                "value": ":".join(parts[1:]) if len(parts) > 1 else "",
            }
        )
    return refs


def extract_parties(segments: list[dict[str, Any]]) -> dict[str, dict[str, str]]:
    parties: dict[str, dict[str, str]] = {}
    for segment in segments:
        if segment.get("id") != "NAD":
            continue
        qualifier = safe_value(segment, 1)
        if not qualifier:
            continue
        id_components = safe_components(segment, 2)
        parties[qualifier] = {
            "id": id_components[0] if id_components else "",
            "name": safe_value(segment, 4) or "",
            "street": safe_value(segment, 5) or "",
            "city": safe_value(segment, 6) or "",
            "postal_code": safe_value(segment, 8) or "",
            "country": safe_value(segment, 9) or "",
        }
    return parties


def extract_currency(segments: list[dict[str, Any]]) -> str | None:
    for segment in segments:
        if segment.get("id") != "CUX":
            continue
        parts = safe_components(segment, 1)
        if len(parts) > 1:
            return parts[1]
    return None


def extract_line_items(segments: list[dict[str, Any]]) -> list[dict[str, Any]]:
    line_items: list[dict[str, Any]] = []
    current: dict[str, Any] | None = None

    for segment in segments:
        segment_id = segment.get("id")

        if segment_id == "LIN":
            if current is not None:
                line_items.append(current)
            lin_components = safe_components(segment, 3)
            current = {
                "line_no": safe_value(segment, 1),
                "item_id": lin_components[0] if lin_components else None,
                "item_id_type": lin_components[1] if len(lin_components) > 1 else None,
                "additional_item_ids": [],
                "quantities": [],
                "prices": [],
            }
            continue

        if current is None:
            continue

        if segment_id == "PIA":
            parts = safe_components(segment, 2)
            if parts:
                current["additional_item_ids"].append(
                    {
                        "id": parts[0],
                        "id_type": parts[1] if len(parts) > 1 else "",
                    }
                )
            continue

        if segment_id == "QTY":
            parts = safe_components(segment, 1)
            if len(parts) >= 2:
                current["quantities"].append(
                    {
                        "qualifier": parts[0],
                        "qualifier_name": qualifier_name(segment, 1, parts[0], QTY_QUALIFIER_NAMES),
                        "value": parts[1],
                    }
                )
            continue

        if segment_id == "PRI":
            parts = safe_components(segment, 1)
            if len(parts) >= 2:
                current["prices"].append(
                    {
                        "qualifier": parts[0],
                        "qualifier_name": qualifier_name(segment, 1, parts[0], PRICE_QUALIFIER_NAMES),
                        "value": parts[1],
                    }
                )
            continue

    if current is not None:
        line_items.append(current)

    return line_items


def transaction_segments(transaction: dict[str, Any]) -> list[dict[str, Any]]:
    segments: list[dict[str, Any]] = []
    start_segment = transaction.get("start_segment")
    if isinstance(start_segment, dict):
        segments.append(start_segment)
    segments.extend(transaction.get("segments", []))
    end_segment = transaction.get("end_segment")
    if isinstance(end_segment, dict):
        segments.append(end_segment)
    return segments


def pick_measure(entries: list[dict[str, Any]], preferred_qualifiers: tuple[str, ...]) -> dict[str, Any]:
    for qualifier in preferred_qualifiers:
        found = next((entry for entry in entries if entry.get("qualifier") == qualifier), None)
        if found:
            return found
    return entries[0] if entries else {}


def summarize_transaction(transaction: dict[str, Any]) -> dict[str, Any]:
    summary: dict[str, Any] = {
        "message_ref": transaction.get("id"),
        "message_type": transaction.get("meta", {}).get("version"),
        "release": transaction.get("meta", {}).get("release"),
    }

    segments = transaction_segments(transaction)
    dates = extract_dates(segments)
    refs = extract_references(segments)
    parties = extract_parties(segments)
    currency = extract_currency(segments)
    line_items = extract_line_items(segments)

    bgm = next((s for s in segments if s.get("id") == "BGM"), None)
    if bgm:
        summary["document"] = {
            "code": safe_components(bgm, 1)[0] if safe_components(bgm, 1) else safe_value(bgm, 1),
            "number": safe_value(bgm, 2),
            "function_code": safe_value(bgm, 3),
        }

    if dates:
        summary["dates"] = dates
    if refs:
        summary["references"] = refs
    if parties:
        summary["parties"] = parties
    if currency:
        summary["currency"] = currency
    if line_items:
        summary["line_items"] = line_items
        summary["line_item_count"] = len(line_items)

        amount_sum = 0.0
        has_amount = False
        for item in line_items:
            qty_info = pick_measure(item.get("quantities", []), PREFERRED_QTY_QUALIFIERS)
            price_info = pick_measure(item.get("prices", []), PREFERRED_PRICE_QUALIFIERS)
            qty = qty_info.get("value")
            price = price_info.get("value")
            qty_num = parse_decimal(qty if isinstance(qty, str) else None)
            price_num = parse_decimal(price if isinstance(price, str) else None)
            if qty_num is not None and price_num is not None:
                amount_sum += qty_num * price_num
                has_amount = True
        if has_amount:
            summary["estimated_total"] = round(amount_sum, 2)

    return summary


def build_summary(parsed: dict[str, Any]) -> dict[str, Any]:
    interchanges = parsed.get("interchanges", [])
    result: dict[str, Any] = {"interchange_count": len(interchanges), "messages": []}

    for interchange in interchanges:
        interchange_meta = interchange.get("meta", {})
        for group in interchange.get("functional_groups", []):
            for transaction in group.get("transaction_sets", []):
                tx_summary = summarize_transaction(transaction)
                tx_summary["interchange"] = {
                    "control_ref": interchange_meta.get("id"),
                    "sender_id": interchange_meta.get("senderID"),
                    "receiver_id": interchange_meta.get("receiverID"),
                    "prepared_date": interchange_meta.get("date"),
                    "prepared_time": interchange_meta.get("time"),
                }
                result["messages"].append(tx_summary)

    return result


def as_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, float):
        return f"{value:.2f}".rstrip("0").rstrip(".")
    return str(value)


def render_table(headers: list[str], rows: list[list[Any]]) -> str:
    text_rows = [[as_text(cell) for cell in row] for row in rows]
    widths = [len(h) for h in headers]
    for row in text_rows:
        for idx, cell in enumerate(row):
            widths[idx] = max(widths[idx], len(cell))

    border = "+" + "+".join("-" * (w + 2) for w in widths) + "+"
    header = "| " + " | ".join(headers[idx].ljust(widths[idx]) for idx in range(len(headers))) + " |"
    body = ["| " + " | ".join(row[idx].ljust(widths[idx]) for idx in range(len(headers))) + " |" for row in text_rows]
    lines = [border, header, border]
    lines.extend(body if body else ["| " + " | ".join("".ljust(w) for w in widths) + " |"])
    lines.append(border)
    return "\n".join(lines)


def render_message_tables(message: dict[str, Any], index: int) -> str:
    blocks: list[str] = [f"Message {index}"]

    header_rows = [
        ["message_ref", message.get("message_ref")],
        ["message_type", message.get("message_type")],
        ["release", message.get("release")],
        ["doc_code", message.get("document", {}).get("code")],
        ["doc_number", message.get("document", {}).get("number")],
        ["doc_function", message.get("document", {}).get("function_code")],
        ["currency", message.get("currency")],
        ["line_item_count", message.get("line_item_count")],
        ["estimated_total", message.get("estimated_total")],
        ["sender_id", message.get("interchange", {}).get("sender_id")],
        ["receiver_id", message.get("interchange", {}).get("receiver_id")],
        ["prepared_date", message.get("interchange", {}).get("prepared_date")],
    ]
    blocks.append(render_table(["field", "value"], header_rows))

    references = message.get("references", [])
    if references:
        ref_rows = [[ref.get("qualifier"), ref.get("qualifier_name"), ref.get("value")] for ref in references]
        blocks.append("References")
        blocks.append(render_table(["qualifier", "name", "value"], ref_rows))

    parties = message.get("parties", {})
    if parties:
        party_rows = []
        for role, info in parties.items():
            party_rows.append([role, info.get("id"), info.get("name"), info.get("city"), info.get("country")])
        blocks.append("Parties")
        blocks.append(render_table(["role", "id", "name", "city", "country"], party_rows))

    dates = message.get("dates", [])
    if dates:
        date_rows = [[d.get("qualifier"), d.get("qualifier_name"), d.get("value")] for d in dates]
        blocks.append("Dates")
        blocks.append(render_table(["qualifier", "name", "value"], date_rows))

    line_items = message.get("line_items", [])
    if line_items:
        line_rows: list[list[Any]] = []
        for item in line_items:
            qty_info = pick_measure(item.get("quantities", []), PREFERRED_QTY_QUALIFIERS)
            price_info = pick_measure(item.get("prices", []), PREFERRED_PRICE_QUALIFIERS)
            qty = qty_info.get("value", "")
            price = price_info.get("value", "")
            qty_num = parse_decimal(qty if isinstance(qty, str) else None)
            price_num = parse_decimal(price if isinstance(price, str) else None)
            amount: float | None = None
            if qty_num is not None and price_num is not None:
                amount = round(qty_num * price_num, 2)
            line_rows.append(
                [
                    item.get("line_no"),
                    item.get("item_id"),
                    qty_info.get("qualifier"),
                    qty_info.get("qualifier_name"),
                    qty,
                    price_info.get("qualifier"),
                    price_info.get("qualifier_name"),
                    price,
                    amount,
                ]
            )
        blocks.append("Line Items")
        blocks.append(
            render_table(
                ["line", "item_id", "qty_q", "qty_name", "qty", "pri_q", "pri_name", "price", "amount"],
                line_rows,
            )
        )

    return "\n\n".join(blocks)


def render_summary_table(summary: dict[str, Any]) -> str:
    messages = summary.get("messages", [])
    if not messages:
        return "No messages found."
    blocks = [render_message_tables(message, idx) for idx, message in enumerate(messages, start=1)]
    return "\n\n".join(blocks)


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Parse EDIFACT and output summary.")
    parser.add_argument("input", help="Path to EDIFACT input file.")
    parser.add_argument("-o", "--output", help="Output path. Defaults to stdout.")
    parser.add_argument("--json", action="store_true", help="Output JSON instead of table text.")
    return parser


def main() -> int:
    args = build_parser().parse_args()

    document = Path(args.input).read_text(encoding="utf-8")
    parsed = parse_edifact(document)
    summary = build_summary(parsed)

    if args.json:
        out_text = json.dumps(summary, ensure_ascii=False, indent=2)
    else:
        out_text = render_summary_table(summary)

    if args.output:
        Path(args.output).write_text(out_text + "\n", encoding="utf-8")
    else:
        print(out_text)

    return 0


if __name__ == "__main__":
    raise SystemExit(main())