Files
Shinebridge/haos-addon/src/main.py
T
retr0 15c0ede72e v1.8.18: MQTT rc=5 Fehlerhandling, Port-Absicherung, Flash-Wizard + NuttX OTA
MQTT:
- rc=5 (Not Authorized) stoppt Reconnect-Loop via _auth_failed Flag
- Fehlermeldung im MQTT-Einstellungen-Banner sichtbar

Sicherheit:
- /api/* nur über HAOS-Ingress (X-Ingress-Path) oder Loopback erreichbar

Flash-Wizard (Baustelle B):
- Neuer Tab "Flash" mit IP-Eingabe und OTA-Modus-Erkennung
- OTA: integrierte oder eigene Firmware via POST /api/flash/update auf Stick
- Fortschrittsbalken + Polling bis Stick nach Reset wieder online
- ST-Link-Erstflash-Anleitung (Pinout, st-flash Kommando)
- Firmware-Binaries im Docker-Image unter /firmware/

NuttX OTA (Baustelle A, shinelanx-modbus):
- ota_http.c: Zwei-Phasen OTA für STM32F103 Single-Bank Flash
  Stage 1: Firmware in Staging-Bereich (obere Flashhälfte) schreiben
  Stage 2: .ramfuncs aus SRAM heraus — Staging → App-Bereich kopieren, Reset
- ota_http.h, Makefile und main.c entsprechend erweitert
- ld.script.dfu: .ramfuncs in .data Section → Ausführung aus SRAM

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 19:45:40 +02:00

974 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
import os
import re
import threading
import time
import uuid
from collections import defaultdict, deque
from typing import Any, Dict, List, Optional
from flask import Flask, jsonify, request, send_from_directory
from inverters import INVERTERS
import history
from modbus_client import ModbusReader
from goodwe_client import GoodweReader
from wallbox_client import WallboxReader
from ems_controller import EmsController
from mqtt_publisher import MqttPublisher
from surplus_devices import SurplusDeviceController
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
log = logging.getLogger(__name__)
CONFIG_PATH = "/data/config.json"
HA_OPTIONS_PATH = "/data/options.json"
WEB_DIR = os.path.join(os.path.dirname(__file__), "web")
app = Flask(__name__, static_folder=WEB_DIR)
@app.before_request
def _check_ingress():
# Static files and the root page are always served (no API data inside)
if request.path == "/" or not request.path.startswith("/api/"):
return None
# Requests routed through HAOS ingress proxy carry this header
if request.headers.get("X-Ingress-Path"):
return None
# Allow loopback for local debugging / health checks
if request.remote_addr in ("127.0.0.1", "::1"):
return None
return jsonify({"error": "Zugriff nur über die HAOS-Oberfläche erlaubt"}), 403
# ── Aggregation ───────────────────────────────────────────────
# Welche Sensor-IDs fließen in welchen Aggregat-Bucket (Summe, außer AGG_AVG)
AGG_SENSOR_IDS: Dict[str, List[str]] = {
"total_pv_power": ["pv_power", "pv1_power", "pv2_power", "ppv"],
"total_ac_power": ["ac_power", "ac_power_total"],
"total_energy_today": ["energy_today", "e_day"],
"total_energy_total": ["energy_total", "e_total"],
"grid_power": ["grid_power"],
"grid_import_kwh": ["import_kwh", "e_total_imp"],
"grid_export_kwh": ["export_kwh", "e_total_exp"],
"bat_charge_power": ["bat_charge_power"],
"bat_discharge_power": ["bat_discharge_power"],
"bat_charge_total": ["bat_charge_total", "e_bat_charge_total"],
"bat_discharge_total": ["bat_discharge_total", "e_bat_discharge_total"],
"bat_soc": ["bat_soc", "battery_soc"],
}
AGG_AVG = {"bat_soc"}
# Einzelmessungen am Netzanschlusspunkt — nicht über Geräte summieren,
# sondern den besten Wert nehmen (Goodwe CT-Klemme hat Vorrang vor Proxy)
AGG_FIRST = {"grid_power"}
AGGREGATE_META: Dict[str, Dict[str, str]] = {
"total_pv_power": {"name": "PV Gesamtleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:solar-power"},
"total_ac_power": {"name": "AC Gesamtleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:flash"},
"total_energy_today": {"name": "Energie Heute Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:solar-power"},
"total_energy_total": {"name": "Energie Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:solar-power"},
"grid_power": {"name": "Netzleistung", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:transmission-tower"},
"grid_import_kwh": {"name": "Netzbezug Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:transmission-tower-import"},
"grid_export_kwh": {"name": "Einspeisung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:transmission-tower-export"},
"bat_charge_power": {"name": "Batterie Ladeleistung Ges.", "unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:battery-plus"},
"bat_discharge_power": {"name": "Batterie Entladeleistung Ges.","unit": "W", "device_class": "power", "state_class": "measurement", "icon": "mdi:battery-minus"},
"bat_charge_total": {"name": "Batterie Ladung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:battery-plus"},
"bat_discharge_total": {"name": "Batterie Entladung Gesamt", "unit": "kWh", "device_class": "energy", "state_class": "total_increasing", "icon": "mdi:battery-minus"},
"bat_soc": {"name": "Batterie Ladezustand Ø", "unit": "%", "device_class": "battery", "state_class": "measurement", "icon": "mdi:battery"},
}
# ── State ────────────────────────────────────────────────────
class State:
lock = threading.Lock()
mqtt_cfg: Dict[str, Any] = {}
inverters_cfg: List[Dict[str, Any]] = []
inv_data: Dict[str, Dict[str, Any]] = {}
surplus_devices_cfg: List[Dict[str, Any]] = []
z2m_base: str = "zigbee2mqtt"
z2m_devices: List[Dict[str, Any]] = []
last_tariff_snapshot_date: str = ""
_publisher: Optional[MqttPublisher] = None
_surplus_ctrl: Optional[SurplusDeviceController] = None
_surplus_stop: threading.Event = threading.Event()
_surplus_thread: Optional[threading.Thread] = None
_threads: Dict[str, threading.Thread] = {}
_stop_events: Dict[str, threading.Event] = {}
# ── Config ───────────────────────────────────────────────────
def _defaults() -> Dict[str, Any]:
return {
"mqtt_broker": "core-mosquitto",
"mqtt_port": 1883,
"mqtt_user": "",
"mqtt_pass": "",
"price_import": 0.30,
"price_export": 0.08,
"billing_day": 1,
"billing_month": 1,
"tariff_type": "fixed",
"spot_country": "de",
"spot_markup": 0.0,
"spot_chart": True,
"billing_tracker_enabled": False,
"monthly_rate_eur": 0.0,
"grundpreis_eur_per_month": 0.0,
"inverters": [],
"surplus_devices": [],
"z2m_base": "zigbee2mqtt",
}
def _load_json_safe(path: str) -> Optional[Dict]:
"""Lädt eine JSON-Datei; gibt None zurück wenn fehlend oder korrupt."""
try:
with open(path) as f:
return json.load(f)
except Exception as e:
log.warning("JSON-Ladefehler %s: %s", path, e)
return None
def load_config() -> Dict[str, Any]:
cfg = _defaults()
# MQTT-Grundeinstellungen aus HAOS-Options (überschreibbar durch config.json)
if os.path.exists(HA_OPTIONS_PATH):
ha = _load_json_safe(HA_OPTIONS_PATH) or {}
for k in ("mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_pass"):
if k in ha:
cfg[k] = ha[k]
# Eigene persistente Config — Hauptdatei, dann Backup als Fallback
loaded = _load_json_safe(CONFIG_PATH)
if loaded is None and os.path.exists(CONFIG_PATH + ".bak"):
log.warning("config.json korrupt — lade Backup")
loaded = _load_json_safe(CONFIG_PATH + ".bak")
if loaded:
cfg.update(loaded)
return cfg
def save_config():
data = {
"mqtt_broker": State.mqtt_cfg.get("mqtt_broker", ""),
"mqtt_port": State.mqtt_cfg.get("mqtt_port", 1883),
"mqtt_user": State.mqtt_cfg.get("mqtt_user", ""),
"mqtt_pass": State.mqtt_cfg.get("mqtt_pass", ""),
"price_import": State.mqtt_cfg.get("price_import", 0.30),
"price_export": State.mqtt_cfg.get("price_export", 0.08),
"billing_day": State.mqtt_cfg.get("billing_day", 1),
"billing_month": State.mqtt_cfg.get("billing_month", 1),
"tariff_type": State.mqtt_cfg.get("tariff_type", "fixed"),
"spot_country": State.mqtt_cfg.get("spot_country", "de"),
"spot_markup": State.mqtt_cfg.get("spot_markup", 0.0),
"spot_chart": State.mqtt_cfg.get("spot_chart", True),
"billing_tracker_enabled": State.mqtt_cfg.get("billing_tracker_enabled", False),
"monthly_rate_eur": State.mqtt_cfg.get("monthly_rate_eur", 0.0),
"grundpreis_eur_per_month": State.mqtt_cfg.get("grundpreis_eur_per_month", 0.0),
"inverters": State.inverters_cfg,
"surplus_devices": State.surplus_devices_cfg,
"z2m_base": State.z2m_base,
}
# Backup der letzten guten Config anlegen
if os.path.exists(CONFIG_PATH):
try:
os.replace(CONFIG_PATH, CONFIG_PATH + ".bak")
except OSError:
pass
# Atomarer Write: erst .tmp schreiben, dann umbenennen
tmp = CONFIG_PATH + ".tmp"
with open(tmp, "w") as f:
json.dump(data, f, indent=2)
os.replace(tmp, CONFIG_PATH)
# ── Aggregation ───────────────────────────────────────────────
def _compute_aggregates(allow_stale: bool = False) -> Dict[str, float]:
buckets: Dict[str, List[float]] = defaultdict(list)
with State.lock:
for inv_cfg in State.inverters_cfg:
inv_id = inv_cfg["id"]
d = State.inv_data.get(inv_id, {})
if not d.get("values"):
continue
if not allow_stale and not d.get("modbus_ok"):
continue
values = d["values"]
for agg_id, sensor_ids in AGG_SENSOR_IDS.items():
for sid in sensor_ids:
if sid in values:
buckets[agg_id].append(values[sid])
break
result: Dict[str, float] = {}
for agg_id, vals in buckets.items():
if vals:
if agg_id in AGG_AVG:
v = sum(vals) / len(vals)
elif agg_id in AGG_FIRST:
v = vals[0] # ersten (besten) Wert nehmen, nicht summieren
else:
v = sum(vals)
result[agg_id] = round(v, 3)
return result
# ── EMS Hilfsfunktionen ───────────────────────────────────────
def _get_pv_surplus() -> float:
"""PV-Überschuss in Watt. Nutzt grid_power-Aggregat (negativ = Einspeisung)."""
agg = _compute_aggregates(allow_stale=True)
return max(0.0, -agg.get("grid_power", 0.0))
# ── Poll Loop ─────────────────────────────────────────────────
def _run_daily_tariff_snapshot():
import datetime
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
today_iso = today.isoformat()
yesterday_iso = yesterday.isoformat()
agg = _compute_aggregates(allow_stale=True)
cur_kwh = agg.get("grid_import_kwh")
if cur_kwh is None:
return
history.save_daily_kwh_start(today_iso, cur_kwh)
prev_kwh = history.get_daily_kwh_start(yesterday_iso)
if prev_kwh is None:
return
fixed_ct = float(State.mqtt_cfg.get("price_import", 0.30)) * 100
markup_ct = float(State.mqtt_cfg.get("spot_markup", 0.0))
country = str(State.mqtt_cfg.get("spot_country", "de"))
y_start = datetime.datetime.combine(yesterday, datetime.time.min).timestamp()
y_end = datetime.datetime.combine(today, datetime.time.min).timestamp()
spot_ct = _get_avg_spot_price(y_start, y_end, country)
history.save_daily_tariff_snapshot(yesterday_iso, prev_kwh, cur_kwh, spot_ct, fixed_ct, markup_ct)
log.info("Tariftag %s gespeichert: %.3f kWh, spot=%.2f ct, fixed=%.2f ct",
yesterday_iso, max(0, cur_kwh - prev_kwh), spot_ct or 0, fixed_ct)
def _poll_loop(inv_cfg: Dict[str, Any], stop: threading.Event):
inv_id = inv_cfg["id"]
model_id = inv_cfg.get("inverter_model", "MIC_1500_TL_X")
inverter = INVERTERS.get(model_id, INVERTERS["MIC_1500_TL_X"])
prefix = inv_cfg.get("mqtt_topic_prefix", f"growatt/{inv_id}")
device_id = f"growatt_{inv_id}"
try:
interval = max(5, int(inv_cfg.get("update_interval", 30)))
port = int(inv_cfg.get("modbus_port", 502))
slave = int(inv_cfg.get("modbus_address", 1))
except (ValueError, TypeError) as e:
log.error("[%s] Ungültige Konfiguration: %s", inv_id, e)
return
host = inv_cfg["modbus_ip"]
ems: Optional[EmsController] = None
if inverter.protocol == "goodwe_udp":
reader = GoodweReader(host=host, family=inverter.goodwe_family)
log.info("[%s] Poll-Loop: %s @ %s (Goodwe UDP/8899) alle %ds",
inv_id, inverter.name, host, interval)
elif inverter.protocol == "kathrein":
reader = WallboxReader(host=host, port=port)
ems = EmsController(
min_pv_power=inv_cfg.get("ems_min_pv", 1400),
pv_timeout_h=inv_cfg.get("ems_timeout", 4.0),
target_hour=inv_cfg.get("ems_target_hour", 6),
phases=inv_cfg.get("ems_phases", 3),
)
log.info("[%s] Poll-Loop: %s @ %s:%s (Kathrein EMS) alle %ds",
inv_id, inverter.name, host, port, interval)
else:
reader = ModbusReader(host=host, port=port, slave=slave)
log.info("[%s] Poll-Loop: %s @ %s:%s alle %ds",
inv_id, inverter.name, host, port, interval)
with State.lock:
if _publisher:
_publisher.register_inverter(inverter, device_id, prefix,
inv_cfg.get("name", inverter.name))
# History aus DB in die In-Memory-Deque laden
hist_data = history.load_recent(inv_id, limit=300)
with State.lock:
d = State.inv_data.setdefault(inv_id, {"poll_count": 0})
hist = d.setdefault("history", {})
for sid, points in hist_data.items():
q = hist.setdefault(sid, deque(maxlen=300))
for pt in points:
q.append(pt)
log.info("[%s] %d Sensoren aus DB geladen", inv_id, len(hist_data))
while not stop.is_set():
t0 = time.time()
values = reader.read(inverter)
# Growatt-Proxy: grid_power aus power_to_grid (nur Einspeisung bekannt)
if values and "grid_power" not in values and "power_to_grid" in values:
values["grid_power"] = -values["power_to_grid"]
# SDM-630-Proxy: total_power = Netzleistung (positiv=Bezug, negativ=Einspeisung)
if values and "grid_power" not in values and "import_kwh" in values and "total_power" in values:
values["grid_power"] = values["total_power"]
# EMS: PV-Überschuss aus anderen Geräten holen und Ladestrom regeln
if ems is not None and values is not None and inv_cfg.get("ems_enabled", True):
pv_surplus = _get_pv_surplus()
# 0x0060 manchmal nicht lesbar → 1 (EV Connected) annehmen,
# damit EMS aktiviert; Wallbox ignoriert Befehle wenn kein Auto da
charging_state = int(values.get("charging_state", 1))
wallbox_power = values.get("total_power", 0.0)
ems_status = ems.update(reader, pv_surplus, charging_state, wallbox_power)
values["ems_status_code"] = float(charging_state)
log.info("[%s] EMS: %s | PV-Überschuss: %.0fW", inv_id, ems_status, pv_surplus)
with State.lock:
d = State.inv_data.setdefault(inv_id, {"poll_count": 0})
if values is not None:
d["values"] = values
d["last_update"] = time.time()
d["modbus_ok"] = True
d["poll_count"] = d.get("poll_count", 0) + 1
hist = d.setdefault("history", {})
now = time.time()
for sid, val in values.items():
q = hist.setdefault(sid, deque(maxlen=300))
q.append((now, val))
history.write_batch(inv_id, now, values)
if _publisher:
_publisher.publish_data(values, prefix)
_publisher.publish_status("online", prefix)
else:
d["modbus_ok"] = False
if _publisher:
_publisher.publish_status("offline", prefix)
# Aggregate nach jedem erfolgreichen Poll neu berechnen und publizieren
if values is not None and _publisher:
agg = _compute_aggregates()
if agg:
_publisher.publish_aggregates(agg)
# Täglicher Tarif-Snapshot (einmal pro Tag, beim ersten Poll nach Mitternacht)
if values is not None:
import datetime
today_iso = datetime.date.today().isoformat()
if State.last_tariff_snapshot_date != today_iso:
State.last_tariff_snapshot_date = today_iso
threading.Thread(target=_run_daily_tariff_snapshot, daemon=True).start()
stop.wait(max(0.0, interval - (time.time() - t0)))
reader.close()
if _publisher:
_publisher.publish_status("offline", prefix)
log.info("[%s] Poll-Loop beendet", inv_id)
def _start_inverter(inv_cfg: Dict[str, Any]):
inv_id = inv_cfg["id"]
_stop_inverter(inv_id)
ev = threading.Event()
_stop_events[inv_id] = ev
t = threading.Thread(target=_poll_loop, args=(inv_cfg, ev), daemon=True, name=f"poll-{inv_id}")
_threads[inv_id] = t
t.start()
def _stop_inverter(inv_id: str):
if inv_id in _stop_events:
_stop_events[inv_id].set()
if inv_id in _threads:
_threads[inv_id].join(timeout=15)
_stop_events.pop(inv_id, None)
_threads.pop(inv_id, None)
def _surplus_loop(stop: threading.Event):
while not stop.wait(30):
ctrl = _surplus_ctrl
if ctrl is None:
continue
surplus = _get_pv_surplus()
ctrl.update(surplus)
def _start_surplus_loop():
global _surplus_stop, _surplus_thread
_surplus_stop.set()
if _surplus_thread and _surplus_thread.is_alive():
_surplus_thread.join(timeout=5)
_surplus_stop = threading.Event()
_surplus_thread = threading.Thread(
target=_surplus_loop, args=(_surplus_stop,), daemon=True, name="surplus-loop"
)
_surplus_thread.start()
def _restart_all():
global _publisher, _surplus_ctrl
for inv_id in list(_threads.keys()):
_stop_inverter(inv_id)
if _publisher:
_publisher.disconnect()
_publisher = MqttPublisher(
broker=State.mqtt_cfg.get("mqtt_broker", "core-mosquitto"),
port=int(State.mqtt_cfg.get("mqtt_port", 1883)),
user=State.mqtt_cfg.get("mqtt_user", ""),
password=State.mqtt_cfg.get("mqtt_pass", ""),
agg_meta=AGGREGATE_META,
)
_publisher.connect()
time.sleep(2)
with State.lock:
devices = State.surplus_devices_cfg
z2m_base = State.z2m_base
_surplus_ctrl = SurplusDeviceController(_publisher, z2m_base)
_surplus_ctrl.set_config(devices, z2m_base)
_start_surplus_loop()
def _on_z2m_devices(topic, payload):
try:
devices_raw = json.loads(payload)
if not isinstance(devices_raw, list):
return
parsed = [
{
"friendly_name": d.get("friendly_name", ""),
"description": (d.get("definition") or {}).get("description", ""),
"type": d.get("type", ""),
}
for d in devices_raw
if d.get("friendly_name") and d.get("friendly_name") != "Coordinator"
]
with State.lock:
State.z2m_devices = parsed
log.info("Z2M: %d Geräte empfangen", len(parsed))
except Exception as e:
log.warning("Z2M bridge/devices Fehler: %s", e)
_publisher.subscribe(f"{z2m_base}/bridge/devices", _on_z2m_devices)
for inv_cfg in State.inverters_cfg:
_start_inverter(inv_cfg)
# ── REST API ──────────────────────────────────────────────────
@app.get("/api/config")
def api_get_config():
with State.lock:
cfg = {**State.mqtt_cfg, "inverters": State.inverters_cfg}
cfg.pop("mqtt_pass", None)
cfg["mqtt_connected"] = _publisher.connected if _publisher else False
cfg["mqtt_error"] = _publisher.last_error if _publisher else None
return jsonify(cfg)
@app.post("/api/config")
def api_save_config():
data = request.get_json(force=True) or {}
with State.lock:
for k in ("mqtt_broker", "mqtt_port", "mqtt_user"):
if k in data:
State.mqtt_cfg[k] = data[k]
if data.get("mqtt_pass"):
State.mqtt_cfg["mqtt_pass"] = data["mqtt_pass"]
for k in ("price_import", "price_export", "spot_markup"):
if k in data:
State.mqtt_cfg[k] = float(data[k])
for k in ("billing_day", "billing_month"):
if k in data:
State.mqtt_cfg[k] = int(data[k])
for k in ("tariff_type", "spot_country"):
if k in data:
State.mqtt_cfg[k] = str(data[k])
if "spot_chart" in data:
State.mqtt_cfg["spot_chart"] = bool(data["spot_chart"])
if "billing_tracker_enabled" in data:
State.mqtt_cfg["billing_tracker_enabled"] = bool(data["billing_tracker_enabled"])
for k in ("monthly_rate_eur", "grundpreis_eur_per_month"):
if k in data:
State.mqtt_cfg[k] = float(data[k])
save_config()
threading.Thread(target=_restart_all, daemon=True).start()
return jsonify({"ok": True})
@app.get("/api/period-energy")
def api_period_energy():
import datetime
agg = _compute_aggregates(allow_stale=True)
price_import = float(State.mqtt_cfg.get("price_import", 0.30))
price_export = float(State.mqtt_cfg.get("price_export", 0.08))
billing_day = int(State.mqtt_cfg.get("billing_day", 1))
billing_month = int(State.mqtt_cfg.get("billing_month", 1))
tariff_type = str(State.mqtt_cfg.get("tariff_type", "fixed"))
spot_country = str(State.mqtt_cfg.get("spot_country", "de"))
spot_markup = float(State.mqtt_cfg.get("spot_markup", 0.0)) # ct/kWh
result = {
"price_import": price_import,
"price_export": price_export,
"tariff_type": tariff_type,
"spot_chart": bool(State.mqtt_cfg.get("spot_chart", True)),
}
MONTHS_DE = ["","Jan","Feb","Mär","Apr","Mai","Jun","Jul","Aug","Sep","Okt","Nov","Dez"]
now_ts = time.time()
for period_type in ("monthly", "yearly"):
key = history.period_key(period_type, billing_day, billing_month)
entry = {}
# Perioden-Startzeitpunkt für Spot-Preisabfrage
if period_type == "monthly":
pd = datetime.date.fromisoformat(key + "-01")
entry["label"] = f"{MONTHS_DE[pd.month]} {pd.year}"
else:
pd = datetime.date.fromisoformat(key)
end_d = datetime.date(pd.year + 1, billing_month, billing_day) if billing_month != 1 or billing_day != 1 \
else datetime.date(pd.year + 1, 1, 1)
entry["label"] = f"{pd.strftime('%d.%m.%Y')} {(end_d - datetime.timedelta(days=1)).strftime('%d.%m.%Y')}"
pd_start_ts = datetime.datetime.combine(pd, datetime.time.min).timestamp()
for agg_id in ("grid_import_kwh", "grid_export_kwh",
"bat_discharge_total", "bat_charge_total",
"total_energy_total"):
cur = agg.get(agg_id)
if cur is None:
continue
history.save_period_start_if_new(agg_id, period_type, key, cur)
val = history.get_period_consumption(agg_id, period_type, key, cur)
if val is not None:
entry[agg_id] = round(val, 2)
# Effektiver Importpreis (Festpreis oder Börsenpreis)
eff_price = price_import
if tariff_type == "spot":
avg_ct = _get_avg_spot_price(pd_start_ts, now_ts, spot_country)
if avg_ct is not None:
eff_price = (avg_ct + spot_markup) / 100
entry["spot_avg_ct"] = avg_ct
entry["spot_markup_ct"] = spot_markup
entry["effective_price"]= round(eff_price, 4)
if "grid_import_kwh" in entry:
entry["import_cost"] = round(entry["grid_import_kwh"] * eff_price, 2)
if "grid_export_kwh" in entry:
entry["export_revenue"] = round(entry["grid_export_kwh"] * price_export, 2)
# Eigenverbrauch-Ersparnis: PV_gesamt Einspeisung = alles was selbst verbraucht wurde
# (Direktverbrauch + Batterie-Umweg). Fällt PV nicht verfügbar: nur Batterie-Entladung.
pv_total = entry.get("total_energy_total")
grid_exp = entry.get("grid_export_kwh")
bat_dch = entry.get("bat_discharge_total")
if pv_total is not None and grid_exp is not None:
savings = round(max(0.0, pv_total - grid_exp), 2)
entry["savings_kwh"] = savings
entry["savings_eur"] = round(savings * eff_price, 2)
elif bat_dch is not None:
entry["savings_kwh"] = bat_dch
entry["savings_eur"] = round(bat_dch * eff_price, 2)
result[period_type] = entry
if State.mqtt_cfg.get("billing_tracker_enabled", False):
monthly_rate = float(State.mqtt_cfg.get("monthly_rate_eur", 0.0))
grundpreis = float(State.mqtt_cfg.get("grundpreis_eur_per_month", 0.0))
yr_key = history.period_key("yearly", billing_day, billing_month)
yr_start = datetime.date.fromisoformat(yr_key)
today = datetime.date.today()
# Anzahl geleisteter Abschlagszahlungen (ganze Monate seit Periodenstart)
months_diff = (today.year - yr_start.year) * 12 + (today.month - yr_start.month)
if today.day >= yr_start.day:
months_diff += 1
payments_made = max(0, months_diff)
total_paid = round(payments_made * monthly_rate, 2)
grundpreis_total= round(payments_made * grundpreis, 2)
energy_cost = result.get("yearly", {}).get("import_cost", 0.0)
total_cost = round(energy_cost + grundpreis_total, 2)
nachzahlung = round(total_cost - total_paid, 2)
result["billing_tracker"] = {
"monthly_rate_eur": monthly_rate,
"grundpreis_eur_per_month": grundpreis,
"payments_made": payments_made,
"total_paid_eur": total_paid,
"grundpreis_total_eur": grundpreis_total,
"energy_cost_eur": energy_cost,
"total_cost_eur": total_cost,
"nachzahlung_eur": nachzahlung,
}
return jsonify(result)
@app.get("/api/finance")
def api_finance():
import datetime
billing_day = int(State.mqtt_cfg.get("billing_day", 1))
billing_month = int(State.mqtt_cfg.get("billing_month", 1))
yr_key = history.period_key("yearly", billing_day, billing_month)
yr_start = datetime.date.fromisoformat(yr_key)
today = datetime.date.today()
rows = history.get_tariff_days(yr_key, today.isoformat())
days = []
fixed_total = 0.0
spot_total = 0.0
spot_available = 0
for date_iso, kwh, spot_ct, fixed_ct, markup_ct in rows:
fixed_day = round(kwh * fixed_ct / 100, 4)
spot_day = round(kwh * ((spot_ct + markup_ct) / 100), 4) if spot_ct is not None else None
fixed_total += fixed_day
if spot_day is not None:
spot_total += spot_day
spot_available += 1
days.append({
"date": date_iso,
"kwh": round(kwh, 3),
"fixed_eur": fixed_day,
"spot_eur": spot_day,
"spot_ct": spot_ct,
"fixed_ct": fixed_ct,
})
result = {
"period_start": yr_key,
"days": days,
"fixed_total_eur": round(fixed_total, 2),
"spot_total_eur": round(spot_total, 2) if spot_available else None,
"spot_days": spot_available,
"total_days": len(days),
"savings_eur": round(fixed_total - spot_total, 2) if spot_available else None,
}
return jsonify(result)
@app.get("/api/z2m-devices")
def api_z2m_devices():
with State.lock:
return jsonify(State.z2m_devices)
@app.get("/api/surplus-devices")
def api_get_surplus_devices():
with State.lock:
devices = State.surplus_devices_cfg
z2m_base = State.z2m_base
states = _surplus_ctrl.get_states() if _surplus_ctrl else {}
surplus_w = _get_pv_surplus()
return jsonify({"devices": devices, "z2m_base": z2m_base, "states": states, "surplus_w": surplus_w})
@app.post("/api/surplus-devices")
def api_save_surplus_devices():
data = request.get_json(force=True) or {}
devices = data.get("devices", [])
z2m_base = str(data.get("z2m_base", "zigbee2mqtt")).strip() or "zigbee2mqtt"
if not isinstance(devices, list):
return jsonify({"error": "invalid"}), 400
for dev in devices:
if not isinstance(dev, dict) or not dev.get("z2m_name"):
return jsonify({"error": "z2m_name fehlt"}), 400
if "id" not in dev:
dev["id"] = uuid.uuid4().hex[:8]
with State.lock:
State.surplus_devices_cfg = devices
State.z2m_base = z2m_base
save_config()
if _surplus_ctrl:
_surplus_ctrl.set_config(devices, z2m_base)
return jsonify({"ok": True})
@app.get("/api/inverters-config")
def api_get_inverters():
with State.lock:
return jsonify(State.inverters_cfg)
@app.post("/api/inverters-config")
def api_save_inverters():
data = request.get_json(force=True) or []
if not isinstance(data, list):
return jsonify({"error": "invalid"}), 400
for inv in data:
if not isinstance(inv, dict):
return jsonify({"error": "invalid"}), 400
model_id = inv.get("inverter_model")
if model_id not in INVERTERS:
return jsonify({"error": f"unknown model: {model_id}"}), 400
inverter_def = INVERTERS[model_id]
if inverter_def.protocol == "modbus":
port = inv.get("modbus_port", 502)
if not isinstance(port, int) or not (1 <= port <= 65535):
return jsonify({"error": "invalid port"}), 400
addr = inv.get("modbus_address", 1)
if not isinstance(addr, int) or not (1 <= addr <= 247):
return jsonify({"error": "invalid modbus address (1-247)"}), 400
with State.lock:
State.inverters_cfg = data
save_config()
threading.Thread(target=_restart_all, daemon=True).start()
return jsonify({"ok": True})
@app.get("/api/data")
def api_get_data():
with State.lock:
result = {}
for inv_cfg in State.inverters_cfg:
inv_id = inv_cfg["id"]
model_id = inv_cfg.get("inverter_model", "MIC_1500_TL_X")
inverter = INVERTERS.get(model_id, INVERTERS["MIC_1500_TL_X"])
d = State.inv_data.get(inv_id, {})
cutoff = time.time() - 300
raw_hist = d.get("history", {})
history = {
sid: [v for (t, v) in q if t >= cutoff]
for sid, q in raw_hist.items()
}
result[inv_id] = {
"name": inv_cfg.get("name", inverter.name),
"inverter_name": inverter.name,
"values": d.get("values", {}),
"history": history,
"sensors": [
{"id": s.id, "name": s.name, "unit": s.unit,
"icon": s.icon, "device_class": s.device_class}
for s in inverter.sensors
],
"last_update": d.get("last_update"),
"modbus_ok": d.get("modbus_ok", False),
"poll_count": d.get("poll_count", 0),
}
mqtt_ok = _publisher.connected if _publisher else False
aggregates = _compute_aggregates()
return jsonify({"inverters": result, "mqtt_ok": mqtt_ok, "aggregates": aggregates})
@app.get("/api/history")
def api_get_history():
inv_id = request.args.get("inv_id", "")
sensor_id = request.args.get("sensor_id", "")
hours = min(float(request.args.get("hours", "24")), 24 * 7)
if not inv_id or not sensor_id:
return jsonify({"error": "inv_id and sensor_id required"}), 400
to_ts = time.time()
from_ts = to_ts - hours * 3600
rows = history.query(inv_id, sensor_id, from_ts, to_ts)
return jsonify([{"ts": t, "value": v} for t, v in rows])
@app.get("/api/inverter-models")
def api_get_models():
return jsonify({
k: {"id": v.id, "name": v.name, "sensor_count": len(v.sensors)}
for k, v in INVERTERS.items()
})
@app.post("/api/new-id")
def api_new_id():
return jsonify({"id": uuid.uuid4().hex[:8]})
@app.get("/api/export-config")
def api_export_config():
with State.lock:
data = {
"shinebridge_export": True,
"version": 1,
"exported_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"mqtt": {
"broker": State.mqtt_cfg.get("mqtt_broker", ""),
"port": State.mqtt_cfg.get("mqtt_port", 1883),
"user": State.mqtt_cfg.get("mqtt_user", ""),
},
"inverters": State.inverters_cfg,
}
from flask import Response
filename = f"shinebridge-config-{time.strftime('%Y%m%d-%H%M%S')}.json"
return Response(
json.dumps(data, indent=2, ensure_ascii=False),
mimetype="application/json",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@app.post("/api/import-config")
def api_import_config():
data = request.get_json(force=True) or {}
if not data.get("shinebridge_export"):
return jsonify({"error": "Keine gültige ShineBridge-Export-Datei"}), 400
inverters = data.get("inverters", [])
if not isinstance(inverters, list):
return jsonify({"error": "inverters ungültig"}), 400
for inv in inverters:
if not inv.get("modbus_ip"):
return jsonify({"error": f"modbus_ip fehlt in Gerät {inv.get('name', '?')}"}), 400
if inv.get("inverter_model") not in INVERTERS:
return jsonify({"error": f"Unbekanntes Modell: {inv.get('inverter_model')}"}), 400
with State.lock:
if mqtt := data.get("mqtt"):
if mqtt.get("broker"):
State.mqtt_cfg["mqtt_broker"] = mqtt["broker"]
if mqtt.get("port"):
State.mqtt_cfg["mqtt_port"] = int(mqtt["port"])
if mqtt.get("user"):
State.mqtt_cfg["mqtt_user"] = mqtt["user"]
State.inverters_cfg = inverters
save_config()
threading.Thread(target=_restart_all, daemon=True).start()
return jsonify({"ok": True, "inverters": len(inverters)})
_hist_spot_cache: Dict[str, Any] = {} # key → {"ts", "avg_ct"}
def _get_avg_spot_price(start_ts: float, end_ts: float, country: str = "de") -> Optional[float]:
"""Durchschnittlicher EPEX-SPOT-Preis (ct/kWh) für einen Zeitraum. None bei Fehler."""
import urllib.request as _ur
cache_key = f"{country}-{int(start_ts//3600)}-{int(end_ts//3600)}"
now = time.time()
cached = _hist_spot_cache.get(cache_key)
if cached and now - cached["ts"] < 3600:
return cached["avg_ct"]
try:
url = (f"https://api.awattar.{country}/v1/marketdata"
f"?start={int(start_ts*1000)}&end={int(end_ts*1000)}")
with _ur.urlopen(url, timeout=10) as r:
raw = json.loads(r.read())
prices = [d["marketprice"] / 10 for d in raw.get("data", [])]
if not prices:
return None
avg = round(sum(prices) / len(prices), 2)
_hist_spot_cache[cache_key] = {"ts": now, "avg_ct": avg}
return avg
except Exception as e:
log.warning("Historischer Spot-Preis Fehler: %s", e)
return None
_spot_cache: Dict[str, Any] = {"ts": 0.0, "data": []}
_SPOT_TTL = 900 # 15 Minuten
@app.get("/api/spot-price")
def api_spot_price():
import urllib.request as _ur
global _spot_cache
now = time.time()
if now - _spot_cache["ts"] < _SPOT_TTL and _spot_cache["data"]:
return jsonify({"ok": True, "data": _spot_cache["data"]})
try:
with _ur.urlopen("https://api.awattar.de/v1/marketdata", timeout=8) as r:
raw = json.loads(r.read())
entries = [
{"ts": int(d["start_timestamp"] // 1000), "price": round(d["marketprice"] / 10, 2)}
for d in raw.get("data", [])
]
_spot_cache = {"ts": now, "data": entries}
return jsonify({"ok": True, "data": entries})
except Exception as e:
log.warning("Spot-Price API Fehler: %s", e)
return jsonify({"ok": False, "data": _spot_cache.get("data", [])})
_IP_RE = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
@app.get("/api/flash/probe")
def api_flash_probe():
import urllib.request as _ur
ip = request.args.get("ip", "").strip()
if not ip or not _IP_RE.match(ip):
return jsonify({"ota": False, "error": "invalid ip"}), 400
try:
with _ur.urlopen(f"http://{ip}/", timeout=5) as r:
data = json.loads(r.read().decode())
return jsonify({"ota": bool(data.get("ota")), "info": data})
except Exception:
return jsonify({"ota": False})
@app.get("/api/flash/firmware")
def api_flash_firmware():
fw_dir = "/firmware"
try:
files = sorted(f for f in os.listdir(fw_dir) if f.endswith(".bin"))
return jsonify({"files": files})
except Exception:
return jsonify({"files": []})
@app.post("/api/flash/update")
def api_flash_update():
import urllib.request as _ur
ip = request.args.get("ip", "").strip()
fw_name = request.args.get("fw", "").strip()
if not ip or not _IP_RE.match(ip):
return jsonify({"ok": False, "error": "invalid ip"}), 400
try:
if fw_name:
fw_path = os.path.join("/firmware", os.path.basename(fw_name))
with open(fw_path, "rb") as f:
data = f.read()
else:
data = request.get_data()
if len(data) < 256:
return jsonify({"ok": False, "error": "firmware too small"}), 400
req = _ur.Request(
f"http://{ip}/update", data=data, method="POST",
headers={"Content-Type": "application/octet-stream",
"Content-Length": str(len(data))})
with _ur.urlopen(req, timeout=90) as r:
return jsonify({"ok": True, "response": r.read().decode(errors="replace")})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 502
@app.post("/api/flash/reboot")
def api_flash_reboot():
import urllib.request as _ur
ip = request.args.get("ip", "").strip()
if not ip or not _IP_RE.match(ip):
return jsonify({"ok": False, "error": "invalid ip"}), 400
try:
req = _ur.Request(f"http://{ip}/reboot", data=b"", method="POST")
with _ur.urlopen(req, timeout=10) as r:
return jsonify({"ok": True})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 502
@app.get("/")
def index():
return send_from_directory(WEB_DIR, "index.html")
@app.get("/<path:filename>")
def static_files(filename):
return send_from_directory(WEB_DIR, filename)
# ── Startup ───────────────────────────────────────────────────
if __name__ == "__main__":
history.init_db()
cfg = load_config()
with State.lock:
State.mqtt_cfg = {k: cfg[k] for k in
("mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_pass")}
State.inverters_cfg = cfg.get("inverters", [])
State.surplus_devices_cfg = cfg.get("surplus_devices", [])
State.z2m_base = cfg.get("z2m_base", "zigbee2mqtt")
if not State.inverters_cfg and cfg.get("modbus_ip"):
State.inverters_cfg = [{
"id": uuid.uuid4().hex[:8],
"name": cfg.get("inverter_model", "MIC_1500_TL_X").replace("_", " "),
"modbus_ip": cfg["modbus_ip"],
"modbus_port": cfg.get("modbus_port", 502),
"modbus_address": cfg.get("modbus_address", 1),
"inverter_model": cfg.get("inverter_model", "MIC_1500_TL_X"),
"mqtt_topic_prefix": cfg.get("mqtt_topic_prefix", "growatt/shinelanx"),
"update_interval": cfg.get("update_interval", 30),
}]
save_config()
_restart_all()
port = int(os.environ.get("INGRESS_PORT", "8099"))
log.info("Web UI startet auf Port %d", port)
app.run(host="0.0.0.0", port=port, threaded=True)