""" qBittorrent 客户端服务层 负责管理客户端连接信息和与 qBittorrent API 的交互 """ import json import os from typing import List, Dict, Optional, Any from concurrent.futures import ThreadPoolExecutor, as_completed import qbittorrentapi from config import Config class ClientService: """qBittorrent 客户端服务""" def __init__(self): self.config_path = Config.CLIENTS_CONFIG_PATH self._ensure_config_dir() def _ensure_config_dir(self): """确保配置目录存在""" os.makedirs(os.path.dirname(self.config_path), exist_ok=True) if not os.path.exists(self.config_path): with open(self.config_path, 'w', encoding='utf-8') as f: json.dump([], f) def get_clients(self) -> List[Dict[str, Any]]: """获取所有客户端配置""" try: with open(self.config_path, 'r', encoding='utf-8') as f: clients = json.load(f) # 移除密码字段,不返回给前端 return [{k: v for k, v in client.items() if k != 'password'} for client in clients] except (FileNotFoundError, json.JSONDecodeError): return [] def get_client_by_id(self, client_id: str) -> Optional[Dict[str, Any]]: """根据ID获取客户端配置""" clients = self._load_clients_with_password() return next((client for client in clients if client['id'] == client_id), None) def _load_clients_with_password(self) -> List[Dict[str, Any]]: """加载包含密码的客户端配置(内部使用)""" try: with open(self.config_path, 'r', encoding='utf-8') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return [] def add_client(self, client_data: Dict[str, Any]) -> Dict[str, Any]: """添加新客户端""" clients = self._load_clients_with_password() # 生成唯一ID import uuid client_id = str(uuid.uuid4()) client = { 'id': client_id, 'name': client_data['name'], 'host': client_data['host'], 'port': client_data['port'], 'username': client_data.get('username', ''), 'password': client_data.get('password', ''), 'enabled': client_data.get('enabled', True) } clients.append(client) self._save_clients(clients) # 返回不包含密码的客户端信息 return {k: v for k, v in client.items() if k != 'password'} def update_client(self, client_id: str, client_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """更新客户端配置""" clients = self._load_clients_with_password() for i, client in enumerate(clients): if client['id'] == client_id: # 更新字段 client.update({ 'name': client_data.get('name', client['name']), 'host': client_data.get('host', client['host']), 'port': client_data.get('port', client['port']), 'username': client_data.get('username', client['username']), 'enabled': client_data.get('enabled', client['enabled']) }) # 只有提供了密码才更新 if 'password' in client_data: client['password'] = client_data['password'] clients[i] = client self._save_clients(clients) # 返回不包含密码的客户端信息 return {k: v for k, v in client.items() if k != 'password'} return None def delete_client(self, client_id: str) -> bool: """删除客户端""" clients = self._load_clients_with_password() original_count = len(clients) clients = [client for client in clients if client['id'] != client_id] if len(clients) < original_count: self._save_clients(clients) return True return False def _save_clients(self, clients: List[Dict[str, Any]]): """保存客户端配置到文件""" with open(self.config_path, 'w', encoding='utf-8') as f: json.dump(clients, f, indent=2, ensure_ascii=False) def test_client_connection(self, client_id: str) -> Dict[str, Any]: """测试客户端连接""" client = self.get_client_by_id(client_id) if not client: return {'success': False, 'error': '客户端不存在'} try: qbt_client = self._create_qbt_client(client) # 尝试获取应用版本来测试连接 version = qbt_client.app.version return { 'success': True, 'version': version, 'web_api_version': qbt_client.app.web_api_version } except qbittorrentapi.LoginFailed: return {'success': False, 'error': '认证失败,请检查用户名和密码'} except qbittorrentapi.APIConnectionError: return {'success': False, 'error': '连接失败,请检查主机地址和端口'} except Exception as e: return {'success': False, 'error': f'连接错误: {str(e)}'} def _create_qbt_client(self, client: Dict[str, Any]) -> qbittorrentapi.Client: """创建 qBittorrent 客户端实例""" return qbittorrentapi.Client( host=client['host'], port=client['port'], username=client['username'], password=client['password'], REQUESTS_ARGS={'timeout': (5, 30)} # 连接超时5秒,读取超时30秒 ) def get_enabled_clients(self) -> List[Dict[str, Any]]: """获取所有启用的客户端""" clients = self._load_clients_with_password() return [client for client in clients if client.get('enabled', True)] def get_clients_with_connection(self) -> List[Dict[str, Any]]: """获取客户端列表并测试连接状态""" clients = self.get_clients() def test_connection(client): result = self.test_client_connection(client['id']) client['connected'] = result['success'] if result['success']: client['version'] = result.get('version') client['web_api_version'] = result.get('web_api_version') else: client['error'] = result.get('error') return client # 并发测试连接 with ThreadPoolExecutor(max_workers=10) as executor: future_to_client = {executor.submit(test_connection, client): client for client in clients} results = [] for future in as_completed(future_to_client): results.append(future.result()) return results