Files
Qbit-Manager/backend/services/torrent_service.py
2025-07-18 15:46:07 +08:00

365 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
种子服务层
负责聚合多个客户端的种子数据和执行种子操作
"""
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import qbittorrentapi
from .client_service import ClientService
class TorrentService:
"""种子服务"""
def __init__(self):
self.client_service = ClientService()
def get_all_torrents(self, client_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""获取所有客户端的种子列表"""
clients = self.client_service.get_enabled_clients()
# 如果指定了客户端ID则只获取这些客户端的数据
if client_ids:
clients = [client for client in clients if client['id'] in client_ids]
if not clients:
return {
'torrents': [],
'global_stats': {
'download_speed': 0,
'upload_speed': 0,
'total_torrents': 0,
'active_torrents': 0,
'downloading': 0,
'seeding': 0,
'paused': 0
},
'clients_status': []
}
def fetch_client_data(client):
"""获取单个客户端的数据"""
try:
qbt_client = self.client_service._create_qbt_client(client)
# 获取种子列表
torrents = qbt_client.torrents.info()
# 获取全局统计信息
transfer_info = qbt_client.transfer_info()
# 为每个种子添加客户端信息
client_torrents = []
for torrent in torrents:
# TorrentDictionary 本身就是字典,直接转换
torrent_dict = dict(torrent)
torrent_dict['client_id'] = client['id']
torrent_dict['client_name'] = client['name']
client_torrents.append(torrent_dict)
return {
'success': True,
'client_id': client['id'],
'client_name': client['name'],
'torrents': client_torrents,
'stats': {
'download_speed': transfer_info.dl_info_speed,
'upload_speed': transfer_info.up_info_speed,
'total_torrents': len(torrents),
'downloading': len([t for t in torrents if t.state in ['downloading', 'stalledDL', 'metaDL']]),
'seeding': len([t for t in torrents if t.state in ['uploading', 'stalledUP']]),
'paused': len([t for t in torrents if 'paused' in t.state.lower()])
}
}
except Exception as e:
return {
'success': False,
'client_id': client['id'],
'client_name': client['name'],
'error': str(e),
'torrents': [],
'stats': {
'download_speed': 0,
'upload_speed': 0,
'total_torrents': 0,
'downloading': 0,
'seeding': 0,
'paused': 0
}
}
# 并发获取所有客户端数据
all_torrents = []
global_stats = {
'download_speed': 0,
'upload_speed': 0,
'total_torrents': 0,
'active_torrents': 0,
'downloading': 0,
'seeding': 0,
'paused': 0
}
clients_status = []
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_client = {executor.submit(fetch_client_data, client): client
for client in clients}
for future in as_completed(future_to_client):
result = future.result()
# 收集种子数据
all_torrents.extend(result['torrents'])
# 聚合统计数据
stats = result['stats']
global_stats['download_speed'] += stats['download_speed']
global_stats['upload_speed'] += stats['upload_speed']
global_stats['total_torrents'] += stats['total_torrents']
global_stats['downloading'] += stats['downloading']
global_stats['seeding'] += stats['seeding']
global_stats['paused'] += stats['paused']
# 记录客户端状态
clients_status.append({
'client_id': result['client_id'],
'client_name': result['client_name'],
'connected': result['success'],
'error': result.get('error'),
'stats': stats
})
# 计算活跃种子数
global_stats['active_torrents'] = global_stats['downloading'] + global_stats['seeding']
return {
'torrents': all_torrents,
'global_stats': global_stats,
'clients_status': clients_status
}
def pause_torrents(self, torrent_hashes: List[str], client_id: Optional[str] = None) -> Dict[str, Any]:
"""暂停种子"""
return self._execute_torrent_action('pause', torrent_hashes, client_id)
def resume_torrents(self, torrent_hashes: List[str], client_id: Optional[str] = None) -> Dict[str, Any]:
"""恢复种子"""
return self._execute_torrent_action('resume', torrent_hashes, client_id)
def delete_torrents(self, torrent_hashes: List[str], delete_files: bool = False, client_id: Optional[str] = None) -> Dict[str, Any]:
"""删除种子"""
return self._execute_torrent_action('delete', torrent_hashes, client_id, delete_files=delete_files)
def _execute_torrent_action(self, action: str, torrent_hashes: List[str], client_id: Optional[str] = None, **kwargs) -> Dict[str, Any]:
"""执行种子操作"""
if client_id:
# 对指定客户端执行操作
client = self.client_service.get_client_by_id(client_id)
if not client:
return {'success': False, 'error': '客户端不存在'}
return self._execute_action_on_client(client, action, torrent_hashes, **kwargs)
else:
# 对所有客户端执行操作
clients = self.client_service.get_enabled_clients()
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_client = {
executor.submit(self._execute_action_on_client, client, action, torrent_hashes, **kwargs): client
for client in clients
}
for future in as_completed(future_to_client):
client = future_to_client[future]
try:
result = future.result()
result['client_id'] = client['id']
result['client_name'] = client['name']
results.append(result)
except Exception as e:
results.append({
'success': False,
'client_id': client['id'],
'client_name': client['name'],
'error': str(e)
})
# 统计成功和失败的结果
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
return {
'success': len(successful) > 0,
'results': results,
'summary': {
'total_clients': len(results),
'successful_clients': len(successful),
'failed_clients': len(failed)
}
}
def _execute_action_on_client(self, client: Dict[str, Any], action: str, torrent_hashes: List[str], **kwargs) -> Dict[str, Any]:
"""在指定客户端上执行操作"""
try:
qbt_client = self.client_service._create_qbt_client(client)
if action == 'pause':
qbt_client.torrents.pause(torrent_hashes=torrent_hashes)
elif action == 'resume':
qbt_client.torrents.resume(torrent_hashes=torrent_hashes)
elif action == 'delete':
delete_files = kwargs.get('delete_files', False)
qbt_client.torrents.delete(torrent_hashes=torrent_hashes, delete_files=delete_files)
else:
return {'success': False, 'error': f'不支持的操作: {action}'}
return {'success': True, 'message': f'操作 {action} 执行成功'}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_torrent_details(self, torrent_hash: str, client_id: str) -> Optional[Dict[str, Any]]:
"""获取种子详细信息"""
client = self.client_service.get_client_by_id(client_id)
if not client:
return None
try:
qbt_client = self.client_service._create_qbt_client(client)
# 获取种子基本信息
torrents = qbt_client.torrents.info(torrent_hashes=torrent_hash)
if not torrents:
return None
torrent = torrents[0]
torrent_dict = dict(torrent)
# 获取种子文件列表
try:
files = qbt_client.torrents.files(torrent_hash=torrent_hash)
torrent_dict['files'] = [dict(file) for file in files]
except:
torrent_dict['files'] = []
# 获取 Tracker 信息
try:
trackers = qbt_client.torrents.trackers(torrent_hash=torrent_hash)
torrent_dict['trackers'] = [dict(tracker) for tracker in trackers]
except:
torrent_dict['trackers'] = []
# 获取 Peers 信息
try:
peers = qbt_client.torrents.peers(torrent_hash=torrent_hash)
torrent_dict['peers'] = [dict(peer) for peer in peers.values()]
except:
torrent_dict['peers'] = []
# 添加客户端信息
torrent_dict['client_id'] = client['id']
torrent_dict['client_name'] = client['name']
return torrent_dict
except Exception as e:
return None
def add_torrent_file(self, client_id: str, torrent_file_path: str, options: dict) -> dict:
"""添加种子文件"""
client = self.client_service.get_client_by_id(client_id)
if not client:
return {'success': False, 'error': '客户端不存在'}
try:
qbt_client = self.client_service._create_qbt_client(client)
# 准备添加选项
add_options = {}
if options.get('category'):
add_options['category'] = options['category']
if options.get('tags'):
add_options['tags'] = options['tags']
if options.get('save_path'):
add_options['savepath'] = options['save_path']
if options.get('paused'):
add_options['paused'] = 'true'
# 添加种子文件
with open(torrent_file_path, 'rb') as torrent_file:
result = qbt_client.torrents.add(torrent_files=torrent_file, **add_options)
return {
'success': True,
'message': '种子文件添加成功',
'result': result
}
except Exception as e:
return {'success': False, 'error': str(e)}
def add_magnet_link(self, client_id: str, magnet_link: str, options: dict) -> dict:
"""添加磁力链接"""
client = self.client_service.get_client_by_id(client_id)
if not client:
return {'success': False, 'error': '客户端不存在'}
try:
qbt_client = self.client_service._create_qbt_client(client)
# 准备添加选项
add_options = {}
if options.get('category'):
add_options['category'] = options['category']
if options.get('tags'):
add_options['tags'] = options['tags']
if options.get('save_path'):
add_options['savepath'] = options['save_path']
if options.get('paused'):
add_options['paused'] = 'true'
# 添加磁力链接
result = qbt_client.torrents.add(urls=magnet_link, **add_options)
return {
'success': True,
'message': '磁力链接添加成功',
'result': result
}
except Exception as e:
return {'success': False, 'error': str(e)}
def add_torrent_url(self, client_id: str, torrent_url: str, options: dict) -> dict:
"""添加种子URL"""
client = self.client_service.get_client_by_id(client_id)
if not client:
return {'success': False, 'error': '客户端不存在'}
try:
qbt_client = self.client_service._create_qbt_client(client)
# 准备添加选项
add_options = {}
if options.get('category'):
add_options['category'] = options['category']
if options.get('tags'):
add_options['tags'] = options['tags']
if options.get('save_path'):
add_options['savepath'] = options['save_path']
if options.get('paused'):
add_options['paused'] = 'true'
# 添加种子URL
result = qbt_client.torrents.add(urls=torrent_url, **add_options)
return {
'success': True,
'message': '种子URL添加成功',
'result': result
}
except Exception as e:
return {'success': False, 'error': str(e)}