Newer
Older
SensorReadings / 2jciebu.py
@fabre fabre on 28 Jan 6 KB INIT
import serial
import time
from datetime import datetime, timezone
import sys

# LED display rule. Normal Off.
DISPLAY_RULE_NORMALLY_OFF = 0

# LED display rule. Normal On.
DISPLAY_RULE_NORMALLY_ON = 1


def s16(value: int) -> int:
    """Convert uint16 to int16."""
    return -(value & 0x8000) | (value & 0x7FFF)


def calc_crc(buf, length):
    """CRC-16 (Modbus) calculation."""
    crc = 0xFFFF
    for i in range(length):
        crc = crc ^ buf[i]
        for _ in range(8):
            carrayFlag = crc & 1
            crc = crc >> 1
            if carrayFlag == 1:
                crc = crc ^ 0xA001
    crcH = crc >> 8
    crcL = crc & 0x00FF
    return bytearray([crcL, crcH])


def _u16_le(data: bytes, offset: int) -> int:
    return int.from_bytes(data[offset : offset + 2], byteorder="little", signed=False)


def _s16_le(data: bytes, offset: int) -> int:
    return int.from_bytes(data[offset : offset + 2], byteorder="little", signed=True)


def _u32_le(data: bytes, offset: int) -> int:
    return int.from_bytes(data[offset : offset + 4], byteorder="little", signed=False)


def parse_latest_data_long(data: bytes) -> dict:
    """Parse the 'Latest data Long' response payload into a dict.

    Offsets follow the original script (expects at least 56 bytes).
    """
    if len(data) < 56:
        raise ValueError(f"response too short: {len(data)} bytes (need >= 56)")

    return {
        "temperature_c": _s16_le(data, 8) / 100.0,
        "humidity_rh": _u16_le(data, 10) / 100.0,
        "ambient_light": float(_u16_le(data, 12)),
        "pressure_hpa": _u32_le(data, 14) / 1000.0,
        "noise_db": _u16_le(data, 18) / 100.0,
        "etvoc_ppb": float(_u16_le(data, 20)),
        "eco2_ppm": float(_u16_le(data, 22)),
        "discomfort_index": _u16_le(data, 24) / 100.0,
        "heatstroke_index": _s16_le(data, 26) / 100.0,
        "vibration_info": int(data[28]),
        "si_value": _u16_le(data, 29) / 10.0,
        "pga_gal": _u16_le(data, 31) / 10.0,
        "seismic_intensity": _u16_le(data, 33) / 1000.0,
        "temperature_flag": _u16_le(data, 35),
        "humidity_flag": _u16_le(data, 37),
        "ambient_light_flag": _u16_le(data, 39),
        "pressure_flag": _u16_le(data, 41),
        "noise_flag": _u16_le(data, 43),
        "etvoc_flag": _u16_le(data, 45),
        "eco2_flag": _u16_le(data, 47),
        "discomfort_flag": _u16_le(data, 49),
        "heatstroke_flag": _u16_le(data, 51),
        "si_flag": int(data[53]),
        "pga_flag": int(data[54]),
        "seismic_intensity_flag": int(data[55]),
    }


def print_latest_data(data: bytes) -> None:
    """Print measured latest values."""
    time_measured = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
    r = parse_latest_data_long(data)

    print("")
    print("Time measured:" + time_measured)
    print("Temperature:" + str(r["temperature_c"]))
    print("Relative humidity:" + str(r["humidity_rh"]))
    print("Ambient light:" + str(r["ambient_light"]))
    print("Barometric pressure:" + str(r["pressure_hpa"]))
    print("Sound noise:" + str(r["noise_db"]))
    print("eTVOC:" + str(r["etvoc_ppb"]))
    print("eCO2:" + str(r["eco2_ppm"]))
    print("Discomfort index:" + str(r["discomfort_index"]))
    print("Heat stroke:" + str(r["heatstroke_index"]))
    print("Vibration information:" + str(r["vibration_info"]))
    print("SI value:" + str(r["si_value"]))
    print("PGA:" + str(r["pga_gal"]))
    print("Seismic intensity:" + str(r["seismic_intensity"]))
    print("Temperature flag:" + str(r["temperature_flag"]))
    print("Relative humidity flag:" + str(r["humidity_flag"]))
    print("Ambient light flag:" + str(r["ambient_light_flag"]))
    print("Barometric pressure flag:" + str(r["pressure_flag"]))
    print("Sound noise flag:" + str(r["noise_flag"]))
    print("eTVOC flag:" + str(r["etvoc_flag"]))
    print("eCO2 flag:" + str(r["eco2_flag"]))
    print("Discomfort index flag:" + str(r["discomfort_flag"]))
    print("Heat stroke flag:" + str(r["heatstroke_flag"]))
    print("SI value flag:" + str(r["si_flag"]))
    print("PGA flag:" + str(r["pga_flag"]))
    print("Seismic intensity flag:" + str(r["seismic_intensity_flag"]))


def _read_available(ser: serial.Serial, timeout_s: float = 1.0) -> bytes:
    """Read whatever is available on the serial buffer, waiting briefly for more."""
    deadline = time.time() + timeout_s
    buf = bytearray()
    last_len = -1
    while time.time() < deadline:
        n = ser.in_waiting
        if n:
            buf += ser.read(n)
        # Stop if buffer stops growing for a short while
        if len(buf) == last_len:
            break
        last_len = len(buf)
        time.sleep(0.05)
    return bytes(buf)


def get_latest_reading(port: str = "/dev/ttyUSB0", location: str | None = None, led: bool = True) -> dict:
    """Single-shot read from 2JCIE-BU and return a dict ready for DB insert."""
    ser = serial.Serial(port, 115200, serial.EIGHTBITS, serial.PARITY_NONE, timeout=1)
    try:
        if led:
            # LED On (green)
            command = bytearray([
                0x52, 0x42, 0x0A, 0x00, 0x02, 0x11, 0x51, DISPLAY_RULE_NORMALLY_ON, 0x00, 0, 255, 0
            ])
            command += calc_crc(command, len(command))
            ser.write(command)
            time.sleep(0.1)
            _read_available(ser, 0.3)

        # Get Latest data Long
        command = bytearray([0x52, 0x42, 0x05, 0x00, 0x01, 0x21, 0x50])
        command += calc_crc(command, len(command))
        ser.write(command)
        time.sleep(0.1)
        data = _read_available(ser, 1.0)
        reading = parse_latest_data_long(data)

        reading["ts"] = datetime.now(timezone.utc).replace(microsecond=0)
        if location is not None:
            reading["location"] = location

        return reading

    finally:
        try:
            if led:
                # LED Off
                command = bytearray([
                    0x52, 0x42, 0x0A, 0x00, 0x02, 0x11, 0x51, DISPLAY_RULE_NORMALLY_OFF, 0x00, 0, 0, 0
                ])
                command += calc_crc(command, len(command))
                ser.write(command)
                time.sleep(0.1)
        except Exception:
            pass
        try:
            ser.close()
        except Exception:
            pass


if __name__ == "__main__":
    try:
        r = get_latest_reading(location="home_2f")
        print(r)
    except Exception as e:
        print(f"error: {e}")
        sys.exit(1)