import requests import os from tqdm import tqdmdef get_token(alist_url, username, password):"""登录并获取token"""login_url = f"{alist_url}/api/auth/login"data = {"username": username,"password": password}response = requests.post(login_url, json=data)if response.status_code == 200:return response.json()["data"]["token"]raise Exception("登录失败")def download_file(alist_url, token, file_path, save_path):"""下载文件"""headers = {"Authorization": token}# 获取文件下载链接fs_url = f"{alist_url}/api/fs/get"params = {"path": file_path}response = requests.get(fs_url, headers=headers, params=params)if response.status_code == 200:download_url = response.json()["data"]["raw_url"]# 获取文件大小response = requests.get(download_url, stream=True)total_size = int(response.headers.get('content-length', 0))# 在进度条之前只显示一次文件名print(f"\n正在下载: {os.path.basename(file_path)}")# 使用tqdm创建进度条,不再显示文件名progress_bar = tqdm(total=total_size,unit='iB',unit_scale=True,leave=True,ncols=100,bar_format='{percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]' # 移除desc )# 下载文件并更新进度条with open(save_path, 'wb') as f:for chunk in response.iter_content(chunk_size=8192):size = f.write(chunk)progress_bar.update(size)progress_bar.close()# 下载完成后再打印一个换行print("")if total_size != 0 and progress_bar.n != total_size:return Falsereturn Truereturn Falsedef list_directory(alist_url, token, dir_path):"""列出目录内容"""headers = {"Authorization": token}fs_list_url = f"{alist_url}/api/fs/list"params = {"path": dir_path}response = requests.post(fs_list_url, headers=headers, json=params)if response.status_code == 200:return response.json()["data"]["content"]return []def recursive_download(alist_url, token, current_path, base_save_path, target_keyword, level=0):"""递归遍历目录并下载符合条件的文件"""items = list_directory(alist_url, token, current_path)indent = " " * levelprint(f"{indent}📁 当前目录: {current_path}")# 如果关键字为空,则不检查目录名is_target_dir = True if not target_keyword else target_keyword in os.path.basename(current_path)for item in items:# 统一使用正斜杠item_path = os.path.join(current_path, item["name"]).replace("\\", "/")if item["is_dir"]:recursive_download(alist_url, token, item_path, base_save_path, target_keyword, level + 1)else:file_size = f"({item.get('size', 0)} bytes)"if is_target_dir and item["name"].lower().endswith('.mp4'):# 处理保存路径,统一使用正斜杠relative_path = item_path.lstrip("/")save_path = os.path.join(base_save_path, *relative_path.split("/"))# 确保目录存在,使用规范化的路径save_dir = os.path.dirname(save_path)if not os.path.exists(save_dir):try:os.makedirs(save_dir, exist_ok=True)except Exception as e:print(f"{indent} ❌ 创建目录失败: {save_dir}")print(f"{indent} 错误信息: {str(e)}")continueif os.path.exists(save_path):print(f"{indent} 📄 文件: {item['name']} {file_size} - 已存在,跳过下载")continuematching_info = f"- 匹配目录关键字「{target_keyword}」" if target_keyword else "- 所有MP4文件"print(f"{indent} 📄 文件: {item['name']} {file_size} {matching_info}")if download_file(alist_url, token, item_path, save_path):print(f"{indent} ✅ 成功下载: {item_path}")else:print(f"{indent} ❌ 下载失败: {item_path}")else:print(f"{indent} 📄 文件: {item['name']} {file_size}")def main():# Alist服务器配置ALIST_URL = "http://localhost:5244"USERNAME = "admin"PASSWORD = "123456"# 基础保存路径,确保使用正斜杠BASE_SAVE_PATH = "d:/videos" # 修改这里,使用正斜杠TARGET_KEYWORD = "" # 如果需要下载所有mp4文件,将此值设为空字符串 ""try:token = get_token(ALIST_URL, USERNAME, PASSWORD)keyword_info = f"关键字「{TARGET_KEYWORD}」" if TARGET_KEYWORD else "所有MP4文件"print(f"开始扫描目录... 查找{keyword_info}")recursive_download(ALIST_URL, token, "/", BASE_SAVE_PATH, TARGET_KEYWORD)print("\n所有文件处理完成")except Exception as e:print(f"发生错误: {str(e)}")if __name__ == "__main__":main()