Feature: Kathrein Wallbox + EMS-Controller (v1.5.0)

- wallbox_client.py: WallboxReader FC03, EMS enable/set_current
- ems_controller.py: PV-Überschussladen + 4h-Timeout Zwangsladen bis 06:00
- inverters.py: KATHREIN_WALLBOX mit 18 Sensoren (Meter + EVSE + EMS)
- main.py: kathrein-Protokollzweig, _get_pv_surplus() aus laufenden Geräten

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
retr0
2026-04-28 12:02:59 +02:00
parent 8b65666e80
commit ac965dcfa6
5 changed files with 315 additions and 1 deletions
+124
View File
@@ -0,0 +1,124 @@
"""
EMS-Controller: PV-Überschussladen mit Deadline-Fallback.
Logik:
1. Wenn PV-Überschuss >= MIN_PV_POWER → Ladestrom = Überschuss / 230V
2. Wenn kein ausreichender Überschuss für > PV_TIMEOUT_H Stunden:
→ Zwangsladen mit Maximalstrom
→ solange bis Zielzeit (default 06:00) erreicht oder Auto voll
3. Auto nicht angeschlossen oder Laden beendet → EMS zurücksetzen
"""
import logging
import time
from datetime import datetime, timedelta
from typing import Optional
log = logging.getLogger(__name__)
# Konfiguration (Defaults, überschreibbar über EmsController.__init__)
MIN_PV_POWER_W = 1400 # Mindest-PV-Überschuss für Laden (6A × 230V)
PV_TIMEOUT_H = 4.0 # Stunden ohne PV-Überschuss → Zwangsladen
TARGET_HOUR = 6 # Zielstunde: Vollladung bis 06:00
POLL_INTERVAL_S = 30 # Sekunden zwischen EMS-Updates
PHASES = 3 # Anzahl der Phasen
VOLTAGE_V = 230 # Netzspannung
# Charging-State Werte (Kathrein Modbus)
STATE_IDLE = 0
STATE_CONNECTED = 1
STATE_CHARGING = 4
STATE_PAUSED = 5
STATE_COMPLETED = 6
class EmsController:
def __init__(
self,
min_pv_power: int = MIN_PV_POWER_W,
pv_timeout_h: float = PV_TIMEOUT_H,
target_hour: int = TARGET_HOUR,
phases: int = PHASES,
):
self.min_pv_power = min_pv_power
self.pv_timeout_h = pv_timeout_h
self.target_hour = target_hour
self.phases = phases
self._pv_ok_since: Optional[float] = None # Zeitpunkt letzter ausreichender PV
self._no_pv_since: Optional[float] = None # Zeitpunkt seit kein PV
self._forced_charging = False
self._ems_enabled = False
self._last_state = STATE_IDLE
def _next_target(self) -> datetime:
now = datetime.now()
t = now.replace(hour=self.target_hour, minute=0, second=0, microsecond=0)
if t <= now:
t += timedelta(days=1)
return t
def _surplus_to_ma(self, surplus_w: float) -> int:
"""PV-Überschuss in Watt → Ladestrom in mA (pro Phase)."""
current_a = surplus_w / (VOLTAGE_V * self.phases)
ma = int(current_a * 1000)
return max(6000, min(32000, ma))
def update(self, wallbox_reader, pv_surplus_w: float, charging_state: int) -> str:
"""
Wird vom Poll-Loop aufgerufen.
pv_surplus_w: positiv = PV exportiert ins Netz (verfügbar für Laden)
Gibt eine kurze Status-Beschreibung zurück.
"""
now = time.time()
# EMS einmalig aktivieren wenn Auto angesteckt
if charging_state in (STATE_CONNECTED, STATE_CHARGING, STATE_PAUSED):
if not self._ems_enabled:
if wallbox_reader.enable_ems():
self._ems_enabled = True
log.info("[EMS] EMS aktiviert")
else:
# Auto weg → alles zurücksetzen
if self._ems_enabled:
wallbox_reader.set_current(0)
self._ems_enabled = False
self._forced_charging = False
self._no_pv_since = None
log.info("[EMS] Auto abgesteckt — EMS zurückgesetzt")
return "kein Fahrzeug"
if charging_state == STATE_COMPLETED:
wallbox_reader.set_current(0)
return "vollgeladen"
# PV-Überschuss auswerten
has_pv = pv_surplus_w >= self.min_pv_power
if has_pv:
self._no_pv_since = None
self._forced_charging = False
ma = self._surplus_to_ma(pv_surplus_w)
wallbox_reader.set_current(ma)
return f"PV-Laden {ma // 1000:.1f}A ({pv_surplus_w:.0f}W)"
# Kein PV
if self._no_pv_since is None:
self._no_pv_since = now
no_pv_h = (now - self._no_pv_since) / 3600.0
if no_pv_h >= self.pv_timeout_h:
# Zwangsladen bis Zielzeit
target = self._next_target()
mins_left = (target - datetime.now()).total_seconds() / 60
if not self._forced_charging:
wallbox_reader.set_current(32000)
self._forced_charging = True
log.info("[EMS] Zwangsladen gestartet — %dmin bis %02d:00", int(mins_left), self.target_hour)
return f"Zwangsladen ({no_pv_h:.1f}h kein PV, {int(mins_left)}min bis {self.target_hour:02d}:00)"
# Warten auf PV
wallbox_reader.set_current(0)
remaining_min = int((self.pv_timeout_h - no_pv_h) * 60)
return f"warte auf PV ({remaining_min}min bis Zwangsladen)"