179 lines
7.0 KiB
Python
179 lines
7.0 KiB
Python
"""
|
||
qBittorrent 客户端服务层
|
||
负责管理客户端连接信息和与 qBittorrent API 的交互
|
||
"""
|
||
import json
|
||
import os
|
||
from typing import List, Dict, Optional, Any
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
import qbittorrentapi
|
||
from config import Config
|
||
|
||
|
||
class ClientService:
|
||
"""qBittorrent 客户端服务"""
|
||
|
||
def __init__(self):
|
||
self.config_path = Config.CLIENTS_CONFIG_PATH
|
||
self._ensure_config_dir()
|
||
|
||
def _ensure_config_dir(self):
|
||
"""确保配置目录存在"""
|
||
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
|
||
if not os.path.exists(self.config_path):
|
||
with open(self.config_path, 'w', encoding='utf-8') as f:
|
||
json.dump([], f)
|
||
|
||
def get_clients(self) -> List[Dict[str, Any]]:
|
||
"""获取所有客户端配置"""
|
||
try:
|
||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||
clients = json.load(f)
|
||
# 移除密码字段,不返回给前端
|
||
return [{k: v for k, v in client.items() if k != 'password'}
|
||
for client in clients]
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
return []
|
||
|
||
def get_client_by_id(self, client_id: str) -> Optional[Dict[str, Any]]:
|
||
"""根据ID获取客户端配置"""
|
||
clients = self._load_clients_with_password()
|
||
return next((client for client in clients if client['id'] == client_id), None)
|
||
|
||
def _load_clients_with_password(self) -> List[Dict[str, Any]]:
|
||
"""加载包含密码的客户端配置(内部使用)"""
|
||
try:
|
||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
return []
|
||
|
||
def add_client(self, client_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""添加新客户端"""
|
||
clients = self._load_clients_with_password()
|
||
|
||
# 生成唯一ID
|
||
import uuid
|
||
client_id = str(uuid.uuid4())
|
||
|
||
client = {
|
||
'id': client_id,
|
||
'name': client_data['name'],
|
||
'host': client_data['host'],
|
||
'port': client_data['port'],
|
||
'username': client_data.get('username', ''),
|
||
'password': client_data.get('password', ''),
|
||
'enabled': client_data.get('enabled', True)
|
||
}
|
||
|
||
clients.append(client)
|
||
self._save_clients(clients)
|
||
|
||
# 返回不包含密码的客户端信息
|
||
return {k: v for k, v in client.items() if k != 'password'}
|
||
|
||
def update_client(self, client_id: str, client_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
"""更新客户端配置"""
|
||
clients = self._load_clients_with_password()
|
||
|
||
for i, client in enumerate(clients):
|
||
if client['id'] == client_id:
|
||
# 更新字段
|
||
client.update({
|
||
'name': client_data.get('name', client['name']),
|
||
'host': client_data.get('host', client['host']),
|
||
'port': client_data.get('port', client['port']),
|
||
'username': client_data.get('username', client['username']),
|
||
'enabled': client_data.get('enabled', client['enabled'])
|
||
})
|
||
|
||
# 只有提供了密码才更新
|
||
if 'password' in client_data:
|
||
client['password'] = client_data['password']
|
||
|
||
clients[i] = client
|
||
self._save_clients(clients)
|
||
|
||
# 返回不包含密码的客户端信息
|
||
return {k: v for k, v in client.items() if k != 'password'}
|
||
|
||
return None
|
||
|
||
def delete_client(self, client_id: str) -> bool:
|
||
"""删除客户端"""
|
||
clients = self._load_clients_with_password()
|
||
original_count = len(clients)
|
||
clients = [client for client in clients if client['id'] != client_id]
|
||
|
||
if len(clients) < original_count:
|
||
self._save_clients(clients)
|
||
return True
|
||
return False
|
||
|
||
def _save_clients(self, clients: List[Dict[str, Any]]):
|
||
"""保存客户端配置到文件"""
|
||
with open(self.config_path, 'w', encoding='utf-8') as f:
|
||
json.dump(clients, f, indent=2, ensure_ascii=False)
|
||
|
||
def test_client_connection(self, client_id: str) -> Dict[str, Any]:
|
||
"""测试客户端连接"""
|
||
client = self.get_client_by_id(client_id)
|
||
if not client:
|
||
return {'success': False, 'error': '客户端不存在'}
|
||
|
||
try:
|
||
qbt_client = self._create_qbt_client(client)
|
||
# 尝试获取应用版本来测试连接
|
||
version = qbt_client.app.version
|
||
return {
|
||
'success': True,
|
||
'version': version,
|
||
'web_api_version': qbt_client.app.web_api_version
|
||
}
|
||
except qbittorrentapi.LoginFailed:
|
||
return {'success': False, 'error': '认证失败,请检查用户名和密码'}
|
||
except qbittorrentapi.APIConnectionError:
|
||
return {'success': False, 'error': '连接失败,请检查主机地址和端口'}
|
||
except Exception as e:
|
||
return {'success': False, 'error': f'连接错误: {str(e)}'}
|
||
|
||
def _create_qbt_client(self, client: Dict[str, Any]) -> qbittorrentapi.Client:
|
||
"""创建 qBittorrent 客户端实例"""
|
||
return qbittorrentapi.Client(
|
||
host=client['host'],
|
||
port=client['port'],
|
||
username=client['username'],
|
||
password=client['password'],
|
||
REQUESTS_ARGS={'timeout': (5, 30)} # 连接超时5秒,读取超时30秒
|
||
)
|
||
|
||
def get_enabled_clients(self) -> List[Dict[str, Any]]:
|
||
"""获取所有启用的客户端"""
|
||
clients = self._load_clients_with_password()
|
||
return [client for client in clients if client.get('enabled', True)]
|
||
|
||
def get_clients_with_connection(self) -> List[Dict[str, Any]]:
|
||
"""获取客户端列表并测试连接状态"""
|
||
clients = self.get_clients()
|
||
|
||
def test_connection(client):
|
||
result = self.test_client_connection(client['id'])
|
||
client['connected'] = result['success']
|
||
if result['success']:
|
||
client['version'] = result.get('version')
|
||
client['web_api_version'] = result.get('web_api_version')
|
||
else:
|
||
client['error'] = result.get('error')
|
||
return client
|
||
|
||
# 并发测试连接
|
||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||
future_to_client = {executor.submit(test_connection, client): client
|
||
for client in clients}
|
||
|
||
results = []
|
||
for future in as_completed(future_to_client):
|
||
results.append(future.result())
|
||
|
||
return results
|