import os import sqlite3 from contextlib import contextmanager from typing import Any, Dict, Generator, Iterable, List, Optional def _ensure_dir(path: str) -> None: directory = os.path.dirname(os.path.abspath(path)) if directory and not os.path.exists(directory): os.makedirs(directory, exist_ok=True) class Database: def __init__(self, db_path: str) -> None: self.db_path = os.path.abspath(db_path) _ensure_dir(self.db_path) @contextmanager def _connect(self) -> Generator[sqlite3.Connection, None, None]: conn = sqlite3.connect(self.db_path, timeout=30.0, isolation_level=None) try: conn.row_factory = sqlite3.Row yield conn finally: conn.close() def init_db(self) -> None: with self._connect() as conn: conn.execute("PRAGMA journal_mode=WAL;") conn.execute( """ CREATE TABLE IF NOT EXISTS readings ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, timestamp TEXT NOT NULL, pressure REAL NOT NULL, voltage REAL NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_readings_device_id ON readings(device_id);") conn.execute("CREATE INDEX IF NOT EXISTS idx_readings_timestamp ON readings(timestamp);") def insert_reading(self, data: Dict[str, Any]) -> int: with self._connect() as conn: cursor = conn.execute( """ INSERT INTO readings(device_id, timestamp, pressure, voltage) VALUES (?, ?, ?, ?) """, ( data["device_id"], data["timestamp"], float(data["pressure"]), float(data["voltage"]), ), ) return int(cursor.lastrowid) def get_reading(self, reading_id: int) -> Optional[Dict[str, Any]]: with self._connect() as conn: row = conn.execute("SELECT * FROM readings WHERE id = ?", (reading_id,)).fetchone() return dict(row) if row else None def list_readings( self, filters: Dict[str, Any], *, sort: str = "desc", limit: int = 50, offset: int = 0, ) -> List[Dict[str, Any]]: where: List[str] = [] params: List[Any] = [] if "device_id" in filters: where.append("device_id = ?") params.append(filters["device_id"]) if "start" in filters: where.append("timestamp >= ?") params.append(filters["start"]) if "end" in filters: where.append("timestamp <= ?") params.append(filters["end"]) if "min_pressure" in filters: where.append("pressure >= ?") params.append(float(filters["min_pressure"])) if "max_pressure" in filters: where.append("pressure <= ?") params.append(float(filters["max_pressure"])) where_sql = (" WHERE " + " AND ".join(where)) if where else "" order_sql = " ORDER BY timestamp ASC" if sort == "asc" else " ORDER BY timestamp DESC" limit_sql = " LIMIT ? OFFSET ?" with self._connect() as conn: rows = conn.execute( f"SELECT * FROM readings{where_sql}{order_sql}{limit_sql}", (*params, int(limit), int(offset)), ).fetchall() return [dict(r) for r in rows] def count_readings(self, filters: Dict[str, Any]) -> int: where: List[str] = [] params: List[Any] = [] if "device_id" in filters: where.append("device_id = ?") params.append(filters["device_id"]) if "start" in filters: where.append("timestamp >= ?") params.append(filters["start"]) if "end" in filters: where.append("timestamp <= ?") params.append(filters["end"]) if "min_pressure" in filters: where.append("pressure >= ?") params.append(float(filters["min_pressure"])) if "max_pressure" in filters: where.append("pressure <= ?") params.append(float(filters["max_pressure"])) where_sql = (" WHERE " + " AND ".join(where)) if where else "" with self._connect() as conn: row = conn.execute(f"SELECT COUNT(1) as c FROM readings{where_sql}", params).fetchone() return int(row["c"]) if row else 0 def update_reading(self, reading_id: int, data: Dict[str, Any]) -> bool: fields: List[str] = [] params: List[Any] = [] for key in ("device_id", "timestamp", "pressure", "voltage"): if key in data: fields.append(f"{key} = ?") params.append(data[key]) if not fields: return False params.append(reading_id) with self._connect() as conn: cur = conn.execute( f"UPDATE readings SET {', '.join(fields)} WHERE id = ?", params, ) return cur.rowcount > 0 def delete_reading(self, reading_id: int) -> bool: with self._connect() as conn: cur = conn.execute("DELETE FROM readings WHERE id = ?", (reading_id,)) return cur.rowcount > 0 def get_latest_reading(self, device_id: Optional[str] = None) -> Optional[Dict[str, Any]]: where = [] params: List[Any] = [] if device_id: where.append("device_id = ?") params.append(device_id) where_sql = (" WHERE " + " AND ".join(where)) if where else "" with self._connect() as conn: row = conn.execute( f"SELECT * FROM readings{where_sql} ORDER BY timestamp DESC LIMIT 1", params, ).fetchone() return dict(row) if row else None