Commit 6d5baff6 by unknown

init

parents
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,re,json,sys
__path__ = os.path.dirname(os.path.abspath(__file__))
sys.path.append(__path__)
sys.path.append(os.path.join(__path__, 'web_search_source'))
import uvicorn
import asyncio
import logging
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import BaseModel
from save_es_database import ESsearch
from web_search_source.web_search_resource import webSearchResource
def setup_logger():
# 创建logger对象
logger = logging.getLogger('AI_planner')
logger.setLevel(logging.INFO)
# 确保日志目录存在
__path__ = os.path.dirname(os.path.abspath(__file__))
log_dir = os.path.join(__path__, "log")
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, "AI_planner.log")
# 创建文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# 创建控制台处理器
# console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.INFO)
# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# console_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(file_handler)
# logger.addHandler(console_handler)
return logger
# 创建全局logger实例
logger_es = setup_logger()
class Item(BaseModel):
tableName: str = "" # sop表名称
bookCategory: str = "" # 图书类别:K12,童书,其他图书
schoolStage: str = "" # 学段 “初中” “小学” “高中”
studentGrade: str = "" # 年级
subjectCategory: str = "" # 学科 语文 数学
bookVersion: str = "" # 版本 如 人教版 通用版
purpose: str = "" # 目标读者的目的
keyword: str = "" # 需要查询的关键字
class ItemWebSearch(BaseModel):
bookName: str = "" # 书名
bookClassify: str = "" # 图书类别
author: str = "" # 图书作者
introduction: str = "" # 图书简介
app = FastAPI()
es_search = ESsearch(hosts=['http://localhost:9200'])
# 中文翻译为朝鲜文相关api接口
# 1、心跳检测
@app.get("/health/")
async def health():
res = JSONResponse(status_code=200, content={"message": "no AI source up."})
return res
# 2、es中 搜索 非AI资源
@app.post("/no_ai_source/")
async def no_ai_source(input: Item):
# print("input: ",input)
"""
接收一字典参数,返回es搜索的 非AI资源清单
"""
try:
input = json.loads(input.json())
es_res = es_search.search(input)
logger_es.info(f"ES search no_ai_source input : {input} ; \nes_search : {json.dumps(es_res, ensure_ascii=False, indent=4)}")
res = JSONResponse(status_code=200, content=es_res)
except Exception as e:
logger_es.error(f" no_ai_source input : {input}; error message : {e}")
res = JSONResponse(
status_code=500,
content={"message": str(e)},
)
return res
if __name__ == "__main__":
uvicorn.run(app="api_service:app", host="0.0.0.0", port=9860, workers=1) # 部署的服务是 9860
# 启动api服务 116.63.110.220 服务器
# netstat -ntlp | grep 9860
# cd /home/liuxin/work/AI_planner
# conda activate translate
# nohup python -u api_service.py > log/api_service.log 2>&1 &
# tail -f log/api_service.log
# tail -f log/AI_planner.log
# uvicorn api_service:app --host 0.0.0.0 --port 9860 --workers 1
This source diff could not be displayed because it is too large. You can view the blob instead.
import re, json, sys , os
import pandas as pd
# 非AI项目 资源清单 数据库 每条数据转为json字符串
def knowledge_json(file, sheet_names:list, save_file):
length_record = []
save_jsonl_data = []
for sheet_name in sheet_names:
data = pd.read_excel(file, sheet_name=sheet_name, keep_default_na=False)
data = data.to_dict(orient='records')
for line in data:
if "序号" in line:
line.pop("序号")
try:
line = json.dumps(line, ensure_ascii=False)
line = re.sub("\n", "", line)
line = re.sub("\t", "", line)
length_record.append(len(line))
if len(line) > 3000:
print(line)
save_jsonl_data.append(line)
except:
print(line)
res = "\n".join(save_jsonl_data)
with open(save_file, 'w', encoding='utf-8') as f:
f.write(res)
print(f"max length: {max(length_record)}")
if __name__ == "__main__":
file = r"D:\0_shu_chuan_work\work\AI_planner\data\非AI工具.xlsx"
save_file = r"D:\0_shu_chuan_work\work\AI_planner\data\伴学工具0.txt" # max length: 157
sheet_names = ['伴学工具20250214'] # max length: 157
sheet_names = ['第三方自有资源评级详表'] # max length: 429
sheet_names = ['品牌资源(爱奇艺+慕课)'] # max length: 333
sheet_names = ['小睿资讯服务'] # max length: 2916
sheet_names = ['测评库资源'] # max length: 259
knowledge_json(file, sheet_names, save_file)
print("finished.")
#!/usr/bin/env python3
import asyncio
import base64
import argparse
from playwright.async_api import async_playwright
from fastapi.responses import JSONResponse
import time
import re, json
import uuid
import uvicorn
from fastapi import FastAPI, Query, HTTPException
from pydantic import BaseModel, HttpUrl
from typing import Optional
import requests
import os
import mimetypes
from typing import Dict, Optional, Union, Tuple
from urllib.parse import quote
# from screenshot import capture_screenshot
async def capture_screenshot(url, width=1280, height=800, save_path=None):
"""
Capture a screenshot of a webpage and return as base64 encoded string.
Args:
url (str): The URL to capture
width (int): Viewport width
height (int): Viewport height
Returns:
str: Base64 encoded screenshot data
"""
timestamp = time.time()
timestamp = str(timestamp)
timestamp = re.sub(r"\.", "_", timestamp)
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page(viewport={'width': width, 'height': height})
try:
await page.goto(url, wait_until='networkidle')
except Exception as e:
await page.goto(url, wait_until='load')
screenshot_bytes = await page.screenshot(full_page=True, path=save_path)
await browser.close()
# Convert to base64
base64_screenshot = base64.b64encode(screenshot_bytes).decode('utf-8')
# base64_screenshot = screenshot_bytes
# 保存图片
if save_path:
with open(save_path, 'wb') as f:
f.write(screenshot_bytes)
# print(f"Screenshot saved to {args.output}")
return base64_screenshot
app = FastAPI(title="Screenshot Service")
class Input(BaseModel):
url: str = ""
width: int = 1280
height: int = 800
class ScreenshotResponse(BaseModel):
url: str
base64_image: str
width: int
height: int
class OBSUploader:
def __init__(self, base_url: str = "https://open.raysgo.com", auth_token: Optional[str] = None):
"""
Initialize the OBS uploader.
Args:
base_url: The base URL for the API
auth_token: The authorization token for API access
"""
self.base_url = base_url.rstrip('/')
self.auth_token = auth_token
self.headers = {
'Authorization': f'Bearer {auth_token}' if auth_token else None
}
# Initialize mimetypes
mimetypes.init()
def _get_content_type(self, file_path: Union[str, bytes]) -> Tuple[str, bytes]:
"""
Get content type and file content from file path or bytes.
Args:
file_path: Path to the file or file content as bytes
Returns:
Tuple of (content_type, file_content)
"""
if isinstance(file_path, str):
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
content_type, _ = mimetypes.guess_type(file_path)
with open(file_path, 'rb') as f:
file_content = f.read()
else:
file_content = file_path
# For bytes input, try to detect type from first few bytes
content_type = 'application/octet-stream' # Default content type
return content_type or 'application/octet-stream', file_content
def get_upload_url(self, biz_code: str, object_name: str, content_type: str) -> Dict:
"""
Get a temporary upload URL for the specified object.
Args:
biz_code: Business code for the upload
object_name: Name/path of the object to upload
content_type: MIME type of the file
Returns:
Dict containing the upload URL and related information
"""
endpoint = f"{self.base_url}/aimodel/v1.0/obs/getCreatePostSignature"
params = {
'bizCode': biz_code,
'objectName': object_name,
'mimeType': content_type
}
response = requests.get(endpoint, params=params, headers=self.headers)
response.raise_for_status()
return response.json()
def upload_file(self, file_path: Union[str, bytes], biz_code: str, object_name: str) -> Dict:
"""
Upload a file using temporary credentials.
Args:
file_path: Path to the file to upload or file content as bytes
biz_code: Business code for the upload
object_name: Name/path of the object to upload
Returns:
Dict containing the upload result and file URL
"""
# Get content type and file content
content_type, file_content = self._get_content_type(file_path)
# Get temporary upload URL with content type
upload_info = self.get_upload_url(biz_code, object_name, content_type)
if upload_info['errCode'] != 0:
raise Exception(f"Failed to get upload URL: {upload_info['message']}")
upload_url = upload_info['data']['temporarySignatureUrl']
# Upload the file with the correct content type
headers = {
'Content-Type': content_type,
'Content-Length': str(len(file_content))
}
response = requests.put(upload_url, data=file_content, headers=headers)
response.raise_for_status()
return {
'success': True,
'file_url': upload_info['data']['domain'] + '/' + object_name,
'object_url_map': upload_info['data']['objectUrlMap']
}
@app.post("/screenshot/")
async def get_screenshot(input: Input):
save_images_path = "images"
if not os.path.exists(save_images_path):
# 创建单级或多级目录(自动处理父目录)
os.makedirs(save_images_path)
file_md5 = uuid.uuid4().hex
try:
base64_image = await capture_screenshot(input.url, width=input.width, height=input.height)
res = {}
# 保存base64图片
base64_image = base64.b64decode(base64_image)
save_file = f"screenshot_{file_md5}.jpg"
save_file = os.path.join(save_images_path, save_file)
with open(save_file, "wb") as f:
f.write(base64_image)
uploader = OBSUploader(auth_token="dcg-4c1e3a7f4fcd415e8c93151ff539d20a")
# Upload a file 上传图片方便浏览器查看
try:
result = uploader.upload_file(
file_path=save_file,
biz_code="test",
object_name=f"screenshot/{uuid.uuid4().hex}.jpg"
)
print(f"File uploaded successfully! URL: {result['file_url']}")
res["obs_url"] = result["file_url"]
except Exception as e:
print(f"Upload failed: {str(e)}")
res["obs_url"] = ''
res = JSONResponse(status_code=200, content=res)
except Exception as e:
res["obs_url"] = ''
res = JSONResponse(status_code=500, content={"message": str(e)})
return res
@app.get("/")
async def root():
return {
"service": "Screenshot Service",
"usage": "GET /screenshot?url=https://example.com&width=1280&height=800"
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=19801)
# 对指定网页进行截屏
# 部署的服务器 116.63.110.220
# sudo docker run -itd --name playwright -p 19801:19801 -v /home/liuxin/work:/home/work playwright:v1.2 /bin/bash
# sudo docker exec -it playwright bash
# cd /home/work/AI_planner/screenshot
# nohup python -u screenshot_service.py > screenshot_service.log 2>&1 & # 启动服务
\ No newline at end of file
import requests
import json
from urllib.parse import quote
import base64
from PIL import Image
import io
# API 端点 URL(请替换为实际接口地址)
# api_url = "http://116.63.110.220:19801/screenshot"
api_url = "http://localhost:19801/screenshot"
# 请求参数
url = "https://www.icourse163.org/course/WUST-1206144803?from=searchPage&outVendor=zw_mooc_pcssjg_"
url = "https://www.baidu.com/"
params = {
"url": url, # 需要截图的目标 URL,
"width": 1290, # 指定宽度
"height": 700 # 指定高度
}
params = json.dumps(params, ensure_ascii=False)
try:
# 发送 GET 请求
response = requests.post(api_url, data=params)
# 检查响应状态码
if response.status_code == 200:
data = response.json()
obs_url = data['obs_url']
print("目标网站截屏保存地址:", obs_url)
else:
print("error")
except Exception as e:
print(f"发生异常: {str(e)}")
\ No newline at end of file
#!/usr/bin/env python3
import asyncio
import base64
import argparse
from playwright.async_api import async_playwright
from fastapi.responses import JSONResponse
import time
import re, json
import uvicorn
from fastapi import FastAPI, Query, HTTPException
from pydantic import BaseModel, HttpUrl
from typing import Optional
# from screenshot import capture_screenshot
async def capture_screenshot(url, width=1280, height=800, save_path=None):
"""
Capture a screenshot of a webpage and return as base64 encoded string.
Args:
url (str): The URL to capture
width (int): Viewport width
height (int): Viewport height
Returns:
str: Base64 encoded screenshot data
"""
timestamp = time.time()
timestamp = str(timestamp)
timestamp = re.sub(r"\.", "_", timestamp)
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page(viewport={'width': width, 'height': height})
try:
await page.goto(url, wait_until='networkidle')
except Exception as e:
await page.goto(url, wait_until='load')
screenshot_bytes = await page.screenshot(full_page=True, path=save_path)
await browser.close()
# Convert to base64
base64_screenshot = base64.b64encode(screenshot_bytes).decode('utf-8')
# base64_screenshot = screenshot_bytes
# 保存图片
if save_path:
with open(save_path, 'wb') as f:
f.write(screenshot_bytes)
# print(f"Screenshot saved to {args.output}")
return base64_screenshot
app = FastAPI(title="Screenshot Service")
class Input(BaseModel):
url: str = ""
width: int = 1280
height: int = 800
class ScreenshotResponse(BaseModel):
url: str
base64_image: str
width: int
height: int
@app.post("/screenshot/")
async def get_screenshot(input: Input):
try:
input = json.loads(input.json())
url = input['url']
width = input['width']
height = input['height']
base64_image = await capture_screenshot(url, width=width, height=height)
res = {"base64_image":base64_image, "url":url, "width":width, "height":height}
res = JSONResponse(status_code=200, content=res)
except Exception as e:
res = JSONResponse(status_code=500, content={"message": str(e)} )
return res
@app.get("/")
async def root():
return {
"service": "Screenshot Service",
"usage": "GET /screenshot?url=https://example.com&width=1280&height=800"
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=19801)
# 对指定网页进行截屏
# sudo docker run -itd --name playwright -p 19801:19801 -v /home/liuxin/work:/home/work playwright:v1.1 /bin/bash
# sudo docker exec -it playwright bash
# cd /home/work/AI_planner/screenshot
# python web_server.py # 启动服务
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment