添加 pdb_downloader.py

This commit is contained in:
2025-08-07 20:28:54 +08:00
commit 7160a5e4b5

411
pdb_downloader.py Normal file
View File

@ -0,0 +1,411 @@
import re
import os
import aiohttp
import asyncio
from bs4 import BeautifulSoup
from tqdm import tqdm
import time
import logging
from urllib.parse import urljoin
import functools # 用于 run_in_executor
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("pdb_downloader.log"),
]
)
class PDBDownloader:
def __init__(self, html_file_path, output_dir, max_connections=50, retries=3, timeout=300, chunk_size=8192, max_files_per_dir=20000):
"""
初始化PDB下载器
:param html_file_path: 本地HTML索引文件的路径
:param output_dir: 下载文件保存的根目录
:param max_connections: 最大并发连接数
:param retries: 单个文件下载失败后的重试次数
:param timeout: 单个HTTP请求的超时时间
:param chunk_size: 下载时每次读取的字节数
:param max_files_per_dir: 每个子目录最多存放的文件数 (EXFAT 限制)
"""
self.html_file_path = html_file_path
self.output_dir = output_dir
self.max_connections = max_connections
self.retries = retries
self.timeout = aiohttp.ClientTimeout(total=timeout, sock_read=timeout/2)
self.chunk_size = chunk_size
self.max_files_per_dir = max_files_per_dir
self.semaphore = asyncio.Semaphore(max_connections)
# 存储相对于 output_dir 的路径,如 "batch_00001/pdbxxxx.ent.gz"
self.downloaded_files = set()
self.failed_files = {}
self.total_files_to_download = 0
self.successfully_downloaded_count = 0
self.download_queue = None
self._lock = None
os.makedirs(output_dir, exist_ok=True)
self._load_existing_files()
def _get_target_dir_and_path(self, filename):
"""根据文件名和当前文件计数,确定目标目录和完整路径"""
counter = 0
while True:
batch_dir_name = f"batch_{counter:05d}"
batch_dir_path = os.path.join(self.output_dir, batch_dir_name)
if not os.path.exists(batch_dir_path):
os.makedirs(batch_dir_path, exist_ok=True)
logging.debug(f"Created new batch directory on demand: {batch_dir_path}")
target_dir = batch_dir_path
break
else:
try:
existing_files_in_batch = [f for f in os.listdir(batch_dir_path) if os.path.isfile(os.path.join(batch_dir_path, f)) and not f.endswith('.tmp')]
if len(existing_files_in_batch) < self.max_files_per_dir:
target_dir = batch_dir_path
break
except OSError as e:
logging.error(f"Error checking batch directory {batch_dir_path}: {e}")
counter += 1
if counter > 100000:
logging.critical("Too many batch directories checked. Aborting.")
raise OSError("Could not find or create a batch directory with space.")
filepath = os.path.join(target_dir, filename)
# 计算相对于 output_dir 的路径,用于记录
relative_path = os.path.relpath(filepath, self.output_dir)
return target_dir, filepath, relative_path
def _load_existing_files(self):
"""递归加载输出目录及其子目录中已存在的文件列表"""
self.downloaded_files.clear()
if os.path.exists(self.output_dir):
for root, dirs, files in os.walk(self.output_dir):
for file in files:
if not file.endswith('.tmp'):
full_path = os.path.join(root, file)
relative_path = os.path.relpath(full_path, self.output_dir)
self.downloaded_files.add(relative_path)
logging.info(f"Found {len(self.downloaded_files)} existing files in output directory (including subdirs)")
async def parse_html_producer(self):
"""解析本地HTML文件并提取下载链接 (生产者) - 异步优化版"""
logging.info(f"Starting to parse local HTML file: {self.html_file_path}")
pattern = re.compile(r'pdb[a-z0-9]{3,4}\.ent\.gz')
base_url = "https://files.pdbj.org/pub/pdb/data/structures/all/pdb/"
if not os.path.isfile(self.html_file_path):
logging.error(f"HTML file not found: {self.html_file_path}")
for _ in range(self.max_connections):
await self.download_queue.put((None, None))
return
loop = asyncio.get_event_loop()
try:
logging.info("Reading HTML file content asynchronously...")
def _read_file():
with open(self.html_file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
html_content = await loop.run_in_executor(None, _read_file)
logging.info("HTML file content read successfully.")
logging.info("Parsing HTML content asynchronously with BeautifulSoup...")
soup = await loop.run_in_executor(
None,
functools.partial(BeautifulSoup, html_content, 'lxml')
)
logging.info("HTML content parsed successfully.")
except Exception as e:
logging.error(f"Error reading or parsing local HTML file: {e}")
for _ in range(self.max_connections):
await self.download_queue.put((None, None))
return
logging.info("Extracting links from parsed HTML...")
links_found = 0
links_queued = 0
for link in soup.find_all('a', href=True):
href = link['href']
if pattern.search(href):
filename = href.split('/')[-1]
links_found += 1
# 跳过检查:基于已加载的相对路径文件名
if any(os.path.basename(rel_path) == filename for rel_path in self.downloaded_files):
logging.debug(f"Skipping {filename} as it exists in downloaded_files set.")
continue
full_url = urljoin(base_url, href)
await self.download_queue.put((full_url, filename))
links_queued += 1
self.total_files_to_download = links_queued
logging.info(f"Producer finished. Parsed {links_found} matching links, {links_queued} need downloading (not skipped).")
# 发送结束信号给所有消费者
for _ in range(self.max_connections):
await self.download_queue.put((None, None))
async def download_file_consumer(self, session, progress_bar):
"""从队列中获取链接并下载文件 (消费者)"""
while True:
url_filename_tuple = await self.download_queue.get()
url, filename = url_filename_tuple
# --- 关键:为整个队列项目处理逻辑包裹 try...finally ---
# 确保无论内部循环如何退出task_done 都只调用一次
try:
if url is None:
# 处理 None 信号 - 直接视为成功处理
logging.debug("Consumer received shutdown signal (None).")
return # 消费者协程正常退出
# 为这个队列项目初始化变量
success = False
target_dir, filepath, relative_path = self._get_target_dir_and_path(filename)
temp_filepath = filepath + '.tmp'
# --- 精确检查文件是否存在 ---
if relative_path in self.downloaded_files:
logging.debug(f"File {relative_path} already exists (precise check), skipping.")
async with self._lock:
self.successfully_downloaded_count += 1
success = True
else:
# --- 重试循环 ---
for attempt in range(self.retries):
# 在每次尝试前重新计算路径(防御性编程)
target_dir, filepath, relative_path = self._get_target_dir_and_path(filename)
temp_filepath = filepath + '.tmp'
try:
# 再次检查,以防在等待重试时文件被其他进程/协程创建
if relative_path in self.downloaded_files:
logging.debug(f"[Attempt {attempt+1}] File {relative_path} already exists, skipping.")
async with self._lock:
self.successfully_downloaded_count += 1
success = True
break # 成功处理,跳出重试循环
resume_header = {}
start_byte = 0
if os.path.exists(temp_filepath):
try:
start_byte = os.path.getsize(temp_filepath)
resume_header = {'Range': f'bytes={start_byte}-'}
logging.debug(f"[Attempt {attempt+1}] Resuming {filename} from byte {start_byte}")
except OSError as e:
logging.warning(f"[Attempt {attempt+1}] Could not get size of temp file {temp_filepath}: {e}. Starting fresh.")
start_byte = 0
resume_header = {}
logging.debug(f"[Attempt {attempt+1}] Acquiring semaphore for {filename}...")
async with self.semaphore:
logging.debug(f"[Attempt {attempt+1}] Semaphore acquired for {filename}. Making HTTP request...")
try:
async with session.get(url, headers=resume_header, timeout=self.timeout) as response:
logging.debug(f"[Attempt {attempt+1}] Received response headers for {filename} (Status: {response.status}). Starting to read body...")
if response.status == 416:
if os.path.exists(temp_filepath):
logging.info(f"[Attempt {attempt+1}] HTTP 416 for {filename}, temp file exists, assuming complete.")
os.rename(temp_filepath, filepath)
async with self._lock:
self.downloaded_files.add(relative_path)
self.successfully_downloaded_count += 1
success = True
break # 成功处理,跳出重试循环
else:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=f"HTTP 416 Range Not Satisfiable and no temp file for {filename}"
)
elif response.status not in (200, 206):
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=f"HTTP error {response.status} for {filename}"
)
mode = 'ab' if start_byte > 0 else 'wb'
logging.debug(f"[Attempt {attempt+1}] Opening file {temp_filepath} in mode '{mode}' for {filename}")
with open(temp_filepath, mode) as f:
async for chunk in response.content.iter_chunked(self.chunk_size):
if chunk:
f.write(chunk)
f.flush()
os.fsync(f.fileno())
progress_bar.update(len(chunk))
logging.debug(f"[Attempt {attempt+1}] Finished reading response body for {filename}.")
final_temp_size = os.path.getsize(temp_filepath)
total_size_str = response.headers.get('content-length', 'unknown')
try:
total_size = int(total_size_str) if total_size_str != 'unknown' else None
except ValueError:
total_size = None
final_temp_size_kb = final_temp_size / 1024
if total_size and final_temp_size >= total_size:
os.rename(temp_filepath, filepath)
async with self._lock:
self.downloaded_files.add(relative_path)
self.successfully_downloaded_count += 1
success = True
logging.info(f"[Attempt {attempt+1}] Successfully downloaded/resumed {filename} ({final_temp_size_kb:.2f} KB) to {relative_path}")
break # 成功处理,跳出重试循环
elif not total_size:
os.rename(temp_filepath, filepath)
async with self._lock:
self.downloaded_files.add(relative_path)
self.successfully_downloaded_count += 1
success = True
logging.info(f"[Attempt {attempt+1}] Downloaded {filename} (size unknown, assumed complete) to {relative_path}")
break # 成功处理,跳出重试循环
else:
expected_kb = total_size / 1024 if total_size else 0
got_kb = final_temp_size / 1024
error_msg = f"Incomplete download for {filename}. Expected >= {expected_kb:.2f} KB, got {got_kb:.2f} KB"
logging.warning(f"[Attempt {attempt+1}] {error_msg}")
except asyncio.TimeoutError:
logging.warning(f"[Attempt {attempt+1}] TIMEOUT occurred for {filename} during HTTP request or response reading.")
except aiohttp.ClientError as e:
logging.warning(f"[Attempt {attempt+1}] Client error for {filename}: {e}")
except Exception as e:
logging.error(f"[Attempt {attempt+1}] Unexpected error for {filename}: {e}", exc_info=True)
# --- 单次尝试的 finally 块 ---
finally:
pass # 单次尝试的清理(如果需要)可以放在这里
# --- 重试逻辑 ---
if not success and attempt < self.retries - 1:
wait_time = 2 ** attempt
logging.debug(f"[Attempt {attempt+1}] Waiting {wait_time}s before retrying {filename}...")
await asyncio.sleep(wait_time)
# --- 单个文件所有重试都结束后 ---
if not success:
error_msg = f"Failed after {self.retries} attempts"
self.failed_files[relative_path] = error_msg
logging.error(f"Final failure for {filename} (intended path: {relative_path}): {error_msg}")
if os.path.exists(temp_filepath):
try:
os.remove(temp_filepath)
logging.debug(f"Cleaned up temporary file for failed download: {temp_filepath}")
except OSError as e:
logging.warning(f"Could not remove temporary file {temp_filepath}: {e}")
# else: success 已在上面处理
# --- 处理单个队列项目的 finally 块 ---
# *** 关键:无论内部如何退出,都只在这里调用一次 task_done ***
finally:
# *** 确保 task_done 只为当前 get() 调用一次 ***
try:
self.download_queue.task_done()
logging.debug(f"Task done for item related to {filename if 'filename' in locals() else 'unknown/shutdown'}.")
except ValueError as e:
# 防御性:捕获重复调用的错误
logging.critical(f"CRITICAL: task_done() error for {filename if 'filename' in locals() else 'unknown/shutdown'}: {e}")
except Exception as e:
# 捕获其他可能的异常
logging.error(f"Unexpected error in task_done() for {filename if 'filename' in locals() else 'unknown/shutdown'}: {e}")
async def run(self):
logging.info("Starting PDB Downloader...")
logging.info(f"Output directory: {self.output_dir}")
logging.info(f"Max files per directory: {self.max_files_per_dir}")
start_time = time.time()
self.download_queue = asyncio.Queue(maxsize=self.max_connections * 2)
self._lock = asyncio.Lock()
progress_bar = tqdm(
total=0,
unit='B',
unit_scale=True,
desc="Downloading PDB files"
)
async with aiohttp.ClientSession(timeout=self.timeout) as session:
producer_task = asyncio.create_task(self.parse_html_producer())
consumer_tasks = [
asyncio.create_task(self.download_file_consumer(session, progress_bar))
for _ in range(self.max_connections)
]
await producer_task
logging.info("Producer finished.")
logging.info("Waiting for all download tasks in queue to complete...")
await self.download_queue.join()
logging.info("All download tasks completed (queue.join finished).")
logging.info("Cancelling consumer tasks...")
for task in consumer_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*consumer_tasks, return_exceptions=True)
logging.info("Consumer tasks cancelled and cleaned up.")
progress_bar.close()
elapsed_time = time.time() - start_time
failed_count = len(self.failed_files)
success_count_report = self.successfully_downloaded_count
logging.info("="*50)
logging.info("Download Session Summary:")
logging.info(f" - Files identified for download (not skipped): {self.total_files_to_download}")
logging.info(f" - Successfully downloaded this session: {success_count_report}")
logging.info(f" - Failed downloads: {failed_count}")
logging.info(f" - Total time taken: {elapsed_time:.2f} seconds")
logging.info("="*50)
if self.failed_files:
logging.error("List of failed downloads (relative paths):")
for relative_path, error in self.failed_files.items():
logging.error(f" - {relative_path}: {error}")
else:
logging.info("No files failed during this session.")
return success_count_report, failed_count
if __name__ == "__main__":
HTML_FILE_PATH = "/pdb_index.html"
OUTPUT_DIR = "./pdb_files"
MAX_CONNECTIONS = 50
RETRIES = 3
TIMEOUT = 300
MAX_FILES_PER_DIR = 20000 # EXFAT 限制
downloader = PDBDownloader(
html_file_path=HTML_FILE_PATH,
output_dir=OUTPUT_DIR,
max_connections=MAX_CONNECTIONS,
retries=RETRIES,
timeout=TIMEOUT,
max_files_per_dir=MAX_FILES_PER_DIR
)
try:
success_count, failed_count = asyncio.run(downloader.run())
print(f"\nDownload finished. Summary: {success_count} succeeded, {failed_count} failed.")
except KeyboardInterrupt:
logging.info("\nDownload process interrupted by user.")
except Exception as e:
logging.critical(f"An unexpected error occurred: {e}", exc_info=True)