HAOS Add-on: MVP + NuttX Binary + .gitignore
- haos-addon/: vollständiges HA Add-on (config.yaml, Dockerfile, build.yaml) - Python Backend: pymodbus Modbus TCP → paho-mqtt MQTT Discovery - Unterstützte Modelle: MIC 1500/2000 TL-X, SPH 5000 TL3, MOD 6000 TL3 - Web UI: Wechselrichter-Auswahl, Modbus/MQTT-Konfig, Live-Sensor-Grid (dark theme) - MQTT HA Discovery für alle Sensoren mit device_class, state_class, icon - ShineLAN-X/releases/nuttx-mbusd-shinelanx.bin: NuttX Firmware (ohne DFU, 0x08000000) - .gitignore: Logs, MQTT-JSON, shinelanx-modbus/ ausgeschlossen Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,77 @@
|
||||
import logging
|
||||
import time
|
||||
from typing import Dict, Optional
|
||||
|
||||
from pymodbus.client import ModbusTcpClient
|
||||
from pymodbus.exceptions import ModbusException
|
||||
|
||||
from inverters import Inverter, Sensor
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ModbusReader:
|
||||
def __init__(self, host: str, port: int, slave: int, timeout: float = 10.0):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.slave = slave
|
||||
self.timeout = timeout
|
||||
self._client: Optional[ModbusTcpClient] = None
|
||||
|
||||
def _connect(self) -> bool:
|
||||
if self._client and self._client.connected:
|
||||
return True
|
||||
self._client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout)
|
||||
if not self._client.connect():
|
||||
log.error("Modbus TCP Verbindung fehlgeschlagen: %s:%d", self.host, self.port)
|
||||
self._client = None
|
||||
return False
|
||||
log.info("Modbus TCP verbunden: %s:%d", self.host, self.port)
|
||||
return True
|
||||
|
||||
def _disconnect(self):
|
||||
if self._client:
|
||||
self._client.close()
|
||||
self._client = None
|
||||
|
||||
def read(self, inverter: Inverter) -> Optional[Dict[str, float]]:
|
||||
if not self._connect():
|
||||
return None
|
||||
|
||||
# Batch-read aller Register-Bereiche
|
||||
reg_cache: Dict[int, int] = {}
|
||||
for start, length in inverter.read_ranges:
|
||||
try:
|
||||
result = self._client.read_input_registers(start, length, slave=self.slave)
|
||||
if result.isError():
|
||||
log.error("FC04 Fehler bei Reg %d+%d: %s", start, length, result)
|
||||
self._disconnect()
|
||||
return None
|
||||
for i, val in enumerate(result.registers):
|
||||
reg_cache[start + i] = val
|
||||
except ModbusException as e:
|
||||
log.error("Modbus Ausnahme: %s", e)
|
||||
self._disconnect()
|
||||
return None
|
||||
|
||||
return _extract_sensors(inverter.sensors, reg_cache)
|
||||
|
||||
def close(self):
|
||||
self._disconnect()
|
||||
|
||||
|
||||
def _extract_sensors(sensors: list, regs: Dict[int, int]) -> Dict[str, float]:
|
||||
values: Dict[str, float] = {}
|
||||
for s in sensors:
|
||||
if s.reg not in regs:
|
||||
log.warning("Register %d fehlt in Antwort (%s)", s.reg, s.id)
|
||||
continue
|
||||
if s.count == 2:
|
||||
if s.reg + 1 not in regs:
|
||||
log.warning("Register %d (high word) fehlt (%s)", s.reg + 1, s.id)
|
||||
continue
|
||||
raw = (regs[s.reg] << 16) | regs[s.reg + 1]
|
||||
else:
|
||||
raw = regs[s.reg]
|
||||
values[s.id] = round(raw * s.scale, 3)
|
||||
return values
|
||||
Reference in New Issue
Block a user