Compare commits

..

2 Commits

12 changed files with 260 additions and 87 deletions

1
.gitignore vendored
View File

@ -52,3 +52,4 @@ Thumbs.db
# Temporary files
*.tmp
*.temp
.qiniu_pythonsdk_hostscache.json

47
Dockerfile Normal file
View File

@ -0,0 +1,47 @@
# 使用Python 3.11官方镜像作为基础镜像
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app/src
# 安装系统依赖包括ffmpeg
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
curl \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
# 验证ffmpeg安装
RUN ffmpeg -version
# 复制requirements文件并安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 创建日志目录
RUN mkdir -p /app/logs
# 创建非root用户
RUN useradd --create-home --shell /bin/bash app && \
chown -R app:app /app
USER app
# 暴露端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/docs || exit 1
# 启动命令 - 使用多worker提升并发性能
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--app-dir", "src", "--workers", "4", "--loop", "uvloop"]

View File

@ -8,6 +8,8 @@ QINIU_BUCKET_NAME="your_qiniu_bucket_name"
QINIU_DOMAIN="your_qiniu_cdn_domain"
# AI Models
GOOGLE_API_KEY="your_google_ai_api_key"
# Google API Keys支持单个或多个key用逗号分隔用于处理限流重试
GOOGLE_API_KEYS="your_google_ai_api_key"
# 多个key示例: GOOGLE_API_KEYS="key1,key2,key3"
OPENROUTER_API_KEY="your_openrouter_api_key"
OPENROUTER_BASE_URL="https://openrouter.ai/api/v1"

View File

@ -90,8 +90,8 @@ class AIServiceImpl(AIService):
async def generate_video(self, frame_image_bytes: bytes, shot_prompt: str):
return await self.gemini_client.generate_video(frame_image_bytes, shot_prompt)
async def analyze_video(self, video_url: str):
return await self.gemini_client.analyze_video(video_url)
async def analyze_video(self, video_url: str, prompt_template: str):
return await self.gemini_client.analyze_video(video_url, prompt_template)
class StorageServiceImpl(StorageService):

View File

@ -434,8 +434,14 @@ async def replicate_from_video(
):
"""一键复刻从视频URL生成项目、素材和分镜"""
try:
# 视频分析提示词模板不需要特定的占位符变量因为视频内容直接传给AI模型
# 这里可以添加其他模板格式验证逻辑,如果需要的话
# 调用业务逻辑
result = await project_use_cases.replicate_from_video(request.video_url)
result = await project_use_cases.replicate_from_video(
request.video_url,
request.prompt_template
)
project = result["project"]
assets = result["assets"]

View File

@ -157,6 +157,7 @@ class ComposeVideoResponse(BaseModel):
class VideoReplicateRequest(BaseModel):
"""一键复刻请求模式"""
video_url: str = Field(..., description="要复刻的视频URL")
prompt_template: str = Field(..., description="视频分析提示词模板")
# 更新引用

View File

@ -590,12 +590,13 @@ class ProjectUseCases:
logger.error(f"生成视频失败: {e}")
raise ValueError(f"生成视频失败: {e}")
async def replicate_from_video(self, video_url: str) -> Dict[str, Any]:
async def replicate_from_video(self, video_url: str, prompt_template: str) -> Dict[str, Any]:
"""
一键复刻从视频URL生成项目素材和分镜
Args:
video_url: 要复刻的视频URL
prompt_template: 视频分析提示词模板
Returns:
包含projectassetsstoryboards的字典
@ -605,7 +606,7 @@ class ProjectUseCases:
# 1. 使用Gemini分析视频内容
logger.info("正在分析视频内容...")
analysis_result = await self.ai_service.analyze_video(video_url)
analysis_result = await self.ai_service.analyze_video(video_url, prompt_template)
if not analysis_result:
raise ValueError("视频分析失败")

View File

@ -110,12 +110,13 @@ class AIService(ABC):
pass
@abstractmethod
async def analyze_video(self, video_url: str) -> Optional[Dict[str, Any]]:
async def analyze_video(self, video_url: str, prompt_template: str) -> Optional[Dict[str, Any]]:
"""
分析视频内容提取关键素材帧和分镜关键帧
Args:
video_url: 视频URL
prompt_template: 视频分析提示词模板
Returns:
分析结果字典包含key_assets_frames和key_storyboard_frames

View File

@ -18,7 +18,7 @@ class Settings(BaseSettings):
qiniu_domain: str
# AI Models
google_api_key: str
google_api_keys: str
openrouter_api_key: str
openrouter_base_url: str = "https://openrouter.ai/api/v1"

View File

@ -10,17 +10,78 @@ from google import genai
from google.genai import types
from ..config import settings
from ..utils import safe_json_loads
from .key_pool_manager import key_pool_manager
from ..services.template_service import TemplateService
from loguru import logger
import ssl
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
client = genai.Client(api_key=settings.google_api_key)
class GeminiClient:
"""Gemini AI客户端"""
def __init__(self):
self._current_client = None
self._refresh_client()
def _refresh_client(self):
"""刷新客户端使用当前key"""
current_key = key_pool_manager.get_current_key()
self._current_client = genai.Client(api_key=current_key)
def _execute_with_retry(self, func, *args, **kwargs):
"""
执行函数并处理429错误重试
Args:
func: 要执行的函数
*args: 函数参数
**kwargs: 函数关键字参数
Returns:
函数执行结果
Raises:
Exception: 所有key都尝试过后仍然失败
"""
key_pool_manager.reset_to_first_key() # 每次请求都从第一个key开始
self._refresh_client()
last_exception = None
tried_keys = 0
max_keys = len(key_pool_manager.get_all_keys())
while tried_keys < max_keys:
try:
tried_keys += 1
logger.info(f"使用第{tried_keys}个key尝试请求")
return func(*args, **kwargs)
except Exception as e:
last_exception = e
error_str = str(e).lower()
# 检查是否是429错误
if '429' in error_str or 'rate limit' in error_str or 'quota' in error_str:
logger.warning(f"遇到限流错误: {e}")
# 如果还有更多key可以尝试
if key_pool_manager.switch_to_next_key():
self._refresh_client()
logger.info("切换到下一个key继续尝试")
continue
else:
logger.error("所有key都已尝试仍然遇到限流")
break
else:
# 非429错误直接抛出不重试
logger.error(f"遇到非限流错误,不重试: {e}")
raise e
# 所有key都尝试过了抛出最后一个异常
logger.error(f"所有{max_keys}个key都已尝试请求失败")
raise last_exception
def generate_image_from_prompt(
self,
prompt: str,
@ -36,7 +97,7 @@ class GeminiClient:
Returns:
生成的图片二进制数据失败返回None
"""
try:
def _generate_image():
# 构建图片生成提示词
image_prompt = f'''
Generate an image strictly according to the following prompt without any confirmation, questioning, or omission:
@ -56,13 +117,17 @@ class GeminiClient:
contents.append(image_prompt)
# 调用Gemini API生成图片
response = client.models.generate_content(
response = self._current_client.models.generate_content(
model='gemini-2.5-flash-image-preview',
contents=contents,
config=types.GenerateContentConfig(
http_options=types.HttpOptions(timeout=30000)
)
)
return response
try:
response = self._execute_with_retry(_generate_image)
text = None
image_base64 = None
if hasattr(response, 'candidates') and response.candidates:
@ -92,7 +157,7 @@ class GeminiClient:
Returns:
分析结果字典包含namedescriptiontags
"""
try:
def _analyze_image():
prompt = """
请分析这张图片并返回以下JSON格式的结果
{
@ -104,7 +169,7 @@ class GeminiClient:
请确保返回的是有效的JSON格式不要包含其他文字
"""
response = client.models.generate_content(
response = self._current_client.models.generate_content(
model='gemini-2.5-flash',
contents=[
prompt,
@ -114,6 +179,10 @@ class GeminiClient:
)
]
)
return response
try:
response = self._execute_with_retry(_analyze_image)
# 解析返回的JSON
result_text = response.text
@ -141,25 +210,28 @@ class GeminiClient:
Returns:
生成的视频二进制数据失败返回None
"""
try:
def _generate_video():
# 构建视频生成提示词
video_prompt = f"Create a video with the following prompt: {shot_prompt}"
image_input = types.Image(image_bytes=frame_image_bytes, mime_type="image/jpeg")
# 调用Veo-3.0 API生成视频
operation = client.models.generate_videos(
model="veo-3.0-fast-generate-preview", #veo-3.0-generate-preview
operation = self._current_client.models.generate_videos(
model="veo-3.0-generate-preview", #veo-3.0-fast-generate-preview
prompt=video_prompt,
image=image_input
)
return operation
try:
operation = self._execute_with_retry(_generate_video)
# 轮询操作状态直到视频生成完成
logger.info("等待视频生成完成...")
while not operation.done:
time.sleep(2)
operation = client.operations.get(operation)
operation = self._current_client.operations.get(operation)
# 下载生成的视频
@ -172,7 +244,7 @@ class GeminiClient:
raise Exception(operation.response.rai_media_filtered_reasons[0])
raise Exception("未知错误")
video_bytes = client.files.download(file=video.video)
video_bytes = self._current_client.files.download(file=video.video)
logger.info("Veo-3.0视频生成成功")
return video_bytes
@ -181,12 +253,13 @@ class GeminiClient:
logger.error(f"Veo-3.0视频生成失败: {e}")
raise e
async def analyze_video(self, video_url: str) -> Optional[Dict[str, Any]]:
async def analyze_video(self, video_url: str, prompt_template: str) -> Optional[Dict[str, Any]]:
"""
分析视频内容提取关键素材帧和分镜关键帧
Args:
video_url: 视频URL
prompt_template: 视频分析提示词模板
Returns:
分析结果字典包含key_assets_frames和key_storyboard_frames
@ -207,74 +280,54 @@ class GeminiClient:
temp_video_path = temp_file.name
try:
# 上传视频文件到Gemini
logger.info("正在上传视频到Gemini...")
myfile = client.files.upload(file=temp_video_path)
logger.info(f"视频上传成功文件ID: {myfile.name}")
# 等待文件变为ACTIVE状态
logger.info("等待文件处理完成...")
max_wait_time = 300 # 最多等待5分钟
wait_interval = 5 # 每5秒检查一次
waited_time = 0
while waited_time < max_wait_time:
file_info = client.files.get(name=myfile.name)
logger.info(f"文件状态: {file_info.state}")
def _upload_and_analyze():
# 上传视频文件到Gemini
logger.info("正在上传视频到Gemini...")
myfile = self._current_client.files.upload(file=temp_video_path)
logger.info(f"视频上传成功文件ID: {myfile.name}")
if file_info.state == "ACTIVE":
logger.info("文件已准备就绪,开始分析")
break
elif file_info.state == "FAILED":
raise Exception("文件处理失败")
# 等待文件变为ACTIVE状态
logger.info("等待文件处理完成...")
max_wait_time = 300 # 最多等待5分钟
wait_interval = 5 # 每5秒检查一次
waited_time = 0
time.sleep(wait_interval)
waited_time += wait_interval
while waited_time < max_wait_time:
file_info = self._current_client.files.get(name=myfile.name)
logger.info(f"文件状态: {file_info.state}")
if file_info.state == "ACTIVE":
logger.info("文件已准备就绪,开始分析")
break
elif file_info.state == "FAILED":
raise Exception("文件处理失败")
time.sleep(wait_interval)
waited_time += wait_interval
if waited_time >= max_wait_time:
raise Exception("文件处理超时")
return myfile
if waited_time >= max_wait_time:
raise Exception("文件处理超时")
# 直接使用提示词模板作为分析提示词
analysis_prompt = prompt_template
# 构建分析提示词
analysis_prompt = """
请仔细分析这个视频并返回以下JSON格式的结果
{
"title": "为这个视频生成一个简洁有吸引力的标题不超过20个字符",
"script": "根据视频内容生成的完整剧本,包含对话、动作、场景描述等",
"key_assets_frames": [
{
"timestamp": "HH:MM:SS",
"name": "素材名称",
"description": "素材描述",
"tags": ["标签1", "标签2"]
}
],
"key_storyboard_frames": [
{
"timestamp": "HH:MM:SS",
"frame_prompt": "该帧画面描述",
"shot_prompt": "该关键帧到下一关键帧之间的剧情描述"
}
]
}
myfile = self._execute_with_retry(_upload_and_analyze)
要求
1. title: 根据视频主题和内容生成简洁有吸引力的标题要能概括视频核心内容不超过20个字符
2. script: 根据视频内容生成完整的剧本包含场景描述角色对话动作指导等要生动详细
3. key_assets_frames: 提取3-5个关键素材帧包含视觉元素如角色场景道具动物等
4. key_storyboard_frames: 提取分镜关键帧约每8秒一帧
5. timestamp格式必须是HH:MM:SS
6. 确保返回的是有效的JSON格式不要包含其他文字
"""
# 调用Gemini分析视频
logger.info("正在分析视频内容...")
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=[myfile, analysis_prompt],
config=types.GenerateContentConfig(
http_options=types.HttpOptions(timeout=120000) # 2分钟超时
def _analyze_video_content():
# 调用Gemini分析视频
logger.info("正在分析视频内容...")
response = self._current_client.models.generate_content(
model="gemini-2.5-flash",
contents=[myfile, analysis_prompt],
config=types.GenerateContentConfig(
http_options=types.HttpOptions(timeout=120000) # 2分钟超时
)
)
)
return response
response = self._execute_with_retry(_analyze_video_content)
# 解析返回的JSON
result_text = response.text

View File

@ -0,0 +1,61 @@
from typing import List, Optional, Iterator
from loguru import logger
import threading
from ..config import settings
class KeyPoolManager:
"""Google API Key池管理器用于处理限流重试"""
def __init__(self):
self._keys: List[str] = []
self._current_index = 0
self._lock = threading.Lock()
self._initialize_keys()
def _initialize_keys(self):
"""初始化key池"""
# 解析key字符串支持单个或多个key
self._keys = [key.strip() for key in settings.google_api_keys.split(',') if key.strip()]
if not self._keys:
raise ValueError("GOOGLE_API_KEYS不能为空")
logger.info(f"初始化Google API Key池{len(self._keys)}个key")
def get_current_key(self) -> str:
"""获取当前key"""
with self._lock:
if not self._keys:
raise ValueError("没有可用的Google API Key")
return self._keys[self._current_index]
def switch_to_next_key(self) -> bool:
"""切换到下一个key
Returns:
bool: 如果还有下一个key返回True否则返回False
"""
with self._lock:
if len(self._keys) <= 1:
return False
self._current_index = (self._current_index + 1) % len(self._keys)
logger.info(f"切换到下一个Google API Key当前索引: {self._current_index}")
return True
def reset_to_first_key(self):
"""重置到第一个key"""
with self._lock:
self._current_index = 0
logger.info("重置到第一个Google API Key")
def get_all_keys(self) -> List[str]:
"""获取所有key用于测试"""
return self._keys.copy()
def has_multiple_keys(self) -> bool:
"""是否有多个key"""
return len(self._keys) > 1
# 全局key池管理器实例
key_pool_manager = KeyPoolManager()

View File

@ -21,7 +21,7 @@ class OpenRouterClient:
async def generate_text(
self,
prompt: str,
model: str = "anthropic/claude-3.5-sonnet"
model: str = "google/gemini-2.5-pro"
) -> Optional[str]:
"""
生成文本内容
@ -66,7 +66,7 @@ class OpenRouterClient:
self,
prompt_template: str,
script_or_idea: str,
model: str = "anthropic/claude-3.5-sonnet"
model: str = "google/gemini-2.5-pro"
) -> Optional[Dict[str, Any]]:
"""
生成完整剧本和素材信息
@ -107,7 +107,7 @@ class OpenRouterClient:
text: str,
source_lang: str = "zh",
target_lang: str = "en",
model: str = "anthropic/claude-3.5-sonnet"
model: str = "google/gemini-2.5-pro"
) -> Optional[str]:
"""
翻译文本