import serial
import time
from datetime import datetime, timezone
import sys
# LED display rule. Normal Off.
DISPLAY_RULE_NORMALLY_OFF = 0
# LED display rule. Normal On.
DISPLAY_RULE_NORMALLY_ON = 1
def s16(value: int) -> int:
"""Convert uint16 to int16."""
return -(value & 0x8000) | (value & 0x7FFF)
def calc_crc(buf, length):
"""CRC-16 (Modbus) calculation."""
crc = 0xFFFF
for i in range(length):
crc = crc ^ buf[i]
for _ in range(8):
carrayFlag = crc & 1
crc = crc >> 1
if carrayFlag == 1:
crc = crc ^ 0xA001
crcH = crc >> 8
crcL = crc & 0x00FF
return bytearray([crcL, crcH])
def _u16_le(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset : offset + 2], byteorder="little", signed=False)
def _s16_le(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset : offset + 2], byteorder="little", signed=True)
def _u32_le(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset : offset + 4], byteorder="little", signed=False)
def parse_latest_data_long(data: bytes) -> dict:
"""Parse the 'Latest data Long' response payload into a dict.
Offsets follow the original script (expects at least 56 bytes).
"""
if len(data) < 56:
raise ValueError(f"response too short: {len(data)} bytes (need >= 56)")
return {
"temperature_c": _s16_le(data, 8) / 100.0,
"humidity_rh": _u16_le(data, 10) / 100.0,
"ambient_light": float(_u16_le(data, 12)),
"pressure_hpa": _u32_le(data, 14) / 1000.0,
"noise_db": _u16_le(data, 18) / 100.0,
"etvoc_ppb": float(_u16_le(data, 20)),
"eco2_ppm": float(_u16_le(data, 22)),
"discomfort_index": _u16_le(data, 24) / 100.0,
"heatstroke_index": _s16_le(data, 26) / 100.0,
"vibration_info": int(data[28]),
"si_value": _u16_le(data, 29) / 10.0,
"pga_gal": _u16_le(data, 31) / 10.0,
"seismic_intensity": _u16_le(data, 33) / 1000.0,
"temperature_flag": _u16_le(data, 35),
"humidity_flag": _u16_le(data, 37),
"ambient_light_flag": _u16_le(data, 39),
"pressure_flag": _u16_le(data, 41),
"noise_flag": _u16_le(data, 43),
"etvoc_flag": _u16_le(data, 45),
"eco2_flag": _u16_le(data, 47),
"discomfort_flag": _u16_le(data, 49),
"heatstroke_flag": _u16_le(data, 51),
"si_flag": int(data[53]),
"pga_flag": int(data[54]),
"seismic_intensity_flag": int(data[55]),
}
def print_latest_data(data: bytes) -> None:
"""Print measured latest values."""
time_measured = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
r = parse_latest_data_long(data)
print("")
print("Time measured:" + time_measured)
print("Temperature:" + str(r["temperature_c"]))
print("Relative humidity:" + str(r["humidity_rh"]))
print("Ambient light:" + str(r["ambient_light"]))
print("Barometric pressure:" + str(r["pressure_hpa"]))
print("Sound noise:" + str(r["noise_db"]))
print("eTVOC:" + str(r["etvoc_ppb"]))
print("eCO2:" + str(r["eco2_ppm"]))
print("Discomfort index:" + str(r["discomfort_index"]))
print("Heat stroke:" + str(r["heatstroke_index"]))
print("Vibration information:" + str(r["vibration_info"]))
print("SI value:" + str(r["si_value"]))
print("PGA:" + str(r["pga_gal"]))
print("Seismic intensity:" + str(r["seismic_intensity"]))
print("Temperature flag:" + str(r["temperature_flag"]))
print("Relative humidity flag:" + str(r["humidity_flag"]))
print("Ambient light flag:" + str(r["ambient_light_flag"]))
print("Barometric pressure flag:" + str(r["pressure_flag"]))
print("Sound noise flag:" + str(r["noise_flag"]))
print("eTVOC flag:" + str(r["etvoc_flag"]))
print("eCO2 flag:" + str(r["eco2_flag"]))
print("Discomfort index flag:" + str(r["discomfort_flag"]))
print("Heat stroke flag:" + str(r["heatstroke_flag"]))
print("SI value flag:" + str(r["si_flag"]))
print("PGA flag:" + str(r["pga_flag"]))
print("Seismic intensity flag:" + str(r["seismic_intensity_flag"]))
def _read_available(ser: serial.Serial, timeout_s: float = 1.0) -> bytes:
"""Read whatever is available on the serial buffer, waiting briefly for more."""
deadline = time.time() + timeout_s
buf = bytearray()
last_len = -1
while time.time() < deadline:
n = ser.in_waiting
if n:
buf += ser.read(n)
# Stop if buffer stops growing for a short while
if len(buf) == last_len:
break
last_len = len(buf)
time.sleep(0.05)
return bytes(buf)
def get_latest_reading(port: str = "/dev/ttyUSB0", location: str | None = None, led: bool = True) -> dict:
"""Single-shot read from 2JCIE-BU and return a dict ready for DB insert."""
ser = serial.Serial(port, 115200, serial.EIGHTBITS, serial.PARITY_NONE, timeout=1)
try:
if led:
# LED On (green)
command = bytearray([
0x52, 0x42, 0x0A, 0x00, 0x02, 0x11, 0x51, DISPLAY_RULE_NORMALLY_ON, 0x00, 0, 255, 0
])
command += calc_crc(command, len(command))
ser.write(command)
time.sleep(0.1)
_read_available(ser, 0.3)
# Get Latest data Long
command = bytearray([0x52, 0x42, 0x05, 0x00, 0x01, 0x21, 0x50])
command += calc_crc(command, len(command))
ser.write(command)
time.sleep(0.1)
data = _read_available(ser, 1.0)
reading = parse_latest_data_long(data)
reading["ts"] = datetime.now(timezone.utc).replace(microsecond=0)
if location is not None:
reading["location"] = location
return reading
finally:
try:
if led:
# LED Off
command = bytearray([
0x52, 0x42, 0x0A, 0x00, 0x02, 0x11, 0x51, DISPLAY_RULE_NORMALLY_OFF, 0x00, 0, 0, 0
])
command += calc_crc(command, len(command))
ser.write(command)
time.sleep(0.1)
except Exception:
pass
try:
ser.close()
except Exception:
pass
if __name__ == "__main__":
try:
r = get_latest_reading(location="home_2f")
print(r)
except Exception as e:
print(f"error: {e}")
sys.exit(1)