411 lines
21 KiB
Python
411 lines
21 KiB
Python
import re
|
||
import os
|
||
import aiohttp
|
||
import asyncio
|
||
from bs4 import BeautifulSoup
|
||
from tqdm import tqdm
|
||
import time
|
||
import logging
|
||
from urllib.parse import urljoin
|
||
import functools # 用于 run_in_executor
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler("pdb_downloader.log"),
|
||
]
|
||
)
|
||
|
||
class PDBDownloader:
|
||
def __init__(self, html_file_path, output_dir, max_connections=50, retries=3, timeout=300, chunk_size=8192, max_files_per_dir=20000):
|
||
"""
|
||
初始化PDB下载器
|
||
|
||
:param html_file_path: 本地HTML索引文件的路径
|
||
:param output_dir: 下载文件保存的根目录
|
||
:param max_connections: 最大并发连接数
|
||
:param retries: 单个文件下载失败后的重试次数
|
||
:param timeout: 单个HTTP请求的超时时间(秒)
|
||
:param chunk_size: 下载时每次读取的字节数
|
||
:param max_files_per_dir: 每个子目录最多存放的文件数 (EXFAT 限制)
|
||
"""
|
||
self.html_file_path = html_file_path
|
||
self.output_dir = output_dir
|
||
self.max_connections = max_connections
|
||
self.retries = retries
|
||
self.timeout = aiohttp.ClientTimeout(total=timeout, sock_read=timeout/2)
|
||
self.chunk_size = chunk_size
|
||
self.max_files_per_dir = max_files_per_dir
|
||
self.semaphore = asyncio.Semaphore(max_connections)
|
||
# 存储相对于 output_dir 的路径,如 "batch_00001/pdbxxxx.ent.gz"
|
||
self.downloaded_files = set()
|
||
self.failed_files = {}
|
||
self.total_files_to_download = 0
|
||
self.successfully_downloaded_count = 0
|
||
self.download_queue = None
|
||
self._lock = None
|
||
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
self._load_existing_files()
|
||
|
||
def _get_target_dir_and_path(self, filename):
|
||
"""根据文件名和当前文件计数,确定目标目录和完整路径"""
|
||
counter = 0
|
||
while True:
|
||
batch_dir_name = f"batch_{counter:05d}"
|
||
batch_dir_path = os.path.join(self.output_dir, batch_dir_name)
|
||
|
||
if not os.path.exists(batch_dir_path):
|
||
os.makedirs(batch_dir_path, exist_ok=True)
|
||
logging.debug(f"Created new batch directory on demand: {batch_dir_path}")
|
||
target_dir = batch_dir_path
|
||
break
|
||
else:
|
||
try:
|
||
existing_files_in_batch = [f for f in os.listdir(batch_dir_path) if os.path.isfile(os.path.join(batch_dir_path, f)) and not f.endswith('.tmp')]
|
||
if len(existing_files_in_batch) < self.max_files_per_dir:
|
||
target_dir = batch_dir_path
|
||
break
|
||
except OSError as e:
|
||
logging.error(f"Error checking batch directory {batch_dir_path}: {e}")
|
||
counter += 1
|
||
if counter > 100000:
|
||
logging.critical("Too many batch directories checked. Aborting.")
|
||
raise OSError("Could not find or create a batch directory with space.")
|
||
|
||
filepath = os.path.join(target_dir, filename)
|
||
# 计算相对于 output_dir 的路径,用于记录
|
||
relative_path = os.path.relpath(filepath, self.output_dir)
|
||
return target_dir, filepath, relative_path
|
||
|
||
def _load_existing_files(self):
|
||
"""递归加载输出目录及其子目录中已存在的文件列表"""
|
||
self.downloaded_files.clear()
|
||
if os.path.exists(self.output_dir):
|
||
for root, dirs, files in os.walk(self.output_dir):
|
||
for file in files:
|
||
if not file.endswith('.tmp'):
|
||
full_path = os.path.join(root, file)
|
||
relative_path = os.path.relpath(full_path, self.output_dir)
|
||
self.downloaded_files.add(relative_path)
|
||
logging.info(f"Found {len(self.downloaded_files)} existing files in output directory (including subdirs)")
|
||
|
||
async def parse_html_producer(self):
|
||
"""解析本地HTML文件并提取下载链接 (生产者) - 异步优化版"""
|
||
logging.info(f"Starting to parse local HTML file: {self.html_file_path}")
|
||
pattern = re.compile(r'pdb[a-z0-9]{3,4}\.ent\.gz')
|
||
base_url = "https://files.pdbj.org/pub/pdb/data/structures/all/pdb/"
|
||
|
||
if not os.path.isfile(self.html_file_path):
|
||
logging.error(f"HTML file not found: {self.html_file_path}")
|
||
for _ in range(self.max_connections):
|
||
await self.download_queue.put((None, None))
|
||
return
|
||
|
||
loop = asyncio.get_event_loop()
|
||
|
||
try:
|
||
logging.info("Reading HTML file content asynchronously...")
|
||
def _read_file():
|
||
with open(self.html_file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
||
return f.read()
|
||
html_content = await loop.run_in_executor(None, _read_file)
|
||
logging.info("HTML file content read successfully.")
|
||
|
||
logging.info("Parsing HTML content asynchronously with BeautifulSoup...")
|
||
soup = await loop.run_in_executor(
|
||
None,
|
||
functools.partial(BeautifulSoup, html_content, 'lxml')
|
||
)
|
||
logging.info("HTML content parsed successfully.")
|
||
except Exception as e:
|
||
logging.error(f"Error reading or parsing local HTML file: {e}")
|
||
for _ in range(self.max_connections):
|
||
await self.download_queue.put((None, None))
|
||
return
|
||
|
||
logging.info("Extracting links from parsed HTML...")
|
||
links_found = 0
|
||
links_queued = 0
|
||
for link in soup.find_all('a', href=True):
|
||
href = link['href']
|
||
if pattern.search(href):
|
||
filename = href.split('/')[-1]
|
||
links_found += 1
|
||
|
||
# 跳过检查:基于已加载的相对路径文件名
|
||
if any(os.path.basename(rel_path) == filename for rel_path in self.downloaded_files):
|
||
logging.debug(f"Skipping {filename} as it exists in downloaded_files set.")
|
||
continue
|
||
|
||
full_url = urljoin(base_url, href)
|
||
await self.download_queue.put((full_url, filename))
|
||
links_queued += 1
|
||
|
||
self.total_files_to_download = links_queued
|
||
logging.info(f"Producer finished. Parsed {links_found} matching links, {links_queued} need downloading (not skipped).")
|
||
|
||
# 发送结束信号给所有消费者
|
||
for _ in range(self.max_connections):
|
||
await self.download_queue.put((None, None))
|
||
|
||
async def download_file_consumer(self, session, progress_bar):
|
||
"""从队列中获取链接并下载文件 (消费者)"""
|
||
while True:
|
||
url_filename_tuple = await self.download_queue.get()
|
||
url, filename = url_filename_tuple
|
||
|
||
# --- 关键:为整个队列项目处理逻辑包裹 try...finally ---
|
||
# 确保无论内部循环如何退出,task_done 都只调用一次
|
||
try:
|
||
if url is None:
|
||
# 处理 None 信号 - 直接视为成功处理
|
||
logging.debug("Consumer received shutdown signal (None).")
|
||
return # 消费者协程正常退出
|
||
|
||
# 为这个队列项目初始化变量
|
||
success = False
|
||
target_dir, filepath, relative_path = self._get_target_dir_and_path(filename)
|
||
temp_filepath = filepath + '.tmp'
|
||
|
||
# --- 精确检查文件是否存在 ---
|
||
if relative_path in self.downloaded_files:
|
||
logging.debug(f"File {relative_path} already exists (precise check), skipping.")
|
||
async with self._lock:
|
||
self.successfully_downloaded_count += 1
|
||
success = True
|
||
else:
|
||
# --- 重试循环 ---
|
||
for attempt in range(self.retries):
|
||
# 在每次尝试前重新计算路径(防御性编程)
|
||
target_dir, filepath, relative_path = self._get_target_dir_and_path(filename)
|
||
temp_filepath = filepath + '.tmp'
|
||
|
||
try:
|
||
# 再次检查,以防在等待重试时文件被其他进程/协程创建
|
||
if relative_path in self.downloaded_files:
|
||
logging.debug(f"[Attempt {attempt+1}] File {relative_path} already exists, skipping.")
|
||
async with self._lock:
|
||
self.successfully_downloaded_count += 1
|
||
success = True
|
||
break # 成功处理,跳出重试循环
|
||
|
||
resume_header = {}
|
||
start_byte = 0
|
||
if os.path.exists(temp_filepath):
|
||
try:
|
||
start_byte = os.path.getsize(temp_filepath)
|
||
resume_header = {'Range': f'bytes={start_byte}-'}
|
||
logging.debug(f"[Attempt {attempt+1}] Resuming {filename} from byte {start_byte}")
|
||
except OSError as e:
|
||
logging.warning(f"[Attempt {attempt+1}] Could not get size of temp file {temp_filepath}: {e}. Starting fresh.")
|
||
start_byte = 0
|
||
resume_header = {}
|
||
|
||
logging.debug(f"[Attempt {attempt+1}] Acquiring semaphore for {filename}...")
|
||
async with self.semaphore:
|
||
logging.debug(f"[Attempt {attempt+1}] Semaphore acquired for {filename}. Making HTTP request...")
|
||
try:
|
||
async with session.get(url, headers=resume_header, timeout=self.timeout) as response:
|
||
logging.debug(f"[Attempt {attempt+1}] Received response headers for {filename} (Status: {response.status}). Starting to read body...")
|
||
|
||
if response.status == 416:
|
||
if os.path.exists(temp_filepath):
|
||
logging.info(f"[Attempt {attempt+1}] HTTP 416 for {filename}, temp file exists, assuming complete.")
|
||
os.rename(temp_filepath, filepath)
|
||
async with self._lock:
|
||
self.downloaded_files.add(relative_path)
|
||
self.successfully_downloaded_count += 1
|
||
success = True
|
||
break # 成功处理,跳出重试循环
|
||
else:
|
||
raise aiohttp.ClientResponseError(
|
||
request_info=response.request_info,
|
||
history=response.history,
|
||
status=response.status,
|
||
message=f"HTTP 416 Range Not Satisfiable and no temp file for {filename}"
|
||
)
|
||
elif response.status not in (200, 206):
|
||
raise aiohttp.ClientResponseError(
|
||
request_info=response.request_info,
|
||
history=response.history,
|
||
status=response.status,
|
||
message=f"HTTP error {response.status} for {filename}"
|
||
)
|
||
|
||
mode = 'ab' if start_byte > 0 else 'wb'
|
||
logging.debug(f"[Attempt {attempt+1}] Opening file {temp_filepath} in mode '{mode}' for {filename}")
|
||
|
||
with open(temp_filepath, mode) as f:
|
||
async for chunk in response.content.iter_chunked(self.chunk_size):
|
||
if chunk:
|
||
f.write(chunk)
|
||
f.flush()
|
||
os.fsync(f.fileno())
|
||
progress_bar.update(len(chunk))
|
||
logging.debug(f"[Attempt {attempt+1}] Finished reading response body for {filename}.")
|
||
|
||
final_temp_size = os.path.getsize(temp_filepath)
|
||
total_size_str = response.headers.get('content-length', 'unknown')
|
||
try:
|
||
total_size = int(total_size_str) if total_size_str != 'unknown' else None
|
||
except ValueError:
|
||
total_size = None
|
||
|
||
final_temp_size_kb = final_temp_size / 1024
|
||
|
||
if total_size and final_temp_size >= total_size:
|
||
os.rename(temp_filepath, filepath)
|
||
async with self._lock:
|
||
self.downloaded_files.add(relative_path)
|
||
self.successfully_downloaded_count += 1
|
||
success = True
|
||
logging.info(f"[Attempt {attempt+1}] Successfully downloaded/resumed {filename} ({final_temp_size_kb:.2f} KB) to {relative_path}")
|
||
break # 成功处理,跳出重试循环
|
||
elif not total_size:
|
||
os.rename(temp_filepath, filepath)
|
||
async with self._lock:
|
||
self.downloaded_files.add(relative_path)
|
||
self.successfully_downloaded_count += 1
|
||
success = True
|
||
logging.info(f"[Attempt {attempt+1}] Downloaded {filename} (size unknown, assumed complete) to {relative_path}")
|
||
break # 成功处理,跳出重试循环
|
||
else:
|
||
expected_kb = total_size / 1024 if total_size else 0
|
||
got_kb = final_temp_size / 1024
|
||
error_msg = f"Incomplete download for {filename}. Expected >= {expected_kb:.2f} KB, got {got_kb:.2f} KB"
|
||
logging.warning(f"[Attempt {attempt+1}] {error_msg}")
|
||
|
||
except asyncio.TimeoutError:
|
||
logging.warning(f"[Attempt {attempt+1}] TIMEOUT occurred for {filename} during HTTP request or response reading.")
|
||
except aiohttp.ClientError as e:
|
||
logging.warning(f"[Attempt {attempt+1}] Client error for {filename}: {e}")
|
||
except Exception as e:
|
||
logging.error(f"[Attempt {attempt+1}] Unexpected error for {filename}: {e}", exc_info=True)
|
||
|
||
# --- 单次尝试的 finally 块 ---
|
||
finally:
|
||
pass # 单次尝试的清理(如果需要)可以放在这里
|
||
|
||
# --- 重试逻辑 ---
|
||
if not success and attempt < self.retries - 1:
|
||
wait_time = 2 ** attempt
|
||
logging.debug(f"[Attempt {attempt+1}] Waiting {wait_time}s before retrying {filename}...")
|
||
await asyncio.sleep(wait_time)
|
||
|
||
# --- 单个文件所有重试都结束后 ---
|
||
if not success:
|
||
error_msg = f"Failed after {self.retries} attempts"
|
||
self.failed_files[relative_path] = error_msg
|
||
logging.error(f"Final failure for {filename} (intended path: {relative_path}): {error_msg}")
|
||
if os.path.exists(temp_filepath):
|
||
try:
|
||
os.remove(temp_filepath)
|
||
logging.debug(f"Cleaned up temporary file for failed download: {temp_filepath}")
|
||
except OSError as e:
|
||
logging.warning(f"Could not remove temporary file {temp_filepath}: {e}")
|
||
# else: success 已在上面处理
|
||
|
||
# --- 处理单个队列项目的 finally 块 ---
|
||
# *** 关键:无论内部如何退出,都只在这里调用一次 task_done ***
|
||
finally:
|
||
# *** 确保 task_done 只为当前 get() 调用一次 ***
|
||
try:
|
||
self.download_queue.task_done()
|
||
logging.debug(f"Task done for item related to {filename if 'filename' in locals() else 'unknown/shutdown'}.")
|
||
except ValueError as e:
|
||
# 防御性:捕获重复调用的错误
|
||
logging.critical(f"CRITICAL: task_done() error for {filename if 'filename' in locals() else 'unknown/shutdown'}: {e}")
|
||
except Exception as e:
|
||
# 捕获其他可能的异常
|
||
logging.error(f"Unexpected error in task_done() for {filename if 'filename' in locals() else 'unknown/shutdown'}: {e}")
|
||
|
||
|
||
async def run(self):
|
||
logging.info("Starting PDB Downloader...")
|
||
logging.info(f"Output directory: {self.output_dir}")
|
||
logging.info(f"Max files per directory: {self.max_files_per_dir}")
|
||
start_time = time.time()
|
||
|
||
self.download_queue = asyncio.Queue(maxsize=self.max_connections * 2)
|
||
self._lock = asyncio.Lock()
|
||
|
||
progress_bar = tqdm(
|
||
total=0,
|
||
unit='B',
|
||
unit_scale=True,
|
||
desc="Downloading PDB files"
|
||
)
|
||
|
||
async with aiohttp.ClientSession(timeout=self.timeout) as session:
|
||
producer_task = asyncio.create_task(self.parse_html_producer())
|
||
|
||
consumer_tasks = [
|
||
asyncio.create_task(self.download_file_consumer(session, progress_bar))
|
||
for _ in range(self.max_connections)
|
||
]
|
||
|
||
await producer_task
|
||
logging.info("Producer finished.")
|
||
|
||
logging.info("Waiting for all download tasks in queue to complete...")
|
||
await self.download_queue.join()
|
||
logging.info("All download tasks completed (queue.join finished).")
|
||
|
||
logging.info("Cancelling consumer tasks...")
|
||
for task in consumer_tasks:
|
||
if not task.done():
|
||
task.cancel()
|
||
await asyncio.gather(*consumer_tasks, return_exceptions=True)
|
||
logging.info("Consumer tasks cancelled and cleaned up.")
|
||
|
||
progress_bar.close()
|
||
|
||
elapsed_time = time.time() - start_time
|
||
failed_count = len(self.failed_files)
|
||
success_count_report = self.successfully_downloaded_count
|
||
|
||
logging.info("="*50)
|
||
logging.info("Download Session Summary:")
|
||
logging.info(f" - Files identified for download (not skipped): {self.total_files_to_download}")
|
||
logging.info(f" - Successfully downloaded this session: {success_count_report}")
|
||
logging.info(f" - Failed downloads: {failed_count}")
|
||
logging.info(f" - Total time taken: {elapsed_time:.2f} seconds")
|
||
logging.info("="*50)
|
||
|
||
if self.failed_files:
|
||
logging.error("List of failed downloads (relative paths):")
|
||
for relative_path, error in self.failed_files.items():
|
||
logging.error(f" - {relative_path}: {error}")
|
||
else:
|
||
logging.info("No files failed during this session.")
|
||
|
||
return success_count_report, failed_count
|
||
|
||
|
||
if __name__ == "__main__":
|
||
HTML_FILE_PATH = "/pdb_index.html"
|
||
OUTPUT_DIR = "./pdb_files"
|
||
MAX_CONNECTIONS = 50
|
||
RETRIES = 3
|
||
TIMEOUT = 300
|
||
MAX_FILES_PER_DIR = 20000 # EXFAT 限制
|
||
|
||
downloader = PDBDownloader(
|
||
html_file_path=HTML_FILE_PATH,
|
||
output_dir=OUTPUT_DIR,
|
||
max_connections=MAX_CONNECTIONS,
|
||
retries=RETRIES,
|
||
timeout=TIMEOUT,
|
||
max_files_per_dir=MAX_FILES_PER_DIR
|
||
)
|
||
|
||
try:
|
||
success_count, failed_count = asyncio.run(downloader.run())
|
||
print(f"\nDownload finished. Summary: {success_count} succeeded, {failed_count} failed.")
|
||
except KeyboardInterrupt:
|
||
logging.info("\nDownload process interrupted by user.")
|
||
except Exception as e:
|
||
logging.critical(f"An unexpected error occurred: {e}", exc_info=True) |