""" 种子服务层 负责聚合多个客户端的种子数据和执行种子操作 """ from typing import List, Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import qbittorrentapi from .client_service import ClientService class TorrentService: """种子服务""" def __init__(self): self.client_service = ClientService() def get_all_torrents(self, client_ids: Optional[List[str]] = None) -> Dict[str, Any]: """获取所有客户端的种子列表""" clients = self.client_service.get_enabled_clients() # 如果指定了客户端ID,则只获取这些客户端的数据 if client_ids: clients = [client for client in clients if client['id'] in client_ids] if not clients: return { 'torrents': [], 'global_stats': { 'download_speed': 0, 'upload_speed': 0, 'total_torrents': 0, 'active_torrents': 0, 'downloading': 0, 'seeding': 0, 'paused': 0 }, 'clients_status': [] } def fetch_client_data(client): """获取单个客户端的数据""" try: qbt_client = self.client_service._create_qbt_client(client) # 获取种子列表 torrents = qbt_client.torrents.info() # 获取全局统计信息 transfer_info = qbt_client.transfer_info() # 为每个种子添加客户端信息 client_torrents = [] for torrent in torrents: # TorrentDictionary 本身就是字典,直接转换 torrent_dict = dict(torrent) torrent_dict['client_id'] = client['id'] torrent_dict['client_name'] = client['name'] client_torrents.append(torrent_dict) return { 'success': True, 'client_id': client['id'], 'client_name': client['name'], 'torrents': client_torrents, 'stats': { 'download_speed': transfer_info.dl_info_speed, 'upload_speed': transfer_info.up_info_speed, 'total_torrents': len(torrents), 'downloading': len([t for t in torrents if t.state in ['downloading', 'stalledDL', 'metaDL']]), 'seeding': len([t for t in torrents if t.state in ['uploading', 'stalledUP']]), 'paused': len([t for t in torrents if 'paused' in t.state.lower()]) } } except Exception as e: return { 'success': False, 'client_id': client['id'], 'client_name': client['name'], 'error': str(e), 'torrents': [], 'stats': { 'download_speed': 0, 'upload_speed': 0, 'total_torrents': 0, 'downloading': 0, 'seeding': 0, 'paused': 0 } } # 并发获取所有客户端数据 all_torrents = [] global_stats = { 'download_speed': 0, 'upload_speed': 0, 'total_torrents': 0, 'active_torrents': 0, 'downloading': 0, 'seeding': 0, 'paused': 0 } clients_status = [] with ThreadPoolExecutor(max_workers=10) as executor: future_to_client = {executor.submit(fetch_client_data, client): client for client in clients} for future in as_completed(future_to_client): result = future.result() # 收集种子数据 all_torrents.extend(result['torrents']) # 聚合统计数据 stats = result['stats'] global_stats['download_speed'] += stats['download_speed'] global_stats['upload_speed'] += stats['upload_speed'] global_stats['total_torrents'] += stats['total_torrents'] global_stats['downloading'] += stats['downloading'] global_stats['seeding'] += stats['seeding'] global_stats['paused'] += stats['paused'] # 记录客户端状态 clients_status.append({ 'client_id': result['client_id'], 'client_name': result['client_name'], 'connected': result['success'], 'error': result.get('error'), 'stats': stats }) # 计算活跃种子数 global_stats['active_torrents'] = global_stats['downloading'] + global_stats['seeding'] return { 'torrents': all_torrents, 'global_stats': global_stats, 'clients_status': clients_status } def pause_torrents(self, torrent_hashes: List[str], client_id: Optional[str] = None) -> Dict[str, Any]: """暂停种子""" return self._execute_torrent_action('pause', torrent_hashes, client_id) def resume_torrents(self, torrent_hashes: List[str], client_id: Optional[str] = None) -> Dict[str, Any]: """恢复种子""" return self._execute_torrent_action('resume', torrent_hashes, client_id) def delete_torrents(self, torrent_hashes: List[str], delete_files: bool = False, client_id: Optional[str] = None) -> Dict[str, Any]: """删除种子""" return self._execute_torrent_action('delete', torrent_hashes, client_id, delete_files=delete_files) def _execute_torrent_action(self, action: str, torrent_hashes: List[str], client_id: Optional[str] = None, **kwargs) -> Dict[str, Any]: """执行种子操作""" if client_id: # 对指定客户端执行操作 client = self.client_service.get_client_by_id(client_id) if not client: return {'success': False, 'error': '客户端不存在'} return self._execute_action_on_client(client, action, torrent_hashes, **kwargs) else: # 对所有客户端执行操作 clients = self.client_service.get_enabled_clients() results = [] with ThreadPoolExecutor(max_workers=10) as executor: future_to_client = { executor.submit(self._execute_action_on_client, client, action, torrent_hashes, **kwargs): client for client in clients } for future in as_completed(future_to_client): client = future_to_client[future] try: result = future.result() result['client_id'] = client['id'] result['client_name'] = client['name'] results.append(result) except Exception as e: results.append({ 'success': False, 'client_id': client['id'], 'client_name': client['name'], 'error': str(e) }) # 统计成功和失败的结果 successful = [r for r in results if r['success']] failed = [r for r in results if not r['success']] return { 'success': len(successful) > 0, 'results': results, 'summary': { 'total_clients': len(results), 'successful_clients': len(successful), 'failed_clients': len(failed) } } def _execute_action_on_client(self, client: Dict[str, Any], action: str, torrent_hashes: List[str], **kwargs) -> Dict[str, Any]: """在指定客户端上执行操作""" try: qbt_client = self.client_service._create_qbt_client(client) if action == 'pause': qbt_client.torrents.pause(torrent_hashes=torrent_hashes) elif action == 'resume': qbt_client.torrents.resume(torrent_hashes=torrent_hashes) elif action == 'delete': delete_files = kwargs.get('delete_files', False) qbt_client.torrents.delete(torrent_hashes=torrent_hashes, delete_files=delete_files) else: return {'success': False, 'error': f'不支持的操作: {action}'} return {'success': True, 'message': f'操作 {action} 执行成功'} except Exception as e: return {'success': False, 'error': str(e)} def get_torrent_details(self, torrent_hash: str, client_id: str) -> Optional[Dict[str, Any]]: """获取种子详细信息""" client = self.client_service.get_client_by_id(client_id) if not client: return None try: qbt_client = self.client_service._create_qbt_client(client) # 获取种子基本信息 torrents = qbt_client.torrents.info(torrent_hashes=torrent_hash) if not torrents: return None torrent = torrents[0] torrent_dict = dict(torrent) # 获取种子文件列表 try: files = qbt_client.torrents.files(torrent_hash=torrent_hash) torrent_dict['files'] = [dict(file) for file in files] except: torrent_dict['files'] = [] # 获取 Tracker 信息 try: trackers = qbt_client.torrents.trackers(torrent_hash=torrent_hash) torrent_dict['trackers'] = [dict(tracker) for tracker in trackers] except: torrent_dict['trackers'] = [] # 获取 Peers 信息 try: peers = qbt_client.torrents.peers(torrent_hash=torrent_hash) torrent_dict['peers'] = [dict(peer) for peer in peers.values()] except: torrent_dict['peers'] = [] # 添加客户端信息 torrent_dict['client_id'] = client['id'] torrent_dict['client_name'] = client['name'] return torrent_dict except Exception as e: return None def add_torrent_file(self, client_id: str, torrent_file_path: str, options: dict) -> dict: """添加种子文件""" client = self.client_service.get_client_by_id(client_id) if not client: return {'success': False, 'error': '客户端不存在'} try: qbt_client = self.client_service._create_qbt_client(client) # 准备添加选项 add_options = {} if options.get('category'): add_options['category'] = options['category'] if options.get('tags'): add_options['tags'] = options['tags'] if options.get('save_path'): add_options['savepath'] = options['save_path'] if options.get('paused'): add_options['paused'] = 'true' # 添加种子文件 with open(torrent_file_path, 'rb') as torrent_file: result = qbt_client.torrents.add(torrent_files=torrent_file, **add_options) return { 'success': True, 'message': '种子文件添加成功', 'result': result } except Exception as e: return {'success': False, 'error': str(e)} def add_magnet_link(self, client_id: str, magnet_link: str, options: dict) -> dict: """添加磁力链接""" client = self.client_service.get_client_by_id(client_id) if not client: return {'success': False, 'error': '客户端不存在'} try: qbt_client = self.client_service._create_qbt_client(client) # 准备添加选项 add_options = {} if options.get('category'): add_options['category'] = options['category'] if options.get('tags'): add_options['tags'] = options['tags'] if options.get('save_path'): add_options['savepath'] = options['save_path'] if options.get('paused'): add_options['paused'] = 'true' # 添加磁力链接 result = qbt_client.torrents.add(urls=magnet_link, **add_options) return { 'success': True, 'message': '磁力链接添加成功', 'result': result } except Exception as e: return {'success': False, 'error': str(e)} def add_torrent_url(self, client_id: str, torrent_url: str, options: dict) -> dict: """添加种子URL""" client = self.client_service.get_client_by_id(client_id) if not client: return {'success': False, 'error': '客户端不存在'} try: qbt_client = self.client_service._create_qbt_client(client) # 准备添加选项 add_options = {} if options.get('category'): add_options['category'] = options['category'] if options.get('tags'): add_options['tags'] = options['tags'] if options.get('save_path'): add_options['savepath'] = options['save_path'] if options.get('paused'): add_options['paused'] = 'true' # 添加种子URL result = qbt_client.torrents.add(urls=torrent_url, **add_options) return { 'success': True, 'message': '种子URL添加成功', 'result': result } except Exception as e: return {'success': False, 'error': str(e)}