Files
Shinebridge/tools/shinediag/diagnose.py
T
retr0 fe1bdb057d Feature: ShineDiag mobiles Dashboard mit Canvas-Charts und SQLite-History (v2.0)
- Auto-Polling alle 30s im Hintergrund (Thread + Event)
- SQLite-Persistenz in /var/lib/shinediag/history.db, 7-Tage-Retention
- 4 Tabs: Dashboard, Diagramme, Sensoren, Rohdaten
- Große Metrikkarten für Spannung (L1/L2/L3), Frequenz, Leistung
- Canvas-Liniendiagramme (kein externes JS) mit 1h/6h/24h/7d-Auswahl
- localStorage für Verbindungskonfig, Auto-Reconnect beim Seitenaufruf
- Retina-fähiges Canvas (DPR-aware)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 12:15:05 +02:00

279 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""ShineDiag — Vor-Ort-Diagnose für Growatt-Wechselrichter via ShineLAN-X."""
import json
import logging
import os
import sqlite3
import sys
import threading
import time
from typing import Dict, List, Optional
from flask import Flask, jsonify, request, send_from_directory
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../haos-addon/src"))
from inverters import INVERTERS
from modbus_client import ModbusReader
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
log = logging.getLogger(__name__)
WEB_DIR = os.path.join(os.path.dirname(__file__), "web")
DB_PATH = os.environ.get("DB_PATH", "/var/lib/shinediag/history.db")
POLL_SEC = int(os.environ.get("POLL_SEC", "30"))
app = Flask(__name__, static_folder=WEB_DIR)
# ── SQLite ────────────────────────────────────────────────────
_db_lock = threading.Lock()
_db_conn: Optional[sqlite3.Connection] = None
def _db() -> sqlite3.Connection:
global _db_conn
if _db_conn is None:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
_db_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
_db_conn.execute("PRAGMA journal_mode=WAL")
_db_conn.execute("PRAGMA synchronous=NORMAL")
_db_conn.execute("""
CREATE TABLE IF NOT EXISTS measurements (
inv_key TEXT NOT NULL,
sensor_id TEXT NOT NULL,
ts REAL NOT NULL,
value REAL NOT NULL
)""")
_db_conn.execute("""
CREATE INDEX IF NOT EXISTS idx_inv_sensor_ts
ON measurements(inv_key, sensor_id, ts)""")
_db_conn.commit()
return _db_conn
def db_write(inv_key: str, ts: float, values: Dict[str, float]):
rows = [(inv_key, sid, ts, v) for sid, v in values.items()]
with _db_lock:
_db().executemany(
"INSERT INTO measurements(inv_key,sensor_id,ts,value) VALUES(?,?,?,?)", rows)
_db().commit()
def db_query(inv_key: str, sensor_ids: List[str],
from_ts: float, to_ts: float) -> Dict[str, List]:
result = {sid: [] for sid in sensor_ids}
placeholders = ",".join("?" * len(sensor_ids))
with _db_lock:
rows = _db().execute(f"""
SELECT sensor_id, ts, value FROM measurements
WHERE inv_key=? AND sensor_id IN ({placeholders})
AND ts BETWEEN ? AND ?
ORDER BY ts ASC
""", [inv_key] + sensor_ids + [from_ts, to_ts]).fetchall()
for sid, ts, val in rows:
if sid in result:
result[sid].append({"ts": ts, "v": val})
return result
def db_cleanup(days: int = 7):
cutoff = time.time() - days * 86400
with _db_lock:
n = _db().execute("DELETE FROM measurements WHERE ts<?", (cutoff,)).rowcount
_db().commit()
if n:
log.info("DB: %d alte Einträge gelöscht", n)
# ── Poll-State ────────────────────────────────────────────────
_state_lock = threading.Lock()
_poll_thread: Optional[threading.Thread] = None
_poll_stop = threading.Event()
_conn_cfg: Dict = {} # aktuelle Verbindungs-Config
_last_values: Dict = {} # letzter erfolgreicher Poll
_last_ts: float = 0.0
_last_ok: bool = False
_inv_key: str = ""
def _poll_worker():
global _last_values, _last_ts, _last_ok
cfg = _conn_cfg.copy()
inverter = INVERTERS[cfg["model"]]
reader = ModbusReader(cfg["ip"], int(cfg["port"]), int(cfg["slave"]), timeout=5.0)
log.info("Poll-Thread gestartet: %s @ %s alle %ds", inverter.name, cfg["ip"], POLL_SEC)
while not _poll_stop.is_set():
t0 = time.time()
values = reader.read(inverter)
with _state_lock:
if values:
_last_values = values
_last_ts = t0
_last_ok = True
db_write(_inv_key, t0, values)
else:
_last_ok = False
_poll_stop.wait(max(0.0, POLL_SEC - (time.time() - t0)))
reader.close()
log.info("Poll-Thread beendet")
def _start_poll(cfg: dict):
global _poll_thread, _conn_cfg, _inv_key, _last_values, _last_ts, _last_ok
_stop_poll()
with _state_lock:
_conn_cfg = cfg
_inv_key = f"{cfg['ip']}:{cfg['port']}:{cfg['slave']}:{cfg['model']}"
_last_values = {}
_last_ts = 0.0
_last_ok = False
_poll_stop.clear()
_poll_thread = threading.Thread(target=_poll_worker, daemon=True, name="poll")
_poll_thread.start()
def _stop_poll():
global _poll_thread
_poll_stop.set()
if _poll_thread and _poll_thread.is_alive():
_poll_thread.join(timeout=10)
_poll_thread = None
# ── Flask ─────────────────────────────────────────────────────
@app.get("/")
def index():
return send_from_directory(WEB_DIR, "index.html")
@app.get("/api/models")
def api_models():
return jsonify({
k: {"id": v.id, "name": v.name, "manufacturer": v.manufacturer,
"sensor_count": len(v.sensors)}
for k, v in INVERTERS.items()
})
@app.post("/api/connect")
def api_connect():
body = request.get_json(force=True) or {}
for field in ("ip", "model"):
if not body.get(field):
return jsonify({"error": f"'{field}' fehlt"}), 400
if body["model"] not in INVERTERS:
return jsonify({"error": "Unbekanntes Modell"}), 400
cfg = {
"ip": body["ip"],
"port": int(body.get("port", 502)),
"slave": int(body.get("slave", 1)),
"model": body["model"],
}
_start_poll(cfg)
db_cleanup()
return jsonify({"ok": True})
@app.post("/api/disconnect")
def api_disconnect():
_stop_poll()
return jsonify({"ok": True})
@app.get("/api/status")
def api_status():
with _state_lock:
connected = _poll_thread is not None and _poll_thread.is_alive()
cfg = _conn_cfg.copy()
values = _last_values.copy()
ts = _last_ts
ok = _last_ok
inverter = INVERTERS.get(cfg.get("model", ""), None)
sensors = []
if inverter:
sensors = [{"id": s.id, "name": s.name, "unit": s.unit,
"device_class": s.device_class, "icon": s.icon}
for s in inverter.sensors]
return jsonify({
"connected": connected,
"modbus_ok": ok,
"last_ts": ts,
"cfg": cfg,
"values": values,
"sensors": sensors,
"inv_name": inverter.name if inverter else "",
})
@app.get("/api/chart-data")
def api_chart_data():
sensors = request.args.get("sensors", "")
hours = min(float(request.args.get("hours", "24")), 24 * 7)
if not sensors:
return jsonify({"error": "sensors required"}), 400
sensor_ids = [s.strip() for s in sensors.split(",") if s.strip()]
to_ts = time.time()
from_ts = to_ts - hours * 3600
with _state_lock:
key = _inv_key
if not key:
return jsonify({sid: [] for sid in sensor_ids})
return jsonify(db_query(key, sensor_ids, from_ts, to_ts))
@app.post("/api/scan")
def api_scan():
"""Einmal-Scan ohne Polling — für schnelle Diagnose."""
body = request.get_json(force=True) or {}
model_id = body.get("model", "MIC_1500_TL_X")
inverter = INVERTERS.get(model_id)
if not inverter:
return jsonify({"error": f"Unbekanntes Modell: {model_id}"}), 400
reader = ModbusReader(body.get("ip", "10.0.0.100"),
int(body.get("port", 502)),
int(body.get("slave", 1)), timeout=5.0)
values = reader.read(inverter)
reader.close()
if values is None:
return jsonify({"ok": False, "error": "Keine Verbindung"}), 502
return jsonify({"ok": True, "model": inverter.name,
"sensors": [{"id": s.id, "name": s.name, "value": values.get(s.id),
"unit": s.unit} for s in inverter.sensors]})
@app.post("/api/raw")
def api_raw():
body = request.get_json(force=True) or {}
start = max(0, int(body.get("start", 0)))
count = min(125, max(1, int(body.get("count", 100))))
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient(body.get("ip", "10.0.0.100"),
port=int(body.get("port", 502)), timeout=5.0)
if not client.connect():
return jsonify({"ok": False, "error": "Verbindung fehlgeschlagen"}), 502
try:
r = client.read_input_registers(start, count, slave=int(body.get("slave", 1)))
if r.isError():
return jsonify({"ok": False, "error": str(r)}), 502
return jsonify({"ok": True, "registers": [
{"addr": start + i, "hex": f"0x{v:04X}",
"dec": v, "signed": v if v < 0x8000 else v - 0x10000}
for i, v in enumerate(r.registers)
]})
finally:
client.close()
if __name__ == "__main__":
_db()
port = int(os.environ.get("PORT", "80"))
log.info("ShineDiag auf Port %d", port)
app.run(host="0.0.0.0", port=port, threaded=True)