""" EMS-Controller: PV-Überschussladen mit Deadline-Fallback. Logik: 1. Wenn PV-Überschuss >= MIN_PV_POWER → Ladestrom = Überschuss / 230V 2. Wenn kein ausreichender Überschuss für > PV_TIMEOUT_H Stunden: → Zwangsladen mit Maximalstrom → solange bis Zielzeit (default 06:00) erreicht oder Auto voll 3. Auto nicht angeschlossen oder Laden beendet → EMS zurücksetzen """ import logging import time from datetime import datetime, timedelta from typing import Optional log = logging.getLogger(__name__) # Konfiguration (Defaults, überschreibbar über EmsController.__init__) MIN_PV_POWER_W = 1400 # Mindest-PV-Überschuss für Laden (6A × 230V) PV_TIMEOUT_H = 4.0 # Stunden ohne PV-Überschuss → Zwangsladen TARGET_HOUR = 6 # Zielstunde: Vollladung bis 06:00 POLL_INTERVAL_S = 30 # Sekunden zwischen EMS-Updates PHASES = 3 # Anzahl der Phasen VOLTAGE_V = 230 # Netzspannung # Charging-State Werte (Kathrein Modbus) STATE_IDLE = 0 STATE_CONNECTED = 1 STATE_CHARGING = 4 STATE_PAUSED = 5 STATE_COMPLETED = 6 class EmsController: def __init__( self, min_pv_power: int = MIN_PV_POWER_W, pv_timeout_h: float = PV_TIMEOUT_H, target_hour: int = TARGET_HOUR, phases: int = PHASES, ): self.min_pv_power = min_pv_power self.pv_timeout_h = pv_timeout_h self.target_hour = target_hour self.phases = phases self._pv_ok_since: Optional[float] = None # Zeitpunkt letzter ausreichender PV self._no_pv_since: Optional[float] = None # Zeitpunkt seit kein PV self._forced_charging = False self._ems_enabled = False self._last_state = STATE_IDLE def _next_target(self) -> datetime: now = datetime.now() t = now.replace(hour=self.target_hour, minute=0, second=0, microsecond=0) if t <= now: t += timedelta(days=1) return t def _surplus_to_ma(self, surplus_w: float, wallbox_w: float = 0.0) -> int: """Gesamte verfügbare PV-Leistung → Ladestrom in mA (pro Phase). surplus_w: noch ins Netz eingespeister Überschuss wallbox_w: aktuell von der Wallbox gezogene Leistung Summe = gesamt verfügbar für Laden.""" total_w = surplus_w + wallbox_w current_a = total_w / (VOLTAGE_V * self.phases) ma = int(current_a * 1000) return max(6000, min(32000, ma)) def update(self, wallbox_reader, pv_surplus_w: float, charging_state: int, wallbox_power_w: float = 0.0) -> str: """ Wird vom Poll-Loop aufgerufen. pv_surplus_w: positiv = PV exportiert ins Netz (verbleibender Überschuss) wallbox_power_w: aktuell von der Wallbox gezogene Leistung (aus Meter-Daten) Gibt eine kurze Status-Beschreibung zurück. """ now = time.time() # EMS einmalig aktivieren wenn Auto angesteckt if charging_state in (STATE_CONNECTED, STATE_CHARGING, STATE_PAUSED): if not self._ems_enabled: if wallbox_reader.enable_ems(): self._ems_enabled = True log.info("[EMS] EMS aktiviert") else: # Auto weg → alles zurücksetzen if self._ems_enabled: wallbox_reader.set_current(0) self._ems_enabled = False self._forced_charging = False self._no_pv_since = None log.info("[EMS] Auto abgesteckt — EMS zurückgesetzt") return "kein Fahrzeug" if charging_state == STATE_COMPLETED: wallbox_reader.set_current(0) return "vollgeladen" # PV-Überschuss auswerten has_pv = pv_surplus_w >= self.min_pv_power if has_pv: self._no_pv_since = None self._forced_charging = False ma = self._surplus_to_ma(pv_surplus_w, wallbox_power_w) total_w = pv_surplus_w + wallbox_power_w wallbox_reader.set_current(ma, self.phases) return f"PV-Laden {ma / 1000:.1f}A ({total_w:.0f}W, Überschuss {pv_surplus_w:.0f}W)" # Kein PV if self._no_pv_since is None: self._no_pv_since = now no_pv_h = (now - self._no_pv_since) / 3600.0 if no_pv_h >= self.pv_timeout_h: # Zwangsladen bis Zielzeit target = self._next_target() mins_left = (target - datetime.now()).total_seconds() / 60 if not self._forced_charging: wallbox_reader.set_current(32000, self.phases) self._forced_charging = True log.info("[EMS] Zwangsladen gestartet — %dmin bis %02d:00", int(mins_left), self.target_hour) return f"Zwangsladen ({no_pv_h:.1f}h kein PV, {int(mins_left)}min bis {self.target_hour:02d}:00)" # Warten auf PV wallbox_reader.set_current(0) remaining_min = int((self.pv_timeout_h - no_pv_h) * 60) return f"warte auf PV ({remaining_min}min bis Zwangsladen)"