import json import logging import os import threading import time import uuid from collections import defaultdict, deque from typing import Any, Dict, List, Optional from flask import Flask, jsonify, request, send_from_directory from inverters import INVERTERS import history from modbus_client import ModbusReader from goodwe_client import GoodweReader from wallbox_client import WallboxReader from ems_controller import EmsController from mqtt_publisher import MqttPublisher from surplus_devices import SurplusDeviceController logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) log = logging.getLogger(__name__) CONFIG_PATH = "/data/config.json" HA_OPTIONS_PATH = "/data/options.json" WEB_DIR = os.path.join(os.path.dirname(__file__), "web") app = Flask(__name__, static_folder=WEB_DIR) # ── Aggregation ─────────────────────────────────────────────── # Welche Sensor-IDs fließen in welchen Aggregat-Bucket (Summe, außer AGG_AVG) AGG_SENSOR_IDS: Dict[str, List[str]] = { "total_pv_power": ["pv_power", "pv1_power", "pv2_power", "ppv"], "total_ac_power": ["ac_power", "ac_power_total"], "total_energy_today": ["energy_today", "e_day"], "total_energy_total": ["energy_total", "e_total"], "grid_power": ["grid_power"], "grid_import_kwh": ["import_kwh", "e_total_imp"], "grid_export_kwh": ["export_kwh", "e_total_exp"], "bat_charge_power": ["bat_charge_power"], "bat_discharge_power": ["bat_discharge_power"], "bat_charge_total": ["bat_charge_total", "e_bat_charge_total"], "bat_discharge_total": ["bat_discharge_total", "e_bat_discharge_total"], "bat_soc": ["bat_soc", "battery_soc"], } AGG_AVG = {"bat_soc"} # Einzelmessungen am Netzanschlusspunkt — nicht über Geräte summieren, # sondern den besten Wert nehmen (Goodwe CT-Klemme hat Vorrang vor Proxy) AGG_FIRST = {"grid_power"} AGGREGATE_META: Dict[str, Dict[str, str]] = { "total_pv_power": {"name": "PV Gesamtleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:solar-power"}, "total_ac_power": {"name": "AC Gesamtleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:flash"}, "total_energy_today": {"name": "Energie Heute Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:solar-power"}, "total_energy_total": {"name": "Energie Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:solar-power"}, "grid_power": {"name": "Netzleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:transmission-tower"}, "grid_import_kwh": {"name": "Netzbezug Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:transmission-tower-import"}, "grid_export_kwh": {"name": "Einspeisung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:transmission-tower-export"}, "bat_charge_power": {"name": "Batterie Ladeleistung Ges.", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:battery-plus"}, "bat_discharge_power": {"name": "Batterie Entladeleistung Ges.","unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:battery-minus"}, "bat_charge_total": {"name": "Batterie Ladung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:battery-plus"}, "bat_discharge_total": {"name": "Batterie Entladung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:battery-minus"}, "bat_soc": {"name": "Batterie Ladezustand Ø", "unit": "%", "device_class": "battery", "state_class": "measurement", "icon": "mdi:battery"}, } # ── State ──────────────────────────────────────────────────── class State: lock = threading.Lock() mqtt_cfg: Dict[str, Any] = {} inverters_cfg: List[Dict[str, Any]] = [] inv_data: Dict[str, Dict[str, Any]] = {} surplus_devices_cfg: List[Dict[str, Any]] = [] z2m_base: str = "zigbee2mqtt" z2m_devices: List[Dict[str, Any]] = [] _publisher: Optional[MqttPublisher] = None _surplus_ctrl: Optional[SurplusDeviceController] = None _surplus_stop: threading.Event = threading.Event() _surplus_thread: Optional[threading.Thread] = None _threads: Dict[str, threading.Thread] = {} _stop_events: Dict[str, threading.Event] = {} # ── Config ─────────────────────────────────────────────────── def _defaults() -> Dict[str, Any]: return { "mqtt_broker": "core-mosquitto", "mqtt_port": 1883, "mqtt_user": "", "mqtt_pass": "", "price_import": 0.30, "price_export": 0.08, "billing_day": 1, "billing_month": 1, "tariff_type": "fixed", "spot_country": "de", "spot_markup": 0.0, "spot_chart": True, "inverters": [], "surplus_devices": [], "z2m_base": "zigbee2mqtt", } def load_config() -> Dict[str, Any]: cfg = _defaults() if os.path.exists(HA_OPTIONS_PATH): try: with open(HA_OPTIONS_PATH) as f: ha = json.load(f) for k in ("mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_pass"): if k in ha: cfg[k] = ha[k] except Exception as e: log.warning("HA options Fehler: %s", e) if os.path.exists(CONFIG_PATH): try: with open(CONFIG_PATH) as f: cfg.update(json.load(f)) except Exception as e: log.warning("Config-Datei Fehler: %s", e) return cfg def save_config(): data = { "mqtt_broker": State.mqtt_cfg.get("mqtt_broker", ""), "mqtt_port": State.mqtt_cfg.get("mqtt_port", 1883), "mqtt_user": State.mqtt_cfg.get("mqtt_user", ""), "mqtt_pass": State.mqtt_cfg.get("mqtt_pass", ""), "price_import": State.mqtt_cfg.get("price_import", 0.30), "price_export": State.mqtt_cfg.get("price_export", 0.08), "billing_day": State.mqtt_cfg.get("billing_day", 1), "billing_month": State.mqtt_cfg.get("billing_month", 1), "tariff_type": State.mqtt_cfg.get("tariff_type", "fixed"), "spot_country": State.mqtt_cfg.get("spot_country", "de"), "spot_markup": State.mqtt_cfg.get("spot_markup", 0.0), "spot_chart": State.mqtt_cfg.get("spot_chart", True), "inverters": State.inverters_cfg, "surplus_devices": State.surplus_devices_cfg, "z2m_base": State.z2m_base, } with open(CONFIG_PATH, "w") as f: json.dump(data, f, indent=2) # ── Aggregation ─────────────────────────────────────────────── def _compute_aggregates(allow_stale: bool = False) -> Dict[str, float]: buckets: Dict[str, List[float]] = defaultdict(list) with State.lock: for inv_cfg in State.inverters_cfg: inv_id = inv_cfg["id"] d = State.inv_data.get(inv_id, {}) if not d.get("values"): continue if not allow_stale and not d.get("modbus_ok"): continue values = d["values"] for agg_id, sensor_ids in AGG_SENSOR_IDS.items(): for sid in sensor_ids: if sid in values: buckets[agg_id].append(values[sid]) break result: Dict[str, float] = {} for agg_id, vals in buckets.items(): if vals: if agg_id in AGG_AVG: v = sum(vals) / len(vals) elif agg_id in AGG_FIRST: v = vals[0] # ersten (besten) Wert nehmen, nicht summieren else: v = sum(vals) result[agg_id] = round(v, 3) return result # ── EMS Hilfsfunktionen ─────────────────────────────────────── def _get_pv_surplus() -> float: """PV-Überschuss in Watt. Nutzt grid_power-Aggregat (negativ = Einspeisung).""" agg = _compute_aggregates(allow_stale=True) return max(0.0, -agg.get("grid_power", 0.0)) # ── Poll Loop ───────────────────────────────────────────────── def _poll_loop(inv_cfg: Dict[str, Any], stop: threading.Event): inv_id = inv_cfg["id"] model_id = inv_cfg.get("inverter_model", "MIC_1500_TL_X") inverter = INVERTERS.get(model_id, INVERTERS["MIC_1500_TL_X"]) prefix = inv_cfg.get("mqtt_topic_prefix", f"growatt/{inv_id}") device_id = f"growatt_{inv_id}" try: interval = max(5, int(inv_cfg.get("update_interval", 30))) port = int(inv_cfg.get("modbus_port", 502)) slave = int(inv_cfg.get("modbus_address", 1)) except (ValueError, TypeError) as e: log.error("[%s] Ungültige Konfiguration: %s", inv_id, e) return host = inv_cfg["modbus_ip"] ems: Optional[EmsController] = None if inverter.protocol == "goodwe_udp": reader = GoodweReader(host=host, family=inverter.goodwe_family) log.info("[%s] Poll-Loop: %s @ %s (Goodwe UDP/8899) alle %ds", inv_id, inverter.name, host, interval) elif inverter.protocol == "kathrein": reader = WallboxReader(host=host, port=port) ems = EmsController( min_pv_power=inv_cfg.get("ems_min_pv", 1400), pv_timeout_h=inv_cfg.get("ems_timeout", 4.0), target_hour=inv_cfg.get("ems_target_hour", 6), phases=inv_cfg.get("ems_phases", 3), ) log.info("[%s] Poll-Loop: %s @ %s:%s (Kathrein EMS) alle %ds", inv_id, inverter.name, host, port, interval) else: reader = ModbusReader(host=host, port=port, slave=slave) log.info("[%s] Poll-Loop: %s @ %s:%s alle %ds", inv_id, inverter.name, host, port, interval) with State.lock: if _publisher: _publisher.register_inverter(inverter, device_id, prefix, inv_cfg.get("name", inverter.name)) # History aus DB in die In-Memory-Deque laden hist_data = history.load_recent(inv_id, limit=300) with State.lock: d = State.inv_data.setdefault(inv_id, {"poll_count": 0}) hist = d.setdefault("history", {}) for sid, points in hist_data.items(): q = hist.setdefault(sid, deque(maxlen=300)) for pt in points: q.append(pt) log.info("[%s] %d Sensoren aus DB geladen", inv_id, len(hist_data)) while not stop.is_set(): t0 = time.time() values = reader.read(inverter) # Growatt-Proxy: grid_power aus power_to_grid (nur Einspeisung bekannt) if values and "grid_power" not in values and "power_to_grid" in values: values["grid_power"] = -values["power_to_grid"] # SDM-630-Proxy: total_power = Netzleistung (positiv=Bezug, negativ=Einspeisung) if values and "grid_power" not in values and "import_kwh" in values and "total_power" in values: values["grid_power"] = values["total_power"] # EMS: PV-Überschuss aus anderen Geräten holen und Ladestrom regeln if ems is not None and values is not None and inv_cfg.get("ems_enabled", True): pv_surplus = _get_pv_surplus() # 0x0060 manchmal nicht lesbar → 1 (EV Connected) annehmen, # damit EMS aktiviert; Wallbox ignoriert Befehle wenn kein Auto da charging_state = int(values.get("charging_state", 1)) wallbox_power = values.get("total_power", 0.0) ems_status = ems.update(reader, pv_surplus, charging_state, wallbox_power) values["ems_status_code"] = float(charging_state) log.info("[%s] EMS: %s | PV-Überschuss: %.0fW", inv_id, ems_status, pv_surplus) with State.lock: d = State.inv_data.setdefault(inv_id, {"poll_count": 0}) if values is not None: d["values"] = values d["last_update"] = time.time() d["modbus_ok"] = True d["poll_count"] = d.get("poll_count", 0) + 1 hist = d.setdefault("history", {}) now = time.time() for sid, val in values.items(): q = hist.setdefault(sid, deque(maxlen=300)) q.append((now, val)) history.write_batch(inv_id, now, values) if _publisher: _publisher.publish_data(values, prefix) _publisher.publish_status("online", prefix) else: d["modbus_ok"] = False if _publisher: _publisher.publish_status("offline", prefix) # Aggregate nach jedem erfolgreichen Poll neu berechnen und publizieren if values is not None and _publisher: agg = _compute_aggregates() if agg: _publisher.publish_aggregates(agg) stop.wait(max(0.0, interval - (time.time() - t0))) reader.close() if _publisher: _publisher.publish_status("offline", prefix) log.info("[%s] Poll-Loop beendet", inv_id) def _start_inverter(inv_cfg: Dict[str, Any]): inv_id = inv_cfg["id"] _stop_inverter(inv_id) ev = threading.Event() _stop_events[inv_id] = ev t = threading.Thread(target=_poll_loop, args=(inv_cfg, ev), daemon=True, name=f"poll-{inv_id}") _threads[inv_id] = t t.start() def _stop_inverter(inv_id: str): if inv_id in _stop_events: _stop_events[inv_id].set() if inv_id in _threads: _threads[inv_id].join(timeout=15) _stop_events.pop(inv_id, None) _threads.pop(inv_id, None) def _surplus_loop(stop: threading.Event): while not stop.wait(30): ctrl = _surplus_ctrl if ctrl is None: continue surplus = _get_pv_surplus() ctrl.update(surplus) def _start_surplus_loop(): global _surplus_stop, _surplus_thread _surplus_stop.set() if _surplus_thread and _surplus_thread.is_alive(): _surplus_thread.join(timeout=5) _surplus_stop = threading.Event() _surplus_thread = threading.Thread( target=_surplus_loop, args=(_surplus_stop,), daemon=True, name="surplus-loop" ) _surplus_thread.start() def _restart_all(): global _publisher, _surplus_ctrl for inv_id in list(_threads.keys()): _stop_inverter(inv_id) if _publisher: _publisher.disconnect() _publisher = MqttPublisher( broker=State.mqtt_cfg.get("mqtt_broker", "core-mosquitto"), port=int(State.mqtt_cfg.get("mqtt_port", 1883)), user=State.mqtt_cfg.get("mqtt_user", ""), password=State.mqtt_cfg.get("mqtt_pass", ""), agg_meta=AGGREGATE_META, ) _publisher.connect() time.sleep(2) with State.lock: devices = State.surplus_devices_cfg z2m_base = State.z2m_base _surplus_ctrl = SurplusDeviceController(_publisher, z2m_base) _surplus_ctrl.set_config(devices, z2m_base) _start_surplus_loop() def _on_z2m_devices(topic, payload): try: devices_raw = json.loads(payload) if not isinstance(devices_raw, list): return parsed = [ { "friendly_name": d.get("friendly_name", ""), "description": (d.get("definition") or {}).get("description", ""), "type": d.get("type", ""), } for d in devices_raw if d.get("friendly_name") and d.get("friendly_name") != "Coordinator" ] with State.lock: State.z2m_devices = parsed log.info("Z2M: %d Geräte empfangen", len(parsed)) except Exception as e: log.warning("Z2M bridge/devices Fehler: %s", e) _publisher.subscribe(f"{z2m_base}/bridge/devices", _on_z2m_devices) for inv_cfg in State.inverters_cfg: _start_inverter(inv_cfg) # ── REST API ────────────────────────────────────────────────── @app.get("/api/config") def api_get_config(): with State.lock: cfg = {**State.mqtt_cfg, "inverters": State.inverters_cfg} cfg.pop("mqtt_pass", None) cfg["mqtt_connected"] = _publisher.connected if _publisher else False return jsonify(cfg) @app.post("/api/config") def api_save_config(): data = request.get_json(force=True) or {} with State.lock: for k in ("mqtt_broker", "mqtt_port", "mqtt_user"): if k in data: State.mqtt_cfg[k] = data[k] if data.get("mqtt_pass"): State.mqtt_cfg["mqtt_pass"] = data["mqtt_pass"] for k in ("price_import", "price_export", "spot_markup"): if k in data: State.mqtt_cfg[k] = float(data[k]) for k in ("billing_day", "billing_month"): if k in data: State.mqtt_cfg[k] = int(data[k]) for k in ("tariff_type", "spot_country"): if k in data: State.mqtt_cfg[k] = str(data[k]) if "spot_chart" in data: State.mqtt_cfg["spot_chart"] = bool(data["spot_chart"]) save_config() threading.Thread(target=_restart_all, daemon=True).start() return jsonify({"ok": True}) @app.get("/api/period-energy") def api_period_energy(): import datetime agg = _compute_aggregates(allow_stale=True) price_import = float(State.mqtt_cfg.get("price_import", 0.30)) price_export = float(State.mqtt_cfg.get("price_export", 0.08)) billing_day = int(State.mqtt_cfg.get("billing_day", 1)) billing_month = int(State.mqtt_cfg.get("billing_month", 1)) tariff_type = str(State.mqtt_cfg.get("tariff_type", "fixed")) spot_country = str(State.mqtt_cfg.get("spot_country", "de")) spot_markup = float(State.mqtt_cfg.get("spot_markup", 0.0)) # ct/kWh result = { "price_import": price_import, "price_export": price_export, "tariff_type": tariff_type, "spot_chart": bool(State.mqtt_cfg.get("spot_chart", True)), } MONTHS_DE = ["","Jan","Feb","Mär","Apr","Mai","Jun","Jul","Aug","Sep","Okt","Nov","Dez"] now_ts = time.time() for period_type in ("monthly", "yearly"): key = history.period_key(period_type, billing_day, billing_month) entry = {} # Perioden-Startzeitpunkt für Spot-Preisabfrage if period_type == "monthly": pd = datetime.date.fromisoformat(key + "-01") entry["label"] = f"{MONTHS_DE[pd.month]} {pd.year}" else: pd = datetime.date.fromisoformat(key) end_d = datetime.date(pd.year + 1, billing_month, billing_day) if billing_month != 1 or billing_day != 1 \ else datetime.date(pd.year + 1, 1, 1) entry["label"] = f"{pd.strftime('%d.%m.%Y')} – {(end_d - datetime.timedelta(days=1)).strftime('%d.%m.%Y')}" pd_start_ts = datetime.datetime.combine(pd, datetime.time.min).timestamp() for agg_id in ("grid_import_kwh", "grid_export_kwh", "bat_discharge_total", "bat_charge_total", "total_energy_total"): cur = agg.get(agg_id) if cur is None: continue history.save_period_start_if_new(agg_id, period_type, key, cur) val = history.get_period_consumption(agg_id, period_type, key, cur) if val is not None: entry[agg_id] = round(val, 2) # Effektiver Importpreis (Festpreis oder Börsenpreis) eff_price = price_import if tariff_type == "spot": avg_ct = _get_avg_spot_price(pd_start_ts, now_ts, spot_country) if avg_ct is not None: eff_price = (avg_ct + spot_markup) / 100 entry["spot_avg_ct"] = avg_ct entry["spot_markup_ct"] = spot_markup entry["effective_price"]= round(eff_price, 4) if "grid_import_kwh" in entry: entry["import_cost"] = round(entry["grid_import_kwh"] * eff_price, 2) if "grid_export_kwh" in entry: entry["export_revenue"] = round(entry["grid_export_kwh"] * price_export, 2) # Eigenverbrauch-Ersparnis: PV_gesamt − Einspeisung = alles was selbst verbraucht wurde # (Direktverbrauch + Batterie-Umweg). Fällt PV nicht verfügbar: nur Batterie-Entladung. pv_total = entry.get("total_energy_total") grid_exp = entry.get("grid_export_kwh") bat_dch = entry.get("bat_discharge_total") if pv_total is not None and grid_exp is not None: savings = round(max(0.0, pv_total - grid_exp), 2) entry["savings_kwh"] = savings entry["savings_eur"] = round(savings * eff_price, 2) elif bat_dch is not None: entry["savings_kwh"] = bat_dch entry["savings_eur"] = round(bat_dch * eff_price, 2) result[period_type] = entry return jsonify(result) @app.get("/api/z2m-devices") def api_z2m_devices(): with State.lock: return jsonify(State.z2m_devices) @app.get("/api/surplus-devices") def api_get_surplus_devices(): with State.lock: devices = State.surplus_devices_cfg z2m_base = State.z2m_base states = _surplus_ctrl.get_states() if _surplus_ctrl else {} surplus_w = _get_pv_surplus() return jsonify({"devices": devices, "z2m_base": z2m_base, "states": states, "surplus_w": surplus_w}) @app.post("/api/surplus-devices") def api_save_surplus_devices(): data = request.get_json(force=True) or {} devices = data.get("devices", []) z2m_base = str(data.get("z2m_base", "zigbee2mqtt")).strip() or "zigbee2mqtt" if not isinstance(devices, list): return jsonify({"error": "invalid"}), 400 for dev in devices: if not isinstance(dev, dict) or not dev.get("z2m_name"): return jsonify({"error": "z2m_name fehlt"}), 400 if "id" not in dev: dev["id"] = uuid.uuid4().hex[:8] with State.lock: State.surplus_devices_cfg = devices State.z2m_base = z2m_base save_config() if _surplus_ctrl: _surplus_ctrl.set_config(devices, z2m_base) return jsonify({"ok": True}) @app.get("/api/inverters-config") def api_get_inverters(): with State.lock: return jsonify(State.inverters_cfg) @app.post("/api/inverters-config") def api_save_inverters(): data = request.get_json(force=True) or [] if not isinstance(data, list): return jsonify({"error": "invalid"}), 400 for inv in data: if not isinstance(inv, dict): return jsonify({"error": "invalid"}), 400 model_id = inv.get("inverter_model") if model_id not in INVERTERS: return jsonify({"error": f"unknown model: {model_id}"}), 400 inverter_def = INVERTERS[model_id] if inverter_def.protocol == "modbus": port = inv.get("modbus_port", 502) if not isinstance(port, int) or not (1 <= port <= 65535): return jsonify({"error": "invalid port"}), 400 addr = inv.get("modbus_address", 1) if not isinstance(addr, int) or not (1 <= addr <= 247): return jsonify({"error": "invalid modbus address (1-247)"}), 400 with State.lock: State.inverters_cfg = data save_config() threading.Thread(target=_restart_all, daemon=True).start() return jsonify({"ok": True}) @app.get("/api/data") def api_get_data(): with State.lock: result = {} for inv_cfg in State.inverters_cfg: inv_id = inv_cfg["id"] model_id = inv_cfg.get("inverter_model", "MIC_1500_TL_X") inverter = INVERTERS.get(model_id, INVERTERS["MIC_1500_TL_X"]) d = State.inv_data.get(inv_id, {}) cutoff = time.time() - 300 raw_hist = d.get("history", {}) history = { sid: [v for (t, v) in q if t >= cutoff] for sid, q in raw_hist.items() } result[inv_id] = { "name": inv_cfg.get("name", inverter.name), "inverter_name": inverter.name, "values": d.get("values", {}), "history": history, "sensors": [ {"id": s.id, "name": s.name, "unit": s.unit, "icon": s.icon, "device_class": s.device_class} for s in inverter.sensors ], "last_update": d.get("last_update"), "modbus_ok": d.get("modbus_ok", False), "poll_count": d.get("poll_count", 0), } mqtt_ok = _publisher.connected if _publisher else False aggregates = _compute_aggregates() return jsonify({"inverters": result, "mqtt_ok": mqtt_ok, "aggregates": aggregates}) @app.get("/api/history") def api_get_history(): inv_id = request.args.get("inv_id", "") sensor_id = request.args.get("sensor_id", "") hours = min(float(request.args.get("hours", "24")), 24 * 7) if not inv_id or not sensor_id: return jsonify({"error": "inv_id and sensor_id required"}), 400 to_ts = time.time() from_ts = to_ts - hours * 3600 rows = history.query(inv_id, sensor_id, from_ts, to_ts) return jsonify([{"ts": t, "value": v} for t, v in rows]) @app.get("/api/inverter-models") def api_get_models(): return jsonify({ k: {"id": v.id, "name": v.name, "sensor_count": len(v.sensors)} for k, v in INVERTERS.items() }) @app.post("/api/new-id") def api_new_id(): return jsonify({"id": uuid.uuid4().hex[:8]}) @app.get("/api/export-config") def api_export_config(): with State.lock: data = { "shinebridge_export": True, "version": 1, "exported_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "mqtt": { "broker": State.mqtt_cfg.get("mqtt_broker", ""), "port": State.mqtt_cfg.get("mqtt_port", 1883), "user": State.mqtt_cfg.get("mqtt_user", ""), }, "inverters": State.inverters_cfg, } from flask import Response filename = f"shinebridge-config-{time.strftime('%Y%m%d-%H%M%S')}.json" return Response( json.dumps(data, indent=2, ensure_ascii=False), mimetype="application/json", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @app.post("/api/import-config") def api_import_config(): data = request.get_json(force=True) or {} if not data.get("shinebridge_export"): return jsonify({"error": "Keine gültige ShineBridge-Export-Datei"}), 400 inverters = data.get("inverters", []) if not isinstance(inverters, list): return jsonify({"error": "inverters ungültig"}), 400 for inv in inverters: if not inv.get("modbus_ip"): return jsonify({"error": f"modbus_ip fehlt in Gerät {inv.get('name', '?')}"}), 400 if inv.get("inverter_model") not in INVERTERS: return jsonify({"error": f"Unbekanntes Modell: {inv.get('inverter_model')}"}), 400 with State.lock: if mqtt := data.get("mqtt"): if mqtt.get("broker"): State.mqtt_cfg["mqtt_broker"] = mqtt["broker"] if mqtt.get("port"): State.mqtt_cfg["mqtt_port"] = int(mqtt["port"]) if mqtt.get("user"): State.mqtt_cfg["mqtt_user"] = mqtt["user"] State.inverters_cfg = inverters save_config() threading.Thread(target=_restart_all, daemon=True).start() return jsonify({"ok": True, "inverters": len(inverters)}) _hist_spot_cache: Dict[str, Any] = {} # key → {"ts", "avg_ct"} def _get_avg_spot_price(start_ts: float, end_ts: float, country: str = "de") -> Optional[float]: """Durchschnittlicher EPEX-SPOT-Preis (ct/kWh) für einen Zeitraum. None bei Fehler.""" import urllib.request as _ur cache_key = f"{country}-{int(start_ts//3600)}-{int(end_ts//3600)}" now = time.time() cached = _hist_spot_cache.get(cache_key) if cached and now - cached["ts"] < 3600: return cached["avg_ct"] try: url = (f"https://api.awattar.{country}/v1/marketdata" f"?start={int(start_ts*1000)}&end={int(end_ts*1000)}") with _ur.urlopen(url, timeout=10) as r: raw = json.loads(r.read()) prices = [d["marketprice"] / 10 for d in raw.get("data", [])] if not prices: return None avg = round(sum(prices) / len(prices), 2) _hist_spot_cache[cache_key] = {"ts": now, "avg_ct": avg} return avg except Exception as e: log.warning("Historischer Spot-Preis Fehler: %s", e) return None _spot_cache: Dict[str, Any] = {"ts": 0.0, "data": []} _SPOT_TTL = 900 # 15 Minuten @app.get("/api/spot-price") def api_spot_price(): import urllib.request as _ur global _spot_cache now = time.time() if now - _spot_cache["ts"] < _SPOT_TTL and _spot_cache["data"]: return jsonify({"ok": True, "data": _spot_cache["data"]}) try: with _ur.urlopen("https://api.awattar.de/v1/marketdata", timeout=8) as r: raw = json.loads(r.read()) entries = [ {"ts": int(d["start_timestamp"] // 1000), "price": round(d["marketprice"] / 10, 2)} for d in raw.get("data", []) ] _spot_cache = {"ts": now, "data": entries} return jsonify({"ok": True, "data": entries}) except Exception as e: log.warning("Spot-Price API Fehler: %s", e) return jsonify({"ok": False, "data": _spot_cache.get("data", [])}) @app.get("/") def index(): return send_from_directory(WEB_DIR, "index.html") @app.get("/") def static_files(filename): return send_from_directory(WEB_DIR, filename) # ── Startup ─────────────────────────────────────────────────── if __name__ == "__main__": history.init_db() cfg = load_config() with State.lock: State.mqtt_cfg = {k: cfg[k] for k in ("mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_pass")} State.inverters_cfg = cfg.get("inverters", []) State.surplus_devices_cfg = cfg.get("surplus_devices", []) State.z2m_base = cfg.get("z2m_base", "zigbee2mqtt") if not State.inverters_cfg and cfg.get("modbus_ip"): State.inverters_cfg = [{ "id": uuid.uuid4().hex[:8], "name": cfg.get("inverter_model", "MIC_1500_TL_X").replace("_", " "), "modbus_ip": cfg["modbus_ip"], "modbus_port": cfg.get("modbus_port", 502), "modbus_address": cfg.get("modbus_address", 1), "inverter_model": cfg.get("inverter_model", "MIC_1500_TL_X"), "mqtt_topic_prefix": cfg.get("mqtt_topic_prefix", "growatt/shinelanx"), "update_interval": cfg.get("update_interval", 30), }] save_config() _restart_all() port = int(os.environ.get("INGRESS_PORT", "8099")) log.info("Web UI startet auf Port %d", port) app.run(host="0.0.0.0", port=port, threaded=True)