import logging import struct import time from typing import Dict, Optional from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ModbusException from inverters import Inverter, Sensor log = logging.getLogger(__name__) class ModbusReader: def __init__(self, host: str, port: int, slave: int, timeout: float = 10.0): self.host = host self.port = port self.slave = slave self.timeout = timeout self._client: Optional[ModbusTcpClient] = None def _connect(self) -> bool: if self._client and self._client.connected: return True self._client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout) if not self._client.connect(): log.error("Modbus TCP Verbindung fehlgeschlagen: %s:%d", self.host, self.port) self._client = None return False log.info("Modbus TCP verbunden: %s:%d", self.host, self.port) return True def _disconnect(self): if self._client: self._client.close() self._client = None def read(self, inverter: Inverter) -> Optional[Dict[str, float]]: if not self._connect(): return None # Batch-read aller Register-Bereiche reg_cache: Dict[int, int] = {} for start, length in inverter.read_ranges: try: result = self._client.read_input_registers(start, length, slave=self.slave) if result.isError(): log.error("FC04 Fehler bei Reg %d+%d: %s", start, length, result) self._disconnect() return None for i, val in enumerate(result.registers): reg_cache[start + i] = val except ModbusException as e: log.error("Modbus Ausnahme: %s", e) self._disconnect() return None return _extract_sensors(inverter.sensors, reg_cache) def close(self): self._disconnect() def _extract_sensors(sensors: list, regs: Dict[int, int]) -> Dict[str, float]: values: Dict[str, float] = {} for s in sensors: if s.reg not in regs: log.warning("Register %d fehlt in Antwort (%s)", s.reg, s.id) continue data_type = getattr(s, "data_type", "uint16") if data_type == "float32": if s.reg + 1 not in regs: log.warning("Register %d+1 fehlt (%s)", s.reg, s.id) continue raw_val = struct.unpack(">f", struct.pack(">HH", regs[s.reg], regs[s.reg + 1]))[0] elif s.count == 2: if s.reg + 1 not in regs: log.warning("Register %d+1 fehlt (%s)", s.reg, s.id) continue raw_val = ((regs[s.reg] << 16) | regs[s.reg + 1]) * s.scale else: raw_val = regs[s.reg] * s.scale values[s.id] = round(raw_val, 3) # SDM-630: register 0x0030 enthält je nach Firmware-Variante den Phasenwinkel statt # der Gesamtwirkleistung → sicherer aus den Phasenwerten ableiten if {"power_l1", "power_l2", "power_l3"} <= values.keys(): values["total_power"] = round( values["power_l1"] + values["power_l2"] + values["power_l3"], 3 ) return values