Newer
Older
SensorReadings / ReadSensors.py
@fabre fabre on 28 Jan 3 KB INIT
#!/usr/bin/env python3
"""Read 2JCIE-BU once and POST to OData.

Endpoint (given): http://10.16.1.100:8016/odata/SensorReadings

No pip deps: uses Python stdlib only.

Env vars:
  SENSOR_PORT      default /dev/ttyUSB0
  SENSOR_LOCATION  default home_2f
  ODATA_URL        default http://10.16.1.100:8016/odata/SensorReadings
  ODATA_TOKEN      optional bearer token
  ODATA_TIMEOUT_S  default 10
"""

import json
import os
import sys
import urllib.request
import urllib.error

import importlib.util
from pathlib import Path

PORT = os.getenv("SENSOR_PORT", "/dev/ttyUSB0")
LOCATION = os.getenv("SENSOR_LOCATION", "home_2f")
ODATA_URL = os.getenv("ODATA_URL", "http://10.16.1.100:8016/odata/SensorReadings")
ODATA_TOKEN = os.getenv("ODATA_TOKEN", "").strip()
TIMEOUT_S = float(os.getenv("ODATA_TIMEOUT_S", "10"))

# Load local sensor module (filename starts with digit)
_2JCIE_PATH = Path(__file__).with_name("2jciebu.py")
_spec = importlib.util.spec_from_file_location("_2jciebu", _2JCIE_PATH)
_mod = importlib.util.module_from_spec(_spec)
assert _spec and _spec.loader
_spec.loader.exec_module(_mod)
get_latest_reading = _mod.get_latest_reading


def _to_odata_payload(r: dict) -> dict:
    """Map our snake_case reading dict to the OData entity property names.

    The OData service appears to use PascalCase keys (e.g. Ts, Location, TemperatureC...).
    """
    ts = r.get("ts")
    if ts is None:
        raise ValueError("missing ts")

    # datetimeoffset(0) -> ISO8601 without fractional seconds, with Z
    ts_str = ts.isoformat().replace("+00:00", "Z")

    payload = {
        "Ts": ts_str,
        "Location": r.get("location") or LOCATION,
        "TemperatureC": r.get("temperature_c"),
        "HumidityRh": r.get("humidity_rh"),
        "AmbientLight": r.get("ambient_light"),
        "PressureHpa": r.get("pressure_hpa"),
        "NoiseDb": r.get("noise_db"),
        "EtvocPpb": r.get("etvoc_ppb"),
        "Eco2Ppm": r.get("eco2_ppm"),
        "DiscomfortIndex": r.get("discomfort_index"),
        "HeatstrokeIndex": r.get("heatstroke_index"),
        "VibrationInfo": r.get("vibration_info"),
        "SiValue": r.get("si_value"),
        "PgaGal": r.get("pga_gal"),
        "SeismicIntensity": r.get("seismic_intensity"),
        "TemperatureFlag": r.get("temperature_flag"),
        "HumidityFlag": r.get("humidity_flag"),
        "AmbientLightFlag": r.get("ambient_light_flag"),
        "PressureFlag": r.get("pressure_flag"),
        "NoiseFlag": r.get("noise_flag"),
        "EtvocFlag": r.get("etvoc_flag"),
        "Eco2Flag": r.get("eco2_flag"),
        "DiscomfortFlag": r.get("discomfort_flag"),
        "HeatstrokeFlag": r.get("heatstroke_flag"),
        "SiFlag": r.get("si_flag"),
        "PgaFlag": r.get("pga_flag"),
        "SeismicIntensityFlag": r.get("seismic_intensity_flag"),
    }

    # Drop None values to avoid type issues on some OData backends
    return {k: v for k, v in payload.items() if v is not None}


def post_odata(payload: dict) -> tuple[int, str]:
    body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
    }
    if ODATA_TOKEN:
        headers["Authorization"] = f"Bearer {ODATA_TOKEN}"

    req = urllib.request.Request(ODATA_URL, data=body, headers=headers, method="POST")
    try:
        with urllib.request.urlopen(req, timeout=TIMEOUT_S) as resp:
            resp_body = resp.read().decode("utf-8", errors="replace")
            return resp.status, resp_body
    except urllib.error.HTTPError as e:
        err_body = e.read().decode("utf-8", errors="replace")
        return e.code, err_body


def main():
    r = get_latest_reading(port=PORT, location=LOCATION)
    payload = _to_odata_payload(r)
    code, body = post_odata(payload)
    if 200 <= code < 300:
        print("ok")
        return
    print(f"http {code}: {body}", file=sys.stderr)
    sys.exit(2)


if __name__ == "__main__":
    main()