import logging import sqlite3 import threading import time from typing import Dict, List, Optional, Tuple log = logging.getLogger(__name__) DB_PATH = "/data/history.db" RETENTION_DAYS = 7 _lock = threading.Lock() _conn: sqlite3.Connection | None = None def _get_conn() -> sqlite3.Connection: global _conn if _conn is None: _conn = sqlite3.connect(DB_PATH, check_same_thread=False) _conn.execute("PRAGMA journal_mode=WAL") _conn.execute("PRAGMA synchronous=NORMAL") return _conn def init_db(): with _lock: c = _get_conn() c.execute(""" CREATE TABLE IF NOT EXISTS measurements ( inv_id TEXT NOT NULL, sensor_id TEXT NOT NULL, ts REAL NOT NULL, value REAL NOT NULL ) """) c.execute(""" CREATE INDEX IF NOT EXISTS idx_inv_sensor_ts ON measurements(inv_id, sensor_id, ts) """) # Periodenstarts: kWh-Zählerstand zu Beginn jeder Abrechungsperiode c.execute(""" CREATE TABLE IF NOT EXISTS period_starts ( agg_id TEXT NOT NULL, period_type TEXT NOT NULL, period_key TEXT NOT NULL, value REAL NOT NULL, PRIMARY KEY (agg_id, period_type, period_key) ) """) # Tariftage: täglicher Verbrauch + Preisvergleich c.execute(""" CREATE TABLE IF NOT EXISTS tariff_days ( date TEXT PRIMARY KEY, kwh REAL NOT NULL, spot_ct REAL, fixed_ct REAL NOT NULL, markup_ct REAL NOT NULL DEFAULT 0 ) """) c.commit() cleanup_old() log.info("History DB initialisiert: %s", DB_PATH) def period_key(period_type: str, billing_day: int = 1, billing_month: int = 1) -> str: import datetime today = datetime.date.today() if period_type == "monthly": return today.strftime("%Y-%m") # Jahresperiode: Beginn = letzter Abrechnungsstichtag try: start = datetime.date(today.year, billing_month, billing_day) except ValueError: start = datetime.date(today.year, billing_month, 1) if today < start: try: start = datetime.date(today.year - 1, billing_month, billing_day) except ValueError: start = datetime.date(today.year - 1, billing_month, 1) return start.isoformat() # z.B. "2025-04-01" def save_period_start_if_new(agg_id: str, period_type: str, key: str, current_value: float): """Speichert den Startwert nur wenn diese Periode noch nicht existiert.""" with _lock: c = _get_conn() c.execute(""" INSERT OR IGNORE INTO period_starts(agg_id, period_type, period_key, value) VALUES (?, ?, ?, ?) """, (agg_id, period_type, key, current_value)) c.commit() def get_period_consumption(agg_id: str, period_type: str, key: str, current_value: float) -> Optional[float]: """Verbrauch seit Periodenbeginn; None wenn noch kein Startwert gespeichert.""" with _lock: row = _get_conn().execute(""" SELECT value FROM period_starts WHERE agg_id=? AND period_type=? AND period_key=? """, (agg_id, period_type, key)).fetchone() if row is None: return None return max(0.0, current_value - row[0]) def write_batch(inv_id: str, ts: float, values: Dict[str, float]): rows = [(inv_id, sid, ts, val) for sid, val in values.items()] with _lock: _get_conn().executemany( "INSERT INTO measurements(inv_id, sensor_id, ts, value) VALUES(?,?,?,?)", rows, ) _get_conn().commit() def load_recent(inv_id: str, limit: int = 300) -> Dict[str, List[Tuple[float, float]]]: """Letzte `limit` Messpunkte pro Sensor — zum Befüllen der In-Memory-Deque beim Start.""" with _lock: rows = _get_conn().execute(""" SELECT sensor_id, ts, value FROM ( SELECT sensor_id, ts, value, ROW_NUMBER() OVER (PARTITION BY sensor_id ORDER BY ts DESC) AS rn FROM measurements WHERE inv_id = ? ) WHERE rn <= ? ORDER BY ts ASC """, (inv_id, limit)).fetchall() result: Dict[str, List[Tuple[float, float]]] = {} for sensor_id, ts, value in rows: result.setdefault(sensor_id, []).append((ts, value)) return result def query(inv_id: str, sensor_id: str, from_ts: float, to_ts: float) -> List[Tuple[float, float]]: """Zeitfenster-Abfrage für die History-API und den späteren Diagnose-Tab.""" with _lock: rows = _get_conn().execute(""" SELECT ts, value FROM measurements WHERE inv_id = ? AND sensor_id = ? AND ts BETWEEN ? AND ? ORDER BY ts ASC """, (inv_id, sensor_id, from_ts, to_ts)).fetchall() return rows def save_daily_tariff_snapshot(date_iso: str, prev_kwh: float, cur_kwh: float, spot_ct: Optional[float], fixed_ct: float, markup_ct: float): """Speichert den Tagesverbrauch + Preisdaten für einen abgeschlossenen Tag.""" delta = max(0.0, cur_kwh - prev_kwh) with _lock: c = _get_conn() c.execute(""" INSERT OR IGNORE INTO tariff_days(date, kwh, spot_ct, fixed_ct, markup_ct) VALUES(?, ?, ?, ?, ?) """, (date_iso, round(delta, 4), spot_ct, fixed_ct, markup_ct)) c.commit() def get_tariff_days(from_date: str, to_date: str) -> List[Tuple]: """Gibt alle Tariftage im Bereich [from_date, to_date] zurück.""" with _lock: return _get_conn().execute(""" SELECT date, kwh, spot_ct, fixed_ct, markup_ct FROM tariff_days WHERE date >= ? AND date <= ? ORDER BY date ASC """, (from_date, to_date)).fetchall() def get_daily_kwh_start(date_iso: str) -> Optional[float]: """Gibt den gespeicherten kWh-Tagesstartwert zurück (aus period_starts).""" with _lock: row = _get_conn().execute(""" SELECT value FROM period_starts WHERE agg_id='tariff' AND period_type='daily' AND period_key=? """, (date_iso,)).fetchone() return row[0] if row else None def save_daily_kwh_start(date_iso: str, kwh: float): """Speichert den kWh-Stand als Tagesstartwert (einmalig, INSERT OR IGNORE).""" with _lock: c = _get_conn() c.execute(""" INSERT OR IGNORE INTO period_starts(agg_id, period_type, period_key, value) VALUES('tariff', 'daily', ?, ?) """, (date_iso, kwh)) c.commit() def cleanup_old(days: int = RETENTION_DAYS): cutoff = time.time() - days * 86400 with _lock: c = _get_conn() deleted = c.execute( "DELETE FROM measurements WHERE ts < ?", (cutoff,) ).rowcount c.commit() if deleted: log.info("History: %d alte Einträge gelöscht (>%d Tage)", deleted, days)