Files
2025-09-09 15:00:30 +08:00

163 lines
6.0 KiB
Python

import os
import sqlite3
from contextlib import contextmanager
from typing import Any, Dict, Generator, Iterable, List, Optional
def _ensure_dir(path: str) -> None:
directory = os.path.dirname(os.path.abspath(path))
if directory and not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
class Database:
def __init__(self, db_path: str) -> None:
self.db_path = os.path.abspath(db_path)
_ensure_dir(self.db_path)
@contextmanager
def _connect(self) -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(self.db_path, timeout=30.0, isolation_level=None)
try:
conn.row_factory = sqlite3.Row
yield conn
finally:
conn.close()
def init_db(self) -> None:
with self._connect() as conn:
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
pressure REAL NOT NULL,
voltage REAL NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_readings_device_id ON readings(device_id);")
conn.execute("CREATE INDEX IF NOT EXISTS idx_readings_timestamp ON readings(timestamp);")
def insert_reading(self, data: Dict[str, Any]) -> int:
with self._connect() as conn:
cursor = conn.execute(
"""
INSERT INTO readings(device_id, timestamp, pressure, voltage)
VALUES (?, ?, ?, ?)
""",
(
data["device_id"],
data["timestamp"],
float(data["pressure"]),
float(data["voltage"]),
),
)
return int(cursor.lastrowid)
def get_reading(self, reading_id: int) -> Optional[Dict[str, Any]]:
with self._connect() as conn:
row = conn.execute("SELECT * FROM readings WHERE id = ?", (reading_id,)).fetchone()
return dict(row) if row else None
def list_readings(
self,
filters: Dict[str, Any],
*,
sort: str = "desc",
limit: int = 50,
offset: int = 0,
) -> List[Dict[str, Any]]:
where: List[str] = []
params: List[Any] = []
if "device_id" in filters:
where.append("device_id = ?")
params.append(filters["device_id"])
if "start" in filters:
where.append("timestamp >= ?")
params.append(filters["start"])
if "end" in filters:
where.append("timestamp <= ?")
params.append(filters["end"])
if "min_pressure" in filters:
where.append("pressure >= ?")
params.append(float(filters["min_pressure"]))
if "max_pressure" in filters:
where.append("pressure <= ?")
params.append(float(filters["max_pressure"]))
where_sql = (" WHERE " + " AND ".join(where)) if where else ""
order_sql = " ORDER BY timestamp ASC" if sort == "asc" else " ORDER BY timestamp DESC"
limit_sql = " LIMIT ? OFFSET ?"
with self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM readings{where_sql}{order_sql}{limit_sql}",
(*params, int(limit), int(offset)),
).fetchall()
return [dict(r) for r in rows]
def count_readings(self, filters: Dict[str, Any]) -> int:
where: List[str] = []
params: List[Any] = []
if "device_id" in filters:
where.append("device_id = ?")
params.append(filters["device_id"])
if "start" in filters:
where.append("timestamp >= ?")
params.append(filters["start"])
if "end" in filters:
where.append("timestamp <= ?")
params.append(filters["end"])
if "min_pressure" in filters:
where.append("pressure >= ?")
params.append(float(filters["min_pressure"]))
if "max_pressure" in filters:
where.append("pressure <= ?")
params.append(float(filters["max_pressure"]))
where_sql = (" WHERE " + " AND ".join(where)) if where else ""
with self._connect() as conn:
row = conn.execute(f"SELECT COUNT(1) as c FROM readings{where_sql}", params).fetchone()
return int(row["c"]) if row else 0
def update_reading(self, reading_id: int, data: Dict[str, Any]) -> bool:
fields: List[str] = []
params: List[Any] = []
for key in ("device_id", "timestamp", "pressure", "voltage"):
if key in data:
fields.append(f"{key} = ?")
params.append(data[key])
if not fields:
return False
params.append(reading_id)
with self._connect() as conn:
cur = conn.execute(
f"UPDATE readings SET {', '.join(fields)} WHERE id = ?",
params,
)
return cur.rowcount > 0
def delete_reading(self, reading_id: int) -> bool:
with self._connect() as conn:
cur = conn.execute("DELETE FROM readings WHERE id = ?", (reading_id,))
return cur.rowcount > 0
def get_latest_reading(self, device_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
where = []
params: List[Any] = []
if device_id:
where.append("device_id = ?")
params.append(device_id)
where_sql = (" WHERE " + " AND ".join(where)) if where else ""
with self._connect() as conn:
row = conn.execute(
f"SELECT * FROM readings{where_sql} ORDER BY timestamp DESC LIMIT 1",
params,
).fetchone()
return dict(row) if row else None