import os from datetime import datetime, timezone from typing import Any, Dict, Optional, Tuple from flask import Flask, jsonify, request try: from flask_cors import CORS except Exception: # pragma: no cover CORS = None # type: ignore from db import Database def _get_env(name: str, default: Optional[str] = None) -> str: value = os.environ.get(name) return value if value is not None else (default if default is not None else "") def _normalize_iso8601(timestamp_str: str) -> str: if not isinstance(timestamp_str, str) or not timestamp_str: return datetime.now(timezone.utc).isoformat() ts = timestamp_str.strip() if ts.endswith("Z"): ts = ts[:-1] + "+00:00" try: parsed = datetime.fromisoformat(ts) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc).isoformat() except Exception: return datetime.now(timezone.utc).isoformat() def _validate_reading_payload(payload: Dict[str, Any], partial: bool = False) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: allowed_keys = {"device_id", "timestamp", "pressure", "voltage"} if not isinstance(payload, dict): return None, "Invalid JSON body" for key in payload.keys(): if key not in allowed_keys: return None, f"Unexpected field: {key}" if not partial: missing = [k for k in ["device_id", "pressure", "voltage"] if k not in payload] if missing: return None, f"Missing required fields: {', '.join(missing)}" result: Dict[str, Any] = {} if "device_id" in payload: if not isinstance(payload["device_id"], str) or not payload["device_id"].strip(): return None, "device_id must be a non-empty string" result["device_id"] = payload["device_id"].strip() if "timestamp" in payload: result["timestamp"] = _normalize_iso8601(str(payload["timestamp"])) elif not partial: result["timestamp"] = datetime.now(timezone.utc).isoformat() if "pressure" in payload: try: result["pressure"] = float(payload["pressure"]) except Exception: return None, "pressure must be a number" if "voltage" in payload: try: result["voltage"] = float(payload["voltage"]) except Exception: return None, "voltage must be a number" return result, None app = Flask(__name__) if CORS is not None: CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=False) DB_PATH = _get_env("DB_PATH", os.path.join(os.path.dirname(__file__), "readings.db")) database = Database(DB_PATH) database.init_db() @app.get("/health") def health() -> Any: return jsonify({"ok": True}) @app.post("/data") def ingest_from_device() -> Any: payload = request.get_json(silent=True) or {} data, error = _validate_reading_payload(payload, partial=False) if error: return jsonify({"ok": False, "error": error}), 400 new_id = database.insert_reading(data) return jsonify({"ok": True, "id": new_id}), 201 @app.get("/api/readings") def list_readings() -> Any: device_id = request.args.get("device_id") start = request.args.get("start") end = request.args.get("end") sort = (request.args.get("sort") or "desc").lower() sort = "asc" if sort == "asc" else "desc" try: page = int(request.args.get("page", 1)) page_size = int(request.args.get("page_size", 50)) except Exception: return jsonify({"ok": False, "error": "page and page_size must be integers"}), 400 page = max(1, page) page_size = min(max(1, page_size), 500) filters: Dict[str, Any] = {} if device_id: filters["device_id"] = device_id if start: filters["start"] = _normalize_iso8601(start) if end: filters["end"] = _normalize_iso8601(end) # 可选压力范围过滤,用于图表或报警检索 min_p = request.args.get("min_pressure") max_p = request.args.get("max_pressure") if min_p is not None: try: filters["min_pressure"] = float(min_p) except Exception: return jsonify({"ok": False, "error": "min_pressure must be a number"}), 400 if max_p is not None: try: filters["max_pressure"] = float(max_p) except Exception: return jsonify({"ok": False, "error": "max_pressure must be a number"}), 400 total = database.count_readings(filters) items = database.list_readings(filters, sort=sort, limit=page_size, offset=(page - 1) * page_size) return jsonify({ "ok": True, "total": total, "page": page, "page_size": page_size, "items": items, }) @app.get("/api/readings/latest") def latest_reading() -> Any: device_id = request.args.get("device_id") row = database.get_latest_reading(device_id) if not row: return jsonify({"ok": False, "error": "Not found"}), 404 return jsonify({"ok": True, "item": row}) @app.get("/api/alarms") def list_alarms() -> Any: device_id = request.args.get("device_id") start = request.args.get("start") end = request.args.get("end") min_p = request.args.get("min_pressure") max_p = request.args.get("max_pressure") if min_p is None and max_p is None: return jsonify({"ok": False, "error": "min_pressure or max_pressure is required"}), 400 try: page = int(request.args.get("page", 1)) page_size = int(request.args.get("page_size", 50)) except Exception: return jsonify({"ok": False, "error": "page and page_size must be integers"}), 400 page = max(1, page) page_size = min(max(1, page_size), 500) filters: Dict[str, Any] = {} if device_id: filters["device_id"] = device_id if start: filters["start"] = _normalize_iso8601(start) if end: filters["end"] = _normalize_iso8601(end) if min_p is not None: try: filters["min_pressure"] = float(min_p) except Exception: return jsonify({"ok": False, "error": "min_pressure must be a number"}), 400 if max_p is not None: try: filters["max_pressure"] = float(max_p) except Exception: return jsonify({"ok": False, "error": "max_pressure must be a number"}), 400 total = database.count_readings(filters) items = database.list_readings(filters, sort="desc", limit=page_size, offset=(page - 1) * page_size) return jsonify({ "ok": True, "total": total, "page": page, "page_size": page_size, "items": items, }) @app.get("/api/readings/") def get_reading(reading_id: int) -> Any: row = database.get_reading(reading_id) if not row: return jsonify({"ok": False, "error": "Not found"}), 404 return jsonify({"ok": True, "item": row}) @app.post("/api/readings") def create_reading() -> Any: payload = request.get_json(silent=True) or {} data, error = _validate_reading_payload(payload, partial=False) if error: return jsonify({"ok": False, "error": error}), 400 new_id = database.insert_reading(data) return jsonify({"ok": True, "id": new_id}), 201 @app.put("/api/readings/") def update_reading(reading_id: int) -> Any: payload = request.get_json(silent=True) or {} data, error = _validate_reading_payload(payload, partial=True) if error: return jsonify({"ok": False, "error": error}), 400 if not data: return jsonify({"ok": False, "error": "Empty body"}), 400 updated = database.update_reading(reading_id, data) if not updated: return jsonify({"ok": False, "error": "Not found"}), 404 return jsonify({"ok": True}) @app.delete("/api/readings/") def delete_reading(reading_id: int) -> Any: deleted = database.delete_reading(reading_id) if not deleted: return jsonify({"ok": False, "error": "Not found"}), 404 return jsonify({"ok": True}) if __name__ == "__main__": host = _get_env("HOST", "0.0.0.0") try: port = int(_get_env("PORT", "5000")) except Exception: port = 5000 # 注意:根据用户要求,不在此处自动启动,只提供可运行入口 app.run(host=host, port=port)