#!/usr/bin/env python3
"""Read 2JCIE-BU once and POST to OData.
Endpoint (given): http://10.16.1.100:8016/odata/SensorReadings
No pip deps: uses Python stdlib only.
Env vars:
SENSOR_PORT default /dev/ttyUSB0
SENSOR_LOCATION default home_2f
ODATA_URL default http://10.16.1.100:8016/odata/SensorReadings
ODATA_TOKEN optional bearer token
ODATA_TIMEOUT_S default 10
"""
import json
import os
import sys
import urllib.request
import urllib.error
import importlib.util
from pathlib import Path
PORT = os.getenv("SENSOR_PORT", "/dev/ttyUSB0")
LOCATION = os.getenv("SENSOR_LOCATION", "home_2f")
ODATA_URL = os.getenv("ODATA_URL", "http://10.16.1.100:8016/odata/SensorReadings")
ODATA_TOKEN = os.getenv("ODATA_TOKEN", "").strip()
TIMEOUT_S = float(os.getenv("ODATA_TIMEOUT_S", "10"))
# Load local sensor module (filename starts with digit)
_2JCIE_PATH = Path(__file__).with_name("2jciebu.py")
_spec = importlib.util.spec_from_file_location("_2jciebu", _2JCIE_PATH)
_mod = importlib.util.module_from_spec(_spec)
assert _spec and _spec.loader
_spec.loader.exec_module(_mod)
get_latest_reading = _mod.get_latest_reading
def _to_odata_payload(r: dict) -> dict:
"""Map our snake_case reading dict to the OData entity property names.
The OData service appears to use PascalCase keys (e.g. Ts, Location, TemperatureC...).
"""
ts = r.get("ts")
if ts is None:
raise ValueError("missing ts")
# datetimeoffset(0) -> ISO8601 without fractional seconds, with Z
ts_str = ts.isoformat().replace("+00:00", "Z")
payload = {
"Ts": ts_str,
"Location": r.get("location") or LOCATION,
"TemperatureC": r.get("temperature_c"),
"HumidityRh": r.get("humidity_rh"),
"AmbientLight": r.get("ambient_light"),
"PressureHpa": r.get("pressure_hpa"),
"NoiseDb": r.get("noise_db"),
"EtvocPpb": r.get("etvoc_ppb"),
"Eco2Ppm": r.get("eco2_ppm"),
"DiscomfortIndex": r.get("discomfort_index"),
"HeatstrokeIndex": r.get("heatstroke_index"),
"VibrationInfo": r.get("vibration_info"),
"SiValue": r.get("si_value"),
"PgaGal": r.get("pga_gal"),
"SeismicIntensity": r.get("seismic_intensity"),
"TemperatureFlag": r.get("temperature_flag"),
"HumidityFlag": r.get("humidity_flag"),
"AmbientLightFlag": r.get("ambient_light_flag"),
"PressureFlag": r.get("pressure_flag"),
"NoiseFlag": r.get("noise_flag"),
"EtvocFlag": r.get("etvoc_flag"),
"Eco2Flag": r.get("eco2_flag"),
"DiscomfortFlag": r.get("discomfort_flag"),
"HeatstrokeFlag": r.get("heatstroke_flag"),
"SiFlag": r.get("si_flag"),
"PgaFlag": r.get("pga_flag"),
"SeismicIntensityFlag": r.get("seismic_intensity_flag"),
}
# Drop None values to avoid type issues on some OData backends
return {k: v for k, v in payload.items() if v is not None}
def post_odata(payload: dict) -> tuple[int, str]:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
if ODATA_TOKEN:
headers["Authorization"] = f"Bearer {ODATA_TOKEN}"
req = urllib.request.Request(ODATA_URL, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=TIMEOUT_S) as resp:
resp_body = resp.read().decode("utf-8", errors="replace")
return resp.status, resp_body
except urllib.error.HTTPError as e:
err_body = e.read().decode("utf-8", errors="replace")
return e.code, err_body
def main():
r = get_latest_reading(port=PORT, location=LOCATION)
payload = _to_odata_payload(r)
code, body = post_odata(payload)
if 200 <= code < 300:
print("ok")
return
print(f"http {code}: {body}", file=sys.stderr)
sys.exit(2)
if __name__ == "__main__":
main()