import json import logging import os import threading import time import uuid from collections import defaultdict, deque from typing import Any, Dict, List, Optional from flask import Flask, jsonify, request, send_from_directory from inverters import INVERTERS import history from modbus_client import ModbusReader from goodwe_client import GoodweReader from wallbox_client import WallboxReader from ems_controller import EmsController from mqtt_publisher import MqttPublisher logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) log = logging.getLogger(__name__) CONFIG_PATH = "/data/config.json" HA_OPTIONS_PATH = "/data/options.json" WEB_DIR = os.path.join(os.path.dirname(__file__), "web") app = Flask(__name__, static_folder=WEB_DIR) # ── Aggregation ─────────────────────────────────────────────── # Welche Sensor-IDs fließen in welchen Aggregat-Bucket (Summe, außer AGG_AVG) AGG_SENSOR_IDS: Dict[str, List[str]] = { "total_pv_power": ["pv_power", "pv1_power", "pv2_power", "ppv"], "total_ac_power": ["ac_power", "ac_power_total"], "total_energy_today": ["energy_today", "e_day"], "total_energy_total": ["energy_total", "e_total"], "grid_power": ["total_power", "active_power"], "grid_import_kwh": ["import_kwh", "e_total_imp"], "grid_export_kwh": ["export_kwh", "e_total_exp"], "bat_charge_power": ["bat_charge_power"], "bat_discharge_power": ["bat_discharge_power"], "bat_charge_total": ["bat_charge_total", "e_bat_charge_total"], "bat_discharge_total": ["bat_discharge_total", "e_bat_discharge_total"], "bat_soc": ["bat_soc", "battery_soc"], } AGG_AVG = {"bat_soc"} AGGREGATE_META: Dict[str, Dict[str, str]] = { "total_pv_power": {"name": "PV Gesamtleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:solar-power"}, "total_ac_power": {"name": "AC Gesamtleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:flash"}, "total_energy_today": {"name": "Energie Heute Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:solar-power"}, "total_energy_total": {"name": "Energie Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:solar-power"}, "grid_power": {"name": "Netzleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:transmission-tower"}, "grid_import_kwh": {"name": "Netzbezug Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:transmission-tower-import"}, "grid_export_kwh": {"name": "Einspeisung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:transmission-tower-export"}, "bat_charge_power": {"name": "Batterie Ladeleistung Ges.", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:battery-plus"}, "bat_discharge_power": {"name": "Batterie Entladeleistung Ges.","unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:battery-minus"}, "bat_charge_total": {"name": "Batterie Ladung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:battery-plus"}, "bat_discharge_total": {"name": "Batterie Entladung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:battery-minus"}, "bat_soc": {"name": "Batterie Ladezustand Ø", "unit": "%", "device_class": "battery", "state_class": "measurement", "icon": "mdi:battery"}, } # ── State ──────────────────────────────────────────────────── class State: lock = threading.Lock() mqtt_cfg: Dict[str, Any] = {} inverters_cfg: List[Dict[str, Any]] = [] inv_data: Dict[str, Dict[str, Any]] = {} _publisher: Optional[MqttPublisher] = None _threads: Dict[str, threading.Thread] = {} _stop_events: Dict[str, threading.Event] = {} # ── Config ─────────────────────────────────────────────────── def _defaults() -> Dict[str, Any]: return { "mqtt_broker": "core-mosquitto", "mqtt_port": 1883, "mqtt_user": "", "mqtt_pass": "", "inverters": [], } def load_config() -> Dict[str, Any]: cfg = _defaults() if os.path.exists(HA_OPTIONS_PATH): try: with open(HA_OPTIONS_PATH) as f: ha = json.load(f) for k in ("mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_pass"): if k in ha: cfg[k] = ha[k] except Exception as e: log.warning("HA options Fehler: %s", e) if os.path.exists(CONFIG_PATH): try: with open(CONFIG_PATH) as f: cfg.update(json.load(f)) except Exception as e: log.warning("Config-Datei Fehler: %s", e) return cfg def save_config(): data = { "mqtt_broker": State.mqtt_cfg.get("mqtt_broker", ""), "mqtt_port": State.mqtt_cfg.get("mqtt_port", 1883), "mqtt_user": State.mqtt_cfg.get("mqtt_user", ""), "mqtt_pass": State.mqtt_cfg.get("mqtt_pass", ""), "inverters": State.inverters_cfg, } with open(CONFIG_PATH, "w") as f: json.dump(data, f, indent=2) # ── Aggregation ─────────────────────────────────────────────── def _compute_aggregates() -> Dict[str, float]: buckets: Dict[str, List[float]] = defaultdict(list) with State.lock: for inv_cfg in State.inverters_cfg: inv_id = inv_cfg["id"] d = State.inv_data.get(inv_id, {}) if not d.get("modbus_ok") or not d.get("values"): continue values = d["values"] for agg_id, sensor_ids in AGG_SENSOR_IDS.items(): for sid in sensor_ids: if sid in values: buckets[agg_id].append(values[sid]) result: Dict[str, float] = {} for agg_id, vals in buckets.items(): if vals: result[agg_id] = round( sum(vals) / len(vals) if agg_id in AGG_AVG else sum(vals), 3, ) return result # ── EMS Hilfsfunktionen ─────────────────────────────────────── def _get_pv_surplus() -> float: """PV-Überschuss in Watt aus laufenden Geräten ermitteln. Goodwe: active_power negativ = Einspeisung (Überschuss). Growatt: power_to_grid positiv = Einspeisung. """ surplus = 0.0 with State.lock: for inv_cfg in State.inverters_cfg: d = State.inv_data.get(inv_cfg["id"], {}) if not d.get("modbus_ok") or not d.get("values"): continue v = d["values"] # Goodwe: active_power < 0 bedeutet Einspeisung if "active_power" in v: surplus += max(0.0, -v["active_power"]) # Growatt if "power_to_grid" in v: surplus += max(0.0, v["power_to_grid"]) return surplus # ── Poll Loop ───────────────────────────────────────────────── def _poll_loop(inv_cfg: Dict[str, Any], stop: threading.Event): inv_id = inv_cfg["id"] model_id = inv_cfg.get("inverter_model", "MIC_1500_TL_X") inverter = INVERTERS.get(model_id, INVERTERS["MIC_1500_TL_X"]) prefix = inv_cfg.get("mqtt_topic_prefix", f"growatt/{inv_id}") device_id = f"growatt_{inv_id}" try: interval = max(5, int(inv_cfg.get("update_interval", 30))) port = int(inv_cfg.get("modbus_port", 502)) slave = int(inv_cfg.get("modbus_address", 1)) except (ValueError, TypeError) as e: log.error("[%s] Ungültige Konfiguration: %s", inv_id, e) return host = inv_cfg["modbus_ip"] ems: Optional[EmsController] = None if inverter.protocol == "goodwe_udp": reader = GoodweReader(host=host, family=inverter.goodwe_family) log.info("[%s] Poll-Loop: %s @ %s (Goodwe UDP/8899) alle %ds", inv_id, inverter.name, host, interval) elif inverter.protocol == "kathrein": reader = WallboxReader(host=host, port=port) ems = EmsController( min_pv_power=inv_cfg.get("ems_min_pv", 1400), pv_timeout_h=inv_cfg.get("ems_timeout", 4.0), target_hour=inv_cfg.get("ems_target_hour", 6), phases=inv_cfg.get("ems_phases", 3), ) log.info("[%s] Poll-Loop: %s @ %s:%s (Kathrein EMS) alle %ds", inv_id, inverter.name, host, port, interval) else: reader = ModbusReader(host=host, port=port, slave=slave) log.info("[%s] Poll-Loop: %s @ %s:%s alle %ds", inv_id, inverter.name, host, port, interval) with State.lock: if _publisher: _publisher.register_inverter(inverter, device_id, prefix, inv_cfg.get("name", inverter.name)) # History aus DB in die In-Memory-Deque laden hist_data = history.load_recent(inv_id, limit=300) with State.lock: d = State.inv_data.setdefault(inv_id, {"poll_count": 0}) hist = d.setdefault("history", {}) for sid, points in hist_data.items(): q = hist.setdefault(sid, deque(maxlen=300)) for pt in points: q.append(pt) log.info("[%s] %d Sensoren aus DB geladen", inv_id, len(hist_data)) while not stop.is_set(): t0 = time.time() values = reader.read(inverter) # EMS: PV-Überschuss aus anderen Geräten holen und Ladestrom regeln if ems is not None and values is not None: pv_surplus = _get_pv_surplus() # 0x0060 manchmal nicht lesbar → 1 (EV Connected) annehmen, # damit EMS aktiviert; Wallbox ignoriert Befehle wenn kein Auto da charging_state = int(values.get("charging_state", 1)) ems_status = ems.update(reader, pv_surplus, charging_state) values["ems_status_code"] = float(charging_state) log.info("[%s] EMS: %s | PV-Überschuss: %.0fW", inv_id, ems_status, pv_surplus) with State.lock: d = State.inv_data.setdefault(inv_id, {"poll_count": 0}) if values is not None: d["values"] = values d["last_update"] = time.time() d["modbus_ok"] = True d["poll_count"] = d.get("poll_count", 0) + 1 hist = d.setdefault("history", {}) now = time.time() for sid, val in values.items(): q = hist.setdefault(sid, deque(maxlen=300)) q.append((now, val)) history.write_batch(inv_id, now, values) if _publisher: _publisher.publish_data(values, prefix) _publisher.publish_status("online", prefix) else: d["modbus_ok"] = False if _publisher: _publisher.publish_status("offline", prefix) # Aggregate nach jedem erfolgreichen Poll neu berechnen und publizieren if values is not None and _publisher: agg = _compute_aggregates() if agg: _publisher.publish_aggregates(agg) stop.wait(max(0.0, interval - (time.time() - t0))) reader.close() if _publisher: _publisher.publish_status("offline", prefix) log.info("[%s] Poll-Loop beendet", inv_id) def _start_inverter(inv_cfg: Dict[str, Any]): inv_id = inv_cfg["id"] _stop_inverter(inv_id) ev = threading.Event() _stop_events[inv_id] = ev t = threading.Thread(target=_poll_loop, args=(inv_cfg, ev), daemon=True, name=f"poll-{inv_id}") _threads[inv_id] = t t.start() def _stop_inverter(inv_id: str): if inv_id in _stop_events: _stop_events[inv_id].set() if inv_id in _threads: _threads[inv_id].join(timeout=15) _stop_events.pop(inv_id, None) _threads.pop(inv_id, None) def _restart_all(): global _publisher for inv_id in list(_threads.keys()): _stop_inverter(inv_id) if _publisher: _publisher.disconnect() _publisher = MqttPublisher( broker=State.mqtt_cfg.get("mqtt_broker", "core-mosquitto"), port=int(State.mqtt_cfg.get("mqtt_port", 1883)), user=State.mqtt_cfg.get("mqtt_user", ""), password=State.mqtt_cfg.get("mqtt_pass", ""), agg_meta=AGGREGATE_META, ) _publisher.connect() time.sleep(2) for inv_cfg in State.inverters_cfg: _start_inverter(inv_cfg) # ── REST API ────────────────────────────────────────────────── @app.get("/api/config") def api_get_config(): with State.lock: cfg = {**State.mqtt_cfg, "inverters": State.inverters_cfg} cfg.pop("mqtt_pass", None) cfg["mqtt_connected"] = _publisher.connected if _publisher else False return jsonify(cfg) @app.post("/api/config") def api_save_config(): data = request.get_json(force=True) or {} with State.lock: for k in ("mqtt_broker", "mqtt_port", "mqtt_user"): if k in data: State.mqtt_cfg[k] = data[k] if data.get("mqtt_pass"): State.mqtt_cfg["mqtt_pass"] = data["mqtt_pass"] save_config() threading.Thread(target=_restart_all, daemon=True).start() return jsonify({"ok": True}) @app.get("/api/inverters-config") def api_get_inverters(): with State.lock: return jsonify(State.inverters_cfg) @app.post("/api/inverters-config") def api_save_inverters(): data = request.get_json(force=True) or [] if not isinstance(data, list): return jsonify({"error": "invalid"}), 400 for inv in data: if not isinstance(inv, dict): return jsonify({"error": "invalid"}), 400 model_id = inv.get("inverter_model") if model_id not in INVERTERS: return jsonify({"error": f"unknown model: {model_id}"}), 400 inverter_def = INVERTERS[model_id] if inverter_def.protocol == "modbus": port = inv.get("modbus_port", 502) if not isinstance(port, int) or not (1 <= port <= 65535): return jsonify({"error": "invalid port"}), 400 addr = inv.get("modbus_address", 1) if not isinstance(addr, int) or not (1 <= addr <= 247): return jsonify({"error": "invalid modbus address (1-247)"}), 400 with State.lock: State.inverters_cfg = data save_config() threading.Thread(target=_restart_all, daemon=True).start() return jsonify({"ok": True}) @app.get("/api/data") def api_get_data(): with State.lock: result = {} for inv_cfg in State.inverters_cfg: inv_id = inv_cfg["id"] model_id = inv_cfg.get("inverter_model", "MIC_1500_TL_X") inverter = INVERTERS.get(model_id, INVERTERS["MIC_1500_TL_X"]) d = State.inv_data.get(inv_id, {}) cutoff = time.time() - 300 raw_hist = d.get("history", {}) history = { sid: [v for (t, v) in q if t >= cutoff] for sid, q in raw_hist.items() } result[inv_id] = { "name": inv_cfg.get("name", inverter.name), "inverter_name": inverter.name, "values": d.get("values", {}), "history": history, "sensors": [ {"id": s.id, "name": s.name, "unit": s.unit, "icon": s.icon, "device_class": s.device_class} for s in inverter.sensors ], "last_update": d.get("last_update"), "modbus_ok": d.get("modbus_ok", False), "poll_count": d.get("poll_count", 0), } mqtt_ok = _publisher.connected if _publisher else False aggregates = _compute_aggregates() return jsonify({"inverters": result, "mqtt_ok": mqtt_ok, "aggregates": aggregates}) @app.get("/api/history") def api_get_history(): inv_id = request.args.get("inv_id", "") sensor_id = request.args.get("sensor_id", "") hours = min(float(request.args.get("hours", "24")), 24 * 7) if not inv_id or not sensor_id: return jsonify({"error": "inv_id and sensor_id required"}), 400 to_ts = time.time() from_ts = to_ts - hours * 3600 rows = history.query(inv_id, sensor_id, from_ts, to_ts) return jsonify([{"ts": t, "value": v} for t, v in rows]) @app.get("/api/inverter-models") def api_get_models(): return jsonify({ k: {"id": v.id, "name": v.name, "sensor_count": len(v.sensors)} for k, v in INVERTERS.items() }) @app.post("/api/new-id") def api_new_id(): return jsonify({"id": uuid.uuid4().hex[:8]}) @app.get("/api/export-config") def api_export_config(): with State.lock: data = { "shinebridge_export": True, "version": 1, "exported_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "mqtt": { "broker": State.mqtt_cfg.get("mqtt_broker", ""), "port": State.mqtt_cfg.get("mqtt_port", 1883), "user": State.mqtt_cfg.get("mqtt_user", ""), }, "inverters": State.inverters_cfg, } from flask import Response filename = f"shinebridge-config-{time.strftime('%Y%m%d-%H%M%S')}.json" return Response( json.dumps(data, indent=2, ensure_ascii=False), mimetype="application/json", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @app.post("/api/import-config") def api_import_config(): data = request.get_json(force=True) or {} if not data.get("shinebridge_export"): return jsonify({"error": "Keine gültige ShineBridge-Export-Datei"}), 400 inverters = data.get("inverters", []) if not isinstance(inverters, list): return jsonify({"error": "inverters ungültig"}), 400 for inv in inverters: if not inv.get("modbus_ip"): return jsonify({"error": f"modbus_ip fehlt in Gerät {inv.get('name', '?')}"}), 400 if inv.get("inverter_model") not in INVERTERS: return jsonify({"error": f"Unbekanntes Modell: {inv.get('inverter_model')}"}), 400 with State.lock: if mqtt := data.get("mqtt"): if mqtt.get("broker"): State.mqtt_cfg["mqtt_broker"] = mqtt["broker"] if mqtt.get("port"): State.mqtt_cfg["mqtt_port"] = int(mqtt["port"]) if mqtt.get("user"): State.mqtt_cfg["mqtt_user"] = mqtt["user"] State.inverters_cfg = inverters save_config() threading.Thread(target=_restart_all, daemon=True).start() return jsonify({"ok": True, "inverters": len(inverters)}) @app.get("/") def index(): return send_from_directory(WEB_DIR, "index.html") @app.get("/") def static_files(filename): return send_from_directory(WEB_DIR, filename) # ── Startup ─────────────────────────────────────────────────── if __name__ == "__main__": history.init_db() cfg = load_config() with State.lock: State.mqtt_cfg = {k: cfg[k] for k in ("mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_pass")} State.inverters_cfg = cfg.get("inverters", []) if not State.inverters_cfg and cfg.get("modbus_ip"): State.inverters_cfg = [{ "id": uuid.uuid4().hex[:8], "name": cfg.get("inverter_model", "MIC_1500_TL_X").replace("_", " "), "modbus_ip": cfg["modbus_ip"], "modbus_port": cfg.get("modbus_port", 502), "modbus_address": cfg.get("modbus_address", 1), "inverter_model": cfg.get("inverter_model", "MIC_1500_TL_X"), "mqtt_topic_prefix": cfg.get("mqtt_topic_prefix", "growatt/shinelanx"), "update_interval": cfg.get("update_interval", 30), }] save_config() _restart_all() port = int(os.environ.get("INGRESS_PORT", "8099")) log.info("Web UI startet auf Port %d", port) app.run(host="0.0.0.0", port=port, threaded=True)