Files
PDB-Downloader/pdb_downloader.py
2025-08-07 20:28:54 +08:00

411 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import os
import aiohttp
import asyncio
from bs4 import BeautifulSoup
from tqdm import tqdm
import time
import logging
from urllib.parse import urljoin
import functools # 用于 run_in_executor
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("pdb_downloader.log"),
]
)
class PDBDownloader:
def __init__(self, html_file_path, output_dir, max_connections=50, retries=3, timeout=300, chunk_size=8192, max_files_per_dir=20000):
"""
初始化PDB下载器
:param html_file_path: 本地HTML索引文件的路径
:param output_dir: 下载文件保存的根目录
:param max_connections: 最大并发连接数
:param retries: 单个文件下载失败后的重试次数
:param timeout: 单个HTTP请求的超时时间
:param chunk_size: 下载时每次读取的字节数
:param max_files_per_dir: 每个子目录最多存放的文件数 (EXFAT 限制)
"""
self.html_file_path = html_file_path
self.output_dir = output_dir
self.max_connections = max_connections
self.retries = retries
self.timeout = aiohttp.ClientTimeout(total=timeout, sock_read=timeout/2)
self.chunk_size = chunk_size
self.max_files_per_dir = max_files_per_dir
self.semaphore = asyncio.Semaphore(max_connections)
# 存储相对于 output_dir 的路径,如 "batch_00001/pdbxxxx.ent.gz"
self.downloaded_files = set()
self.failed_files = {}
self.total_files_to_download = 0
self.successfully_downloaded_count = 0
self.download_queue = None
self._lock = None
os.makedirs(output_dir, exist_ok=True)
self._load_existing_files()
def _get_target_dir_and_path(self, filename):
"""根据文件名和当前文件计数,确定目标目录和完整路径"""
counter = 0
while True:
batch_dir_name = f"batch_{counter:05d}"
batch_dir_path = os.path.join(self.output_dir, batch_dir_name)
if not os.path.exists(batch_dir_path):
os.makedirs(batch_dir_path, exist_ok=True)
logging.debug(f"Created new batch directory on demand: {batch_dir_path}")
target_dir = batch_dir_path
break
else:
try:
existing_files_in_batch = [f for f in os.listdir(batch_dir_path) if os.path.isfile(os.path.join(batch_dir_path, f)) and not f.endswith('.tmp')]
if len(existing_files_in_batch) < self.max_files_per_dir:
target_dir = batch_dir_path
break
except OSError as e:
logging.error(f"Error checking batch directory {batch_dir_path}: {e}")
counter += 1
if counter > 100000:
logging.critical("Too many batch directories checked. Aborting.")
raise OSError("Could not find or create a batch directory with space.")
filepath = os.path.join(target_dir, filename)
# 计算相对于 output_dir 的路径,用于记录
relative_path = os.path.relpath(filepath, self.output_dir)
return target_dir, filepath, relative_path
def _load_existing_files(self):
"""递归加载输出目录及其子目录中已存在的文件列表"""
self.downloaded_files.clear()
if os.path.exists(self.output_dir):
for root, dirs, files in os.walk(self.output_dir):
for file in files:
if not file.endswith('.tmp'):
full_path = os.path.join(root, file)
relative_path = os.path.relpath(full_path, self.output_dir)
self.downloaded_files.add(relative_path)
logging.info(f"Found {len(self.downloaded_files)} existing files in output directory (including subdirs)")
async def parse_html_producer(self):
"""解析本地HTML文件并提取下载链接 (生产者) - 异步优化版"""
logging.info(f"Starting to parse local HTML file: {self.html_file_path}")
pattern = re.compile(r'pdb[a-z0-9]{3,4}\.ent\.gz')
base_url = "https://files.pdbj.org/pub/pdb/data/structures/all/pdb/"
if not os.path.isfile(self.html_file_path):
logging.error(f"HTML file not found: {self.html_file_path}")
for _ in range(self.max_connections):
await self.download_queue.put((None, None))
return
loop = asyncio.get_event_loop()
try:
logging.info("Reading HTML file content asynchronously...")
def _read_file():
with open(self.html_file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
html_content = await loop.run_in_executor(None, _read_file)
logging.info("HTML file content read successfully.")
logging.info("Parsing HTML content asynchronously with BeautifulSoup...")
soup = await loop.run_in_executor(
None,
functools.partial(BeautifulSoup, html_content, 'lxml')
)
logging.info("HTML content parsed successfully.")
except Exception as e:
logging.error(f"Error reading or parsing local HTML file: {e}")
for _ in range(self.max_connections):
await self.download_queue.put((None, None))
return
logging.info("Extracting links from parsed HTML...")
links_found = 0
links_queued = 0
for link in soup.find_all('a', href=True):
href = link['href']
if pattern.search(href):
filename = href.split('/')[-1]
links_found += 1
# 跳过检查:基于已加载的相对路径文件名
if any(os.path.basename(rel_path) == filename for rel_path in self.downloaded_files):
logging.debug(f"Skipping {filename} as it exists in downloaded_files set.")
continue
full_url = urljoin(base_url, href)
await self.download_queue.put((full_url, filename))
links_queued += 1
self.total_files_to_download = links_queued
logging.info(f"Producer finished. Parsed {links_found} matching links, {links_queued} need downloading (not skipped).")
# 发送结束信号给所有消费者
for _ in range(self.max_connections):
await self.download_queue.put((None, None))
async def download_file_consumer(self, session, progress_bar):
"""从队列中获取链接并下载文件 (消费者)"""
while True:
url_filename_tuple = await self.download_queue.get()
url, filename = url_filename_tuple
# --- 关键:为整个队列项目处理逻辑包裹 try...finally ---
# 确保无论内部循环如何退出task_done 都只调用一次
try:
if url is None:
# 处理 None 信号 - 直接视为成功处理
logging.debug("Consumer received shutdown signal (None).")
return # 消费者协程正常退出
# 为这个队列项目初始化变量
success = False
target_dir, filepath, relative_path = self._get_target_dir_and_path(filename)
temp_filepath = filepath + '.tmp'
# --- 精确检查文件是否存在 ---
if relative_path in self.downloaded_files:
logging.debug(f"File {relative_path} already exists (precise check), skipping.")
async with self._lock:
self.successfully_downloaded_count += 1
success = True
else:
# --- 重试循环 ---
for attempt in range(self.retries):
# 在每次尝试前重新计算路径(防御性编程)
target_dir, filepath, relative_path = self._get_target_dir_and_path(filename)
temp_filepath = filepath + '.tmp'
try:
# 再次检查,以防在等待重试时文件被其他进程/协程创建
if relative_path in self.downloaded_files:
logging.debug(f"[Attempt {attempt+1}] File {relative_path} already exists, skipping.")
async with self._lock:
self.successfully_downloaded_count += 1
success = True
break # 成功处理,跳出重试循环
resume_header = {}
start_byte = 0
if os.path.exists(temp_filepath):
try:
start_byte = os.path.getsize(temp_filepath)
resume_header = {'Range': f'bytes={start_byte}-'}
logging.debug(f"[Attempt {attempt+1}] Resuming {filename} from byte {start_byte}")
except OSError as e:
logging.warning(f"[Attempt {attempt+1}] Could not get size of temp file {temp_filepath}: {e}. Starting fresh.")
start_byte = 0
resume_header = {}
logging.debug(f"[Attempt {attempt+1}] Acquiring semaphore for {filename}...")
async with self.semaphore:
logging.debug(f"[Attempt {attempt+1}] Semaphore acquired for {filename}. Making HTTP request...")
try:
async with session.get(url, headers=resume_header, timeout=self.timeout) as response:
logging.debug(f"[Attempt {attempt+1}] Received response headers for {filename} (Status: {response.status}). Starting to read body...")
if response.status == 416:
if os.path.exists(temp_filepath):
logging.info(f"[Attempt {attempt+1}] HTTP 416 for {filename}, temp file exists, assuming complete.")
os.rename(temp_filepath, filepath)
async with self._lock:
self.downloaded_files.add(relative_path)
self.successfully_downloaded_count += 1
success = True
break # 成功处理,跳出重试循环
else:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=f"HTTP 416 Range Not Satisfiable and no temp file for {filename}"
)
elif response.status not in (200, 206):
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=f"HTTP error {response.status} for {filename}"
)
mode = 'ab' if start_byte > 0 else 'wb'
logging.debug(f"[Attempt {attempt+1}] Opening file {temp_filepath} in mode '{mode}' for {filename}")
with open(temp_filepath, mode) as f:
async for chunk in response.content.iter_chunked(self.chunk_size):
if chunk:
f.write(chunk)
f.flush()
os.fsync(f.fileno())
progress_bar.update(len(chunk))
logging.debug(f"[Attempt {attempt+1}] Finished reading response body for {filename}.")
final_temp_size = os.path.getsize(temp_filepath)
total_size_str = response.headers.get('content-length', 'unknown')
try:
total_size = int(total_size_str) if total_size_str != 'unknown' else None
except ValueError:
total_size = None
final_temp_size_kb = final_temp_size / 1024
if total_size and final_temp_size >= total_size:
os.rename(temp_filepath, filepath)
async with self._lock:
self.downloaded_files.add(relative_path)
self.successfully_downloaded_count += 1
success = True
logging.info(f"[Attempt {attempt+1}] Successfully downloaded/resumed {filename} ({final_temp_size_kb:.2f} KB) to {relative_path}")
break # 成功处理,跳出重试循环
elif not total_size:
os.rename(temp_filepath, filepath)
async with self._lock:
self.downloaded_files.add(relative_path)
self.successfully_downloaded_count += 1
success = True
logging.info(f"[Attempt {attempt+1}] Downloaded {filename} (size unknown, assumed complete) to {relative_path}")
break # 成功处理,跳出重试循环
else:
expected_kb = total_size / 1024 if total_size else 0
got_kb = final_temp_size / 1024
error_msg = f"Incomplete download for {filename}. Expected >= {expected_kb:.2f} KB, got {got_kb:.2f} KB"
logging.warning(f"[Attempt {attempt+1}] {error_msg}")
except asyncio.TimeoutError:
logging.warning(f"[Attempt {attempt+1}] TIMEOUT occurred for {filename} during HTTP request or response reading.")
except aiohttp.ClientError as e:
logging.warning(f"[Attempt {attempt+1}] Client error for {filename}: {e}")
except Exception as e:
logging.error(f"[Attempt {attempt+1}] Unexpected error for {filename}: {e}", exc_info=True)
# --- 单次尝试的 finally 块 ---
finally:
pass # 单次尝试的清理(如果需要)可以放在这里
# --- 重试逻辑 ---
if not success and attempt < self.retries - 1:
wait_time = 2 ** attempt
logging.debug(f"[Attempt {attempt+1}] Waiting {wait_time}s before retrying {filename}...")
await asyncio.sleep(wait_time)
# --- 单个文件所有重试都结束后 ---
if not success:
error_msg = f"Failed after {self.retries} attempts"
self.failed_files[relative_path] = error_msg
logging.error(f"Final failure for {filename} (intended path: {relative_path}): {error_msg}")
if os.path.exists(temp_filepath):
try:
os.remove(temp_filepath)
logging.debug(f"Cleaned up temporary file for failed download: {temp_filepath}")
except OSError as e:
logging.warning(f"Could not remove temporary file {temp_filepath}: {e}")
# else: success 已在上面处理
# --- 处理单个队列项目的 finally 块 ---
# *** 关键:无论内部如何退出,都只在这里调用一次 task_done ***
finally:
# *** 确保 task_done 只为当前 get() 调用一次 ***
try:
self.download_queue.task_done()
logging.debug(f"Task done for item related to {filename if 'filename' in locals() else 'unknown/shutdown'}.")
except ValueError as e:
# 防御性:捕获重复调用的错误
logging.critical(f"CRITICAL: task_done() error for {filename if 'filename' in locals() else 'unknown/shutdown'}: {e}")
except Exception as e:
# 捕获其他可能的异常
logging.error(f"Unexpected error in task_done() for {filename if 'filename' in locals() else 'unknown/shutdown'}: {e}")
async def run(self):
logging.info("Starting PDB Downloader...")
logging.info(f"Output directory: {self.output_dir}")
logging.info(f"Max files per directory: {self.max_files_per_dir}")
start_time = time.time()
self.download_queue = asyncio.Queue(maxsize=self.max_connections * 2)
self._lock = asyncio.Lock()
progress_bar = tqdm(
total=0,
unit='B',
unit_scale=True,
desc="Downloading PDB files"
)
async with aiohttp.ClientSession(timeout=self.timeout) as session:
producer_task = asyncio.create_task(self.parse_html_producer())
consumer_tasks = [
asyncio.create_task(self.download_file_consumer(session, progress_bar))
for _ in range(self.max_connections)
]
await producer_task
logging.info("Producer finished.")
logging.info("Waiting for all download tasks in queue to complete...")
await self.download_queue.join()
logging.info("All download tasks completed (queue.join finished).")
logging.info("Cancelling consumer tasks...")
for task in consumer_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*consumer_tasks, return_exceptions=True)
logging.info("Consumer tasks cancelled and cleaned up.")
progress_bar.close()
elapsed_time = time.time() - start_time
failed_count = len(self.failed_files)
success_count_report = self.successfully_downloaded_count
logging.info("="*50)
logging.info("Download Session Summary:")
logging.info(f" - Files identified for download (not skipped): {self.total_files_to_download}")
logging.info(f" - Successfully downloaded this session: {success_count_report}")
logging.info(f" - Failed downloads: {failed_count}")
logging.info(f" - Total time taken: {elapsed_time:.2f} seconds")
logging.info("="*50)
if self.failed_files:
logging.error("List of failed downloads (relative paths):")
for relative_path, error in self.failed_files.items():
logging.error(f" - {relative_path}: {error}")
else:
logging.info("No files failed during this session.")
return success_count_report, failed_count
if __name__ == "__main__":
HTML_FILE_PATH = "/pdb_index.html"
OUTPUT_DIR = "./pdb_files"
MAX_CONNECTIONS = 50
RETRIES = 3
TIMEOUT = 300
MAX_FILES_PER_DIR = 20000 # EXFAT 限制
downloader = PDBDownloader(
html_file_path=HTML_FILE_PATH,
output_dir=OUTPUT_DIR,
max_connections=MAX_CONNECTIONS,
retries=RETRIES,
timeout=TIMEOUT,
max_files_per_dir=MAX_FILES_PER_DIR
)
try:
success_count, failed_count = asyncio.run(downloader.run())
print(f"\nDownload finished. Summary: {success_count} succeeded, {failed_count} failed.")
except KeyboardInterrupt:
logging.info("\nDownload process interrupted by user.")
except Exception as e:
logging.critical(f"An unexpected error occurred: {e}", exc_info=True)