本文涉及代码主要由deepseek提供。
这个事情主要是为了能够每天订阅到新的论文,另外是能够自动整理到知识库里面去,免得像邮件订阅或者 CSS 订阅一样去搬运。
主要涉及以下内容:
- 根据关键字搜索 arxiv 的论文,并提取信息
- 将摘要内容翻译成中文
- 按照指定格式,保存成 md 文件到指定位置
爬虫
搜索 arxiv 信息直接使用了 [[Python]] 的库 arxiv。
import arxiv
需要进行一些基础的配置,重点是搜索的关键字
# 配置参数
ARXIV_CONFIG = {"save_dir": os.path.expanduser("./"), # 指定保存的位置"categories": [{"name": "机器人操作","query": "(ti:\"robot manipulation\" OR abs:\"grasping\" OR abs:\"motion planning\") AND cat:cs.RO","max_results": 30},{"name": "强化学习","query": "(abs:\"reinforcement learning\" OR abs:\"RL\" OR abs:\"Q-learning\") AND (cat:cs.LG OR cat:cs.AI)","max_results": 30},{"name": "VLA模型","query": "(abs:\"vision-language-action\" OR abs:\"VLA\" OR abs:\"embodied AI\") AND (cat:cs.CV OR cat:cs.CL)","max_results": 30}],"timezone_offset": 8, # 时区偏移(UTC+8)"retries": 5, # API请求重试次数"delay_seconds": 5 # 请求间隔
}# 日志配置
logging.basicConfig(level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s",handlers=[logging.FileHandler(os.path.join(ARXIV_CONFIG["save_dir"], "arxiv.log")),logging.StreamHandler()]
)
接着创建请求客户端
def get_arxiv_client() -> arxiv.Client:"""创建带重试机制的arXiv客户端"""return arxiv.Client(page_size=100,delay_seconds=ARXIV_CONFIG["delay_seconds"],num_retries=ARXIV_CONFIG["retries"])
客户端可以根据关键字来进行检索
def fetch_category_papers(category: Dict) -> List[arxiv.Result]:"""抓取单个分类的最新论文"""client = get_arxiv_client()search = arxiv.Search(query=category["query"],max_results=category["max_results"],sort_by=arxiv.SortCriterion.SubmittedDate)try:papers = list(client.results(search))logging.info(f"[{category['name']}] 获取到{len(papers)}篇论文")return papersexcept:logging.error(f"[{category['name']}] API请求失败: {str(e)}")return []
检索到的文章放到命名为 papers
的列表中。
翻译
脚本中使用了腾讯翻译、百度翻译、和小牛翻译,用免费的额度就应该够了。
from hashlib import md5
import urllib.error
import urllib.parse
import urllib.request
import requests
import randomfrom tencentcloud.common import credential
from tencentcloud.tmt.v20180321 import tmt_client, models
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
腾讯提供了封装好的库,其他两个用 request
来调用翻译的 API。
三种翻译服务的调用翻译的方式:
- 腾讯云翻译
def tengxun_translate_text(text: str) -> str:try:cred = credential.Credential("your id", "you key")client = tmt_client.TmtClient(cred, "ap-beijing")req = models.TextTranslateRequest()req.SourceText = textreq.Source = "en"req.Target = "zh"req.ProjectId = 0resp = client.TextTranslate(req)return resp.TargetTextexcept Exception as e:logging.error(f"腾讯翻译失败: {str(e)}")return ""
- 百度翻译
def make_md5(s, encoding='utf-8'):return md5(s.encode(encoding)).hexdigest()def baidu_translate_text(text: str) -> str:try:# Set your own appid/appkey.appid = 'your appid'appkey = 'your appkey'# For list of language codes, please refer to `https://api.fanyi.baidu.com/doc/21`from_lang = 'en'to_lang = 'zh'endpoint = 'http://api.fanyi.baidu.com'path = '/api/trans/vip/translate'url = endpoint + pathquery = text# Generate salt and signsalt = random.randint(32768, 65536)sign = make_md5(appid + query + str(salt) + appkey)# Build requestheaders = {'Content-Type': 'application/x-www-form-urlencoded'}payload = {'appid': appid, 'q': query, 'from': from_lang,'to': to_lang, 'salt': salt, 'sign': sign}# Send requestr = requests.post(url, params=payload, headers=headers)result = r.json()# Show responsereturn result["trans_result"][0]["dst"]except Exception as e:logging.error(f"百度翻译失败: {str(e)}")return ""
- 小牛翻译
def xiaoniu_translate_text(text: str) -> str:url = 'http://api.niutrans.com/NiuTransServer/translation?'apikey = "your apikey "data = {"from": "en", "to": "zh", "apikey": apikey, "src_text": text}data_en = urllib.parse.urlencode(data)req = url + "&" + data_entry:res = urllib.request.urlopen(req)res = res.read()res_dict = json.loads(res)return res_dict["tgt_text"]except Exception as e:logging.error(f"小牛翻译失败: {str(e)}")return ""
总的翻译接口
def translate_text(text: str) -> str:zh_abstract = tengxun_translate_text(text)if zh_abstract == "":zh_abstract = xiaoniu_translate_text(text)if zh_abstract == "":zh_abstract = baidu_translate_text(text)return zh_abstract
要在各自的翻译服务控制台设置一下,如果额度用完了就禁用,这样就会选还有额度的服务,避免自动扣钱。
保存成 md 文件
创建文件夹
首先根据当天日期创建相应的目录路径,然后根据该路径创建文件夹
# 生成每日目录路径
def get_daily_dir() -> str:local_date = datetime.now(timezone(timedelta(hours=ARXIV_CONFIG["timezone_offset"]))).date()return os.path.join(ARXIV_CONFIG["save_dir"], local_date.strftime("%Y%m%d"))
保存已检索的文章
创建一个 json
文件用来保存已检索的文章的 id
exist_papers = []if os.path.exists(ARXIV_CONFIG["save_dir"]+"arxiv_papers.json"):with open(ARXIV_CONFIG["save_dir"]+"arxiv_papers.json", 'r') as f:papers_dict = json.load(f)exist_papers = papers_dict["papers"]else:with open(ARXIV_CONFIG["save_dir"]+"arxiv_papers.json", 'w') as f:json.dump({"papers": exist_papers}, f)
前面用 arxiv 的客户端检索了一个 papers
的列表,现在来对这个列表里面的文章进行处理。
如果文章已经在 json
文件里,直接跳过。如果没有,则整理它的信息,并放到已有文章的列表里
for paper in papers:# 生成安全文件名# paper_id = paper.entry_id.split('/')[-1]paper_id = paper.get_short_id()if paper_id in exist_papers:logging.info(f"文献已存在 → {paper_id}")continueelse:exist_papers.append(paper_id)
这里先根据 arxiv 抓取的文章信息提取元数据,并整理成 md 的元数据。
safe_title = re.sub(r'[^\w_()()\-]', ' ', paper.title).strip(' ')[:200]# filename = f"{paper_id}_{safe_title}.md"filename = f"{safe_title}.md"filepath = os.path.join(daily_dir, filename)# 获取DOI(需要根据实际情况调整)doi = getattr(paper, 'doi', []) or extract_doi_from_links(paper.links)# 构建元数据metadata = f"""---
title: "{paper.title}"
id: "{paper_id}"
authors: {[author.name for author in paper.authors]}
tags: {[tag.replace(".","/") for tag in paper.categories]}
category: {paper.primary_category}
doi: "{doi}"
url: "{paper.entry_id}"
published: "{paper.published.strftime('%Y-%m-%d')}"
update: "{paper.updated.strftime('%Y-%m-%d')}"
---"""
再对摘要进行翻译
# 处理摘要翻译en_abstract = paper.summary.replace('\n', ' ')zh_abstract = ""zh_abstract = translate_text(en_abstract)if zh_abstract == "":logging.error(f"翻译失败: {paper.title}")time.sleep(5)content = f"""{metadata}
## Abstract
{en_abstract}{"## 摘要" if zh_abstract else ""}
{zh_abstract if zh_abstract else ""}## PDF Links
- [arXiv PDF]({paper.pdf_url})
"""
最终将提取的信息写入文章名对应的 md 文件。并将已有文章的列表更新到 json 文件。
# 写入文件with open(filepath, 'w', encoding='utf-8') as f:f.write(content.strip())logging.info(f"文献保存 → {filepath}")with open(ARXIV_CONFIG["save_dir"]+"arxiv_papers.json", 'w') as f:json.dump({"papers": exist_papers}, f)
最终脚本地址归档
arxiv-daily-subscriber