#!/usr/bin/env python3 """ShineDiag — Vor-Ort-Diagnose für Growatt-Wechselrichter via ShineLAN-X.""" import json import logging import os import sqlite3 import sys import threading import time from typing import Dict, List, Optional from flask import Flask, jsonify, request, send_from_directory sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../haos-addon/src")) from inverters import INVERTERS from modbus_client import ModbusReader logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") log = logging.getLogger(__name__) WEB_DIR = os.path.join(os.path.dirname(__file__), "web") DB_PATH = os.environ.get("DB_PATH", "/var/lib/shinediag/history.db") POLL_SEC = int(os.environ.get("POLL_SEC", "30")) app = Flask(__name__, static_folder=WEB_DIR) # ── SQLite ──────────────────────────────────────────────────── _db_lock = threading.Lock() _db_conn: Optional[sqlite3.Connection] = None def _db() -> sqlite3.Connection: global _db_conn if _db_conn is None: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) _db_conn = sqlite3.connect(DB_PATH, check_same_thread=False) _db_conn.execute("PRAGMA journal_mode=WAL") _db_conn.execute("PRAGMA synchronous=NORMAL") _db_conn.execute(""" CREATE TABLE IF NOT EXISTS measurements ( inv_key TEXT NOT NULL, sensor_id TEXT NOT NULL, ts REAL NOT NULL, value REAL NOT NULL )""") _db_conn.execute(""" CREATE INDEX IF NOT EXISTS idx_inv_sensor_ts ON measurements(inv_key, sensor_id, ts)""") _db_conn.commit() return _db_conn def db_write(inv_key: str, ts: float, values: Dict[str, float]): rows = [(inv_key, sid, ts, v) for sid, v in values.items()] with _db_lock: _db().executemany( "INSERT INTO measurements(inv_key,sensor_id,ts,value) VALUES(?,?,?,?)", rows) _db().commit() def db_query(inv_key: str, sensor_ids: List[str], from_ts: float, to_ts: float) -> Dict[str, List]: result = {sid: [] for sid in sensor_ids} placeholders = ",".join("?" * len(sensor_ids)) with _db_lock: rows = _db().execute(f""" SELECT sensor_id, ts, value FROM measurements WHERE inv_key=? AND sensor_id IN ({placeholders}) AND ts BETWEEN ? AND ? ORDER BY ts ASC """, [inv_key] + sensor_ids + [from_ts, to_ts]).fetchall() for sid, ts, val in rows: if sid in result: result[sid].append({"ts": ts, "v": val}) return result def db_cleanup(days: int = 7): cutoff = time.time() - days * 86400 with _db_lock: n = _db().execute("DELETE FROM measurements WHERE ts