Files
Shinebridge/haos-addon/src/modbus_client.py
T
retr0 33c6a15644 Feature: Eastron SDM-630 Support + Float32 Decode (v1.1.5)
- inverters.py: data_type Feld in Sensor (uint16/uint32/float32)
  + SDM-630 Definition (16 Sensoren: U/I/P L1-L3, PF, Freq, kWh)
  + read_ranges: [(0, 76)] — alle Sensoren in einem Batch
- modbus_client.py: Float32 IEEE 754 Decode via struct.unpack
  (SDM-630 liefert Floats, Growatt liefert skalierte Integer)
- index.html: "Wechselrichter" → "Gerät" — Add-on unterstützt
  jetzt beliebige Modbus-Geräte, nicht nur Wechselrichter

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-26 17:07:14 +02:00

85 lines
2.9 KiB
Python

import logging
import struct
import time
from typing import Dict, Optional
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
from inverters import Inverter, Sensor
log = logging.getLogger(__name__)
class ModbusReader:
def __init__(self, host: str, port: int, slave: int, timeout: float = 10.0):
self.host = host
self.port = port
self.slave = slave
self.timeout = timeout
self._client: Optional[ModbusTcpClient] = None
def _connect(self) -> bool:
if self._client and self._client.connected:
return True
self._client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout)
if not self._client.connect():
log.error("Modbus TCP Verbindung fehlgeschlagen: %s:%d", self.host, self.port)
self._client = None
return False
log.info("Modbus TCP verbunden: %s:%d", self.host, self.port)
return True
def _disconnect(self):
if self._client:
self._client.close()
self._client = None
def read(self, inverter: Inverter) -> Optional[Dict[str, float]]:
if not self._connect():
return None
# Batch-read aller Register-Bereiche
reg_cache: Dict[int, int] = {}
for start, length in inverter.read_ranges:
try:
result = self._client.read_input_registers(start, length, slave=self.slave)
if result.isError():
log.error("FC04 Fehler bei Reg %d+%d: %s", start, length, result)
self._disconnect()
return None
for i, val in enumerate(result.registers):
reg_cache[start + i] = val
except ModbusException as e:
log.error("Modbus Ausnahme: %s", e)
self._disconnect()
return None
return _extract_sensors(inverter.sensors, reg_cache)
def close(self):
self._disconnect()
def _extract_sensors(sensors: list, regs: Dict[int, int]) -> Dict[str, float]:
values: Dict[str, float] = {}
for s in sensors:
if s.reg not in regs:
log.warning("Register %d fehlt in Antwort (%s)", s.reg, s.id)
continue
data_type = getattr(s, "data_type", "uint16")
if data_type == "float32":
if s.reg + 1 not in regs:
log.warning("Register %d+1 fehlt (%s)", s.reg, s.id)
continue
raw_val = struct.unpack(">f", struct.pack(">HH", regs[s.reg], regs[s.reg + 1]))[0]
elif s.count == 2:
if s.reg + 1 not in regs:
log.warning("Register %d+1 fehlt (%s)", s.reg, s.id)
continue
raw_val = ((regs[s.reg] << 16) | regs[s.reg + 1]) * s.scale
else:
raw_val = regs[s.reg] * s.scale
values[s.id] = round(raw_val, 3)
return values