import json import logging from typing import Dict, List, Optional, Tuple import paho.mqtt.client as mqtt from inverters import Inverter log = logging.getLogger(__name__) AGG_DEVICE_ID = "shinebridge_aggregate" AGG_TOPIC = "shinebridge/aggregate" class MqttPublisher: def __init__(self, broker: str, port: int, user: str, password: str, agg_meta: Optional[Dict] = None): self._broker = broker self._port = port self._connected = False self._registered: List[Tuple] = [] self._agg_meta: Dict = agg_meta or {} self._client = mqtt.Client(client_id="shinebridge_hub", clean_session=True) if user: self._client.username_pw_set(user, password) self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect def _on_connect(self, client, userdata, flags, rc): if rc == 0: self._connected = True log.info("MQTT verbunden: %s:%d", self._broker, self._port) for entry in self._registered: self._publish_discovery(*entry) if self._agg_meta: self._publish_aggregate_discovery() else: log.error("MQTT Verbindungsfehler rc=%d", rc) def _on_disconnect(self, client, userdata, rc): self._connected = False log.warning("MQTT getrennt rc=%d", rc) def connect(self): try: self._client.connect_async(self._broker, self._port, keepalive=60) self._client.loop_start() except Exception as e: log.error("MQTT connect fehlgeschlagen: %s", e) def disconnect(self): self._client.loop_stop() self._client.disconnect() @property def connected(self) -> bool: return self._connected # ── Gerät-Discovery ────────────────────────────────────── def register_inverter(self, inverter: Inverter, device_id: str, topic_prefix: str, display_name: str = None): entry = (inverter, device_id, topic_prefix, display_name) self._registered = [r for r in self._registered if r[1] != device_id] self._registered.append(entry) if self._connected: self._publish_discovery(inverter, device_id, topic_prefix, display_name) def unregister_inverter(self, device_id: str): self._registered = [r for r in self._registered if r[1] != device_id] def _publish_discovery(self, inverter: Inverter, device_id: str, topic_prefix: str, display_name: str = None): device_payload = { "identifiers": [device_id], "name": display_name or inverter.name, "manufacturer": inverter.manufacturer, "model": inverter.name, } for sensor in inverter.sensors: config = { "name": sensor.name, "unique_id": f"{device_id}_{sensor.id}", "state_topic": f"{topic_prefix}/state", "value_template": f"{{{{ value_json.{sensor.id} }}}}", "unit_of_measurement": sensor.unit, "state_class": sensor.state_class, "icon": sensor.icon, "device": device_payload, } if sensor.device_class: config["device_class"] = sensor.device_class topic = f"homeassistant/sensor/{device_id}/{sensor.id}/config" self._client.publish(topic, json.dumps(config), retain=True, qos=1) log.info("MQTT Discovery: %d Sensoren für %s", len(inverter.sensors), device_id) # ── Aggregat-Discovery ──────────────────────────────────── def _publish_aggregate_discovery(self): device_payload = { "identifiers": [AGG_DEVICE_ID], "name": "ShineBridge Gesamt", "manufacturer": "ShineBridge", "model": "Aggregat", } for sensor_id, meta in self._agg_meta.items(): config = { "name": meta["name"], "unique_id": f"{AGG_DEVICE_ID}_{sensor_id}", "state_topic": f"{AGG_TOPIC}/state", "value_template": f"{{{{ value_json.{sensor_id} }}}}", "unit_of_measurement": meta["unit"], "state_class": meta["state_class"], "icon": meta["icon"], "device": device_payload, } if meta.get("device_class"): config["device_class"] = meta["device_class"] topic = f"homeassistant/sensor/{AGG_DEVICE_ID}/{sensor_id}/config" self._client.publish(topic, json.dumps(config), retain=True, qos=1) log.info("MQTT Discovery: %d Aggregat-Sensoren", len(self._agg_meta)) # ── Daten publizieren ───────────────────────────────────── def publish_data(self, values: dict, topic_prefix: str): if not self._connected: return self._client.publish(f"{topic_prefix}/state", json.dumps(values), retain=True, qos=0) def publish_status(self, status: str, topic_prefix: str): self._client.publish(f"{topic_prefix}/status", status, retain=True, qos=1) def publish_aggregates(self, values: dict): if not self._connected or not values: return self._client.publish(f"{AGG_TOPIC}/state", json.dumps(values), retain=True, qos=0) self._client.publish(f"{AGG_TOPIC}/status", "online", retain=True, qos=1)