Feature: Persistente History via SQLite (v1.3.0)

Sensorwerte werden in /data/history.db (SQLite, WAL-Modus) persistiert
und überleben damit Add-on-Neustarts. Beim Start werden die letzten 300
Messpunkte pro Sensor in die In-Memory-Deque geladen, sodass Sparklines
sofort Daten zeigen. Retention: 7 Tage. Neue API: GET /api/history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
retr0
2026-04-27 11:44:11 +02:00
parent 9913dccfa9
commit a3e87da943
3 changed files with 124 additions and 3 deletions
+97
View File
@@ -0,0 +1,97 @@
import logging
import sqlite3
import threading
import time
from typing import Dict, List, Tuple
log = logging.getLogger(__name__)
DB_PATH = "/data/history.db"
RETENTION_DAYS = 7
_lock = threading.Lock()
_conn: sqlite3.Connection | None = None
def _get_conn() -> sqlite3.Connection:
global _conn
if _conn is None:
_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
_conn.execute("PRAGMA journal_mode=WAL")
_conn.execute("PRAGMA synchronous=NORMAL")
return _conn
def init_db():
with _lock:
c = _get_conn()
c.execute("""
CREATE TABLE IF NOT EXISTS measurements (
inv_id TEXT NOT NULL,
sensor_id TEXT NOT NULL,
ts REAL NOT NULL,
value REAL NOT NULL
)
""")
c.execute("""
CREATE INDEX IF NOT EXISTS idx_inv_sensor_ts
ON measurements(inv_id, sensor_id, ts)
""")
c.commit()
cleanup_old()
log.info("History DB initialisiert: %s", DB_PATH)
def write_batch(inv_id: str, ts: float, values: Dict[str, float]):
rows = [(inv_id, sid, ts, val) for sid, val in values.items()]
with _lock:
_get_conn().executemany(
"INSERT INTO measurements(inv_id, sensor_id, ts, value) VALUES(?,?,?,?)",
rows,
)
_get_conn().commit()
def load_recent(inv_id: str, limit: int = 300) -> Dict[str, List[Tuple[float, float]]]:
"""Letzte `limit` Messpunkte pro Sensor — zum Befüllen der In-Memory-Deque beim Start."""
with _lock:
rows = _get_conn().execute("""
SELECT sensor_id, ts, value
FROM (
SELECT sensor_id, ts, value,
ROW_NUMBER() OVER (PARTITION BY sensor_id ORDER BY ts DESC) AS rn
FROM measurements
WHERE inv_id = ?
)
WHERE rn <= ?
ORDER BY ts ASC
""", (inv_id, limit)).fetchall()
result: Dict[str, List[Tuple[float, float]]] = {}
for sensor_id, ts, value in rows:
result.setdefault(sensor_id, []).append((ts, value))
return result
def query(inv_id: str, sensor_id: str,
from_ts: float, to_ts: float) -> List[Tuple[float, float]]:
"""Zeitfenster-Abfrage für die History-API und den späteren Diagnose-Tab."""
with _lock:
rows = _get_conn().execute("""
SELECT ts, value FROM measurements
WHERE inv_id = ? AND sensor_id = ? AND ts BETWEEN ? AND ?
ORDER BY ts ASC
""", (inv_id, sensor_id, from_ts, to_ts)).fetchall()
return rows
def cleanup_old(days: int = RETENTION_DAYS):
cutoff = time.time() - days * 86400
with _lock:
c = _get_conn()
deleted = c.execute(
"DELETE FROM measurements WHERE ts < ?", (cutoff,)
).rowcount
c.commit()
if deleted:
log.info("History: %d alte Einträge gelöscht (>%d Tage)", deleted, days)