#!/usr/bin/env python3
"""Standalone EDIFACT parser extracted from vscode-edi-support behavior."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
DEFAULT_SEPARATORS = {
"segment_separator": "'",
"data_element_separator": "+",
"component_element_separator": ":",
"release_character": "?",
}
CONTROL_SEGMENT_SCHEMAS: dict[str, dict[str, Any]] = {
"UNA": {
"Desc": "Delimiter String Advice",
"Purpose": "To define separators used in the interchange.",
"Elements": [
{"Id": "UNA01", "Desc": "Component data element separator"},
{"Id": "UNA02", "Desc": "Data element separator"},
{"Id": "UNA03", "Desc": "Decimal notification"},
{"Id": "UNA04", "Desc": "Release indicator"},
{"Id": "UNA05", "Desc": "Reserved"},
],
},
"UNB": {
"Desc": "Interchange header",
"Purpose": "To start, identify and specify an interchange.",
"Elements": [
{"Id": "S001", "Desc": "Syntax identifier"},
{"Id": "S002", "Desc": "Interchange sender"},
{"Id": "S003", "Desc": "Interchange recipient"},
{"Id": "S004", "Desc": "Date/time of preparation"},
{"Id": "0020", "Desc": "Interchange control reference"},
{"Id": "S005", "Desc": "Recipient's reference/password"},
{"Id": "0026", "Desc": "Application reference"},
{"Id": "0029", "Desc": "Processing priority code"},
{"Id": "0031", "Desc": "Acknowledgement request"},
{"Id": "0032", "Desc": "Communications agreement ID"},
{"Id": "0035", "Desc": "Test indicator"},
],
},
"UNG": {
"Desc": "Functional group header",
"Purpose": "To head, identify and specify a functional group.",
"Elements": [
{"Id": "0038", "Desc": "Message group identification"},
{"Id": "S006", "Desc": "Application sender identification"},
{"Id": "S007", "Desc": "Application recipient identification"},
{"Id": "S004", "Desc": "Date/time of preparation"},
{"Id": "0048", "Desc": "Group reference number"},
{"Id": "0051", "Desc": "Controlling agency"},
{"Id": "S008", "Desc": "Message version"},
{"Id": "0058", "Desc": "Application password"},
],
},
"UNE": {
"Desc": "Functional group trailer",
"Purpose": "To end and check the completeness of a group.",
"Elements": [
{"Id": "0060", "Desc": "Group control count"},
{"Id": "0048", "Desc": "Group reference number"},
],
},
"UNZ": {
"Desc": "Interchange trailer",
"Purpose": "To end and check the completeness of an interchange.",
"Elements": [
{"Id": "0036", "Desc": "Interchange control count"},
{"Id": "0020", "Desc": "Interchange control reference"},
],
},
}
class SchemaStore:
def __init__(self, schema_root: str | Path | None) -> None:
self.schema_root = resolve_schema_root(schema_root)
self._release_cache: dict[str, dict[str, Any] | None] = {}
self._version_doc_cache: dict[tuple[str, str], dict[str, Any] | None] = {}
def load_release(self, release: str | None) -> dict[str, Any] | None:
if not release:
return None
if release in self._release_cache:
return self._release_cache[release]
if not self.schema_root:
self._release_cache[release] = None
return None
schema_path = self.schema_root / release / f"{release}.json"
if not schema_path.exists():
self._release_cache[release] = None
return None
try:
with schema_path.open(encoding="utf-8") as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
raw = None
self._release_cache[release] = raw
return raw
def load_version_doc(self, release: str | None, version: str | None) -> dict[str, Any] | None:
if not release or not version:
return None
cache_key = (release, version)
if cache_key in self._version_doc_cache:
return self._version_doc_cache[cache_key]
if not self.schema_root:
self._version_doc_cache[cache_key] = None
return None
versions_path = self.schema_root / release / f"{release}_versions.json"
if not versions_path.exists():
self._version_doc_cache[cache_key] = None
return None
try:
with versions_path.open(encoding="utf-8") as f:
raw_versions = json.load(f)
except (OSError, json.JSONDecodeError):
self._version_doc_cache[cache_key] = None
return None
key = f"{release}_{version}"
document_types = raw_versions.get("DocumentTypes", {})
version_doc = document_types.get(key)
self._version_doc_cache[cache_key] = version_doc
return version_doc
def parse_edifact(
document: str,
schema_root: str | Path | None = None,
semantic: bool = True,
) -> dict[str, Any]:
separators = parse_separators(document)
segment_slices = split_segments(document, separators["segment_separator"], separators["release_character"])
parsed_segments: list[dict[str, Any]] = []
for raw_segment, start_index in segment_slices:
segment = parse_segment(raw_segment, start_index, separators)
if segment is not None:
parsed_segments.append(segment)
result: dict[str, Any] = {
"separators": separators,
"separators_segment": None,
"interchanges": [],
"all_segments": parsed_segments,
}
active_interchange: dict[str, Any] | None = None
active_group: dict[str, Any] | None = None
active_transaction: dict[str, Any] | None = None
def new_interchange(meta: dict[str, Any], start_segment: dict[str, Any] | None) -> dict[str, Any]:
return {
"meta": meta,
"id": meta.get("id"),
"start_segment": start_segment,
"end_segment": None,
"functional_groups": [],
}
def new_group(meta: dict[str, Any], start_segment: dict[str, Any] | None) -> dict[str, Any]:
return {
"meta": meta,
"id": meta.get("id"),
"is_fake": start_segment is None,
"start_segment": start_segment,
"end_segment": None,
"transaction_sets": [],
}
def new_transaction(meta: dict[str, Any], start_segment: dict[str, Any] | None) -> dict[str, Any]:
return {
"meta": meta,
"id": meta.get("id"),
"start_segment": start_segment,
"end_segment": None,
"segments": [],
}
def ensure_interchange() -> dict[str, Any]:
nonlocal active_interchange
if active_interchange is None:
active_interchange = new_interchange({}, None)
result["interchanges"].append(active_interchange)
return active_interchange
def ensure_group() -> dict[str, Any]:
nonlocal active_group
interchange = ensure_interchange()
if active_group is None:
active_group = new_group({}, None)
interchange["functional_groups"].append(active_group)
return active_group
def ensure_transaction() -> dict[str, Any]:
nonlocal active_transaction
group = ensure_group()
if active_transaction is None:
active_transaction = new_transaction({}, None)
group["transaction_sets"].append(active_transaction)
return active_transaction
for segment in parsed_segments:
segment_id = segment["id"]
if segment_id == "UNA":
result["separators_segment"] = segment
continue
if segment_id == "UNB":
interchange = new_interchange(parse_interchange_meta(segment), segment)
result["interchanges"].append(interchange)
active_interchange = interchange
active_group = None
active_transaction = None
continue
if segment_id == "UNZ":
interchange = ensure_interchange()
interchange["end_segment"] = segment
active_interchange = None
active_group = None
active_transaction = None
continue
if segment_id == "UNG":
interchange = ensure_interchange()
group = new_group(parse_functional_group_meta(segment), segment)
interchange["functional_groups"].append(group)
active_group = group
active_transaction = None
continue
if segment_id == "UNE":
group = ensure_group()
group["end_segment"] = segment
active_group = None
active_transaction = None
continue
if segment_id == "UNH":
group = ensure_group()
transaction = new_transaction(parse_transaction_meta(segment), segment)
group["transaction_sets"].append(transaction)
active_transaction = transaction
continue
if segment_id == "UNT":
transaction = ensure_transaction()
transaction["end_segment"] = segment
active_transaction = None
continue
ensure_transaction()["segments"].append(segment)
if semantic:
schema_store = SchemaStore(schema_root)
apply_semantic_enrichment(result, schema_store)
return result
def parse_separators(document: str) -> dict[str, str]:
separators = dict(DEFAULT_SEPARATORS)
trimmed = document.strip()
if not trimmed or not trimmed.startswith("UNA"):
return separators
una_string = trimmed.split("UNB", 1)[0] if "UNB" in trimmed else trimmed
if len(una_string) > 8:
separators["segment_separator"] = una_string[8]
if len(una_string) > 4:
separators["data_element_separator"] = una_string[4]
if len(una_string) > 3:
separators["component_element_separator"] = una_string[3]
if len(una_string) > 6:
separators["release_character"] = una_string[6]
return separators
def split_segments(document: str, segment_separator: str, release_character: str) -> list[tuple[str, int]]:
if not document:
return []
result: list[tuple[str, int]] = []
start = 0
for index, char in enumerate(document):
if char == segment_separator and is_unescaped(document, index, release_character):
result.append((document[start : index + 1], start))
start = index + 1
if start < len(document):
result.append((document[start:], start))
return result
def parse_segment(
raw_segment: str,
start_index: int,
separators: dict[str, str],
) -> dict[str, Any] | None:
if not raw_segment.strip():
return None
left_trimmed = raw_segment.lstrip()
start_index += len(raw_segment) - len(left_trimmed)
if not left_trimmed:
return None
segment_str = left_trimmed.rstrip()
if not segment_str:
return None
segment_separator = separators["segment_separator"]
ending_delimiter = segment_separator if segment_str.endswith(segment_separator) else ""
content = segment_str[:-1] if ending_delimiter else segment_str
segment_id = get_segment_id(content)
if not segment_id:
return None
end_index = start_index + len(segment_str) - len(ending_delimiter) - 1
elements: list[dict[str, Any]]
if segment_id == "UNA" and len(segment_str) == 9:
elements = [
{
"index": index + 1,
"value": segment_str[index + 3],
}
for index in range(5)
]
else:
elements = parse_elements(content, segment_id, separators)
return {
"id": segment_id,
"raw": segment_str,
"start_index": start_index,
"end_index": end_index,
"length": len(segment_str),
"ending_delimiter": ending_delimiter,
"elements": elements,
}
def get_segment_id(content: str) -> str:
if not content:
return ""
index = 0
while index < len(content) and (content[index].isalnum() or content[index] == "_"):
index += 1
return content[:index]
def parse_elements(content: str, segment_id: str, separators: dict[str, str]) -> list[dict[str, Any]]:
tail = content[len(segment_id) :]
if not tail or not tail.startswith(separators["data_element_separator"]):
return []
payload = tail[1:]
values = split_unescaped(payload, separators["data_element_separator"], separators["release_character"])
if payload == "":
values = [""]
elements: list[dict[str, Any]] = []
for element_index, value in enumerate(values, start=1):
element: dict[str, Any] = {
"index": element_index,
"value": value,
}
components = split_unescaped(value, separators["component_element_separator"], separators["release_character"])
if value and len(components) > 1:
element["components"] = [
{
"index": component_index,
"value": component_value,
}
for component_index, component_value in enumerate(components, start=1)
]
elements.append(element)
return elements
def split_unescaped(value: str, delimiter: str, release_character: str) -> list[str]:
if delimiter == "":
return [value]
parts: list[str] = []
current: list[str] = []
for index, char in enumerate(value):
if char == delimiter and is_unescaped(value, index, release_character):
parts.append("".join(current))
current = []
else:
current.append(char)
parts.append("".join(current))
return parts
def is_unescaped(value: str, index: int, release_character: str) -> bool:
if index <= 0 or not release_character:
return True
return value[index - 1] != release_character
def parse_interchange_meta(segment: dict[str, Any]) -> dict[str, Any]:
elements = segment["elements"]
meta: dict[str, Any] = {}
if len(elements) > 1:
components = elements[1].get("components", [])
if len(components) > 0:
meta["senderID"] = components[0]["value"]
if len(components) > 1:
meta["senderQualifer"] = components[1]["value"]
if len(elements) > 2:
components = elements[2].get("components", [])
if len(components) > 0:
meta["receiverID"] = components[0]["value"]
if len(components) > 1:
meta["receiverQualifer"] = components[1]["value"]
if len(elements) > 3:
components = elements[3].get("components", [])
if len(components) > 0:
meta["date"] = components[0]["value"]
if len(components) > 1:
meta["time"] = components[1]["value"]
if len(elements) > 4:
meta["id"] = elements[4]["value"]
return meta
def parse_functional_group_meta(segment: dict[str, Any]) -> dict[str, Any]:
elements = segment["elements"]
meta: dict[str, Any] = {}
if len(elements) > 3:
components = elements[3].get("components", [])
if len(components) > 0:
meta["date"] = components[0]["value"]
if len(components) > 1:
meta["time"] = components[1]["value"]
if len(elements) > 4:
meta["id"] = elements[4]["value"]
return meta
def parse_transaction_meta(segment: dict[str, Any]) -> dict[str, Any]:
elements = segment["elements"]
meta: dict[str, Any] = {}
if len(elements) > 0:
meta["id"] = elements[0]["value"]
if len(elements) > 1:
components = elements[1].get("components", [])
if len(components) > 0:
meta["version"] = components[0]["value"]
if len(components) > 2:
release_left = components[1]["value"] if components[1]["value"] is not None else ""
release_right = components[2]["value"] if components[2]["value"] is not None else ""
meta["release"] = f"{release_left}{release_right}"
return meta
def resolve_schema_root(schema_root: str | Path | None) -> Path | None:
if schema_root:
candidate = Path(schema_root).expanduser()
return candidate if candidate.exists() else None
here = Path(__file__).resolve()
candidates = [
here.parent / "schemas" / "edifact",
here.parent.parent / "src" / "schemas" / "edifact",
here.parent.parent / "vscode-edi-support" / "src" / "schemas" / "edifact",
Path.cwd() / "src" / "schemas" / "edifact",
Path.cwd() / "vscode-edi-support" / "src" / "schemas" / "edifact",
]
for candidate in candidates:
if candidate.exists():
return candidate
return None
def apply_semantic_enrichment(result: dict[str, Any], schema_store: SchemaStore) -> None:
semantic_info: dict[str, Any] = {
"enabled": True,
"schema_root": str(schema_store.schema_root) if schema_store.schema_root else None,
"warnings": [],
}
if schema_store.schema_root is None:
semantic_info["warnings"].append(
"Schema root not found. Put EDIFACT schema files under the default schema paths."
)
result["semantic_enrichment"] = semantic_info
annotate_control_segment(result.get("separators_segment"))
for interchange in result.get("interchanges", []):
annotate_control_segment(interchange.get("start_segment"))
annotate_control_segment(interchange.get("end_segment"))
for group in interchange.get("functional_groups", []):
annotate_control_segment(group.get("start_segment"))
annotate_control_segment(group.get("end_segment"))
for transaction in group.get("transaction_sets", []):
enrich_transaction_semantics(transaction, schema_store)
def annotate_control_segment(segment: dict[str, Any] | None) -> None:
if not segment:
return
schema = CONTROL_SEGMENT_SCHEMAS.get(segment.get("id", ""))
if not schema:
return
annotate_segment_with_schema(segment, schema=schema, qualifiers=None)
def enrich_transaction_semantics(transaction: dict[str, Any], schema_store: SchemaStore) -> None:
meta = transaction.get("meta", {})
release = meta.get("release")
version = meta.get("version")
release_schema = schema_store.load_release(release)
version_doc = schema_store.load_version_doc(release, version)
tx_semantic: dict[str, Any] = {}
if release:
tx_semantic["release"] = release
tx_semantic["release_schema_loaded"] = release_schema is not None
if version:
tx_semantic["document_type"] = version
if version and release_schema:
message_type_desc = lookup_qualifier_desc(release_schema, "Message type identifier", version)
if message_type_desc:
tx_semantic["document_type_desc"] = message_type_desc
meta["version_desc"] = message_type_desc
if release and version:
tx_semantic["version_schema_found"] = version_doc is not None
if version_doc:
tx_semantic["transaction_set"] = {
"segment_count": len(version_doc.get("TransactionSet", [])),
}
if tx_semantic:
transaction["semantic"] = tx_semantic
segment_schemas = release_schema.get("Segments", {}) if release_schema else {}
qualifiers = release_schema.get("Qualifiers", {}) if release_schema else None
segments: list[dict[str, Any]] = []
if transaction.get("start_segment"):
segments.append(transaction["start_segment"])
segments.extend(transaction.get("segments", []))
if transaction.get("end_segment"):
segments.append(transaction["end_segment"])
for segment in segments:
segment_schema = segment_schemas.get(segment.get("id", ""))
fallback_schema = CONTROL_SEGMENT_SCHEMAS.get(segment.get("id", ""))
if segment_schema:
segment["is_invalid_segment"] = False
annotate_segment_with_schema(segment, schema=segment_schema, qualifiers=qualifiers)
elif fallback_schema:
annotate_segment_with_schema(segment, schema=fallback_schema, qualifiers=qualifiers)
elif release_schema is not None:
segment["is_invalid_segment"] = True
def annotate_segment_with_schema(
segment: dict[str, Any],
schema: dict[str, Any],
qualifiers: dict[str, Any] | None,
) -> None:
segment["semantic"] = compact_dict(
{
"desc": schema.get("Desc"),
"purpose": schema.get("Purpose"),
}
)
element_schemas = schema.get("Elements") or []
for element in segment.get("elements", []):
element_index = element.get("index", 0) - 1
if element_index < 0 or element_index >= len(element_schemas):
continue
element_schema = element_schemas[element_index]
annotate_element_with_schema(element, element_schema, qualifiers)
def annotate_element_with_schema(
element: dict[str, Any],
element_schema: dict[str, Any],
qualifiers: dict[str, Any] | None,
) -> None:
qualifier_ref = element_schema.get("QualifierRef")
qualifier_desc = lookup_qualifier_desc_from_table(
qualifiers=qualifiers,
qualifier_ref=qualifier_ref,
value=element.get("value"),
)
element["semantic"] = compact_dict(
{
"id": element_schema.get("Id"),
"name": element_schema.get("Name"),
"desc": element_schema.get("Desc"),
"definition": element_schema.get("Definition"),
"data_type": element_schema.get("DataType"),
"required": element_schema.get("Required"),
"min_length": element_schema.get("MinLength"),
"max_length": element_schema.get("MaxLength"),
"qualifier_ref": qualifier_ref,
"qualifier_desc": qualifier_desc,
}
)
component_schemas = element_schema.get("Components") or []
if component_schemas and "components" not in element and element.get("value") not in (None, ""):
# Match vscode parser behavior: composite elements can still have a single component
# even when no component separator appears in source text.
element["components"] = [{"index": 1, "value": element.get("value")}]
for component in element.get("components", []):
component_index = component.get("index", 0) - 1
if component_index < 0 or component_index >= len(component_schemas):
continue
component_schema = component_schemas[component_index]
annotate_component_with_schema(component, component_schema, qualifiers)
def annotate_component_with_schema(
component: dict[str, Any],
component_schema: dict[str, Any],
qualifiers: dict[str, Any] | None,
) -> None:
qualifier_ref = component_schema.get("QualifierRef")
qualifier_desc = lookup_qualifier_desc_from_table(
qualifiers=qualifiers,
qualifier_ref=qualifier_ref,
value=component.get("value"),
)
component["semantic"] = compact_dict(
{
"id": component_schema.get("Id"),
"name": component_schema.get("Name"),
"desc": component_schema.get("Desc"),
"definition": component_schema.get("Definition"),
"data_type": component_schema.get("DataType"),
"required": component_schema.get("Required"),
"min_length": component_schema.get("MinLength"),
"max_length": component_schema.get("MaxLength"),
"qualifier_ref": qualifier_ref,
"qualifier_desc": qualifier_desc,
}
)
def lookup_qualifier_desc(
release_schema: dict[str, Any],
qualifier_ref: str,
value: str | None,
) -> str | None:
qualifiers = release_schema.get("Qualifiers", {})
return lookup_qualifier_desc_from_table(qualifiers, qualifier_ref, value)
def lookup_qualifier_desc_from_table(
qualifiers: dict[str, Any] | None,
qualifier_ref: str | None,
value: str | None,
) -> str | None:
if not qualifiers or not qualifier_ref or value is None:
return None
qualifier_map = qualifiers.get(qualifier_ref)
if not isinstance(qualifier_map, dict):
return None
desc = qualifier_map.get(value)
return desc if isinstance(desc, str) else None
def compact_dict(value: dict[str, Any]) -> dict[str, Any]:
return {k: v for k, v in value.items() if v is not None}