From c40c2b1039bd246e573e2c3e73d01abd3c08ead3 Mon Sep 17 00:00:00 2001 From: lk-eternal Date: Sun, 31 Aug 2025 19:08:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=E7=8E=AF=E5=A2=83?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E5=92=8CGemini=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9A=E4=B8=AAGoogle=20API?= =?UTF-8?q?=E5=AF=86=E9=92=A5=E5=B9=B6=E5=AE=9E=E7=8E=B0=E9=99=90=E6=B5=81?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 47 +++++ env.example | 4 +- src/infrastructure/config.py | 2 +- src/infrastructure/external/gemini_client.py | 168 +++++++++++++----- .../external/key_pool_manager.py | 61 +++++++ 5 files changed, 236 insertions(+), 46 deletions(-) create mode 100644 Dockerfile create mode 100644 src/infrastructure/external/key_pool_manager.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e5a7ba2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,47 @@ +# 使用Python 3.11官方镜像作为基础镜像 +FROM python:3.11-slim + +# 设置工作目录 +WORKDIR /app + +# 设置环境变量 +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONPATH=/app/src + +# 安装系统依赖,包括ffmpeg +RUN apt-get update && apt-get install -y \ + build-essential \ + libpq-dev \ + curl \ + ffmpeg \ + && rm -rf /var/lib/apt/lists/* + +# 验证ffmpeg安装 +RUN ffmpeg -version + +# 复制requirements文件并安装Python依赖 +COPY requirements.txt . +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r requirements.txt + +# 复制项目文件 +COPY . . + +# 创建日志目录 +RUN mkdir -p /app/logs + +# 创建非root用户 +RUN useradd --create-home --shell /bin/bash app && \ + chown -R app:app /app +USER app + +# 暴露端口 +EXPOSE 8000 + +# 健康检查 +HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/docs || exit 1 + +# 启动命令 +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--app-dir", "src"] \ No newline at end of file diff --git a/env.example b/env.example index 4b9a731..63ad01f 100644 --- a/env.example +++ b/env.example @@ -8,6 +8,8 @@ QINIU_BUCKET_NAME="your_qiniu_bucket_name" QINIU_DOMAIN="your_qiniu_cdn_domain" # AI Models -GOOGLE_API_KEY="your_google_ai_api_key" +# Google API Keys,支持单个或多个key(用逗号分隔),用于处理限流重试 +GOOGLE_API_KEYS="your_google_ai_api_key" +# 多个key示例: GOOGLE_API_KEYS="key1,key2,key3" OPENROUTER_API_KEY="your_openrouter_api_key" OPENROUTER_BASE_URL="https://openrouter.ai/api/v1" diff --git a/src/infrastructure/config.py b/src/infrastructure/config.py index 689147b..8d42599 100644 --- a/src/infrastructure/config.py +++ b/src/infrastructure/config.py @@ -18,7 +18,7 @@ class Settings(BaseSettings): qiniu_domain: str # AI Models - google_api_key: str + google_api_keys: str openrouter_api_key: str openrouter_base_url: str = "https://openrouter.ai/api/v1" diff --git a/src/infrastructure/external/gemini_client.py b/src/infrastructure/external/gemini_client.py index d0cc607..d979ec4 100644 --- a/src/infrastructure/external/gemini_client.py +++ b/src/infrastructure/external/gemini_client.py @@ -10,17 +10,77 @@ from google import genai from google.genai import types from ..config import settings from ..utils import safe_json_loads +from .key_pool_manager import key_pool_manager from loguru import logger import ssl import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - -client = genai.Client(api_key=settings.google_api_key) - class GeminiClient: """Gemini AI客户端""" + def __init__(self): + self._current_client = None + self._refresh_client() + + def _refresh_client(self): + """刷新客户端,使用当前key""" + current_key = key_pool_manager.get_current_key() + self._current_client = genai.Client(api_key=current_key) + + def _execute_with_retry(self, func, *args, **kwargs): + """ + 执行函数并处理429错误重试 + + Args: + func: 要执行的函数 + *args: 函数参数 + **kwargs: 函数关键字参数 + + Returns: + 函数执行结果 + + Raises: + Exception: 所有key都尝试过后仍然失败 + """ + key_pool_manager.reset_to_first_key() # 每次请求都从第一个key开始 + self._refresh_client() + + last_exception = None + tried_keys = 0 + max_keys = len(key_pool_manager.get_all_keys()) + + while tried_keys < max_keys: + try: + tried_keys += 1 + logger.info(f"使用第{tried_keys}个key尝试请求") + return func(*args, **kwargs) + + except Exception as e: + last_exception = e + error_str = str(e).lower() + + # 检查是否是429错误 + if '429' in error_str or 'rate limit' in error_str or 'quota' in error_str: + logger.warning(f"遇到限流错误: {e}") + + # 如果还有更多key可以尝试 + if key_pool_manager.switch_to_next_key(): + self._refresh_client() + logger.info("切换到下一个key继续尝试") + continue + else: + logger.error("所有key都已尝试,仍然遇到限流") + break + else: + # 非429错误,直接抛出,不重试 + logger.error(f"遇到非限流错误,不重试: {e}") + raise e + + # 所有key都尝试过了,抛出最后一个异常 + logger.error(f"所有{max_keys}个key都已尝试,请求失败") + raise last_exception + def generate_image_from_prompt( self, prompt: str, @@ -36,7 +96,7 @@ class GeminiClient: Returns: 生成的图片二进制数据,失败返回None """ - try: + def _generate_image(): # 构建图片生成提示词 image_prompt = f''' Generate an image strictly according to the following prompt without any confirmation, questioning, or omission: @@ -56,13 +116,17 @@ class GeminiClient: contents.append(image_prompt) # 调用Gemini API生成图片 - response = client.models.generate_content( + response = self._current_client.models.generate_content( model='gemini-2.5-flash-image-preview', contents=contents, config=types.GenerateContentConfig( http_options=types.HttpOptions(timeout=30000) ) ) + return response + + try: + response = self._execute_with_retry(_generate_image) text = None image_base64 = None if hasattr(response, 'candidates') and response.candidates: @@ -92,7 +156,7 @@ class GeminiClient: Returns: 分析结果字典,包含name、description、tags """ - try: + def _analyze_image(): prompt = """ 请分析这张图片,并返回以下JSON格式的结果: { @@ -104,7 +168,7 @@ class GeminiClient: 请确保返回的是有效的JSON格式,不要包含其他文字。 """ - response = client.models.generate_content( + response = self._current_client.models.generate_content( model='gemini-2.5-flash', contents=[ prompt, @@ -114,6 +178,10 @@ class GeminiClient: ) ] ) + return response + + try: + response = self._execute_with_retry(_analyze_image) # 解析返回的JSON result_text = response.text @@ -141,25 +209,28 @@ class GeminiClient: Returns: 生成的视频二进制数据,失败返回None """ - try: + def _generate_video(): # 构建视频生成提示词 video_prompt = f"Create a video with the following prompt: {shot_prompt}" - image_input = types.Image(image_bytes=frame_image_bytes, mime_type="image/jpeg") # 调用Veo-3.0 API生成视频 - operation = client.models.generate_videos( + operation = self._current_client.models.generate_videos( model="veo-3.0-fast-generate-preview", #veo-3.0-generate-preview prompt=video_prompt, image=image_input ) + return operation + + try: + operation = self._execute_with_retry(_generate_video) # 轮询操作状态直到视频生成完成 logger.info("等待视频生成完成...") while not operation.done: time.sleep(2) - operation = client.operations.get(operation) + operation = self._current_client.operations.get(operation) # 下载生成的视频 @@ -172,7 +243,7 @@ class GeminiClient: raise Exception(operation.response.rai_media_filtered_reasons[0]) raise Exception("未知错误") - video_bytes = client.files.download(file=video.video) + video_bytes = self._current_client.files.download(file=video.video) logger.info("Veo-3.0视频生成成功") return video_bytes @@ -207,32 +278,35 @@ class GeminiClient: temp_video_path = temp_file.name try: - # 上传视频文件到Gemini - logger.info("正在上传视频到Gemini...") - myfile = client.files.upload(file=temp_video_path) - logger.info(f"视频上传成功,文件ID: {myfile.name}") - - # 等待文件变为ACTIVE状态 - logger.info("等待文件处理完成...") - max_wait_time = 300 # 最多等待5分钟 - wait_interval = 5 # 每5秒检查一次 - waited_time = 0 - - while waited_time < max_wait_time: - file_info = client.files.get(name=myfile.name) - logger.info(f"文件状态: {file_info.state}") + def _upload_and_analyze(): + # 上传视频文件到Gemini + logger.info("正在上传视频到Gemini...") + myfile = self._current_client.files.upload(file=temp_video_path) + logger.info(f"视频上传成功,文件ID: {myfile.name}") - if file_info.state == "ACTIVE": - logger.info("文件已准备就绪,开始分析") - break - elif file_info.state == "FAILED": - raise Exception("文件处理失败") + # 等待文件变为ACTIVE状态 + logger.info("等待文件处理完成...") + max_wait_time = 300 # 最多等待5分钟 + wait_interval = 5 # 每5秒检查一次 + waited_time = 0 - time.sleep(wait_interval) - waited_time += wait_interval - - if waited_time >= max_wait_time: - raise Exception("文件处理超时") + while waited_time < max_wait_time: + file_info = self._current_client.files.get(name=myfile.name) + logger.info(f"文件状态: {file_info.state}") + + if file_info.state == "ACTIVE": + logger.info("文件已准备就绪,开始分析") + break + elif file_info.state == "FAILED": + raise Exception("文件处理失败") + + time.sleep(wait_interval) + waited_time += wait_interval + + if waited_time >= max_wait_time: + raise Exception("文件处理超时") + + return myfile # 构建分析提示词 analysis_prompt = """ @@ -266,15 +340,21 @@ class GeminiClient: 6. 确保返回的是有效的JSON格式,不要包含其他文字 """ - # 调用Gemini分析视频 - logger.info("正在分析视频内容...") - response = client.models.generate_content( - model="gemini-2.5-flash", - contents=[myfile, analysis_prompt], - config=types.GenerateContentConfig( - http_options=types.HttpOptions(timeout=120000) # 2分钟超时 + myfile = self._execute_with_retry(_upload_and_analyze) + + def _analyze_video_content(): + # 调用Gemini分析视频 + logger.info("正在分析视频内容...") + response = self._current_client.models.generate_content( + model="gemini-2.5-flash", + contents=[myfile, analysis_prompt], + config=types.GenerateContentConfig( + http_options=types.HttpOptions(timeout=120000) # 2分钟超时 + ) ) - ) + return response + + response = self._execute_with_retry(_analyze_video_content) # 解析返回的JSON result_text = response.text diff --git a/src/infrastructure/external/key_pool_manager.py b/src/infrastructure/external/key_pool_manager.py new file mode 100644 index 0000000..255b215 --- /dev/null +++ b/src/infrastructure/external/key_pool_manager.py @@ -0,0 +1,61 @@ +from typing import List, Optional, Iterator +from loguru import logger +import threading +from ..config import settings + + +class KeyPoolManager: + """Google API Key池管理器,用于处理限流重试""" + + def __init__(self): + self._keys: List[str] = [] + self._current_index = 0 + self._lock = threading.Lock() + self._initialize_keys() + + def _initialize_keys(self): + """初始化key池""" + # 解析key字符串,支持单个或多个key + self._keys = [key.strip() for key in settings.google_api_keys.split(',') if key.strip()] + if not self._keys: + raise ValueError("GOOGLE_API_KEYS不能为空") + logger.info(f"初始化Google API Key池,共{len(self._keys)}个key") + + def get_current_key(self) -> str: + """获取当前key""" + with self._lock: + if not self._keys: + raise ValueError("没有可用的Google API Key") + return self._keys[self._current_index] + + def switch_to_next_key(self) -> bool: + """切换到下一个key + + Returns: + bool: 如果还有下一个key返回True,否则返回False + """ + with self._lock: + if len(self._keys) <= 1: + return False + + self._current_index = (self._current_index + 1) % len(self._keys) + logger.info(f"切换到下一个Google API Key,当前索引: {self._current_index}") + return True + + def reset_to_first_key(self): + """重置到第一个key""" + with self._lock: + self._current_index = 0 + logger.info("重置到第一个Google API Key") + + def get_all_keys(self) -> List[str]: + """获取所有key(用于测试)""" + return self._keys.copy() + + def has_multiple_keys(self) -> bool: + """是否有多个key""" + return len(self._keys) > 1 + + +# 全局key池管理器实例 +key_pool_manager = KeyPoolManager()