#!/usr/bin/env python3 """测试通知发送脚本.""" import json import sys from datetime import datetime import requests def test_notification(server_url: str, api_key: str): """测试发送通知.""" # 测试简化版 API simple_test_data = { "api_key": api_key, "content": "这是一条测试短信,用于验证通知转发功能是否正常工作。", "sender": "测试发送者" } try: print(f"🔄 正在向 {server_url} 发送测试通知...") # 发送请求到简化版 API response = requests.post( f"{server_url}/notify/simple", json=simple_test_data, headers={"Content-Type": "application/json"}, timeout=10 ) # 检查响应 if response.status_code == 200: result = response.json() if result.get("success"): print("✅ 测试通知发送成功!") print(f"📱 请检查你的 Android 设备是否收到通知") else: print(f"❌ 通知发送失败: {result.get('message')}") return False else: print(f"❌ HTTP 请求失败: {response.status_code}") print(f"响应内容: {response.text}") return False except requests.exceptions.ConnectionError: print(f"❌ 无法连接到服务器 {server_url}") print("请确认服务器正在运行且地址正确") return False except requests.exceptions.Timeout: print("❌ 请求超时") return False except Exception as e: print(f"❌ 发生错误: {e}") return False return True def test_server_status(server_url: str): """测试服务器状态.""" try: print(f"🔄 检查服务器状态...") # 健康检查 health_response = requests.get(f"{server_url}/health", timeout=5) if health_response.status_code == 200: print("✅ 服务器健康检查通过") else: print(f"⚠️ 健康检查失败: {health_response.status_code}") # 状态信息 status_response = requests.get(f"{server_url}/status", timeout=5) if status_response.status_code == 200: status = status_response.json() print(f"📊 服务器状态:") print(f" 版本: {status.get('version')}") print(f" 运行时间: {status.get('uptime', 0):.2f} 秒") print(f" 已发送通知: {status.get('notifications_sent', 0)}") print(f" 失败通知: {status.get('notifications_failed', 0)}") else: print(f"⚠️ 获取状态信息失败: {status_response.status_code}") except Exception as e: print(f"❌ 检查服务器状态时出错: {e}") return False return True def main(): """主函数.""" if len(sys.argv) < 3: print("使用方法: python test_notification.py ") print("示例: python test_notification.py http://localhost:8000 your-api-key") sys.exit(1) server_url = sys.argv[1].rstrip('/') api_key = sys.argv[2] print("SMS Forwarder 通知测试") print("=" * 30) # 测试服务器状态 if not test_server_status(server_url): print("❌ 服务器状态检查失败") sys.exit(1) print() # 测试通知发送 if test_notification(server_url, api_key): print("\n🎉 所有测试通过!") print("如果你的 Android 设备收到了通知,说明配置正确。") else: print("\n❌ 测试失败") print("请检查:") print("1. 服务器配置是否正确") print("2. API 密钥是否正确") print("3. 推送服务是否正确配置") sys.exit(1) if __name__ == "__main__": main()