365 lines
14 KiB
Python
365 lines
14 KiB
Python
"""
|
||
种子服务层
|
||
负责聚合多个客户端的种子数据和执行种子操作
|
||
"""
|
||
from typing import List, Dict, Any, Optional
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
import qbittorrentapi
|
||
from .client_service import ClientService
|
||
|
||
|
||
class TorrentService:
|
||
"""种子服务"""
|
||
|
||
def __init__(self):
|
||
self.client_service = ClientService()
|
||
|
||
def get_all_torrents(self, client_ids: Optional[List[str]] = None) -> Dict[str, Any]:
|
||
"""获取所有客户端的种子列表"""
|
||
clients = self.client_service.get_enabled_clients()
|
||
|
||
# 如果指定了客户端ID,则只获取这些客户端的数据
|
||
if client_ids:
|
||
clients = [client for client in clients if client['id'] in client_ids]
|
||
|
||
if not clients:
|
||
return {
|
||
'torrents': [],
|
||
'global_stats': {
|
||
'download_speed': 0,
|
||
'upload_speed': 0,
|
||
'total_torrents': 0,
|
||
'active_torrents': 0,
|
||
'downloading': 0,
|
||
'seeding': 0,
|
||
'paused': 0
|
||
},
|
||
'clients_status': []
|
||
}
|
||
|
||
def fetch_client_data(client):
|
||
"""获取单个客户端的数据"""
|
||
try:
|
||
qbt_client = self.client_service._create_qbt_client(client)
|
||
|
||
# 获取种子列表
|
||
torrents = qbt_client.torrents.info()
|
||
|
||
# 获取全局统计信息
|
||
transfer_info = qbt_client.transfer_info()
|
||
|
||
# 为每个种子添加客户端信息
|
||
client_torrents = []
|
||
for torrent in torrents:
|
||
# TorrentDictionary 本身就是字典,直接转换
|
||
torrent_dict = dict(torrent)
|
||
torrent_dict['client_id'] = client['id']
|
||
torrent_dict['client_name'] = client['name']
|
||
client_torrents.append(torrent_dict)
|
||
|
||
return {
|
||
'success': True,
|
||
'client_id': client['id'],
|
||
'client_name': client['name'],
|
||
'torrents': client_torrents,
|
||
'stats': {
|
||
'download_speed': transfer_info.dl_info_speed,
|
||
'upload_speed': transfer_info.up_info_speed,
|
||
'total_torrents': len(torrents),
|
||
'downloading': len([t for t in torrents if t.state in ['downloading', 'stalledDL', 'metaDL']]),
|
||
'seeding': len([t for t in torrents if t.state in ['uploading', 'stalledUP']]),
|
||
'paused': len([t for t in torrents if 'paused' in t.state.lower()])
|
||
}
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'client_id': client['id'],
|
||
'client_name': client['name'],
|
||
'error': str(e),
|
||
'torrents': [],
|
||
'stats': {
|
||
'download_speed': 0,
|
||
'upload_speed': 0,
|
||
'total_torrents': 0,
|
||
'downloading': 0,
|
||
'seeding': 0,
|
||
'paused': 0
|
||
}
|
||
}
|
||
|
||
# 并发获取所有客户端数据
|
||
all_torrents = []
|
||
global_stats = {
|
||
'download_speed': 0,
|
||
'upload_speed': 0,
|
||
'total_torrents': 0,
|
||
'active_torrents': 0,
|
||
'downloading': 0,
|
||
'seeding': 0,
|
||
'paused': 0
|
||
}
|
||
clients_status = []
|
||
|
||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||
future_to_client = {executor.submit(fetch_client_data, client): client
|
||
for client in clients}
|
||
|
||
for future in as_completed(future_to_client):
|
||
result = future.result()
|
||
|
||
# 收集种子数据
|
||
all_torrents.extend(result['torrents'])
|
||
|
||
# 聚合统计数据
|
||
stats = result['stats']
|
||
global_stats['download_speed'] += stats['download_speed']
|
||
global_stats['upload_speed'] += stats['upload_speed']
|
||
global_stats['total_torrents'] += stats['total_torrents']
|
||
global_stats['downloading'] += stats['downloading']
|
||
global_stats['seeding'] += stats['seeding']
|
||
global_stats['paused'] += stats['paused']
|
||
|
||
# 记录客户端状态
|
||
clients_status.append({
|
||
'client_id': result['client_id'],
|
||
'client_name': result['client_name'],
|
||
'connected': result['success'],
|
||
'error': result.get('error'),
|
||
'stats': stats
|
||
})
|
||
|
||
# 计算活跃种子数
|
||
global_stats['active_torrents'] = global_stats['downloading'] + global_stats['seeding']
|
||
|
||
return {
|
||
'torrents': all_torrents,
|
||
'global_stats': global_stats,
|
||
'clients_status': clients_status
|
||
}
|
||
|
||
def pause_torrents(self, torrent_hashes: List[str], client_id: Optional[str] = None) -> Dict[str, Any]:
|
||
"""暂停种子"""
|
||
return self._execute_torrent_action('pause', torrent_hashes, client_id)
|
||
|
||
def resume_torrents(self, torrent_hashes: List[str], client_id: Optional[str] = None) -> Dict[str, Any]:
|
||
"""恢复种子"""
|
||
return self._execute_torrent_action('resume', torrent_hashes, client_id)
|
||
|
||
def delete_torrents(self, torrent_hashes: List[str], delete_files: bool = False, client_id: Optional[str] = None) -> Dict[str, Any]:
|
||
"""删除种子"""
|
||
return self._execute_torrent_action('delete', torrent_hashes, client_id, delete_files=delete_files)
|
||
|
||
def _execute_torrent_action(self, action: str, torrent_hashes: List[str], client_id: Optional[str] = None, **kwargs) -> Dict[str, Any]:
|
||
"""执行种子操作"""
|
||
if client_id:
|
||
# 对指定客户端执行操作
|
||
client = self.client_service.get_client_by_id(client_id)
|
||
if not client:
|
||
return {'success': False, 'error': '客户端不存在'}
|
||
|
||
return self._execute_action_on_client(client, action, torrent_hashes, **kwargs)
|
||
else:
|
||
# 对所有客户端执行操作
|
||
clients = self.client_service.get_enabled_clients()
|
||
results = []
|
||
|
||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||
future_to_client = {
|
||
executor.submit(self._execute_action_on_client, client, action, torrent_hashes, **kwargs): client
|
||
for client in clients
|
||
}
|
||
|
||
for future in as_completed(future_to_client):
|
||
client = future_to_client[future]
|
||
try:
|
||
result = future.result()
|
||
result['client_id'] = client['id']
|
||
result['client_name'] = client['name']
|
||
results.append(result)
|
||
except Exception as e:
|
||
results.append({
|
||
'success': False,
|
||
'client_id': client['id'],
|
||
'client_name': client['name'],
|
||
'error': str(e)
|
||
})
|
||
|
||
# 统计成功和失败的结果
|
||
successful = [r for r in results if r['success']]
|
||
failed = [r for r in results if not r['success']]
|
||
|
||
return {
|
||
'success': len(successful) > 0,
|
||
'results': results,
|
||
'summary': {
|
||
'total_clients': len(results),
|
||
'successful_clients': len(successful),
|
||
'failed_clients': len(failed)
|
||
}
|
||
}
|
||
|
||
def _execute_action_on_client(self, client: Dict[str, Any], action: str, torrent_hashes: List[str], **kwargs) -> Dict[str, Any]:
|
||
"""在指定客户端上执行操作"""
|
||
try:
|
||
qbt_client = self.client_service._create_qbt_client(client)
|
||
|
||
if action == 'pause':
|
||
qbt_client.torrents.pause(torrent_hashes=torrent_hashes)
|
||
elif action == 'resume':
|
||
qbt_client.torrents.resume(torrent_hashes=torrent_hashes)
|
||
elif action == 'delete':
|
||
delete_files = kwargs.get('delete_files', False)
|
||
qbt_client.torrents.delete(torrent_hashes=torrent_hashes, delete_files=delete_files)
|
||
else:
|
||
return {'success': False, 'error': f'不支持的操作: {action}'}
|
||
|
||
return {'success': True, 'message': f'操作 {action} 执行成功'}
|
||
|
||
except Exception as e:
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
def get_torrent_details(self, torrent_hash: str, client_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取种子详细信息"""
|
||
client = self.client_service.get_client_by_id(client_id)
|
||
if not client:
|
||
return None
|
||
|
||
try:
|
||
qbt_client = self.client_service._create_qbt_client(client)
|
||
|
||
# 获取种子基本信息
|
||
torrents = qbt_client.torrents.info(torrent_hashes=torrent_hash)
|
||
if not torrents:
|
||
return None
|
||
|
||
torrent = torrents[0]
|
||
torrent_dict = dict(torrent)
|
||
|
||
# 获取种子文件列表
|
||
try:
|
||
files = qbt_client.torrents.files(torrent_hash=torrent_hash)
|
||
torrent_dict['files'] = [dict(file) for file in files]
|
||
except:
|
||
torrent_dict['files'] = []
|
||
|
||
# 获取 Tracker 信息
|
||
try:
|
||
trackers = qbt_client.torrents.trackers(torrent_hash=torrent_hash)
|
||
torrent_dict['trackers'] = [dict(tracker) for tracker in trackers]
|
||
except:
|
||
torrent_dict['trackers'] = []
|
||
|
||
# 获取 Peers 信息
|
||
try:
|
||
peers = qbt_client.torrents.peers(torrent_hash=torrent_hash)
|
||
torrent_dict['peers'] = [dict(peer) for peer in peers.values()]
|
||
except:
|
||
torrent_dict['peers'] = []
|
||
|
||
# 添加客户端信息
|
||
torrent_dict['client_id'] = client['id']
|
||
torrent_dict['client_name'] = client['name']
|
||
|
||
return torrent_dict
|
||
|
||
except Exception as e:
|
||
return None
|
||
|
||
def add_torrent_file(self, client_id: str, torrent_file_path: str, options: dict) -> dict:
|
||
"""添加种子文件"""
|
||
client = self.client_service.get_client_by_id(client_id)
|
||
if not client:
|
||
return {'success': False, 'error': '客户端不存在'}
|
||
|
||
try:
|
||
qbt_client = self.client_service._create_qbt_client(client)
|
||
|
||
# 准备添加选项
|
||
add_options = {}
|
||
if options.get('category'):
|
||
add_options['category'] = options['category']
|
||
if options.get('tags'):
|
||
add_options['tags'] = options['tags']
|
||
if options.get('save_path'):
|
||
add_options['savepath'] = options['save_path']
|
||
if options.get('paused'):
|
||
add_options['paused'] = 'true'
|
||
|
||
# 添加种子文件
|
||
with open(torrent_file_path, 'rb') as torrent_file:
|
||
result = qbt_client.torrents.add(torrent_files=torrent_file, **add_options)
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '种子文件添加成功',
|
||
'result': result
|
||
}
|
||
|
||
except Exception as e:
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
def add_magnet_link(self, client_id: str, magnet_link: str, options: dict) -> dict:
|
||
"""添加磁力链接"""
|
||
client = self.client_service.get_client_by_id(client_id)
|
||
if not client:
|
||
return {'success': False, 'error': '客户端不存在'}
|
||
|
||
try:
|
||
qbt_client = self.client_service._create_qbt_client(client)
|
||
|
||
# 准备添加选项
|
||
add_options = {}
|
||
if options.get('category'):
|
||
add_options['category'] = options['category']
|
||
if options.get('tags'):
|
||
add_options['tags'] = options['tags']
|
||
if options.get('save_path'):
|
||
add_options['savepath'] = options['save_path']
|
||
if options.get('paused'):
|
||
add_options['paused'] = 'true'
|
||
|
||
# 添加磁力链接
|
||
result = qbt_client.torrents.add(urls=magnet_link, **add_options)
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '磁力链接添加成功',
|
||
'result': result
|
||
}
|
||
|
||
except Exception as e:
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
def add_torrent_url(self, client_id: str, torrent_url: str, options: dict) -> dict:
|
||
"""添加种子URL"""
|
||
client = self.client_service.get_client_by_id(client_id)
|
||
if not client:
|
||
return {'success': False, 'error': '客户端不存在'}
|
||
|
||
try:
|
||
qbt_client = self.client_service._create_qbt_client(client)
|
||
|
||
# 准备添加选项
|
||
add_options = {}
|
||
if options.get('category'):
|
||
add_options['category'] = options['category']
|
||
if options.get('tags'):
|
||
add_options['tags'] = options['tags']
|
||
if options.get('save_path'):
|
||
add_options['savepath'] = options['save_path']
|
||
if options.get('paused'):
|
||
add_options['paused'] = 'true'
|
||
|
||
# 添加种子URL
|
||
result = qbt_client.torrents.add(urls=torrent_url, **add_options)
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '种子URL添加成功',
|
||
'result': result
|
||
}
|
||
|
||
except Exception as e:
|
||
return {'success': False, 'error': str(e)}
|