import json import logging import os import threading import time import uuid from collections import deque from typing import Any, Dict, List, Optional from flask import Flask, jsonify, request, send_from_directory from inverters import INVERTERS from modbus_client import ModbusReader from mqtt_publisher import MqttPublisher logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) log = logging.getLogger(__name__) CONFIG_PATH = "/data/config.json" HA_OPTIONS_PATH = "/data/options.json" WEB_DIR = os.path.join(os.path.dirname(__file__), "web") app = Flask(__name__, static_folder=WEB_DIR) # ── State ──────────────────────────────────────────────────── class State: lock = threading.Lock() mqtt_cfg: Dict[str, Any] = {} inverters_cfg: List[Dict[str, Any]] = [] # {inv_id: {values, last_update, modbus_ok, poll_count}} inv_data: Dict[str, Dict[str, Any]] = {} _publisher: Optional[MqttPublisher] = None _threads: Dict[str, threading.Thread] = {} _stop_events: Dict[str, threading.Event] = {} # ── Config ─────────────────────────────────────────────────── def _defaults() -> Dict[str, Any]: return { "mqtt_broker": "core-mosquitto", "mqtt_port": 1883, "mqtt_user": "", "mqtt_pass": "", "inverters": [], } def load_config() -> Dict[str, Any]: cfg = _defaults() if os.path.exists(HA_OPTIONS_PATH): try: with open(HA_OPTIONS_PATH) as f: ha = json.load(f) for k in ("mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_pass"): if k in ha: cfg[k] = ha[k] except Exception as e: log.warning("HA options Fehler: %s", e) if os.path.exists(CONFIG_PATH): try: with open(CONFIG_PATH) as f: cfg.update(json.load(f)) except Exception as e: log.warning("Config-Datei Fehler: %s", e) return cfg def save_config(): data = { "mqtt_broker": State.mqtt_cfg.get("mqtt_broker", ""), "mqtt_port": State.mqtt_cfg.get("mqtt_port", 1883), "mqtt_user": State.mqtt_cfg.get("mqtt_user", ""), "mqtt_pass": State.mqtt_cfg.get("mqtt_pass", ""), "inverters": State.inverters_cfg, } with open(CONFIG_PATH, "w") as f: json.dump(data, f, indent=2) # ── Poll Loop ───────────────────────────────────────────────── def _poll_loop(inv_cfg: Dict[str, Any], stop: threading.Event): inv_id = inv_cfg["id"] model_id = inv_cfg.get("inverter_model", "MIC_1500_TL_X") inverter = INVERTERS.get(model_id, INVERTERS["MIC_1500_TL_X"]) prefix = inv_cfg.get("mqtt_topic_prefix", f"growatt/{inv_id}") device_id = f"growatt_{inv_id}" interval = max(5, int(inv_cfg.get("update_interval", 30))) reader = ModbusReader( host=inv_cfg["modbus_ip"], port=int(inv_cfg.get("modbus_port", 502)), slave=int(inv_cfg.get("modbus_address", 1)), ) with State.lock: if _publisher: _publisher.register_inverter(inverter, device_id, prefix, inv_cfg.get("name", inverter.name)) log.info("[%s] Poll-Loop: %s @ %s:%s alle %ds", inv_id, inverter.name, inv_cfg["modbus_ip"], inv_cfg.get("modbus_port", 502), interval) while not stop.is_set(): t0 = time.time() values = reader.read(inverter) with State.lock: d = State.inv_data.setdefault(inv_id, {"poll_count": 0}) if values is not None: d["values"] = values d["last_update"] = time.time() d["modbus_ok"] = True d["poll_count"] = d.get("poll_count", 0) + 1 # History: (timestamp, value) pro Sensor, maximal 5 Minuten hist = d.setdefault("history", {}) now = time.time() for sid, val in values.items(): q = hist.setdefault(sid, deque(maxlen=300)) q.append((now, val)) if _publisher: _publisher.publish_data(values, prefix) _publisher.publish_status("online", prefix) else: d["modbus_ok"] = False if _publisher: _publisher.publish_status("offline", prefix) stop.wait(max(0.0, interval - (time.time() - t0))) reader.close() if _publisher: _publisher.publish_status("offline", prefix) log.info("[%s] Poll-Loop beendet", inv_id) def _start_inverter(inv_cfg: Dict[str, Any]): inv_id = inv_cfg["id"] _stop_inverter(inv_id) ev = threading.Event() _stop_events[inv_id] = ev t = threading.Thread(target=_poll_loop, args=(inv_cfg, ev), daemon=True, name=f"poll-{inv_id}") _threads[inv_id] = t t.start() def _stop_inverter(inv_id: str): if inv_id in _stop_events: _stop_events[inv_id].set() if inv_id in _threads: _threads[inv_id].join(timeout=15) _stop_events.pop(inv_id, None) _threads.pop(inv_id, None) def _restart_all(): global _publisher for inv_id in list(_threads.keys()): _stop_inverter(inv_id) if _publisher: _publisher.disconnect() _publisher = MqttPublisher( broker=State.mqtt_cfg.get("mqtt_broker", "core-mosquitto"), port=int(State.mqtt_cfg.get("mqtt_port", 1883)), user=State.mqtt_cfg.get("mqtt_user", ""), password=State.mqtt_cfg.get("mqtt_pass", ""), ) _publisher.connect() time.sleep(2) for inv_cfg in State.inverters_cfg: _start_inverter(inv_cfg) # ── REST API ────────────────────────────────────────────────── @app.get("/api/config") def api_get_config(): with State.lock: cfg = {**State.mqtt_cfg, "inverters": State.inverters_cfg} cfg.pop("mqtt_pass", None) cfg["mqtt_connected"] = _publisher.connected if _publisher else False return jsonify(cfg) @app.post("/api/config") def api_save_config(): data = request.get_json(force=True) or {} with State.lock: for k in ("mqtt_broker", "mqtt_port", "mqtt_user"): if k in data: State.mqtt_cfg[k] = data[k] if data.get("mqtt_pass"): State.mqtt_cfg["mqtt_pass"] = data["mqtt_pass"] save_config() threading.Thread(target=_restart_all, daemon=True).start() return jsonify({"ok": True}) @app.get("/api/inverters-config") def api_get_inverters(): with State.lock: return jsonify(State.inverters_cfg) @app.post("/api/inverters-config") def api_save_inverters(): data = request.get_json(force=True) or [] with State.lock: State.inverters_cfg = data save_config() threading.Thread(target=_restart_all, daemon=True).start() return jsonify({"ok": True}) @app.get("/api/data") def api_get_data(): with State.lock: result = {} for inv_cfg in State.inverters_cfg: inv_id = inv_cfg["id"] model_id = inv_cfg.get("inverter_model", "MIC_1500_TL_X") inverter = INVERTERS.get(model_id, INVERTERS["MIC_1500_TL_X"]) d = State.inv_data.get(inv_id, {}) cutoff = time.time() - 300 # letzte 5 Minuten raw_hist = d.get("history", {}) history = { sid: [v for (t, v) in q if t >= cutoff] for sid, q in raw_hist.items() } result[inv_id] = { "name": inv_cfg.get("name", inverter.name), "inverter_name": inverter.name, "values": d.get("values", {}), "history": history, "sensors": [ {"id": s.id, "name": s.name, "unit": s.unit, "icon": s.icon, "device_class": s.device_class} for s in inverter.sensors ], "last_update": d.get("last_update"), "modbus_ok": d.get("modbus_ok", False), "poll_count": d.get("poll_count", 0), } mqtt_ok = _publisher.connected if _publisher else False return jsonify({"inverters": result, "mqtt_ok": mqtt_ok}) @app.get("/api/inverter-models") def api_get_models(): return jsonify({ k: {"id": v.id, "name": v.name, "sensor_count": len(v.sensors)} for k, v in INVERTERS.items() }) @app.post("/api/new-id") def api_new_id(): return jsonify({"id": uuid.uuid4().hex[:8]}) @app.get("/") def index(): return send_from_directory(WEB_DIR, "index.html") @app.get("/") def static_files(filename): return send_from_directory(WEB_DIR, filename) # ── Startup ─────────────────────────────────────────────────── if __name__ == "__main__": cfg = load_config() with State.lock: State.mqtt_cfg = {k: cfg[k] for k in ("mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_pass")} State.inverters_cfg = cfg.get("inverters", []) # Migration: single-inverter config → list if not State.inverters_cfg and cfg.get("modbus_ip"): State.inverters_cfg = [{ "id": uuid.uuid4().hex[:8], "name": cfg.get("inverter_model", "MIC_1500_TL_X").replace("_", " "), "modbus_ip": cfg["modbus_ip"], "modbus_port": cfg.get("modbus_port", 502), "modbus_address": cfg.get("modbus_address", 1), "inverter_model": cfg.get("inverter_model", "MIC_1500_TL_X"), "mqtt_topic_prefix": cfg.get("mqtt_topic_prefix", "growatt/shinelanx"), "update_interval": cfg.get("update_interval", 30), }] save_config() _restart_all() port = int(os.environ.get("INGRESS_PORT", "8099")) log.info("Web UI startet auf Port %d", port) app.run(host="0.0.0.0", port=port, threaded=True)