#!/usr/bin/env python3
"""CLI: parse EDIFACT and output a concise business summary."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
from edifact_parser import parse_edifact
REFERENCE_QUALIFIER_NAMES = {
"ADE": "Account related details",
"CR": "Customer reference number",
"PD": "Promotion deal number",
"VA": "VAT registration number",
}
DATE_QUALIFIER_NAMES = {
"137": "Document/message date",
"63": "Delivery date/time, latest",
"64": "Delivery date/time, earliest",
}
QTY_QUALIFIER_NAMES = {
"21": "Ordered quantity",
}
PRICE_QUALIFIER_NAMES = {
"AAA": "Calculation net",
}
PREFERRED_QTY_QUALIFIERS = ("21", "113", "194", "263")
PREFERRED_PRICE_QUALIFIERS = ("AAA",)
def safe_value(segment: dict[str, Any], element_index: int) -> str | None:
"""Return a scalar element value by 1-based EDIFACT element index."""
elements = segment.get("elements", [])
idx = element_index - 1
if idx < 0 or idx >= len(elements):
return None
value = elements[idx].get("value")
return value if isinstance(value, str) else None
def safe_components(segment: dict[str, Any], element_index: int) -> list[str]:
"""Return all string components of a composite element."""
elements = segment.get("elements", [])
idx = element_index - 1
if idx < 0 or idx >= len(elements):
return []
components = elements[idx].get("components", [])
if not isinstance(components, list):
return []
values: list[str] = []
for component in components:
value = component.get("value")
if isinstance(value, str):
values.append(value)
return values
def component_metadata(segment: dict[str, Any], element_index: int, component_index: int) -> dict[str, Any]:
"""Read raw component metadata (e.g., semantic info) with safe bounds checks."""
elements = segment.get("elements", [])
element_idx = element_index - 1
component_idx = component_index - 1
if element_idx < 0 or element_idx >= len(elements):
return {}
components = elements[element_idx].get("components", [])
if not isinstance(components, list):
return {}
if component_idx < 0 or component_idx >= len(components):
return {}
component = components[component_idx]
return component if isinstance(component, dict) else {}
def qualifier_name(
segment: dict[str, Any],
element_index: int,
code: str,
fallback_names: dict[str, str],
) -> str:
"""Resolve qualifier description from static fallback, then parser semantic metadata."""
name = fallback_names.get(code, "")
if name:
return name
semantic = component_metadata(segment, element_index, 1).get("semantic", {})
if isinstance(semantic, dict):
semantic_name = semantic.get("qualifier_desc")
if isinstance(semantic_name, str):
return semantic_name
return ""
def parse_decimal(value: str | None) -> float | None:
"""Best-effort numeric parse used by amount estimation."""
if value is None or value == "":
return None
try:
return float(value)
except ValueError:
return None
def extract_dates(segments: list[dict[str, Any]]) -> list[dict[str, str]]:
"""Collect DTM date entries as qualifier/value pairs."""
dates: list[dict[str, str]] = []
for segment in segments:
if segment.get("id") != "DTM":
continue
parts = safe_components(segment, 1)
if len(parts) < 2:
continue
qualifier = parts[0]
value = parts[1]
dates.append(
{
"qualifier": qualifier,
"qualifier_name": qualifier_name(segment, 1, qualifier, DATE_QUALIFIER_NAMES),
"value": value,
}
)
return dates
def extract_references(segments: list[dict[str, Any]]) -> list[dict[str, str]]:
"""Collect RFF references used as business keys."""
refs: list[dict[str, str]] = []
for segment in segments:
if segment.get("id") != "RFF":
continue
parts = safe_components(segment, 1)
if not parts:
continue
refs.append(
{
"qualifier": parts[0],
"qualifier_name": qualifier_name(segment, 1, parts[0], REFERENCE_QUALIFIER_NAMES),
"value": ":".join(parts[1:]) if len(parts) > 1 else "",
}
)
return refs
def extract_parties(segments: list[dict[str, Any]]) -> dict[str, dict[str, str]]:
"""Collect NAD party master data keyed by party qualifier (BY/SU/DP...)."""
parties: dict[str, dict[str, str]] = {}
for segment in segments:
if segment.get("id") != "NAD":
continue
qualifier = safe_value(segment, 1)
if not qualifier:
continue
id_components = safe_components(segment, 2)
parties[qualifier] = {
"id": id_components[0] if id_components else "",
"name": safe_value(segment, 4) or "",
"street": safe_value(segment, 5) or "",
"city": safe_value(segment, 6) or "",
"postal_code": safe_value(segment, 8) or "",
"country": safe_value(segment, 9) or "",
}
return parties
def extract_currency(segments: list[dict[str, Any]]) -> str | None:
"""Pick the first document currency from CUX."""
for segment in segments:
if segment.get("id") != "CUX":
continue
parts = safe_components(segment, 1)
if len(parts) > 1:
return parts[1]
return None
def extract_line_items(segments: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Build line-level facts by grouping segments under each LIN anchor."""
line_items: list[dict[str, Any]] = []
current: dict[str, Any] | None = None
for segment in segments:
segment_id = segment.get("id")
if segment_id == "LIN":
if current is not None:
line_items.append(current)
lin_components = safe_components(segment, 3)
current = {
"line_no": safe_value(segment, 1),
"item_id": lin_components[0] if lin_components else None,
"item_id_type": lin_components[1] if len(lin_components) > 1 else None,
"additional_item_ids": [],
"quantities": [],
"prices": [],
}
continue
if current is None:
continue
if segment_id == "PIA":
parts = safe_components(segment, 2)
if parts:
current["additional_item_ids"].append(
{
"id": parts[0],
"id_type": parts[1] if len(parts) > 1 else "",
}
)
continue
if segment_id == "QTY":
parts = safe_components(segment, 1)
if len(parts) >= 2:
current["quantities"].append(
{
"qualifier": parts[0],
"qualifier_name": qualifier_name(segment, 1, parts[0], QTY_QUALIFIER_NAMES),
"value": parts[1],
}
)
continue
if segment_id == "PRI":
parts = safe_components(segment, 1)
if len(parts) >= 2:
current["prices"].append(
{
"qualifier": parts[0],
"qualifier_name": qualifier_name(segment, 1, parts[0], PRICE_QUALIFIER_NAMES),
"value": parts[1],
}
)
continue
if current is not None:
line_items.append(current)
return line_items
def transaction_segments(transaction: dict[str, Any]) -> list[dict[str, Any]]:
"""Flatten one transaction into start + body + end segment sequence."""
segments: list[dict[str, Any]] = []
start_segment = transaction.get("start_segment")
if isinstance(start_segment, dict):
segments.append(start_segment)
segments.extend(transaction.get("segments", []))
end_segment = transaction.get("end_segment")
if isinstance(end_segment, dict):
segments.append(end_segment)
return segments
def pick_measure(entries: list[dict[str, Any]], preferred_qualifiers: tuple[str, ...]) -> dict[str, Any]:
"""Pick the first preferred qualifier; fallback to first available entry."""
for qualifier in preferred_qualifiers:
found = next((entry for entry in entries if entry.get("qualifier") == qualifier), None)
if found:
return found
return entries[0] if entries else {}
def summarize_transaction(transaction: dict[str, Any]) -> dict[str, Any]:
"""Build one concise business summary from a parsed transaction set."""
summary: dict[str, Any] = {
"message_ref": transaction.get("id"),
"message_type": transaction.get("meta", {}).get("version"),
"release": transaction.get("meta", {}).get("release"),
}
segments = transaction_segments(transaction)
dates = extract_dates(segments)
refs = extract_references(segments)
parties = extract_parties(segments)
currency = extract_currency(segments)
line_items = extract_line_items(segments)
# BGM carries the document identity (business doc code + number).
bgm = next((s for s in segments if s.get("id") == "BGM"), None)
if bgm:
summary["document"] = {
"code": safe_components(bgm, 1)[0] if safe_components(bgm, 1) else safe_value(bgm, 1),
"number": safe_value(bgm, 2),
"function_code": safe_value(bgm, 3),
}
if dates:
summary["dates"] = dates
if refs:
summary["references"] = refs
if parties:
summary["parties"] = parties
if currency:
summary["currency"] = currency
if line_items:
summary["line_items"] = line_items
summary["line_item_count"] = len(line_items)
# Lightweight estimate: preferred quantity * preferred price per line.
amount_sum = 0.0
has_amount = False
for item in line_items:
qty_info = pick_measure(item.get("quantities", []), PREFERRED_QTY_QUALIFIERS)
price_info = pick_measure(item.get("prices", []), PREFERRED_PRICE_QUALIFIERS)
qty = qty_info.get("value")
price = price_info.get("value")
qty_num = parse_decimal(qty if isinstance(qty, str) else None)
price_num = parse_decimal(price if isinstance(price, str) else None)
if qty_num is not None and price_num is not None:
amount_sum += qty_num * price_num
has_amount = True
if has_amount:
summary["estimated_total"] = round(amount_sum, 2)
return summary
def build_summary(parsed: dict[str, Any]) -> dict[str, Any]:
"""Aggregate all interchanges/groups/transactions into a summary payload."""
interchanges = parsed.get("interchanges", [])
result: dict[str, Any] = {"interchange_count": len(interchanges), "messages": []}
for interchange in interchanges:
interchange_meta = interchange.get("meta", {})
for group in interchange.get("functional_groups", []):
for transaction in group.get("transaction_sets", []):
tx_summary = summarize_transaction(transaction)
tx_summary["interchange"] = {
# Interchange metadata from UNB header for traceability.
"control_ref": interchange_meta.get("id"),
"sender_id": interchange_meta.get("senderID"),
"receiver_id": interchange_meta.get("receiverID"),
"prepared_date": interchange_meta.get("date"),
"prepared_time": interchange_meta.get("time"),
}
result["messages"].append(tx_summary)
return result
def as_text(value: Any) -> str:
"""Normalize values before table rendering."""
if value is None:
return ""
if isinstance(value, float):
return f"{value:.2f}".rstrip("0").rstrip(".")
return str(value)
def render_table(headers: list[str], rows: list[list[Any]]) -> str:
"""Render a simple ASCII table with dynamic column widths."""
text_rows = [[as_text(cell) for cell in row] for row in rows]
widths = [len(h) for h in headers]
for row in text_rows:
for idx, cell in enumerate(row):
widths[idx] = max(widths[idx], len(cell))
border = "+" + "+".join("-" * (w + 2) for w in widths) + "+"
header = "| " + " | ".join(headers[idx].ljust(widths[idx]) for idx in range(len(headers))) + " |"
body = ["| " + " | ".join(row[idx].ljust(widths[idx]) for idx in range(len(headers))) + " |" for row in text_rows]
lines = [border, header, border]
lines.extend(body if body else ["| " + " | ".join("".ljust(w) for w in widths) + " |"])
lines.append(border)
return "\n".join(lines)
def render_message_tables(message: dict[str, Any], index: int) -> str:
"""Render one message summary as multiple readable table blocks."""
blocks: list[str] = [f"Message {index}"]
header_rows = [
["message_ref", message.get("message_ref")],
["message_type", message.get("message_type")],
["release", message.get("release")],
["doc_code", message.get("document", {}).get("code")],
["doc_number", message.get("document", {}).get("number")],
["doc_function", message.get("document", {}).get("function_code")],
["currency", message.get("currency")],
["line_item_count", message.get("line_item_count")],
["estimated_total", message.get("estimated_total")],
["sender_id", message.get("interchange", {}).get("sender_id")],
["receiver_id", message.get("interchange", {}).get("receiver_id")],
["prepared_date", message.get("interchange", {}).get("prepared_date")],
]
blocks.append(render_table(["field", "value"], header_rows))
references = message.get("references", [])
if references:
ref_rows = [[ref.get("qualifier"), ref.get("qualifier_name"), ref.get("value")] for ref in references]
blocks.append("References")
blocks.append(render_table(["qualifier", "name", "value"], ref_rows))
parties = message.get("parties", {})
if parties:
party_rows = []
for role, info in parties.items():
party_rows.append([role, info.get("id"), info.get("name"), info.get("city"), info.get("country")])
blocks.append("Parties")
blocks.append(render_table(["role", "id", "name", "city", "country"], party_rows))
dates = message.get("dates", [])
if dates:
date_rows = [[d.get("qualifier"), d.get("qualifier_name"), d.get("value")] for d in dates]
blocks.append("Dates")
blocks.append(render_table(["qualifier", "name", "value"], date_rows))
line_items = message.get("line_items", [])
if line_items:
line_rows: list[list[Any]] = []
for item in line_items:
qty_info = pick_measure(item.get("quantities", []), PREFERRED_QTY_QUALIFIERS)
price_info = pick_measure(item.get("prices", []), PREFERRED_PRICE_QUALIFIERS)
qty = qty_info.get("value", "")
price = price_info.get("value", "")
qty_num = parse_decimal(qty if isinstance(qty, str) else None)
price_num = parse_decimal(price if isinstance(price, str) else None)
amount: float | None = None
if qty_num is not None and price_num is not None:
amount = round(qty_num * price_num, 2)
line_rows.append(
[
item.get("line_no"),
item.get("item_id"),
qty_info.get("qualifier"),
qty_info.get("qualifier_name"),
qty,
price_info.get("qualifier"),
price_info.get("qualifier_name"),
price,
amount,
]
)
blocks.append("Line Items")
blocks.append(
render_table(
["line", "item_id", "qty_q", "qty_name", "qty", "pri_q", "pri_name", "price", "amount"],
line_rows,
)
)
return "\n\n".join(blocks)
def render_summary_table(summary: dict[str, Any]) -> str:
"""Render all message summaries into one plain-text report."""
messages = summary.get("messages", [])
if not messages:
return "No messages found."
blocks = [render_message_tables(message, idx) for idx, message in enumerate(messages, start=1)]
return "\n\n".join(blocks)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Parse EDIFACT and output summary.")
parser.add_argument("input", help="Path to EDIFACT input file.")
parser.add_argument("-o", "--output", help="Output path. Defaults to stdout.")
parser.add_argument("--json", action="store_true", help="Output JSON instead of table text.")
return parser
def main() -> int:
"""CLI entry point: parse EDIFACT, build summary, write JSON or table text."""
args = build_parser().parse_args()
document = Path(args.input).read_text(encoding="utf-8")
parsed = parse_edifact(document)
summary = build_summary(parsed)
if args.json:
out_text = json.dumps(summary, ensure_ascii=False, indent=2)
else:
out_text = render_summary_table(summary)
if args.output:
Path(args.output).write_text(out_text + "\n", encoding="utf-8")
else:
print(out_text)
return 0
if __name__ == "__main__":
raise SystemExit(main())