163 lines
6.0 KiB
Python
163 lines
6.0 KiB
Python
import os
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from typing import Any, Dict, Generator, Iterable, List, Optional
|
|
|
|
|
|
def _ensure_dir(path: str) -> None:
|
|
directory = os.path.dirname(os.path.abspath(path))
|
|
if directory and not os.path.exists(directory):
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
|
|
class Database:
|
|
def __init__(self, db_path: str) -> None:
|
|
self.db_path = os.path.abspath(db_path)
|
|
_ensure_dir(self.db_path)
|
|
|
|
@contextmanager
|
|
def _connect(self) -> Generator[sqlite3.Connection, None, None]:
|
|
conn = sqlite3.connect(self.db_path, timeout=30.0, isolation_level=None)
|
|
try:
|
|
conn.row_factory = sqlite3.Row
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
def init_db(self) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS readings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
device_id TEXT NOT NULL,
|
|
timestamp TEXT NOT NULL,
|
|
pressure REAL NOT NULL,
|
|
voltage REAL NOT NULL,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_readings_device_id ON readings(device_id);")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_readings_timestamp ON readings(timestamp);")
|
|
|
|
def insert_reading(self, data: Dict[str, Any]) -> int:
|
|
with self._connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO readings(device_id, timestamp, pressure, voltage)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(
|
|
data["device_id"],
|
|
data["timestamp"],
|
|
float(data["pressure"]),
|
|
float(data["voltage"]),
|
|
),
|
|
)
|
|
return int(cursor.lastrowid)
|
|
|
|
def get_reading(self, reading_id: int) -> Optional[Dict[str, Any]]:
|
|
with self._connect() as conn:
|
|
row = conn.execute("SELECT * FROM readings WHERE id = ?", (reading_id,)).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
def list_readings(
|
|
self,
|
|
filters: Dict[str, Any],
|
|
*,
|
|
sort: str = "desc",
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
) -> List[Dict[str, Any]]:
|
|
where: List[str] = []
|
|
params: List[Any] = []
|
|
if "device_id" in filters:
|
|
where.append("device_id = ?")
|
|
params.append(filters["device_id"])
|
|
if "start" in filters:
|
|
where.append("timestamp >= ?")
|
|
params.append(filters["start"])
|
|
if "end" in filters:
|
|
where.append("timestamp <= ?")
|
|
params.append(filters["end"])
|
|
if "min_pressure" in filters:
|
|
where.append("pressure >= ?")
|
|
params.append(float(filters["min_pressure"]))
|
|
if "max_pressure" in filters:
|
|
where.append("pressure <= ?")
|
|
params.append(float(filters["max_pressure"]))
|
|
|
|
where_sql = (" WHERE " + " AND ".join(where)) if where else ""
|
|
order_sql = " ORDER BY timestamp ASC" if sort == "asc" else " ORDER BY timestamp DESC"
|
|
limit_sql = " LIMIT ? OFFSET ?"
|
|
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
f"SELECT * FROM readings{where_sql}{order_sql}{limit_sql}",
|
|
(*params, int(limit), int(offset)),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def count_readings(self, filters: Dict[str, Any]) -> int:
|
|
where: List[str] = []
|
|
params: List[Any] = []
|
|
if "device_id" in filters:
|
|
where.append("device_id = ?")
|
|
params.append(filters["device_id"])
|
|
if "start" in filters:
|
|
where.append("timestamp >= ?")
|
|
params.append(filters["start"])
|
|
if "end" in filters:
|
|
where.append("timestamp <= ?")
|
|
params.append(filters["end"])
|
|
if "min_pressure" in filters:
|
|
where.append("pressure >= ?")
|
|
params.append(float(filters["min_pressure"]))
|
|
if "max_pressure" in filters:
|
|
where.append("pressure <= ?")
|
|
params.append(float(filters["max_pressure"]))
|
|
where_sql = (" WHERE " + " AND ".join(where)) if where else ""
|
|
with self._connect() as conn:
|
|
row = conn.execute(f"SELECT COUNT(1) as c FROM readings{where_sql}", params).fetchone()
|
|
return int(row["c"]) if row else 0
|
|
|
|
def update_reading(self, reading_id: int, data: Dict[str, Any]) -> bool:
|
|
fields: List[str] = []
|
|
params: List[Any] = []
|
|
for key in ("device_id", "timestamp", "pressure", "voltage"):
|
|
if key in data:
|
|
fields.append(f"{key} = ?")
|
|
params.append(data[key])
|
|
if not fields:
|
|
return False
|
|
params.append(reading_id)
|
|
with self._connect() as conn:
|
|
cur = conn.execute(
|
|
f"UPDATE readings SET {', '.join(fields)} WHERE id = ?",
|
|
params,
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
def delete_reading(self, reading_id: int) -> bool:
|
|
with self._connect() as conn:
|
|
cur = conn.execute("DELETE FROM readings WHERE id = ?", (reading_id,))
|
|
return cur.rowcount > 0
|
|
|
|
def get_latest_reading(self, device_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
where = []
|
|
params: List[Any] = []
|
|
if device_id:
|
|
where.append("device_id = ?")
|
|
params.append(device_id)
|
|
where_sql = (" WHERE " + " AND ".join(where)) if where else ""
|
|
with self._connect() as conn:
|
|
row = conn.execute(
|
|
f"SELECT * FROM readings{where_sql} ORDER BY timestamp DESC LIMIT 1",
|
|
params,
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|