这是OpenResty作者的微博:
一个好用的 AI Agent 程序框架需要满足:1. 小巧轻便和高效,2. 方便自动最大化请求并发度, 3. 透明切换不同的 LLM(厂商 API 或本地部署的),4. 透明混用大小不同的模型,5. 透明切换实时请求方式和批处理离线方式,6. 自如重跑单个处理环节,7. 方便对数据流进行回溯排查问题,8. 自如切换流式输出或批量输出模式,9. 透明的分布式请求和错误重试机制。10. 透明的纯内存或基于硬盘的海量数据的存储与计算。11. 透明的本地和远方 LLM prompt 缓存,等等。兼顾高吞吐量、低延迟、低成本、高开发效率和高可调试性。
所以我前面自己从零写了一个 Chunbot AI 框架,不依赖外部服务组件,经过大半年的特性和优化积累之后,现在也只有几千行代码,极为小巧和高效。其自带自动化测试台,轻松驱动上亿 token 量级的庞大的测试集。
输入给Deepseek,要求根据描述实现一个Python Agent框架,最小化依赖,结果:
以下是一个基于描述的轻量级Python Agent框架设计,尽量保持简洁并减少依赖:
import asyncio
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union# 基础数据模型
@dataclass
class LLMRequest:prompt: strmodel: strparams: dict = Nonecontext: dict = None@dataclass
class LLMResponse:content: strmodel: strmetadata: dict = None@dataclass
class ProcessingStep:name: strinput_data: Anyoutput_data: Anycontext: dict# 核心框架组件
class AgentCore:def __init__(self):self.model_providers = {}self.pipeline = []self.storage_backend = MemoryStorage()self.cache = PromptCache()self.executor = AsyncExecutor()self.error_handler = RetryHandler()self.current_context = {}def register_model(self, name: str, provider: 'ModelProvider'):self.model_providers[name] = providerdef add_pipeline_step(self, step: callable):self.pipeline.append(step)async def process(self, request: LLMRequest) -> LLMResponse:self.current_context = request.context or {}result = Nonefor step in self.pipeline:result = await self._execute_step(step, request, result)self._record_step(step.__name__, request, result)return LLMResponse(content=result, model=request.model)async def _execute_step(self, step, request, prev_result):return await self.error_handler.execute_with_retry(step, request, prev_result, self.current_context)def _record_step(self, step_name, request, result):self.storage_backend.store(ProcessingStep(name=step_name,input_data=request,output_data=result,context=self.current_context.copy()))# 抽象模型接口
class ModelProvider(ABC):@abstractmethodasync def generate(self, prompt: str, **kwargs) -> str:passclass OpenAIProvider(ModelProvider):def __init__(self, api_key: str):self.api_key = api_keyasync def generate(self, prompt: str, **kwargs) -> str:# 实现实际的API调用逻辑passclass LocalModelProvider(ModelProvider):async def generate(self, prompt: str, **kwargs) -> str:# 实现本地模型推理逻辑pass# 执行与并发控制
class AsyncExecutor:async def execute(self, tasks: List[callable], max_concurrency: int = 10):semaphore = asyncio.Semaphore(max_concurrency)async def wrapper(task):async with semaphore:return await taskreturn await asyncio.gather(*[wrapper(t) for t in tasks])# 存储抽象层
class StorageBackend(ABC):@abstractmethoddef store(self, data: Any):passclass MemoryStorage(StorageBackend):def __init__(self):self.data = []def store(self, data: Any):self.data.append(data)class DiskStorage(StorageBackend):def store(self, data: Any):# 实现磁盘存储逻辑pass# 错误处理与重试
class RetryHandler:async def execute_with_retry(self, func: callable, *args, max_retries=3, **kwargs):for attempt in range(max_retries):try:return await func(*args, **kwargs)except Exception as e:if attempt == max_retries - 1:raiseawait asyncio.sleep(2 ** attempt)# 缓存系统
class PromptCache:def __init__(self):self.cache = {}def get(self, prompt: str, model: str) -> Optional[str]:return self.cache.get((model, prompt))def set(self, prompt: str, model: str, response: str):self.cache[(model, prompt)] = response# 测试工具
class TestBench:def __init__(self, agent: AgentCore):self.agent = agentasync def run_test(self, test_cases: List[LLMRequest]):return await self.agent.executor.execute([self.agent.process(case) for case in test_cases])# 使用示例
async def main():agent = AgentCore()agent.register_model("gpt-4", OpenAIProvider("sk-..."))agent.register_model("local", LocalModelProvider())@agent.add_pipeline_stepasync def preprocess(request, prev):return request.prompt.upper()@agent.add_pipeline_stepasync def inference(request, prev):return await agent.model_providers[request.model].generate(prev)response = await agent.process(LLMRequest(prompt="Hello world",model="gpt-4"))print(response.content)if __name__ == "__main__":asyncio.run(main())
这个框架设计具有以下特点:
- 模块化设计:通过清晰的抽象层分离不同功能模块
- 异步优先:基于asyncio实现高效并发控制
- 透明的模型切换:统一的ModelProvider接口支持任意LLM
- 全链路追踪:自动记录每个处理步骤的输入输出和上下文
- 弹性存储:可扩展的存储后端支持内存/磁盘存储
- 灵活扩展:通过装饰器轻松添加处理步骤
- 内置重试:自动化的错误重试机制
- 轻量级:零外部依赖,仅使用标准库
- 透明缓存:简单的Prompt缓存系统
- 测试支持:集成测试基准工具
可以通过组合不同的组件和管道步骤来实现复杂的处理流程,同时保持代码简洁和可维护性。执行模式(实时/批量)、输出方式(流式/批量)等可以通过配置执行器参数来实现。