feat: 更新环境配置和Gemini客户端,支持多个Google API密钥并实现限流重试机制

This commit is contained in:
lk-eternal 2025-08-31 19:08:19 +08:00
parent 70d247fa4c
commit c40c2b1039
5 changed files with 236 additions and 46 deletions

47
Dockerfile Normal file
View File

@ -0,0 +1,47 @@
# 使用Python 3.11官方镜像作为基础镜像
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app/src
# 安装系统依赖包括ffmpeg
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
curl \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
# 验证ffmpeg安装
RUN ffmpeg -version
# 复制requirements文件并安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 创建日志目录
RUN mkdir -p /app/logs
# 创建非root用户
RUN useradd --create-home --shell /bin/bash app && \
chown -R app:app /app
USER app
# 暴露端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/docs || exit 1
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--app-dir", "src"]

View File

@ -8,6 +8,8 @@ QINIU_BUCKET_NAME="your_qiniu_bucket_name"
QINIU_DOMAIN="your_qiniu_cdn_domain" QINIU_DOMAIN="your_qiniu_cdn_domain"
# AI Models # AI Models
GOOGLE_API_KEY="your_google_ai_api_key" # Google API Keys支持单个或多个key用逗号分隔用于处理限流重试
GOOGLE_API_KEYS="your_google_ai_api_key"
# 多个key示例: GOOGLE_API_KEYS="key1,key2,key3"
OPENROUTER_API_KEY="your_openrouter_api_key" OPENROUTER_API_KEY="your_openrouter_api_key"
OPENROUTER_BASE_URL="https://openrouter.ai/api/v1" OPENROUTER_BASE_URL="https://openrouter.ai/api/v1"

View File

@ -18,7 +18,7 @@ class Settings(BaseSettings):
qiniu_domain: str qiniu_domain: str
# AI Models # AI Models
google_api_key: str google_api_keys: str
openrouter_api_key: str openrouter_api_key: str
openrouter_base_url: str = "https://openrouter.ai/api/v1" openrouter_base_url: str = "https://openrouter.ai/api/v1"

View File

@ -10,17 +10,77 @@ from google import genai
from google.genai import types from google.genai import types
from ..config import settings from ..config import settings
from ..utils import safe_json_loads from ..utils import safe_json_loads
from .key_pool_manager import key_pool_manager
from loguru import logger from loguru import logger
import ssl import ssl
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
client = genai.Client(api_key=settings.google_api_key)
class GeminiClient: class GeminiClient:
"""Gemini AI客户端""" """Gemini AI客户端"""
def __init__(self):
self._current_client = None
self._refresh_client()
def _refresh_client(self):
"""刷新客户端使用当前key"""
current_key = key_pool_manager.get_current_key()
self._current_client = genai.Client(api_key=current_key)
def _execute_with_retry(self, func, *args, **kwargs):
"""
执行函数并处理429错误重试
Args:
func: 要执行的函数
*args: 函数参数
**kwargs: 函数关键字参数
Returns:
函数执行结果
Raises:
Exception: 所有key都尝试过后仍然失败
"""
key_pool_manager.reset_to_first_key() # 每次请求都从第一个key开始
self._refresh_client()
last_exception = None
tried_keys = 0
max_keys = len(key_pool_manager.get_all_keys())
while tried_keys < max_keys:
try:
tried_keys += 1
logger.info(f"使用第{tried_keys}个key尝试请求")
return func(*args, **kwargs)
except Exception as e:
last_exception = e
error_str = str(e).lower()
# 检查是否是429错误
if '429' in error_str or 'rate limit' in error_str or 'quota' in error_str:
logger.warning(f"遇到限流错误: {e}")
# 如果还有更多key可以尝试
if key_pool_manager.switch_to_next_key():
self._refresh_client()
logger.info("切换到下一个key继续尝试")
continue
else:
logger.error("所有key都已尝试仍然遇到限流")
break
else:
# 非429错误直接抛出不重试
logger.error(f"遇到非限流错误,不重试: {e}")
raise e
# 所有key都尝试过了抛出最后一个异常
logger.error(f"所有{max_keys}个key都已尝试请求失败")
raise last_exception
def generate_image_from_prompt( def generate_image_from_prompt(
self, self,
prompt: str, prompt: str,
@ -36,7 +96,7 @@ class GeminiClient:
Returns: Returns:
生成的图片二进制数据失败返回None 生成的图片二进制数据失败返回None
""" """
try: def _generate_image():
# 构建图片生成提示词 # 构建图片生成提示词
image_prompt = f''' image_prompt = f'''
Generate an image strictly according to the following prompt without any confirmation, questioning, or omission: Generate an image strictly according to the following prompt without any confirmation, questioning, or omission:
@ -56,13 +116,17 @@ class GeminiClient:
contents.append(image_prompt) contents.append(image_prompt)
# 调用Gemini API生成图片 # 调用Gemini API生成图片
response = client.models.generate_content( response = self._current_client.models.generate_content(
model='gemini-2.5-flash-image-preview', model='gemini-2.5-flash-image-preview',
contents=contents, contents=contents,
config=types.GenerateContentConfig( config=types.GenerateContentConfig(
http_options=types.HttpOptions(timeout=30000) http_options=types.HttpOptions(timeout=30000)
) )
) )
return response
try:
response = self._execute_with_retry(_generate_image)
text = None text = None
image_base64 = None image_base64 = None
if hasattr(response, 'candidates') and response.candidates: if hasattr(response, 'candidates') and response.candidates:
@ -92,7 +156,7 @@ class GeminiClient:
Returns: Returns:
分析结果字典包含namedescriptiontags 分析结果字典包含namedescriptiontags
""" """
try: def _analyze_image():
prompt = """ prompt = """
请分析这张图片并返回以下JSON格式的结果 请分析这张图片并返回以下JSON格式的结果
{ {
@ -104,7 +168,7 @@ class GeminiClient:
请确保返回的是有效的JSON格式不要包含其他文字 请确保返回的是有效的JSON格式不要包含其他文字
""" """
response = client.models.generate_content( response = self._current_client.models.generate_content(
model='gemini-2.5-flash', model='gemini-2.5-flash',
contents=[ contents=[
prompt, prompt,
@ -114,6 +178,10 @@ class GeminiClient:
) )
] ]
) )
return response
try:
response = self._execute_with_retry(_analyze_image)
# 解析返回的JSON # 解析返回的JSON
result_text = response.text result_text = response.text
@ -141,25 +209,28 @@ class GeminiClient:
Returns: Returns:
生成的视频二进制数据失败返回None 生成的视频二进制数据失败返回None
""" """
try: def _generate_video():
# 构建视频生成提示词 # 构建视频生成提示词
video_prompt = f"Create a video with the following prompt: {shot_prompt}" video_prompt = f"Create a video with the following prompt: {shot_prompt}"
image_input = types.Image(image_bytes=frame_image_bytes, mime_type="image/jpeg") image_input = types.Image(image_bytes=frame_image_bytes, mime_type="image/jpeg")
# 调用Veo-3.0 API生成视频 # 调用Veo-3.0 API生成视频
operation = client.models.generate_videos( operation = self._current_client.models.generate_videos(
model="veo-3.0-fast-generate-preview", #veo-3.0-generate-preview model="veo-3.0-fast-generate-preview", #veo-3.0-generate-preview
prompt=video_prompt, prompt=video_prompt,
image=image_input image=image_input
) )
return operation
try:
operation = self._execute_with_retry(_generate_video)
# 轮询操作状态直到视频生成完成 # 轮询操作状态直到视频生成完成
logger.info("等待视频生成完成...") logger.info("等待视频生成完成...")
while not operation.done: while not operation.done:
time.sleep(2) time.sleep(2)
operation = client.operations.get(operation) operation = self._current_client.operations.get(operation)
# 下载生成的视频 # 下载生成的视频
@ -172,7 +243,7 @@ class GeminiClient:
raise Exception(operation.response.rai_media_filtered_reasons[0]) raise Exception(operation.response.rai_media_filtered_reasons[0])
raise Exception("未知错误") raise Exception("未知错误")
video_bytes = client.files.download(file=video.video) video_bytes = self._current_client.files.download(file=video.video)
logger.info("Veo-3.0视频生成成功") logger.info("Veo-3.0视频生成成功")
return video_bytes return video_bytes
@ -207,9 +278,10 @@ class GeminiClient:
temp_video_path = temp_file.name temp_video_path = temp_file.name
try: try:
def _upload_and_analyze():
# 上传视频文件到Gemini # 上传视频文件到Gemini
logger.info("正在上传视频到Gemini...") logger.info("正在上传视频到Gemini...")
myfile = client.files.upload(file=temp_video_path) myfile = self._current_client.files.upload(file=temp_video_path)
logger.info(f"视频上传成功文件ID: {myfile.name}") logger.info(f"视频上传成功文件ID: {myfile.name}")
# 等待文件变为ACTIVE状态 # 等待文件变为ACTIVE状态
@ -219,7 +291,7 @@ class GeminiClient:
waited_time = 0 waited_time = 0
while waited_time < max_wait_time: while waited_time < max_wait_time:
file_info = client.files.get(name=myfile.name) file_info = self._current_client.files.get(name=myfile.name)
logger.info(f"文件状态: {file_info.state}") logger.info(f"文件状态: {file_info.state}")
if file_info.state == "ACTIVE": if file_info.state == "ACTIVE":
@ -234,6 +306,8 @@ class GeminiClient:
if waited_time >= max_wait_time: if waited_time >= max_wait_time:
raise Exception("文件处理超时") raise Exception("文件处理超时")
return myfile
# 构建分析提示词 # 构建分析提示词
analysis_prompt = """ analysis_prompt = """
请仔细分析这个视频并返回以下JSON格式的结果 请仔细分析这个视频并返回以下JSON格式的结果
@ -266,15 +340,21 @@ class GeminiClient:
6. 确保返回的是有效的JSON格式不要包含其他文字 6. 确保返回的是有效的JSON格式不要包含其他文字
""" """
myfile = self._execute_with_retry(_upload_and_analyze)
def _analyze_video_content():
# 调用Gemini分析视频 # 调用Gemini分析视频
logger.info("正在分析视频内容...") logger.info("正在分析视频内容...")
response = client.models.generate_content( response = self._current_client.models.generate_content(
model="gemini-2.5-flash", model="gemini-2.5-flash",
contents=[myfile, analysis_prompt], contents=[myfile, analysis_prompt],
config=types.GenerateContentConfig( config=types.GenerateContentConfig(
http_options=types.HttpOptions(timeout=120000) # 2分钟超时 http_options=types.HttpOptions(timeout=120000) # 2分钟超时
) )
) )
return response
response = self._execute_with_retry(_analyze_video_content)
# 解析返回的JSON # 解析返回的JSON
result_text = response.text result_text = response.text

View File

@ -0,0 +1,61 @@
from typing import List, Optional, Iterator
from loguru import logger
import threading
from ..config import settings
class KeyPoolManager:
"""Google API Key池管理器用于处理限流重试"""
def __init__(self):
self._keys: List[str] = []
self._current_index = 0
self._lock = threading.Lock()
self._initialize_keys()
def _initialize_keys(self):
"""初始化key池"""
# 解析key字符串支持单个或多个key
self._keys = [key.strip() for key in settings.google_api_keys.split(',') if key.strip()]
if not self._keys:
raise ValueError("GOOGLE_API_KEYS不能为空")
logger.info(f"初始化Google API Key池{len(self._keys)}个key")
def get_current_key(self) -> str:
"""获取当前key"""
with self._lock:
if not self._keys:
raise ValueError("没有可用的Google API Key")
return self._keys[self._current_index]
def switch_to_next_key(self) -> bool:
"""切换到下一个key
Returns:
bool: 如果还有下一个key返回True否则返回False
"""
with self._lock:
if len(self._keys) <= 1:
return False
self._current_index = (self._current_index + 1) % len(self._keys)
logger.info(f"切换到下一个Google API Key当前索引: {self._current_index}")
return True
def reset_to_first_key(self):
"""重置到第一个key"""
with self._lock:
self._current_index = 0
logger.info("重置到第一个Google API Key")
def get_all_keys(self) -> List[str]:
"""获取所有key用于测试"""
return self._keys.copy()
def has_multiple_keys(self) -> bool:
"""是否有多个key"""
return len(self._keys) > 1
# 全局key池管理器实例
key_pool_manager = KeyPoolManager()