Commit 47c4d836 by 刘鑫

Add new file

parent b6f81242
#!/usr/bin/env python3
import asyncio
import base64
import argparse
from playwright.async_api import async_playwright
from fastapi.responses import JSONResponse
import time
import re, json
import uuid
import uvicorn
from fastapi import FastAPI, Query, HTTPException
from pydantic import BaseModel, HttpUrl
from typing import Optional
import requests
import os
import mimetypes
from typing import Dict, Optional, Union, Tuple
from urllib.parse import quote
# from screenshot import capture_screenshot
async def capture_screenshot(url, width=1280, height=800, save_path=None):
"""
Capture a screenshot of a webpage and return as base64 encoded string.
Args:
url (str): The URL to capture
width (int): Viewport width
height (int): Viewport height
Returns:
str: Base64 encoded screenshot data
"""
timestamp = time.time()
timestamp = str(timestamp)
timestamp = re.sub(r"\.", "_", timestamp)
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page(viewport={'width': width, 'height': height})
try:
await page.goto(url, wait_until='networkidle')
except Exception as e:
await page.goto(url, wait_until='load')
screenshot_bytes = await page.screenshot(full_page=True, path=save_path)
await browser.close()
# Convert to base64
base64_screenshot = base64.b64encode(screenshot_bytes).decode('utf-8')
# base64_screenshot = screenshot_bytes
# 保存图片
if save_path:
with open(save_path, 'wb') as f:
f.write(screenshot_bytes)
# print(f"Screenshot saved to {args.output}")
return base64_screenshot
app = FastAPI(title="Screenshot Service")
class Input(BaseModel):
url: str = ""
width: int = 1280
height: int = 800
class ScreenshotResponse(BaseModel):
url: str
base64_image: str
width: int
height: int
class OBSUploader:
def __init__(self, base_url: str = "https://open.raysgo.com", auth_token: Optional[str] = None):
"""
Initialize the OBS uploader.
Args:
base_url: The base URL for the API
auth_token: The authorization token for API access
"""
self.base_url = base_url.rstrip('/')
self.auth_token = auth_token
self.headers = {
'Authorization': f'Bearer {auth_token}' if auth_token else None
}
# Initialize mimetypes
mimetypes.init()
def _get_content_type(self, file_path: Union[str, bytes]) -> Tuple[str, bytes]:
"""
Get content type and file content from file path or bytes.
Args:
file_path: Path to the file or file content as bytes
Returns:
Tuple of (content_type, file_content)
"""
if isinstance(file_path, str):
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
content_type, _ = mimetypes.guess_type(file_path)
with open(file_path, 'rb') as f:
file_content = f.read()
else:
file_content = file_path
# For bytes input, try to detect type from first few bytes
content_type = 'application/octet-stream' # Default content type
return content_type or 'application/octet-stream', file_content
def get_upload_url(self, biz_code: str, object_name: str, content_type: str) -> Dict:
"""
Get a temporary upload URL for the specified object.
Args:
biz_code: Business code for the upload
object_name: Name/path of the object to upload
content_type: MIME type of the file
Returns:
Dict containing the upload URL and related information
"""
endpoint = f"{self.base_url}/aimodel/v1.0/obs/getCreatePostSignature"
params = {
'bizCode': biz_code,
'objectName': object_name,
'mimeType': content_type
}
response = requests.get(endpoint, params=params, headers=self.headers)
response.raise_for_status()
return response.json()
def upload_file(self, file_path: Union[str, bytes], biz_code: str, object_name: str) -> Dict:
"""
Upload a file using temporary credentials.
Args:
file_path: Path to the file to upload or file content as bytes
biz_code: Business code for the upload
object_name: Name/path of the object to upload
Returns:
Dict containing the upload result and file URL
"""
# Get content type and file content
content_type, file_content = self._get_content_type(file_path)
# Get temporary upload URL with content type
upload_info = self.get_upload_url(biz_code, object_name, content_type)
if upload_info['errCode'] != 0:
raise Exception(f"Failed to get upload URL: {upload_info['message']}")
upload_url = upload_info['data']['temporarySignatureUrl']
# Upload the file with the correct content type
headers = {
'Content-Type': content_type,
'Content-Length': str(len(file_content))
}
response = requests.put(upload_url, data=file_content, headers=headers)
response.raise_for_status()
return {
'success': True,
'file_url': upload_info['data']['domain'] + '/' + object_name,
'object_url_map': upload_info['data']['objectUrlMap']
}
@app.post("/screenshot/")
async def get_screenshot(input: Input):
save_images_path = "images"
if not os.path.exists(save_images_path):
# 创建单级或多级目录(自动处理父目录)
os.makedirs(save_images_path)
file_md5 = uuid.uuid4().hex
try:
base64_image = await capture_screenshot(input.url, width=input.width, height=input.height)
res = {}
# 保存base64图片
base64_image = base64.b64decode(base64_image)
save_file = f"screenshot_{file_md5}.jpg"
save_file = os.path.join(save_images_path, save_file)
with open(save_file, "wb") as f:
f.write(base64_image)
uploader = OBSUploader(auth_token="dcg-4c1e3a7f4fcd415e8c93151ff539d20a")
# Upload a file 上传图片方便浏览器查看
try:
result = uploader.upload_file(
file_path=save_file,
biz_code="test",
object_name=f"screenshot/{uuid.uuid4().hex}.jpg"
)
print(f"File uploaded successfully! URL: {result['file_url']}")
res["obs_url"] = result["file_url"]
except Exception as e:
print(f"Upload failed: {str(e)}")
res["obs_url"] = ''
res = JSONResponse(status_code=200, content=res)
except Exception as e:
res["obs_url"] = ''
res = JSONResponse(status_code=500, content={"message": str(e)})
return res
@app.get("/")
async def root():
return {
"service": "Screenshot Service",
"usage": "GET /screenshot?url=https://example.com&width=1280&height=800"
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=19801)
# 对指定网页进行截屏
# 部署的服务网 116.63.110.220
# sudo docker run -itd --name playwright -p 19801:19801 -v /home/liuxin/work:/home/work playwright:v1.2 /bin/bash
# sudo docker exec -it playwright bash
# cd /home/work/AI_planner/screenshot
# nohup python -u screenshot_service.py > screenshot_service.log 2>&1 & # 启动服务
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment