首先要创建一个机器人,找到BotFather获取到机器人的Token
设置后台地址,实现消息转发
curl -X POST "https://api.telegram.org/bot{机器人token}/setWebhook?url=https://chat.xxxxxxxx.com/chat"
配置文件
.env
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TELEGRAM_BOT_TOKEN=xxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_API_URL=https://api.openai.com/v1/chat/completions
REDIS_HOST= 127.0.0.1
REDIS_PORT= 6379
REDIS_PASSWD=12346
主程序
main.py
import json
import logging
import osimport httpx
from aioredis import Redis
from dotenv import load_dotenv
from fastapi import FastAPI, Request# 配置日志
logging.basicConfig(level=logging.INFO)load_dotenv()OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
OPENAI_API_URL = os.getenv("OPENAI_API_URL")
REDIS_HOST = os.getenv("REDIS_HOST", '127.0.0.1')
REDIS_PORT = os.getenv("REDIS_PORT", 6379)
REDIS_PASSWD = os.getenv("REDIS_PASSWD", '123456')
REDIS_DB = os.getenv("REDIS_DB", 10)
TELEGRAM_API_URL = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/"
CHAT_HISTORY_EXPIRE = 60 * 60 * 24 # 聊天记录保存24小时
GPT_MODEL = "gpt-4o"app = FastAPI(docs_url=None, redoc_url=None)# 异步Redis连接
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWD, decode_responses=True)async def get_chat_history(chat_id):chat_key = f"chat:{chat_id}"history = await redis_client.get(chat_key)if history:return json.loads(history)default_history = [{"role": "system", "content": "你是一个有帮助的助手"}]await redis_client.set(chat_key, json.dumps(default_history), ex=CHAT_HISTORY_EXPIRE)return default_historyasync def update_chat_history(chat_id, messages):chat_key = f"chat:{chat_id}"await redis_client.set(chat_key, json.dumps(messages), ex=CHAT_HISTORY_EXPIRE)async def clear_chat_history(chat_id):"""清除指定用户的聊天历史"""chat_key = f"chat:{chat_id}"await redis_client.delete(chat_key)async def call_openai_api(user_message: str, chat_id) -> str:messages = await get_chat_history(chat_id)headers = {"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}if len(messages) > 9:messages[1:3] = []messages.append({"role": "user", "content": user_message})data = {"model": GPT_MODEL, "messages": messages}try:async with httpx.AsyncClient(timeout=30) as client:response = await client.post(url=OPENAI_API_URL, headers=headers, json=data)result = response.json()assistant_reply = result["choices"][0]["message"]["content"]messages.append({"role": "assistant", "content": assistant_reply})await update_chat_history(chat_id, messages)return assistant_replyexcept httpx.HTTPStatusError as e:error_msg = f"API调用失败: HTTP {e.response.status_code}"logging.error(error_msg)return error_msgexcept Exception as e:error_msg = f"发生错误: {str(e)}"logging.error(error_msg)return error_msgasync def send_message(chat_id, text: str):async with httpx.AsyncClient() as client:await client.post(url=f"{TELEGRAM_API_URL}sendMessage",json={"chat_id": chat_id, "text": text})@app.get("/")
async def index():return {"msg": "Welcome ChatBot!!!"}@app.post("/chat")
async def webhook(request: Request):update = await request.json()try:chat_id = update["message"]["chat"]["id"]user_message = update["message"]["text"]if user_message.lower() == "/clear":await clear_chat_history(chat_id)await send_message(chat_id, "聊天历史已清除!")return {"status": "ok"}response_message = await call_openai_api(user_message, chat_id)await send_message(chat_id, response_message)return {"status": "ok"}except Exception as e:return {"status": "ok"}
启动程序
uvicorn main:app --host=0.0.0.0 --workers=4 --port=8000 --reload
效果