Commit 905b999b by unknown

init

parent 245877af
图标题匹配 图文匹配-标题
图正文匹配
\ No newline at end of file 图文匹配-正文
config.py
CHECK_IMAGE_CAPTION=True # 一、图像和标题匹配 开关
CHECK_IMAGE_CAPTION_VLM=True # 图像和标题匹配 提示词开关
CHECK_IMAGE_CONTEXT=False # 二、图像和正文匹配 开关
CHECK_IMAGE_CONTEXT_VLM=False # 图像和正文匹配 提示词开关
服务启动
```
# cd /nfs/liuxin/work/Image_TextTitle_Matching
# conda activate text_check
# nohup python -u main.py > main_image_title.log 2>&1 &
# tail -f main_image_title.log
# ss -ntlp | grep 29500
```
import requests
def test_service():
#url_path='https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg'
url_path='https://oss.5rs.me/oss/upload/image/png/d038e42a43154bb499810096446888f8.png'
#url_path='/home/wangtengbo/A800-13-nfs/Image_Text_Matching_Server_Develop/logs/2025-06-23/images/dog_and_girl_20250623024916296.jpeg'
data_info = {
"illustration_url": url_path,
'caption_text':'庆祝澳门回归',
"context_info":''
}
try:
url = 'http://localhost:29505/v1/image_text_matching' # 王腾博部署
url = 'http://localhost:29500/v1/image_text_matching' # 测试和生产部署的api接口
response = requests.post(url, json=data_info)
print('Comment response status code:', response.status_code)
if response.status_code == 200:
print('Response content:', response.json())
else:
print('Error response content:', response.text)
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
if __name__ == "__main__":
test_service()
import requests
import os
import mimetypes
from typing import Dict, Optional, Union, Tuple
from urllib.parse import quote
class OBSUploader:
def __init__(self, base_url: str = "https://open.raysgo.com", auth_token: Optional[str] = None):
"""
Initialize the OBS uploader.
Args:
base_url: The base URL for the API
auth_token: The authorization token for API access
"""
self.base_url = base_url.rstrip('/')
self.auth_token = auth_token
self.headers = {
'Authorization': f'Bearer {auth_token}' if auth_token else None
}
# Initialize mimetypes
mimetypes.init()
def _get_content_type(self, file_path: Union[str, bytes]) -> Tuple[str, bytes]:
"""
Get content type and file content from file path or bytes.
Args:
file_path: Path to the file or file content as bytes
Returns:
Tuple of (content_type, file_content)
"""
if isinstance(file_path, str):
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
content_type, _ = mimetypes.guess_type(file_path)
with open(file_path, 'rb') as f:
file_content = f.read()
else:
file_content = file_path
# For bytes input, try to detect type from first few bytes
content_type = 'application/octet-stream' # Default content type
return content_type or 'application/octet-stream', file_content
def get_upload_url(self, biz_code: str, object_name: str, content_type: str) -> Dict:
"""
Get a temporary upload URL for the specified object.
Args:
biz_code: Business code for the upload
object_name: Name/path of the object to upload
content_type: MIME type of the file
Returns:
Dict containing the upload URL and related information
"""
endpoint = f"{self.base_url}/aimodel/v1.0/obs/getCreatePostSignature"
params = {
'bizCode': biz_code,
'objectName': object_name,
'mimeType': content_type
}
response = requests.get(endpoint, params=params, headers=self.headers)
response.raise_for_status()
return response.json()
def upload_file(self, file_path: Union[str, bytes], biz_code: str, object_name: str) -> Dict:
"""
Upload a file using temporary credentials.
Args:
file_path: Path to the file to upload or file content as bytes
biz_code: Business code for the upload
object_name: Name/path of the object to upload
Returns:
Dict containing the upload result and file URL
"""
# Get content type and file content
content_type, file_content = self._get_content_type(file_path)
# Get temporary upload URL with content type
upload_info = self.get_upload_url(biz_code, object_name, content_type)
if upload_info['errCode'] != 0:
raise Exception(f"Failed to get upload URL: {upload_info['message']}")
upload_url = upload_info['data']['temporarySignatureUrl']
# Upload the file with the correct content type
headers = {
'Content-Type': content_type,
'Content-Length': str(len(file_content))
}
response = requests.put(upload_url, data=file_content, headers=headers)
response.raise_for_status()
return {
'success': True,
'file_url': upload_info['data']['domain'] + '/' + object_name,
'object_url_map': upload_info['data']['objectUrlMap']
}
# Example usage:
if __name__ == "__main__":
# Initialize uploader
uploader = OBSUploader(auth_token="dcg-4c1e3a7f4fcd415e8c93151ff539d20a")
# Upload a file
try:
result = uploader.upload_file(
file_path="/data/wangtengbo/formula_node4_测试/logs/logs/2025-03-02/images/0d307e97071846a1b144e7dfb4d44241_20250302073213192/formula_1.png",
biz_code="formula",
object_name="image/test.jpg"
)
print(f"File uploaded successfully! URL: {result['file_url']}")
except Exception as e:
print(f"Upload failed: {str(e)}")
\ No newline at end of file
This diff is collapsed. Click to expand it.
import os
# 通过 pip install volcengine-python-sdk[ark] 安装方舟SDK
from volcenginesdkarkruntime import Ark
# 替换 <MODEL> 为模型的Model ID
model="doubao-1.5-vision-pro"
# 初始化Ark客户端,从环境变量中读取您的API Key
client = Ark(
api_key="fc61954e-f585-4aac-b88a-7baf56e05d9e",
)
# 创建一个对话请求
response = client.chat.completions.create(
# 指定您部署了视觉理解大模型的推理接入点ID
model = model,
messages = [
{
# 指定消息的角色为用户
"role": "user",
"content": [
# 图片信息,希望模型理解的图片
{"type": "image_url", "image_url": {"url": "https://ark-project.tos-cn-beijing.volces.com/doc_image/ark_demo_img_1.png"},},
# 文本消息,希望模型根据图片信息回答的问题
{"type": "text", "text": "支持输入是图片的模型系列是哪个?"},
],
}
],
)
print(response)
print(response.choices[0].message.content)
\ No newline at end of file
import re
def remove_blank_lines(text):
# 使用正则表达式去除空行
result = re.sub(r'\n\s*\n', '\n', text)
return result
if __name__ == "__main__":
sub_text_infos="""# 第一章 机械运动
表 1-1 地球上不同纬度的重力加速度
| 地点 | 赤道 | 广州 | 上海 | 北京 | 北极 |
| --- | --- | --- | --- | --- | --- |
| 纬度 | 0° | 23°06′ | 30°12′ | 39°56′ | 90° |
| $g$ (m/s²) | 9.780 | 9.788 | 9.794 | 9.801 | 9.832 |
"""
result=remove_blank_lines(sub_text_infos)
print(result)
import re
from utils.common import Singleton
# class FormulaProcessor(metaclass=Singleton):
class FormulaProcessor():
def __init__(self):
# 预编译正则表达式,提高性能
self.chinese_characters_pattern = re.compile(r'[\u4e00-\u9fff]')
self.english_characters_pattern = re.compile(r'[a-zA-Z]')
self.digits_pattern = re.compile(r'\d')
self.expression_pattern = re.compile(r'(\b[a-zA-Z]+\b\s*=\s*\d+)')
self.latex_formula_pattern = re.compile(r'\\[a-zA-Z]+\{.*?\}|\$.*?\$')
self.exclusion_keywords = ['例题', '题目','答案','练习','新课教授','化简','例','判断','选择','填空','计算题']
self.allowed_characters_pattern = re.compile(r'^[\u4e00-\u9fffA-Za-z]+$')
def contains_latex_formula(self, string):
return bool(self.latex_formula_pattern.search(string))
def is_formula(self, string):
# 规则1:包含等号或者包含LaTeX公式
if '=' in string or '>' in string or '<' in string or '\\' in string or '+' in string or '-' in string or '*' in string or '^' in string :
return True
if not self.contains_latex_formula(string):
return False
if self.allowed_characters_pattern.match(string):
return False
# 规则3:包含“例题”、“题”、“题目”的都不是公式
if any(keyword in string for keyword in self.exclusion_keywords):
return False
# # 规则2:只有中文、英文,或者同时包含中文以及英文和数字的肯定不是公式,除非包含LaTeX公式
if self.chinese_characters_pattern.search(string) and (
self.english_characters_pattern.search(string) or self.digits_pattern.search(string)):
if not self.contains_latex_formula(string):
return False
# 通过所有规则检查,返回True
return True
# 测试示例
if __name__ == "__main__":
checker = FormulaProcessor()
test_strings = [
"1.了解二次根式、最简二次根式的概念,理解二次根式的性质 2.了解二次根式(根号下仅限于数)的加、减、乘、除运算法则,会用它们进 行有关的简单四则运算. 一 课时分配 本章教学约需8 课时,具体安排如下: 16.1二次根式2 课时 16.2二次根式的乘除3 课时 16.3二次根式的加减2 课时 小结1课时。",
"最简二次根式的概念,理解二次根式的性质",
'abdfsfdsfewfdsfsdfew,'
'2为了民族复兴的梦想,我们从1840年的海面出发'
"x^2 + y^2 = z^2",
"E = mc^2",
"F = ma",
'a*b=10',
"理解并掌握(\sqrt{a})^{2}=a (a \geq 0)",
"本章内容主要有两个部分,它们分别是二次根式的有关概念、性质和二次根 式的四则运算. 本章的第一部分是二次根式的有关概念和性质.教材从两个电视塔的传播 半径之比出发,引入人二次根式的概念.接着根据有理数的算术平方根的意义,顺 理成章地推导出二次根式的两个性质: $\left( \sqrt{a} \right)^{3}=a \left( a \geq 0 \right) \rightarrow \sqrt{a^{3}}=a \left( a \geq 0 \right)$ 本章的第二部分是二次根式的四则运算.教材遵循由特殊到一般的规律,由 学生通过分析、概括、交流、归纳等过程,探究得到二次根式的乘除运算法则: $\sqrt{a}+ \sqrt{b}= \sqrt{ab} \left( a \geq 0,b \geq 0 \right) \text{和} \frac{ \sqrt{a}}{ \sqrt{b}}= \sqrt{ \frac{a}{b}} \left( a \geq 0,b>0 \right)$ .在此基础上,又通过进一步 类比,引出二次根式的加减运算.教材注重知识之间的联系,将乘法公式运用到 二次根式的四则运算中,以简化二次根式的运算"
,
'1.直线与抛物线的交点问题 要解决直线与抛物线的位置关系问题,可把直线方程与抛物线方程联 立,消去y(或消去x)得出关于x(或关于y)的一个方程 $ax^{2}+bx+c=0$ $y$ ,其中二 次项系 $a$ 有可能为0,此时直线与抛物线有一个交点. 当二次项系数 $a \neq 0$ 时, $\Delta=b^{2}-4ac$ 若△=0,则直线与抛物线没有公共点; 若 $\Delta>0$ ,则直线与抛物线有且只有一个公共点; 若 $\Delta<0$ 则直线与抛物线有两个不同的公共点. 2.弦长问题 设弦的端点为 $A \left( x_{1},y_{1} \right),B \left( x_{2},y_{2} \right)$ (1)一般弦长: $\left| AB \right|= \sqrt{1+k^{2}} \left| x_{1}-x_{2} \right.$ 域| $AB \left| \right.= \sqrt{1+ \frac{1}{k^{2}}} \left| y_{1}-y_{2} \right|$ (其中 k为弦所在直线的斜率) (2)焦点弦长: $\left| AB \right|=x_{1}+x_{2}-p.$ 3.中点弦问题 若 $M \left( x_{0},y_{0} \right)$ 是抛物线 $y^{2}=2px \left( p>0 \right)$ 的弦 $AB$ 的中点,则直线 $AB$ 的斜率 为 $k_{AB}= \frac{p}{y_{0}}$'
,
'库伦定律表达式:F=k1y2 r2' #:True
]
results = [checker.is_formula(s) for s in test_strings]
print(results) # [False, False, True, True, True, True, True]
# encoding=utf8
import sys
import os
import time
from loguru import logger
import datetime
from sqlalchemy import create_engine, UniqueConstraint
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Integer, DateTime, Text, SmallInteger
from config.config import MYSQL_DB_URL
engine_knowledge = create_engine(
url=MYSQL_DB_URL,
max_overflow = 10, #超过连接池大小外最多创建的连接,为0表示超过5个连接后,其他连接请求会阻塞 (默认为10)
pool_size = 50, #连接池大小(默认为5)
pool_timeout = 30, #连接线程池中,没有连接时最多等待的时间,不设置无连接时直接报错 (默认为30)
pool_recycle = 3600 #多久之后对线程池中的线程进行一次连接的回收(重置) (默认为-1)
)
Session_Knowledge = sessionmaker(bind=engine_knowledge)
BaseKnowledge = declarative_base()
class KnowledgeBasePrompts(BaseKnowledge):
__tablename__ = 'knowledge-base-prompts'
id = Column(Integer, primary_key=True)
task_id = Column(Integer, unique=True)
task_name = Column(String(50))
prompt = Column(Text)
llm = Column(String(30))
author = Column(String(50))
status = Column(Integer)
attr = Column(Text)
create_time = Column(DateTime, default=datetime.datetime.now)
update_time = Column(DateTime, default=datetime.datetime.now)
class CorrectLLMStatisticsLog(BaseKnowledge):
__tablename__ = 'correct_llm_statistics_log'
id = Column(Integer, primary_key=True)
env = Column(String(32))
source_type = Column(String(32))
server_name = Column(String(64))
user_id = Column(String(64))
publish_id = Column(String(64))
request = Column(String(512))
model = Column(String(512))
url = Column(String(512))
api_key = Column(String(512))
api_version = Column(String(512))
input_token = Column(Integer)
output_token = Column(Integer)
start_time = Column(DateTime)
end_time = Column(DateTime)
message = Column(String(512))
doc_id = Column(String(64))
fragment_id = Column(String(512))
book_name = Column(String(512))
text = Column(Text)
backup = Column(Text)
update_time = Column(DateTime, default=datetime.datetime.now)
"""
类DBUtils的定义
"""
class DBUtils(object):
def __init__(self):
pass
@staticmethod
def get_prompt(task_id, task_name=None):
session = Session_Knowledge()
try:
prompt_obj = session.query(KnowledgeBasePrompts).filter_by(task_id=task_id).first()
return prompt_obj
except Exception as e:
logger.error("get_prompt error: {0}",e)
return None
finally:
session.close()
@staticmethod
def insert_llm_log(env=None, source_type=None, server_name=None, user_id=None, publish_id=None,
request=None, model=None, url=None, api_key=None,api_version=None,
input_token=0, output_token=0, start_time=None, end_time=None,
message=None, doc_id=None, fragment_id=None,book_name=None, text=None,
backup=None):
begin_time = datetime.datetime.now()
session = Session_Knowledge()
try:
log = CorrectLLMStatisticsLog(env=env, source_type=source_type, server_name=server_name,
user_id=user_id, publish_id=publish_id, request=request,
model=model, url=url, api_key=api_key, api_version=api_version,
input_token=input_token, output_token=output_token,
start_time=start_time, end_time=end_time, message=message,
doc_id=doc_id, fragment_id=fragment_id,book_name=book_name, text=text,
backup=backup)
session.add(log)
session.commit()
except Exception as e:
logger.error("insert_llm_log error: {}".format(e))
session.rollback()
finally:
session.close()
over_time = datetime.datetime.now()
logger.info("insert_llm_log elapsed time: {}".format(over_time - begin_time))
def test_dbutil():
pass
if __name__ == '__main__':
test_dbutil()
\ No newline at end of file
import os
from openai import OpenAI
from loguru import logger
from config.config import QWEN_API_KEY,QWEN_URL,QWEN_MODEL
def qwen_vl_infer(
image_url: str,
system_prompt:str,
user_prompt:str
) -> str:
"""
使用指定的多模态模型,对给定图片 URL 进行描述。
Args:
api_key (str): OpenAI API 密钥。
image_url (str): 要描述的图片地址。
model (str): 模型名称,默认为 qwen-vl-max-latest。
base_url (str): 接口基础 URL。
Returns:
str: 模型返回的描述文本;出错时返回空字符串。
"""
try:
client = OpenAI(
api_key=QWEN_API_KEY,
base_url=QWEN_URL,
)
messages = [
{
"role": "system",
"content": [{"type": "text", "text": system_prompt}],
},
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": user_prompt},
],
},
]
completion = client.chat.completions.create(
model=QWEN_MODEL,
messages=messages,
)
description = completion.choices[0].message.content
logger.info("Received description from model")
return description
except Exception as e:
logger.error(f"Unexpected Qwen Infer error: {e},completion={completion}", exc_info=True)
return ""
\ No newline at end of file
import cv2
import os
class ImageProcessor:
def __init__(self):
pass
def enlarge_image(self, image, scale_factor=2):
# 获取原图像的尺寸
width = int(image.shape[1] * scale_factor)
height = int(image.shape[0] * scale_factor)
# 使用Lanczos插值方法进行图像放大
enlarged_image = cv2.resize(image, (width, height), interpolation=cv2.INTER_LANCZOS4)
return enlarged_image
def process_image(self, image, rois, output_folder):
extracted_image_paths = []
for i, (x1, y1, x2, y2) in enumerate(rois):
roi = image[y1:y2, x1:x2]
enlarged_roi = self.enlarge_image(roi)
if ((x2 - x1) * (y2 - y1)) >= (0.75 * image.shape[1] * image.shape[0]) and ((x2 - x1) > 600 and (y2 - y1) > 600):
# logger.info(f"Sub-image at index {i} is too large, splitting into smaller sections.")
if (x2 - x1) > (y2 - y1): # 横向裁剪
mid_x = x1 + (x2 - x1) // 2
for j, (start_x, end_x) in enumerate([(x1, mid_x), (mid_x, x2)]):
sub_roi = image[y1:y2, start_x:end_x]
sub_enlarged_roi = self.enlarge_image(sub_roi)
sub_output_path = os.path.join(output_folder, f"formula_{i+1}_{j+1}.png")
cv2.imwrite(sub_output_path, sub_enlarged_roi)
extracted_image_paths.append(sub_output_path)
else: # 竖向裁剪
mid_y = y1 + (y2 - y1) // 2
for j, (start_y, end_y) in enumerate([(y1, mid_y), (mid_y, y2)]):
sub_roi = image[start_y:end_y, x1:x2]
sub_enlarged_roi = self.enlarge_image(sub_roi)
sub_output_path = os.path.join(output_folder, f"formula_{i+1}_{j+1}.png")
cv2.imwrite(sub_output_path, sub_enlarged_roi)
extracted_image_paths.append(sub_output_path)
else:
# 保存提取的图像
output_path = os.path.join(output_folder, f"formula_{i+1}.png")
cv2.imwrite(output_path, enlarged_roi)
extracted_image_paths.append(output_path)
return extracted_image_paths
# 假设这里有图像和ROI的初始化代码
if __name__ == "__main__":
processor = ImageProcessor()
image = cv2.imread("/data/wangtengbo/formula_correct/test_data/SQW@MFFBGE28F2%S%TV]M[0.png") # 读取图像
rois = [(50, 50, 700, 1300), (800, 100, 1500, 1600)] # 假设这里有ROI的定义
output_folder = "output"
if not os.path.exists(output_folder):
os.makedirs(output_folder)
extracted_image_paths = processor.process_image(image, rois, output_folder)
print(extracted_image_paths)
import re
class FormulaProcessor:
def __init__(self):
# 预编译正则表达式,提高性能
self.chinese_characters_pattern = re.compile(r'[\u4e00-\u9fff]')
self.english_characters_pattern = re.compile(r'[a-zA-Z]')
self.digits_pattern = re.compile(r'\d')
self.expression_pattern = re.compile(r'(\b[a-zA-Z]+\b\s*=\s*\d+)')
self.latex_formula_pattern = re.compile(r'\\[a-zA-Z]+\{.*?\}|\$.*?\$')
self.exclusion_keywords = ['例题', '题目','答案','讲解','练习','新课教授','化简','例','概念辨析']
def contains_latex_formula(self, string):
return bool(self.latex_formula_pattern.search(string))
def is_formula(self, string):
# 规则1:包含等号或者包含LaTeX公式
if '=' not in string and not self.contains_latex_formula(string):
return False
# 规则3:包含“例题”、“题”、“题目”的都不是公式
if any(keyword in string for keyword in self.exclusion_keywords):
return False
# 规则2:只有中文、英文,或者同时包含中文以及英文和数字的肯定不是公式,除非包含LaTeX公式
if self.chinese_characters_pattern.search(string) and (
self.english_characters_pattern.search(string) or self.digits_pattern.search(string)):
if not self.contains_latex_formula(string):
return False
# 规则4:排除简单的表达式
if re.match(r'[a-zA-Z]\s*=\s*[a-zA-Z0-9+\-*/^]+', string):
return True
# 通过所有规则检查,返回True
return True
# 测试示例
if __name__ == "__main__":
checker = FormulaProcessor()
test_strings = [
"1.了解二次根式、最简二次根式的概念,理解二次根式的性质 2.了解二次根式(根号下仅限于数)的加、减、乘、除运算法则,会用它们进 行有关的简单四则运算. 一 课时分配 本章教学约需8 课时,具体安排如下: 16.1二次根式2 课时 16.2二次根式的乘除3 课时 16.3二次根式的加减2 课时 小结1课时。",
"最简二次根式的概念,理解二次根式的性质",
"x^2 + y^2 = z^2",
"E = mc^2",
"F = ma",
'a*b=10',
"理解并掌握(\sqrt{a})^{2}=a (a \geq 0)",
"本章内容主要有两个部分,它们分别是二次根式的有关概念、性质和二次根 式的四则运算. 本章的第一部分是二次根式的有关概念和性质.教材从两个电视塔的传播 半径之比出发,引入人二次根式的概念.接着根据有理数的算术平方根的意义,顺 理成章地推导出二次根式的两个性质: $\left( \sqrt{a} \right)^{3}=a \left( a \geq 0 \right) \rightarrow \sqrt{a^{3}}=a \left( a \geq 0 \right)$ 本章的第二部分是二次根式的四则运算.教材遵循由特殊到一般的规律,由 学生通过分析、概括、交流、归纳等过程,探究得到二次根式的乘除运算法则: $\sqrt{a}+ \sqrt{b}= \sqrt{ab} \left( a \geq 0,b \geq 0 \right) \text{和} \frac{ \sqrt{a}}{ \sqrt{b}}= \sqrt{ \frac{a}{b}} \left( a \geq 0,b>0 \right)$ .在此基础上,又通过进一步 类比,引出二次根式的加减运算.教材注重知识之间的联系,将乘法公式运用到 二次根式的四则运算中,以简化二次根式的运算"
,
'1.直线与抛物线的交点问题 要解决直线与抛物线的位置关系问题,可把直线方程与抛物线方程联 立,消去y(或消去x)得出关于x(或关于y)的一个方程 $ax^{2}+bx+c=0$ $y$ ,其中二 次项系 $a$ 有可能为0,此时直线与抛物线有一个交点. 当二次项系数 $a \neq 0$ 时, $\Delta=b^{2}-4ac$ 若△=0,则直线与抛物线没有公共点; 若 $\Delta>0$ ,则直线与抛物线有且只有一个公共点; 若 $\Delta<0$ 则直线与抛物线有两个不同的公共点. 2.弦长问题 设弦的端点为 $A \left( x_{1},y_{1} \right),B \left( x_{2},y_{2} \right)$ (1)一般弦长: $\left| AB \right|= \sqrt{1+k^{2}} \left| x_{1}-x_{2} \right.$ 域| $AB \left| \right.= \sqrt{1+ \frac{1}{k^{2}}} \left| y_{1}-y_{2} \right|$ (其中 k为弦所在直线的斜率) (2)焦点弦长: $\left| AB \right|=x_{1}+x_{2}-p.$ 3.中点弦问题 若 $M \left( x_{0},y_{0} \right)$ 是抛物线 $y^{2}=2px \left( p>0 \right)$ 的弦 $AB$ 的中点,则直线 $AB$ 的斜率 为 $k_{AB}= \frac{p}{y_{0}}$'
,
'库伦定律表达式:F=k1y2 r2' #:True
]
results = [checker.is_formula(s) for s in test_strings]
print(results) # [False, False, True, True, True, True, True, True, True]
import requests
import json
from config.config import APP_ID
from config.config import SECRET_CODE
from loguru import logger
def get_file_content(filePath):
with open(filePath, 'rb') as fp:
return fp.read()
class TextinOcr(object):
def __init__(self):
self.host = 'https://api.textin.com'
def recognize_pdf2md(self, image_path, options=None):
"""
pdf to markdown
:param options: request params
:param image: file bytes
:return: response
options = {
'pdf_pwd': None,
'dpi': 144, # 设置dpi为144
'page_start': 0,
'page_count': 1000, # 设置解析的页数为1000页
'apply_document_tree': 0,
'markdown_details': 1,
'page_details': 0, # 不包含页面细节信息
'table_flavor': 'md',
'get_image': 'none',
'parse_mode': 'scan', # 解析模式设为scan
}
"""
image=get_file_content(image_path)
if options==None:
options={
'table_flavor': 'md',
'parse_mode': 'scan', # 设置解析模式为scan模式
'page_details': 1, # 不包含页面细节
'markdown_details': 1,
'apply_document_tree': 1,
'dpi': 144 # 分辨率设置为144 dpi
}
url = self.host + '/ai/service/v1/pdf_to_markdown'
headers = {
'x-ti-app-id': APP_ID,
'x-ti-secret-code': SECRET_CODE
}
response=requests.post(url, data=image, headers=headers, params=options)
#logger.info(f'textln response=\n{response}')
#logger.info(f'Textln response infos={response}')
if response.status_code == 200:
time_cost=response.elapsed.total_seconds()
result = json.loads(response.text)
#logger.info(f'textln_init_infos={result}')
logger.info(f'textln response_time_cost={time_cost}\n\ntextln response=\n{result}')
return result['result']['markdown'],time_cost,result['result']['detail']
else:
logger.info('TextinOcr 请求失败 ,错误信息={}'.format(response))
return [],'',[]
if __name__ == "__main__":
# 请登录后前往 “工作台-账号设置-开发者信息” 查看 app-id/app-secret
textin = TextinOcr()
resp ,time_cost= textin.recognize_pdf2md(image_path='/data/wangtengbo/got_ocr2/infer/QQ图片20240926223216.png')
print("request time: ", time_cost)
print(resp)
# result = json.loads(resp.text)
# print(result)
# with open('./result.json', 'w', encoding='utf-8') as fw:
# json.dump(result, fw, indent=4, ensure_ascii=False)
# import re
# def filter_sentences_by_caption(context_info: str, caption_title: str, threshold: float = 0.5):
# # 1. 按照句号、问号、感叹号切分句子,并去除空白
# sentences = [s.strip() for s in re.split(r'[。!?]', context_info) if s.strip()]
# # 2. 统计 caption_title 中的所有唯一字符
# title_chars = set(caption_title)
# total_chars = len(title_chars)
# # 3. 计算每句中出现的 title 字符比率
# filtered = []
# for sent in sentences:
# # 计算交集字符数
# match_count = sum(1 for ch in title_chars if ch in sent)
# ratio = match_count / total_chars
# # 如果比例超过阈值,则保留
# if ratio > threshold:
# filtered.append((sent, ratio))
# return filtered
# # 示例调用
# txt = "由于他的功力浑厚,画路广阔,所以成就的方面甚多。如在意境开拓之上,他有金蝉脱壳“夺人”之法,同时,他亦服膺“画你最熟悉的”的大原则,画了不少他最亲切的农业社会的亲切图画。如《闻铃心喜图》(28-7)上画一小牧童,身系一铃铛,牵一老牛回家,画上题字云:"
# caption = "图  28-7 近现代  齐白石《闻铃心喜图》"
# results = filter_sentences_by_caption(txt, caption)
# for sent, ratio in results:
# print(f"句子: {sent}\n匹配比例: {ratio:.2f}\n")
# import re
# import logging
# # 设置日志
# logger = logging.getLogger()
# # 示例数据
# item = {'caption_text': "图 5 显示了…"} # 示例文本
# #(1) 提取图序
# image_id = ''
# if len(item['caption_text']) > 3:
# # 匹配图序:图 K、图K-1、图K-1-2 等,去掉空格
# image_id_match = re.findall(r"图\s*\d+(?:-\d+)*", item['caption_text'])
# # 去掉空格
# if image_id_match:
# image_id = image_id_match[0].replace(" ", "") # 输出: 图5 或 图5-7
# else:
# image_id = ''
# # 打印或记录日志
# logger.info(f'{item["caption_text"]} image_order is {image_id}')
# print(f"{image_id}")
# from config.config import VLM_Match_Context_User_Prompt
# from tasks.qwen_vl_infer import qwen_vl_infer
# print(VLM_Match_Context_User_Prompt.replace("{{user_text}}","wwadwadwa ").replace("{{caption}}","wwadwadwa "))
# qwen_match_response=qwen_vl_infer("https://oss.raysgo.com/oss/upload/image/png/777a14d621b3402daa2dbe7fac09d8cf.png",'你是一个图文匹配判断专家。',VLM_Match_Context_User_Prompt.replace("{{user_text}}","wwadwadwa ").replace("{{caption}}","wwadwadwa "))
# print(qwen_match_response)
import re
# 示例文本
text = """
匹配结果:不匹配
原因:文本描述为“一颗大树”,而图像中展示的是一个金色的鸟笼,内有一只小鸟,笼子上覆盖着粉色布料。文本与图像内容完全不符。
分析过程:
1. **观察图像**:图像中有一个金色的鸟笼,笼内有一只小鸟,笼子上覆盖着粉色布料。背景为浅蓝色,没有树木或其他自然元素。
2. **解析文本**:文本仅提到“一颗大树”,没有关于鸟笼、小鸟或粉色布料的描述。
3. **对比匹配**:
- **实体名称**:图像中的实体是“鸟笼”和“小鸟”,而文本中的实体是“大树”。两者名称完全不同。
- **场景环境**:图像场景为室内或简单背景,而文本描述的“大树”通常关联户外自然环境。
4. **判断标准**:根据上述对比,实体名称和场景环境均不匹配,因此判定为不匹配。
综上所述,文本与图像内容存在根本性差异,故判定为不匹配。
"""
# 修改后的正则表达式,支持中英文冒号
pattern = r'### 匹配结果[::]\s*([^\n]+)\n+### 原因[::]\s*\n([^\n]+)\n+### 分析过程:\n*'
# 使用 re.search 提取
match = re.search(pattern, text)
if match:
result = match.group(1) # 匹配结果
reason = match.group(2) # 原因
print(f"匹配结果: {result}")
print(f"原因: {reason}")
else:
print("未能匹配到内容")
print(123021)
import re
# 标准化策略1:基础标准化
def normalize_strategy_1(s: str) -> str:
s = s.replace(" ", "").replace("$", "")
s = re.sub(r'\\[ ,;!]', '', s) # 移除 \, \; \! 等
s = re.sub(r'\^\{\s*(.*?)\s*\}', r'^\1', s)
s = re.sub(r'\_\{\s*(.*?)\s*\}', r'_\1', s)
s = s.replace("{", "").replace("}", "")
s = s.replace(r'\cdot', '*').replace(r'\times', '*')
s = re.sub(r'\\frac\s*\{([^{}]+)\}\s*\{([^{}]+)\}', r'(\1/\2)', s)
s = re.sub(r'\\sqrt\s*\{([^{}]+)\}', r'sqrt(\1)', s)
s = s.replace(r'\left', '').replace(r'\right', '')
s = re.sub(r'\\mathrm|\\mathbf|\\text|\\displaystyle|\\normalfont|\\rm', '', s)
s = s.replace("\\", "")
return s.strip()
# 标准化策略2:更宽松的标准化
def normalize_strategy_2(s: str) -> str:
s = re.sub(r'\\[a-zA-Z]+', '', s) # 去掉所有 LaTeX 命令
s = re.sub(r'[^a-zA-Z0-9^_*/().]', '', s) # 保留基本符号
return s.strip()
# 标准化策略3:保留部分数学符号
def normalize_strategy_3(s: str) -> str:
s = s.replace(" ", "").replace("$", "")
s = re.sub(r'\\frac\s*\{([^{}]+)\}\s*\{([^{}]+)\}', r'(\1/\2)', s)
s = re.sub(r'\\sqrt\s*\{([^{}]+)\}', r'sqrt(\1)', s)
s = s.replace("\\", "")
return s.strip()
# 标准化策略4:完全删除所有 LaTeX 特殊符号,仅保留文字和数字
def normalize_strategy_4(s: str) -> str:
s = re.sub(r'\\[a-zA-Z]+', '', s) # 去掉所有 LaTeX 命令
s = re.sub(r'[{}^_\\]', '', s) # 去掉特殊符号
s = s.replace(" ", "").replace("$", "")
return s.strip()
# 主函数
def normalize_latex_string(s: str, strategy: int) -> str:
if strategy == 1:
return normalize_strategy_1(s)
elif strategy == 2:
return normalize_strategy_2(s)
elif strategy == 3:
return normalize_strategy_3(s)
elif strategy == 4:
return normalize_strategy_4(s)
else:
raise ValueError("Invalid normalization strategy")
# 匹配函数
def latex_match(A: str, B: str) -> bool:
for strategy in range(1, 5): # 尝试所有标准化策略
A_norm = normalize_latex_string(A, strategy)
B_norm = normalize_latex_string(B, strategy)
if A_norm in B_norm:
return True
return False
# 测试代码
if __name__ == "__main__":
text_B = r"\frac { \sqrt { 8 } } { \sqrt { 2 a } } = \frac { \sqrt { 8 } \cdot \sqrt { 2 a } } { \sqrt { 2 a } \cdot \sqrt { 2 a } } = \frac { 4 \sqrt { a } } { 2 a } = \frac { 2 \sqrt { a } } { a }"
text_A = r"'(3)$\\frac { \\sqrt { 8 } } { \\sqrt { 2 a } } = \\frac { \\sqrt { 8 } \\cdot \\sqrt { 2 a } } { \\sqrt { 2 a } \\cdot \\sqrt { 2 a } } = \\frac { 4 \\sqrt { a } } { 2 a } = \\frac { 2 \\sqrt { a } } { a } .$"
print(latex_match(text_B, text_A)) # 应该返回 True
import re
def normalize_latex_string(s: str) -> str:
"""标准化 LaTeX 字符串"""
s = s.replace(" ", "").replace("$", "")
s = re.sub(r'\\[ ,;!]', '', s) # 移除空白控制符
s = re.sub(r'\^\{\s*(.*?)\s*\}', r'^\1', s) # 处理上标
s = re.sub(r'\_\{\s*(.*?)\s*\}', r'_\1', s) # 处理下标
s = s.replace("{", "").replace("}", "") # 移除花括号
s = s.replace(r'\cdot', '*').replace(r'\times', '*') # 替换乘号
s = re.sub(r'\\frac\s*\{([^{}]+)\}\s*\{([^{}]+)\}', r'(\1/\2)', s) # 处理分数
s = re.sub(r'\\sqrt\s*\{([^{}]+)\}', r'sqrt(\1)', s) # 处理平方根
s = s.replace(r'\left', '').replace(r'\right', '') # 移除 \left 和 \right
s = re.sub(r'\\mathrm|\\mathbf|\\text|\\displaystyle|\\normalfont|\\rm', '', s) # 移除样式命令
s = s.replace("\\", "") # 去除残余的反斜杠
s = s.replace("≥", ">=").replace("≤", "<=") # 标准化符号
return s.strip()
def extract_latex(text: str) -> list:
"""提取文本中的所有 LaTeX 公式"""
matches = re.findall(r'\$(.*?)\$', text)
return matches
def latex_match(A: str, B: str) -> bool:
"""检查 A 是否匹配 B 中的任意公式"""
# 提取并标准化 B 中的所有公式
B_formulas = extract_latex(B)
B_norm_formulas = [normalize_latex_string(f) for f in B_formulas]
# 标准化 A
A_norm = normalize_latex_string(A)
# 检查 A 是否匹配任意标准化的公式
for formula in B_norm_formulas:
if A_norm == formula:
return True
return False
if __name__ == "__main__":
text_B = r"\frac { \sqrt { 8 } } { \sqrt { 2 a } } = \frac { \sqrt { 8 } \cdot \sqrt { 2 a } } { \sqrt { 2 a } \cdot \sqrt { 2 a } } = \frac { 4 \sqrt { a } } { 2 a } = \frac { 2 \sqrt { a } } { a }"
text_A = r"'(3)$\\frac { \\sqrt { 8 } } { \\sqrt { 2 a } } = \\frac { \\sqrt { 8 } \\cdot \\sqrt { 2 a } } { \\sqrt { 2 a } \\cdot \\sqrt { 2 a } } = \\frac { 4 \\sqrt { a } } { 2 a } = \\frac { 2 \\sqrt { a } } { a } .$"
print(latex_match(text_B, text_A)) # 应该返回 True
This diff is collapsed. Click to expand it.
import re
import json
from loguru import logger
def calculate_iou(inner_box, outer_box):
# 提取内部边界框的四个顶点
x1, y1 = inner_box[0]
x2, y2 = inner_box[1]
x3, y3 = inner_box[2]
x4, y4 = inner_box[3]
# 计算内部边界框的最小和最大坐标
x_min_inner = min(x1, x2, x3, x4)
y_min_inner = min(y1, y2, y3, y4)
x_max_inner = max(x1, x2, x3, x4)
y_max_inner = max(y1, y2, y3, y4)
# 提取外部边界框的坐标
x_min_outer, y_min_outer, x_max_outer, y_max_outer = outer_box
# 计算交集的坐标
x_min_inter = max(x_min_inner, x_min_outer)
y_min_inter = max(y_min_inner, y_min_outer)
x_max_inter = min(x_max_inner, x_max_outer)
y_max_inter = min(y_max_inner, y_max_outer)
# 计算交集的宽度和高度
inter_width = max(0, x_max_inter - x_min_inter)
inter_height = max(0, y_max_inter - y_min_inter)
# 计算交集面积
inter_area = inter_width * inter_height
# 计算两个边界框的面积
inner_area = (x_max_inner - x_min_inner) * (y_max_inner - y_min_inner)
outer_area = (x_max_outer - x_min_outer) * (y_max_outer - y_min_outer)
# 计算并集面积
union_area = inner_area + outer_area - inter_area
# 计算IoU
iou = inter_area / union_area if union_area != 0 else 0
return iou
def has_intersection(inner_box, outer_box, threshold=0.1):
"""
判断inner_box是否与outer_box有交集(IOU > 0.1)。
参数:
inner_box (list): 内部边界框,格式为[[x1, y1], [x2, y2], [x3, y3], [x4, y4]]
outer_box (list): 外部边界框,格式为[x_min, y_min, x_max, y_max]
threshold (float): 判断交集的IOU阈值,默认为0.1
返回:
bool: 如果inner_box与outer_box有交集(IOU > threshold)则返回True,否则返回False。
"""
iou = calculate_iou(inner_box, outer_box)
return iou > 0
# def perform_re_check(text):
# """
# 检查文本中是否包含公式。
# 参数:
# text (str): 要检查的文本
# 返回:
# bool: 如果包含公式则返回True,否则返回False。
# """
# formula_pattern = re.compile(
# r"([A-Za-z]+\s*=\s*[A-Za-z0-9+\-*/^()]+)|" # 一般公式,包含多个运算符或字母
# r"(\b√[A-Za-z0-9]+\b)|" # 根号
# r"(\bΔ\b)|" # Δ
# r"(\([A-Za-z0-9+\-*/^()]+\)\s*[+\-*/^]\s*\([A-Za-z0-9+\-*/^()]+\))|" # 复杂括号表达式
# r"([A-Za-z]*\d*[(x)(y)(z)(a)(b)(c)]*\s*[+\-*/^]+\s*\(?[A-Za-z0-9+\-*/^()]+\)?)" # 公式中的符号运算
# )
# return bool(formula_pattern.search(text))
def perform_re_check(text):
# 规则1:过滤掉包含仅字母、空格或中文字符的文本
if re.fullmatch(r'[\u4e00-\u9fa5a-zA-Z\s]+', text):
return False
# 规则2:过滤掉不带“=”符号的text
if '=' not in text:
return False
# 规则3:过滤掉表达式和赋值,例如a=5
if re.fullmatch(r'[a-zA-Z]+\s*=\s*\d+', text):
return False
return True
def filter_boxes(ocr_result, layout_result):
"""
过滤排版检测结果,保留下包含公式的边界框。
参数:
ocr_result (dict): OCR识别结果
layout_result (dict): 布局检测结果
返回:
list: 过滤后的排版检测结果
"""
logger.info('in filter_boxes!')
#layout_data = json.loads(json.loads(layout_result['data'])['Ids_Scores_boxes'])
layout_data=layout_result
filtered_layout_boxes = []
filtered_layout_ocrs=[]
for layout_box in layout_data:
layout_coordinates = layout_box[2]
combined_text = ""
for ocr_box in ocr_result['data']:
ocr_coordinates = ocr_box[0]
ocr_text = ocr_box[1][0]
if has_intersection(ocr_coordinates, layout_coordinates):
combined_text += ocr_text + " "
if perform_re_check(combined_text.strip()):
filtered_layout_boxes.append(layout_box)
filtered_layout_ocrs.append(combined_text)
return filtered_layout_boxes,filtered_layout_ocrs
# # 示例使用
# ocr_result = {'errorCode': 0, 'msg': '识别成功', 'data': [[[[96.0, 44.0], [569.0, 44.0], [569.0, 64.0], [96.0, 64.0]], ['1.4.1用空间向量研究直线、平面的位置关系(第3课时)', 0.9963806867599487]], [[[238.0, 97.0], [426.0, 97.0], [426.0, 116.0], [238.0, 116.0]], ['空间中直线、平面的垂直', 0.9977184534072876]], [[[73.0, 145.0], [180.0, 145.0], [180.0, 170.0], [73.0, 170.0]], ['知识清单', 0.998611330986023]], [[[105.0, 187.0], [231.0, 187.0], [231.0, 204.0], [105.0, 204.0]], ['1.直线和直线垂直', 0.9975678324699402]], [[[103.0, 213.0], [544.0, 214.0], [544.0, 234.0], [103.0, 233.0]], ['设直线,l的方向向量分别为u,μ,则uuuu=3.', 0.8799490332603455]], [[[104.0, 243.0], [231.0, 243.0], [231.0, 260.0], [104.0, 260.0]], ['2.直线和平面垂直', 0.9996089935302734]], [[[105.0, 269.0], [591.0, 269.0], [591.0, 286.0], [105.0, 286.0]], ['设u是直线l的方向向量,n是平面α的法向量,lα,则lLαu/nA', 0.8951645493507385]], [[[71.0, 296.0], [177.0, 298.0], [177.0, 316.0], [71.0, 314.0]], ['eR,使得ux入n.', 0.871402382850647]], [[[105.0, 325.0], [267.0, 325.0], [267.0, 341.0], [105.0, 341.0]], ['要点3平面和平面垂直', 0.9992749094963074]], [[[103.0, 349.0], [530.0, 351.0], [530.0, 371.0], [103.0, 369.0]], ['设n,n分别是平面α,β的法向量,则α⊥βn,⊥nn*n=1.', 0.8930713534355164]], [[[73.0, 397.0], [179.0, 397.0], [179.0, 421.0], [73.0, 421.0]], ['例题讲评', 0.9989429712295532]], [[[113.0, 441.0], [592.0, 441.0], [592.0, 461.0], [113.0, 461.0]], ['例1如图,已知正三棱柱ABC-A,B,C,的各棱长都为1,M是底面上', 0.9656786918640137]], [[[72.0, 474.0], [309.0, 474.0], [309.0, 490.0], [72.0, 490.0]], ['BC边的中点,V是侧棱CC上的点,', 0.991436243057251]], [[[102.0, 508.0], [162.0, 504.0], [163.0, 525.0], [104.0, 528.0]], ['且CV=', 0.9438493251800537]], [[[175.0, 507.0], [203.0, 510.0], [201.0, 526.0], [174.0, 524.0]], ['CC,', 0.7943589091300964]], [[[161.0, 522.0], [174.0, 522.0], [174.0, 531.0], [161.0, 531.0]], ['4', 0.9967143535614014]], [[[104.0, 544.0], [234.0, 544.0], [234.0, 560.0], [104.0, 560.0]], ['(1)求证:ABLMN;', 0.8688576817512512]], [[[103.0, 568.0], [343.0, 569.0], [343.0, 589.0], [103.0, 588.0]], ['(2)设CC,中点为D,求证:AB,⊥A,D.', 0.8998482823371887]], [[[106.0, 736.0], [589.0, 738.0], [589.0, 758.0], [106.0, 756.0]], ['练习1如图,△ABC和△BCD所在平面互相垂直,且AB=BC=BD=', 0.9863734245300293]], [[[72.0, 773.0], [519.0, 773.0], [519.0, 789.0], [72.0, 789.0]], ['2,ZABC=ZDBC=120°,E,F分别为AC,DC的中点.求证:EF⊥BC.', 0.9168810844421387]]]}
# layout_result = {'errorCode': 0, 'msg': '识别成功', 'data': '{"Ids_Scores_boxes": "[[[0], 0.5164221525192261, [72.51856621809566, 187.46995484912938, 593.7710048745755, 369.08707769273684]], [[10], 0.4314861297607422, [74.0, 741.0, 589.0, 788.0]], [[1], 0.8855363726615906, [74.0858920888629, 143.8997421042558, 179.57114012112666, 169.70436416413207]], [[1], 0.8849098086357117, [73.6531480480851, 396.0868356459496, 178.90487691988264, 420.96475408517415]], [[1], 0.8246437907218933, [94.7679979762152, 44.78538083893527, 567.6477659821933, 64.29004998079591]], [[1], 0.0, [241.0, 100.0, 425.0, 114.0]], [[0], 0.0, [74.0, 443.0, 592.0, 587.0]]]", "boxes_num": "7"}'}
# filtered_layout_boxes = filter_boxes(ocr_result, layout_result)
# print(filtered_layout_boxes)
import re
import json
def calculate_iou(inner_box, outer_box):
# 提取内部边界框的四个顶点
x1, y1 = inner_box[0]
x2, y2 = inner_box[1]
x3, y3 = inner_box[2]
x4, y4 = inner_box[3]
# 计算内部边界框的最小和最大坐标
x_min_inner = min(x1, x2, x3, x4)
y_min_inner = min(y1, y2, y3, y4)
x_max_inner = max(x1, x2, x3, x4)
y_max_inner = max(y1, y2, y3, y4)
# 提取外部边界框的坐标
x_min_outer, y_min_outer, x_max_outer, y_max_outer = outer_box
# 计算交集的坐标
x_min_inter = max(x_min_inner, x_min_outer)
y_min_inter = max(y_min_inner, y_min_outer)
x_max_inter = min(x_max_inner, x_max_outer)
y_max_inter = min(y_max_inner, y_max_outer)
# 计算交集的宽度和高度
inter_width = max(0, x_max_inter - x_min_inter)
inter_height = max(0, y_max_inter - y_min_inter)
# 计算交集面积
inter_area = inter_width * inter_height
# 计算两个边界框的面积
inner_area = (x_max_inner - x_min_inner) * (y_max_inner - y_min_inner)
outer_area = (x_max_outer - x_min_outer) * (y_max_outer - y_min_outer)
# 计算并集面积
union_area = inner_area + outer_area - inter_area
# 计算IoU
iou = inter_area / union_area if union_area != 0 else 0
return iou
def has_intersection(inner_box, outer_box, threshold=0.1):
"""
判断inner_box是否与outer_box有交集(IOU > 0.1)。
参数:
inner_box (list): 内部边界框,格式为[[x1, y1], [x2, y2], [x3, y3], [x4, y4]]
outer_box (list): 外部边界框,格式为[x_min, y_min, x_max, y_max]
threshold (float): 判断交集的IOU阈值,默认为0.1
返回:
bool: 如果inner_box与outer_box有交集(IOU > threshold)则返回True,否则返回False。
"""
iou = calculate_iou(inner_box, outer_box)
return iou > 0
# def perform_re_check(text):
# """
# 检查文本中是否包含公式。
# 参数:
# text (str): 要检查的文本
# 返回:
# bool: 如果包含公式则返回True,否则返回False。
# """
# formula_pattern = re.compile(
# r"([A-Za-z]+\s*=\s*[A-Za-z0-9+\-*/^()]+)|" # 一般公式,包含多个运算符或字母
# r"(\b√[A-Za-z0-9]+\b)|" # 根号
# r"(\bΔ\b)|" # Δ
# r"(\([A-Za-z0-9+\-*/^()]+\)\s*[+\-*/^]\s*\([A-Za-z0-9+\-*/^()]+\))|" # 复杂括号表达式
# r"([A-Za-z]*\d*[(x)(y)(z)(a)(b)(c)]*\s*[+\-*/^]+\s*\(?[A-Za-z0-9+\-*/^()]+\)?)" # 公式中的符号运算
# )
# return bool(formula_pattern.search(text))
def perform_re_check(text):
# 规则1:过滤掉包含仅字母、空格或中文字符的文本
if re.fullmatch(r'[\u4e00-\u9fa5a-zA-Z\s]+', text):
return False
# 规则2:过滤掉不带“=”符号的text
if '=' not in text:
return False
# 规则3:过滤掉表达式和赋值,例如a=5
if re.fullmatch(r'[a-zA-Z]+\s*=\s*\d+', text):
return False
return True
def filter_boxes(ocr_result, layout_result):
"""
过滤排版检测结果,保留下包含公式的边界框。
参数:
ocr_result (dict): OCR识别结果
layout_result (dict): 布局检测结果
返回:
list: 过滤后的排版检测结果
"""
layout_data = json.loads(json.loads(layout_result['data'])['Ids_Scores_boxes'])
filtered_layout_boxes = []
filtered_layout_ocrs=[]
for layout_box in layout_data:
layout_coordinates = layout_box[2]
combined_text = ""
for ocr_box in ocr_result['data']:
ocr_coordinates = ocr_box[0]
ocr_text = ocr_box[1][0]
if has_intersection(ocr_coordinates, layout_coordinates):
combined_text += ocr_text + " "
#print(combined_text)
if perform_re_check(combined_text.strip()):
filtered_layout_boxes.append(layout_box)
filtered_layout_ocrs.append(combined_text)
return filtered_layout_boxes,filtered_layout_ocrs
# # # # 示例使用
# ocr_result = {'errorCode': 0, 'msg': '识别成功', 'data': [[[[132.0, 6.0], [487.0, 6.0], [487.0, 23.0], [132.0, 23.0]], ['1.4.2用空间向量研究距离、夹角问题(一)', 0.9909250140190125]], [[[274.0, 57.0], [348.0, 57.0], [348.0, 77.0], [274.0, 77.0]], ['空间距离', 0.9974817037582397]], [[[50.0, 105.0], [158.0, 105.0], [158.0, 130.0], [50.0, 130.0]], ['知识清单', 0.9985876083374023]], [[[83.0, 148.0], [209.0, 148.0], [209.0, 165.0], [83.0, 165.0]], ['1.点到直线的距离', 0.999904215335846]], [[[84.0, 176.0], [395.0, 176.0], [395.0, 192.0], [84.0, 192.0]], ["已知直线l的方向向量是a,点P#l,P'el,则点", 0.8676444292068481]], [[[83.0, 223.0], [236.0, 223.0], [236.0, 240.0], [83.0, 240.0]], ['P到直线l的距离为d:', 0.932934045791626]], [[[270.0, 227.0], [289.0, 227.0], [289.0, 234.0], [270.0, 234.0]], ['DP', 0.7328389883041382]], [[[83.0, 269.0], [438.0, 269.0], [438.0, 286.0], [83.0, 286.0]], ['两条平行直线间的距离可以转化为点到直线的距离,', 0.9892963767051697]], [[[82.0, 297.0], [209.0, 297.0], [209.0, 313.0], [82.0, 313.0]], ['2.点到平面的距离', 0.9998923540115356]], [[[84.0, 323.0], [567.0, 323.0], [567.0, 340.0], [84.0, 340.0]], ['已知AB为平面α的一条斜线段(点A在平面α内),n为平面α的法向量,', 0.9887251853942871]], [[[47.0, 358.0], [392.0, 360.0], [392.0, 387.0], [47.0, 385.0]], ['则点B到平面α的距离为d=AB|·cos<AB,n)', 0.9637517929077148]], [[[378.0, 351.0], [438.0, 351.0], [438.0, 372.0], [378.0, 372.0]], ['無·n', 0.6339988112449646]], [[[447.0, 365.0], [570.0, 365.0], [570.0, 382.0], [447.0, 382.0]], ['空间中其他距离', 0.9999088644981384]], [[[49.0, 408.0], [85.0, 408.0], [85.0, 427.0], [49.0, 427.0]], ['问题', 0.9955520629882812]], [[[86.0, 435.0], [296.0, 435.0], [296.0, 452.0], [86.0, 452.0]], ['一般都可以转化为点面距问题.', 0.9974074363708496]], [[[49.0, 479.0], [159.0, 479.0], [159.0, 507.0], [49.0, 507.0]], ['例题讲评', 0.9977942109107971]], [[[91.0, 525.0], [571.0, 525.0], [571.0, 545.0], [91.0, 545.0]], ['例1如图,在棱长为2的正方体ABCD-A,B,C,D,中,E是BC的中点,P', 0.9382723569869995]], [[[52.0, 558.0], [286.0, 558.0], [286.0, 575.0], [52.0, 575.0]], ['是AE上的动点,求DP的最小值', 0.9763669371604919]], [[[84.0, 725.0], [570.0, 727.0], [570.0, 748.0], [84.0, 746.0]], ['练习1在长方体0ABC-0,A,B,C,中,0A=2,AB=3,AA,=2,求0,到', 0.887532114982605]], [[[50.0, 759.0], [159.0, 759.0], [159.0, 779.0], [50.0, 779.0]], ['直线AC的距离.', 0.9977055788040161]]]}
# layout_result = {'errorCode': 0, 'msg': '识别成功', 'data': '{"Ids_Scores_boxes": "[[[1], 0.6768088340759277, [132.82876458235634, 4.867646808112585, 484.25709724895194, 24.110981528087148]], [[1], 0.5721949338912964, [50.41460480158925, 478.7880785462551, 157.3041640894006, 504.33530555645126]], [[1], 0.699893593788147, [53.0, 526.0, 570.0, 576.0]], [[10, 0], 0.6270818710327148, [52.00979196153577, 761.0801100416344, 155.13669618897046, 776.1116098266145]], [[10], 0.0, [86.0, 727.0, 569.0, 747.0]], [[1], 0.8837159276008606, [51.657900767122186, 103.97743590987875, 157.41005498988994, 129.50588956440203]], [[0], 0.0, [50.0, 150.0, 569.0, 451.0]], [[1], 0.0, [276.0, 58.0, 347.0, 76.0]]]", "boxes_num": "8"}'}
# #print(ocr_result['data'][0])
# filtered_layout_boxes,filtered_layout_ocrs = filter_boxes(ocr_result, layout_result)
# print(filtered_layout_ocrs)
# print(filtered_layout_boxes)
# # 测试用例
import cv2
import base64
import numpy as np
def get_base64_from_image(image):
"""
将图像转换为Base64编码。
:param image: 输入图像的NumPy数组
:return: Base64编码的字符串
"""
_, buffer = cv2.imencode('.jpg', image)
base64_str = base64.b64encode(buffer).decode('utf-8')
return base64_str
def draw_bounding_boxes(image_path, coordinates_list):
"""
在图像上绘制多个红色边框并返回带框图像的Base64编码。
:param image_path: 图像路径
:param coordinates_list: 每个元素为以8个数字表示的四个角坐标,顺序为左上,右上,右下,左下
:return: 图像Base64编码的字符串
"""
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Failed to read the image file '{image_path}'.")
for coordinates in coordinates_list:
# 解析输入坐标
x1, y1, x2, y2, x3, y3, x4, y4 = map(int, coordinates)
# 计算矩形边界
x_min = min(x1, x2, x3, x4)
y_min = min(y1, y2, y3, y4)
x_max = max(x1, x2, x3, x4)
y_max = max(y1, y2, y3, y4)
# 确保边界不超出图像范围
x_min = max(0, x_min)
y_min = max(0, y_min)
x_max = min(image.shape[1], x_max)
y_max = min(image.shape[0], y_max)
# 绘制红色矩形框 (B, G, R) -> (0, 0, 255)
cv2.rectangle(image, (x_min, y_min), (x_max, y_max), (0, 0, 255), 2)
# 转换为Base64编码
return get_base64_from_image(image)
def get_formula_boundingbox_base64_list(return_sub_img_path_results):
"""
根据输入路径和坐标信息,返回每张图像包含红色边框的Base64编码列表。
:param return_sub_img_path_results: 字典,键为图像路径,值为包含边界框的坐标列表
:return: Base64编码的列表
"""
base64_list = []
for image_path, coordinates_list in return_sub_img_path_results.items():
# 在图像上绘制红色框并转换为Base64编码
base64_image = draw_bounding_boxes(image_path, coordinates_list)
base64_list.append(base64_image)
return base64_list
# 示例调用
if __name__ == "__main__":
return_sub_img_path_results = {
"example_image.jpg": [
[100, 50, 200, 50, 200, 150, 100, 150],
[300, 200, 400, 200, 400, 300, 300, 300]
]
}
base64_lists = get_formula_boundingbox_base64_list(return_sub_img_path_results)
for base64_str in base64_lists:
print(base64_str)
\ No newline at end of file
import requests
import time
def ocr_service_request(image_url):
"""
向指定的 OCR 服务发送请求,并返回响应信息。
参数:
- image_url: 图像的 URL 路径
返回:
- response_json: 成功时返回的响应内容(JSON 格式)
- elapsed_time: 请求和响应之间的时间(秒)
- status_code: 请求的 HTTP 状态码
- error_message: 请求失败时的错误消息
"""
# 请求数据
data_info = {
"url": image_url,
}
try:
# 记录请求开始的时间
start_time = time.time()
# 发送 POST 请求
response = requests.post('http://localhost:8880/v1/got_ocr_markdown_local', json=data_info)
# 记录请求结束的时间
end_time = time.time()
# 计算请求和响应之间的时间
elapsed_time = end_time - start_time
# 返回请求相关信息
if response.status_code == 200:
return response.json(), elapsed_time, response.status_code, None
else:
return None, elapsed_time, response.status_code, response.text
except requests.exceptions.RequestException as e:
return None, None, None, str(e)
# 调用示例
if __name__ == "__main__":
image_url = '/data/wangtengbo/got_ocr2/infer/demo.png' # 替换为需要处理的图片路径
response_json, elapsed_time, status_code, error_message = ocr_service_request(image_url)
if status_code == 200:
print(f"Request successful! Time taken: {elapsed_time:.2f} seconds")
print("Response content:", response_json)
else:
print(f"Request failed with status code {status_code}. Error: {error_message}")
import os
from datetime import datetime
import langid
import numpy as np
from sklearn.cluster import KMeans
from collections import Counter
from loguru import logger
def mkdir_if_not_exist(path):
if not os.path.exists(path):
os.makedirs(path)
# 获取毫秒级时间
def get_millisecond_time():
current_time = datetime.now()
time_str = current_time.strftime("%Y%m%d%H%M%S%f")[:-3]
return time_str
def get_lang(text):
lang_detect, _ = langid.classify(text.replace('。', ' ').replace(',', ' ')) # 语言检测
return 'en' if lang_detect == 'en' else 'zh'
def get_day_time():
# 获取当前日期和时间
now = datetime.now()
# 格式化日期和时间为字符串,格式为 "YYYYMMDD_HH%M%S"
formatted_time = now.strftime("%Y%m%d_%H%M%S")
return formatted_time
def merge_ver_boxes(formula_positions):
def sort_by_y_min(box):
return box[2][1]
formula_positions.sort(key=sort_by_y_min)
merged_boxes = []
current_box = None
for box in formula_positions:
category, confidence, bbox = box
x_min, y_min, x_max, y_max = bbox
if current_box is None:
current_box = [category, confidence, bbox]
else:
curr_x_min, curr_y_min, curr_x_max, curr_y_max = current_box[2]
if y_min <= curr_y_max:
# Merge the boxes
merged_bbox = [
min(curr_x_min, x_min),
min(curr_y_min, y_min),
max(curr_x_max, x_max),
max(curr_y_max, y_max)
]
current_box[2] = merged_bbox
else:
# Append the current box to merged_boxes list and start a new current_box
merged_boxes.append(current_box)
current_box = [category, confidence, bbox]
if current_box is not None:
merged_boxes.append(current_box)
return merged_boxes
def merge_hor_boxes(formula_positions):
"""
Merges horizontally overlapping or adjacent bounding boxes.
Args:
formula_positions (list): A list of bounding box data where each item is
in the format (category, confidence, [x_min, y_min, x_max, y_max]).
Returns:
list: A list of merged bounding box data in the same format.
"""
def sort_by_x_min(box):
return box[2][0] # Sort by x_min
# Sort the boxes by their x_min coordinate
formula_positions.sort(key=sort_by_x_min)
merged_boxes = []
current_box = None
for box in formula_positions:
category, confidence, bbox = box
x_min, y_min, x_max, y_max = bbox
if current_box is None:
current_box = [category, confidence, bbox]
else:
curr_x_min, curr_y_min, curr_x_max, curr_y_max = current_box[2]
if x_min <= curr_x_max: # Check if boxes overlap or are adjacent horizontally
# Merge the boxes
merged_bbox = [
min(curr_x_min, x_min),
min(curr_y_min, y_min),
max(curr_x_max, x_max),
max(curr_y_max, y_max)
]
current_box[2] = merged_bbox
else:
# Append the current box to merged_boxes list and start a new current_box
merged_boxes.append(current_box)
current_box = [category, confidence, bbox]
if current_box is not None:
merged_boxes.append(current_box)
return merged_boxes
def merge_boxes_by_clustering(formula_positions, target_num_boxes=4):
# Extract bounding box centers as features for clustering
box_centers = np.array([((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) for _, _, bbox in formula_positions])
# Initialize KMeans with the target number of clusters (boxes)
kmeans = KMeans(n_clusters=target_num_boxes, random_state=0).fit(box_centers)
# Assign cluster labels to each bounding box
labels = kmeans.labels_
# Merge bounding boxes based on cluster labels
merged_boxes = []
for i in range(target_num_boxes):
boxes_in_cluster = [formula_positions[j] for j in range(len(formula_positions)) if labels[j] == i]
# Calculate merged bounding box for each cluster
min_x = min([bbox[0] for _, _, bbox in boxes_in_cluster])
min_y = min([bbox[1] for _, _, bbox in boxes_in_cluster])
max_x = max([bbox[2] for _, _, bbox in boxes_in_cluster])
max_y = max([bbox[3] for _, _, bbox in boxes_in_cluster])
merged_bbox = [min_x, min_y, max_x, max_y]
merged_boxes.append((None, None, merged_bbox)) # Replace None with category and confidence if needed
return merged_boxes
def merge_horizontal_boxes(formula_positions, num_big_boxes):
"""
将给定的公式框从上到下合并成指定数量的大框,并保证这些大框之间没有垂直重叠。
参数:
formula_positions: List[Tuple[List[int], float, List[float]]]
每个公式的位置数据结构为 (category_list, confidence, [x_min, y_min, x_max, y_max])
num_big_boxes: int
期望最终合并得到的大框数量
返回:
List[Tuple[List[int], float, List[float]]]
返回合并后的大框列表,结构与输入相同。
"""
if not formula_positions:
return []
# 按 y_min 排序
formula_positions.sort(key=lambda box: box[2][1])
total = len(formula_positions)
group_size = (total + num_big_boxes - 1) // num_big_boxes # 向上取整分组
merged_boxes = []
for i in range(num_big_boxes):
start_idx = i * group_size
end_idx = (i + 1) * group_size
group = formula_positions[start_idx:end_idx]
if not group:
continue
# 合并本组框
category = group[0][0]
confidence = group[0][1]
x_min, y_min, x_max, y_max = group[0][2]
for j in range(1, len(group)):
c, conf, bbox = group[j]
# 更新置信度,如取最大值
if conf > confidence:
confidence = conf
gx_min, gy_min, gx_max, gy_max = bbox
x_min = min(x_min, gx_min)
y_min = min(y_min, gy_min)
x_max = max(x_max, gx_max)
y_max = max(y_max, gy_max)
merged_boxes.append([category, confidence, [x_min, y_min, x_max, y_max]])
# 确保大框之间无重叠
# 假设merged_boxes已按y_min排序(因为我们在分组时就是按排序后的顺序合并的)
for i in range(1, len(merged_boxes)):
prev_box = merged_boxes[i - 1]
curr_box = merged_boxes[i]
_, _, [prev_x_min, prev_y_min, prev_x_max, prev_y_max] = prev_box
_, _, [curr_x_min, curr_y_min, curr_x_max, curr_y_max] = curr_box
# 如果当前框的y_min <= 上一个框的y_max,说明有重叠,需要调整
if curr_y_min <= prev_y_max:
# 将当前框向下平移,使得curr_y_min = prev_y_max + 1
shift = (prev_y_max + 1) - curr_y_min
curr_y_min += shift
curr_y_max += shift
# 更新当前框的坐标
curr_box[2] = [curr_x_min, curr_y_min, curr_x_max, curr_y_max]
return merged_boxes
def process_formula_positions(formula_positions, target_num_boxes=2):
"""
处理公式位置,合并垂直方向上的框和聚类框,并返回最终的边界框。
参数:
formula_positions: List[Tuple[List[int], float, List[float]]]
每个公式的位置,由类别、置信度和边界框组成。
target_num_boxes: int
目标聚类框的数量。
返回:
List[List[float]]: 合并后的公式边界框列表。
"""
# print(formula_positions)
# hor_merges=merge_horizontal_boxes(formula_positions)
# logger.info(f'hor_merges={hor_merges}')
#合并垂直方向上的框
merged_boxes_ver = merge_ver_boxes(formula_positions)
#print(len(merged_boxes_ver))
merged_boxes_hor=merge_horizontal_boxes(merged_boxes_ver,target_num_boxes)
#print(merged_boxes_hor)
#print(len(merged_boxes_hor))
#merged_boxes_hor = merge_hor_boxes(formula_positions)
# if len(formula_positions) < target_num_boxes:
# target_num_boxes = len(formula_positions)
# # 使用聚类算法合并框,仅保留 target_num_boxes 个框
# merged_boxes = merge_boxes_by_clustering(merged_boxes_hor, target_num_boxes=target_num_boxes)
# 提取合并后的边界框信息
formula_boxes = [data[2] for data in merged_boxes_hor]
return formula_boxes
import numpy as np
def merge_bounding_boxes(formula_positions):
# 转换为NumPy数组
positions_array = np.array(formula_positions)
# 找到最左、最上、最右、最下的点
min_x1 = np.min(positions_array[:, 0])
min_y1 = np.min(positions_array[:, 1])
max_x2 = np.max(positions_array[:, 2])
max_y2 = np.max(positions_array[:, 3])
# 返回新的大的边界框
return [min_x1, min_y1, max_x2, max_y2]
if __name__ == "__main__":
# 示例
formula_positions = [
[1, 2, 3, 4],
[2, 3, 5, 6],
[0, 1, 4, 5]
]
merged_box = merge_bounding_boxes(formula_positions)
print(merged_box) # 输出:[0, 1, 5, 6]
import os
from datetime import datetime
import langid
import numpy as np
from sklearn.cluster import KMeans
def mkdir_if_not_exist(path):
"""
如果目录不存在,则创建目录。
参数:
path (str): 要创建的目录路径。
"""
if not os.path.exists(path):
os.makedirs(path)
def get_millisecond_time():
"""
获取当前时间(精确到毫秒)。
返回:
str: 格式化的当前时间字符串(精确到毫秒)。
"""
current_time = datetime.now()
time_str = current_time.strftime("%Y%m%d%H%M%S%f")[:-3]
return time_str
def get_lang(text):
"""
检测给定文本的语言。
参数:
text (str): 要检测语言的文本。
返回:
str: 'en' 表示英文,'zh' 表示中文。
"""
lang_detect, _ = langid.classify(text.replace('。', ' ').replace(',', ' '))
return 'en' if lang_detect == 'en' else 'zh'
def get_day_time():
"""
获取当前日期和时间的格式化字符串。
返回:
str: 格式化的当前日期和时间字符串(格式为 "YYYYMMDD_HH%M%S")。
"""
now = datetime.now()
formatted_time = now.strftime("%Y%m%d_%H%M%S")
return formatted_time
def merge_ver_boxes(formula_positions):
"""
合并垂直方向上重叠的边界框。
参数:
formula_positions (list): 包含类别、置信度和边界框的元组列表。
返回:
list: 合并后的边界框。
"""
def sort_by_y_min(box):
return box[2][1]
formula_positions.sort(key=sort_by_y_min)
merged_boxes = []
current_box = None
for box in formula_positions:
category, confidence, bbox = box
x_min, y_min, x_max, y_max = bbox
if current_box is None:
current_box = [category, confidence, bbox]
else:
curr_x_min, curr_y_min, curr_x_max, curr_y_max = current_box[2]
if y_min <= curr_y_max:
merged_bbox = [
min(curr_x_min, x_min),
min(curr_y_min, y_min),
max(curr_x_max, x_max),
max(curr_y_max, y_max)
]
current_box[2] = merged_bbox
else:
merged_boxes.append(current_box)
current_box = [category, confidence, bbox]
if current_box is not None:
merged_boxes.append(current_box)
return merged_boxes
def merge_hor_boxes(formula_positions):
"""
合并水平方向上重叠的边界框。
参数:
formula_positions (list): 包含类别、置信度和边界框的元组列表。
返回:
list: 合并后的边界框。
"""
def sort_by_x_min(box):
return box[2][0]
formula_positions.sort(key=sort_by_x_min)
merged_boxes = []
current_box = None
for box in formula_positions:
category, confidence, bbox = box
x_min, y_min, x_max, y_max = bbox
if current_box is None:
current_box = [category, confidence, bbox]
else:
curr_x_min, curr_y_min, curr_x_max, curr_y_max = current_box[2]
if x_min <= curr_x_max:
merged_bbox = [
min(curr_x_min, x_min),
min(curr_y_min, y_min),
max(curr_x_max, x_max),
max(curr_y_max, y_max)
]
current_box[2] = merged_bbox
else:
merged_boxes.append(current_box)
current_box = [category, confidence, bbox]
if current_box is not None:
merged_boxes.append(current_box)
return merged_boxes
def merge_boxes_by_clustering(formula_positions, target_num_boxes=4):
"""
使用聚类算法合并边界框。
参数:
formula_positions (list): 包含类别、置信度和边界框的元组列表。
target_num_boxes (int): 目标合并成的边界框数量。
返回:
list: 基于聚类的合并边界框。
"""
box_centers = np.array([((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) for _, _, bbox in formula_positions])
kmeans = KMeans(n_clusters=target_num_boxes, random_state=0).fit(box_centers)
labels = kmeans.labels_
merged_boxes = []
for i in range(target_num_boxes):
boxes_in_cluster = [formula_positions[j] for j in range(len(formula_positions)) if labels[j] == i]
if not boxes_in_cluster:
continue
min_x = min([bbox[0] for _, _, bbox in boxes_in_cluster])
min_y = min([bbox[1] for _, _, bbox in boxes_in_cluster])
max_x = max([bbox[2] for _, _, bbox in boxes_in_cluster])
max_y = max([bbox[3] for _, _, bbox in boxes_in_cluster])
merged_bbox = [min_x, min_y, max_x, max_y]
merged_boxes.append((None, None, merged_bbox))
return merged_boxes
def process_formula_positionsv2(formula_positions, target_num_boxes=4):
"""
通过合并垂直、水平和聚类框来处理公式位置。
参数:
formula_positions (list): 包含类别、置信度和边界框的元组列表。
target_num_boxes (int): 目标合并成的边界框数量。
返回:
list: 最终合并的边界框列表。
"""
merged_boxes_ver = merge_ver_boxes(formula_positions)
#merged_boxes_hor = merge_hor_boxes(merged_boxes_ver)
# print(merged_boxes_ver)
# print(merged_boxes_hor)
if len(merged_boxes_ver) < target_num_boxes:
target_num_boxes = len(merged_boxes_ver)
merged_boxes = merge_boxes_by_clustering(merged_boxes_ver, target_num_boxes=target_num_boxes)
formula_boxes = [data[2] for data in merged_boxes]
return formula_boxes
def merge_bounding_boxes(formula_positions):
"""
将多个边界框合并为一个大的边界框。
参数:
formula_positions (list): 边界框列表。
返回:
list: 合并后的大边界框。
"""
positions_array = np.array(formula_positions)
min_x1 = np.min(positions_array[:, 0])
min_y1 = np.min(positions_array[:, 1])
max_x2 = np.max(positions_array[:, 2])
max_y2 = np.max(positions_array[:, 3])
return [min_x1, min_y1, max_x2, max_y2]
if __name__ == "__main__":
formula_positions = [[[6], 0.0, [1609.0, 199.0, 2127.0, 329.0]], [[0], 0.9736799597740173, [225.93232349219818, 1611.367297878681, 2017.0720675821328, 2075.073957172845]], [[0], 0.9499262571334839, [1579.0, 2211.0, 1992.0, 2841.0]], [[1], 0.4914904832839966, [320.0, 2088.0, 739.0, 2161.0]], [[1], 0.4311352074146271, [228.9372284589486, 1553.0413806681088, 726.2493805957434, 1587.8428093167895]], [[1], 0.0, [765.0, 972.0, 842.0, 1011.0]], [[1], 0.0, [778.0, 1329.0, 848.0, 1368.0]], [[1], 0.0, [431.0, 717.0, 634.0, 766.0]], [[1], 0.0, [327.0, 1152.0, 554.0, 1195.0]], [[1], 0.0, [351.0, 1460.0, 735.0, 1506.0]], [[1], 0.0, [1012.0, 517.0, 1352.0, 566.0]], [[0], 0.0, [1059.0, 713.0, 1867.0, 874.0]], [[1], 0.0, [1062.0, 930.0, 1696.0, 962.0]], [[1], 0.0, [1055.0, 1024.0, 1249.0, 1093.0]], [[1], 0.0, [1059.0, 1149.0, 1840.0, 1182.0]], [[1], 0.0, [1095.0, 1247.0, 1980.0, 1345.0]], [[1], 0.0, [1092.0, 1413.0, 1346.0, 1486.0]], [[7], 0.4297367036342621, [1976.0, 2958.0, 2067.0, 2994.0]], [[0], 0.0, [227.0, 2235.0, 1453.0, 2854.0]]]
#formula_positions=[data[2] for data in formula_positions]
merged_box = process_formula_positions(formula_positions)
print(merged_box) # 输出:[0, 1, 5, 6]
import os
from datetime import datetime
import langid
import numpy as np
from sklearn.cluster import KMeans
def mkdir_if_not_exist(path):
"""
如果目录不存在,则创建目录。
参数:
path (str): 要创建的目录路径。
"""
if not os.path.exists(path):
os.makedirs(path)
def get_millisecond_time():
"""
获取当前时间(精确到毫秒)。
返回:
str: 格式化的当前时间字符串(精确到毫秒)。
"""
current_time = datetime.now()
time_str = current_time.strftime("%Y%m%d%H%M%S%f")[:-3]
return time_str
def get_lang(text):
"""
检测给定文本的语言。
参数:
text (str): 要检测语言的文本。
返回:
str: 'en' 表示英文,'zh' 表示中文。
"""
lang_detect, _ = langid.classify(text.replace('。', ' ').replace(',', ' '))
return 'en' if lang_detect == 'en' else 'zh'
def get_day_time():
"""
获取当前日期和时间的格式化字符串。
返回:
str: 格式化的当前日期和时间字符串(格式为 "YYYYMMDD_HH%M%S")。
"""
now = datetime.now()
formatted_time = now.strftime("%Y%m%d_%H%M%S")
return formatted_time
def merge_ver_boxes(formula_positions):
"""
合并垂直方向上重叠的边界框。
参数:
formula_positions (list): 包含类别、置信度和边界框的元组列表。
返回:
list: 合并后的边界框。
"""
def sort_by_y_min(box):
return box[2][1]
formula_positions.sort(key=sort_by_y_min)
merged_boxes = []
current_box = None
for box in formula_positions:
category, confidence, bbox = box
x_min, y_min, x_max, y_max = bbox
if current_box is None:
current_box = [category, confidence, bbox]
else:
curr_x_min, curr_y_min, curr_x_max, curr_y_max = current_box[2]
if y_min <= curr_y_max:
merged_bbox = [
min(curr_x_min, x_min),
min(curr_y_min, y_min),
max(curr_x_max, x_max),
max(curr_y_max, y_max)
]
current_box[2] = merged_bbox
else:
merged_boxes.append(current_box)
current_box = [category, confidence, bbox]
if current_box is not None:
merged_boxes.append(current_box)
return merged_boxes
def merge_hor_boxes(formula_positions):
"""
合并水平方向上重叠的边界框。
参数:
formula_positions (list): 包含类别、置信度和边界框的元组列表。
返回:
list: 合并后的边界框。
"""
def sort_by_x_min(box):
return box[2][0]
formula_positions.sort(key=sort_by_x_min)
merged_boxes = []
current_box = None
for box in formula_positions:
category, confidence, bbox = box
x_min, y_min, x_max, y_max = bbox
if current_box is None:
current_box = [category, confidence, bbox]
else:
curr_x_min, curr_y_min, curr_x_max, curr_y_max = current_box[2]
if x_min <= curr_x_max:
merged_bbox = [
min(curr_x_min, x_min),
min(curr_y_min, y_min),
max(curr_x_max, x_max),
max(curr_y_max, y_max)
]
current_box[2] = merged_bbox
else:
merged_boxes.append(current_box)
current_box = [category, confidence, bbox]
if current_box is not None:
merged_boxes.append(current_box)
return merged_boxes
def merge_boxes_by_clustering(formula_positions, target_num_boxes=4):
"""
使用聚类算法合并边界框。
参数:
formula_positions (list): 包含类别、置信度和边界框的元组列表。
target_num_boxes (int): 目标合并成的边界框数量。
返回:
list: 基于聚类的合并边界框。
"""
box_centers = np.array([((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) for _, _, bbox in formula_positions])
kmeans = KMeans(n_clusters=target_num_boxes, random_state=0).fit(box_centers)
labels = kmeans.labels_
merged_boxes = []
for i in range(target_num_boxes):
boxes_in_cluster = [formula_positions[j] for j in range(len(formula_positions)) if labels[j] == i]
if not boxes_in_cluster:
continue
min_x = min([bbox[0] for _, _, bbox in boxes_in_cluster])
min_y = min([bbox[1] for _, _, bbox in boxes_in_cluster])
max_x = max([bbox[2] for _, _, bbox in boxes_in_cluster])
max_y = max([bbox[3] for _, _, bbox in boxes_in_cluster])
merged_bbox = [min_x, min_y, max_x, max_y]
merged_boxes.append((None, None, merged_bbox))
return merged_boxes
def process_formula_positions_v2(formula_positions, target_num_boxes=5):
"""
通过合并垂直、水平和聚类框来处理公式位置。
参数:
formula_positions (list): 包含类别、置信度和边界框的元组列表。
target_num_boxes (int): 目标合并成的边界框数量。
返回:
list: 最终合并的边界框列表。
"""
#merged_boxes_ver = merge_ver_boxes(formula_positions)
merged_boxes_hor = merge_hor_boxes(formula_positions)
# print(merged_boxes_ver)
# print(merged_boxes_hor)
if len(merged_boxes_hor) < target_num_boxes:
target_num_boxes = len(merged_boxes_hor)
merged_boxes = merge_boxes_by_clustering(merged_boxes_hor, target_num_boxes=target_num_boxes)
formula_boxes = [data[2] for data in merged_boxes]
return formula_boxes
def merge_bounding_boxes(formula_positions):
"""
将多个边界框合并为一个大的边界框。
参数:
formula_positions (list): 边界框列表。
返回:
list: 合并后的大边界框。
"""
positions_array = np.array(formula_positions)
min_x1 = np.min(positions_array[:, 0])
min_y1 = np.min(positions_array[:, 1])
max_x2 = np.max(positions_array[:, 2])
max_y2 = np.max(positions_array[:, 3])
return [min_x1, min_y1, max_x2, max_y2]
if __name__ == "__main__":
formula_positions = [[[6], 0.0, [1609.0, 199.0, 2127.0, 329.0]], [[0], 0.9736799597740173, [225.93232349219818, 1611.367297878681, 2017.0720675821328, 2075.073957172845]], [[0], 0.9499262571334839, [1579.0, 2211.0, 1992.0, 2841.0]], [[1], 0.4914904832839966, [320.0, 2088.0, 739.0, 2161.0]], [[1], 0.4311352074146271, [228.9372284589486, 1553.0413806681088, 726.2493805957434, 1587.8428093167895]], [[1], 0.0, [765.0, 972.0, 842.0, 1011.0]], [[1], 0.0, [778.0, 1329.0, 848.0, 1368.0]], [[1], 0.0, [431.0, 717.0, 634.0, 766.0]], [[1], 0.0, [327.0, 1152.0, 554.0, 1195.0]], [[1], 0.0, [351.0, 1460.0, 735.0, 1506.0]], [[1], 0.0, [1012.0, 517.0, 1352.0, 566.0]], [[0], 0.0, [1059.0, 713.0, 1867.0, 874.0]], [[1], 0.0, [1062.0, 930.0, 1696.0, 962.0]], [[1], 0.0, [1055.0, 1024.0, 1249.0, 1093.0]], [[1], 0.0, [1059.0, 1149.0, 1840.0, 1182.0]], [[1], 0.0, [1095.0, 1247.0, 1980.0, 1345.0]], [[1], 0.0, [1092.0, 1413.0, 1346.0, 1486.0]], [[7], 0.4297367036342621, [1976.0, 2958.0, 2067.0, 2994.0]], [[0], 0.0, [227.0, 2235.0, 1453.0, 2854.0]]]
#formula_positions=[data[2] for data in formula_positions]
merged_box = process_formula_positions_v2(formula_positions)
print(merged_box) # 输出:[0, 1, 5, 6]
import re
def find_and_highlight_substring(A, B):
# 正则表达式匹配B
pattern = re.escape(B)
# 存储匹配到的内容的起始和结束下标
matches = []
# 处理单个$符号内的内容
def single_dollar_replacement(match):
start = match.start(1)
end = match.end(1)
inner_text = match.group(1)
highlighted_text = re.sub(pattern, r'<span style="color:red">\g<0></span>', inner_text)
matches.append((start, end))
return f"${highlighted_text}$"
# 处理双个$$符号内的内容
def double_dollar_replacement(match):
start = match.start(1)
end = match.end(1)
inner_text = match.group(1)
highlighted_text = re.sub(pattern, r'<span style="color:red">\g<0></span>', inner_text)
matches.append((start, end))
return f"$${highlighted_text}$$"
# 使用正则表达式替换并标记匹配位置
highlighted_A = re.sub(r'\$(.*?)\$', single_dollar_replacement, A)
highlighted_A = re.sub(r'\$\$(.*?)\$\$', double_dollar_replacement, highlighted_A)
return highlighted_A, matches
# # 示例
# A = '# 1.4.1 用空间向量研究直线、平面的位置关系(第3课时)\n\n## 空间中直线、平面的垂直\n\n### 知识清单\n\n1. 直线和直线垂直\n设直线 $l_1, l_2$ 的方向向量分别为 $$u_1, u_2$$,则 $$l_1 \perp l_2 \Leftrightarrow u_1 \cdot u_2 = 0$$。'
# B = 'l_1 \perp l_2'
# highlighted_A, matches = find_and_highlight_substring(A, B)
# print("Highlighted A:\n", highlighted_A)
# print("Matches:", matches)
import xml.etree.ElementTree as ET
import cv2
import random
def parse_xml(file_path):
"""
解析XML文件,提取公式框和文本框的坐标。
参数:
file_path (str): XML文件路径。
返回:
tuple: 公式框列表和文本框列表,格式为[(xmin, ymin, xmax, ymax), ...]。
"""
tree = ET.parse(file_path)
root = tree.getroot()
formula_boxes = []
text_boxes = []
for obj in root.findall('object'):
name = obj.find('name').text
xmin = int(obj.find('bndbox/xmin').text)
ymin = int(obj.find('bndbox/ymin').text)
xmax = int(obj.find('bndbox/xmax').text)
ymax = int(obj.find('bndbox/ymax').text)
box = (xmin, ymin, xmax, ymax)
if name == 'formula':
formula_boxes.append(box)
elif name == 'text':
text_boxes.append(box)
return formula_boxes, text_boxes
def find_closest_text_boxes(formula_box, text_boxes):
"""
找到给定公式框最近的左、右、上、下文本框。
参数:
formula_box (tuple): 公式框的坐标(xmin, ymin, xmax, ymax)。
text_boxes (list): 文本框列表,格式为[(xmin, ymin, xmax, ymax), ...]。
返回:
tuple: 最近的左、右、上、下文本框的坐标,如果没有则为None。
"""
left_text = right_text = top_text = bottom_text = None
left_dist = right_dist = top_dist = bottom_dist = float('inf')
xmin_f, ymin_f, xmax_f, ymax_f = formula_box
for text_box in text_boxes:
xmin_t, ymin_t, xmax_t, ymax_t = text_box
if check_overlap(formula_box, text_box):
continue
if xmax_t < xmin_f and xmin_f - xmax_t < left_dist:
left_dist = xmin_f - xmax_t
left_text = text_box
if xmin_t > xmax_f and xmin_t - xmax_f < right_dist:
right_dist = xmin_t - xmax_f
right_text = text_box
if ymax_t < ymin_f and ymin_f - ymax_t < top_dist:
top_dist = ymin_f - ymax_t
top_text = text_box
if ymin_t > ymax_f and ymin_t - ymax_f < bottom_dist:
bottom_dist = ymin_t - ymax_f
bottom_text = text_box
return left_text, right_text, top_text, bottom_text
def calculate_iou(box1, box2):
"""
计算两个框的交并比(IoU)。
参数:
box1 (tuple): 第一个框的坐标(xmin, ymin, xmax, ymax)。
box2 (tuple): 第二个框的坐标(xmin, ymin, xmax, ymax)。
返回:
float: 两个框的交并比(IoU)。
"""
xmin1, ymin1, xmax1, ymax1 = box1
xmin2, ymin2, xmax2, ymax2 = box2
# 计算交集坐标
inter_xmin = max(xmin1, xmin2)
inter_ymin = max(ymin1, ymin2)
inter_xmax = min(xmax1, xmax2)
inter_ymax = min(ymax1, ymax2)
# 计算交集面积
inter_area = max(0, inter_xmax - inter_xmin + 1) * max(0, inter_ymax - inter_ymin + 1)
# 计算每个框的面积
box1_area = (xmax1 - xmin1 + 1) * (ymax1 - ymin1 + 1)
box2_area = (xmax2 - xmin2 + 1) * (ymax2 - ymax2 + 1)
# 计算并集面积
union_area = box1_area + box2_area - inter_area
# 计算IoU
iou = inter_area / union_area
return iou
def check_overlap(box1, box2, iou_threshold=0.1):
"""
检查两个框是否有重叠,并且重叠的IoU在一定范围内是可以接受的。
参数:
box1 (tuple): 第一个框的坐标(xmin, ymin, xmax, ymax)。
box2 (tuple): 第二个框的坐标(xmin, ymin, xmax, ymax)。
iou_threshold (float): 可接受的IoU阈值。
返回:
bool: 如果两个框的IoU在可接受范围内则返回True,否则返回False。
"""
iou = calculate_iou(box1, box2)
return iou >= iou_threshold
def check_no_overlap_with_others(expanded_box, formula_boxes, current_box):
"""
检查扩展后的框是否与其他公式框重叠。
参数:
expanded_box (tuple): 扩展后的框的坐标(xmin, ymin, xmax, ymax)。
formula_boxes (list): 所有公式框的列表。
current_box (tuple): 当前正在处理的公式框。
返回:
bool: 如果扩展后的框与其他公式框没有重叠则返回True,否则返回False。
"""
for other_box in formula_boxes:
if other_box != current_box and check_overlap(expanded_box, other_box):
return False
return True
def expand_formula_boxes(formula_boxes, text_boxes):
"""
扩展公式框,使其与最近的文本框融合。
参数:
formula_boxes (list): 公式框列表,格式为[(xmin, ymin, xmax, ymax), ...]。
text_boxes (list): 文本框列表,格式为[(xmin, ymin, xmax, ymax), ...]。
返回:
list: 扩展后的公式框列表。
"""
expanded_formula_boxes = []
for formula_box in formula_boxes:
xmin_f, ymin_f, xmax_f, ymax_f = formula_box
left_text, right_text, top_text, bottom_text = find_closest_text_boxes(formula_box, text_boxes)
# 扩展左边
if left_text:
xmin_t, ymin_t, xmax_t, ymax_t = left_text
new_xmin_f = xmin_t
expanded_box = (new_xmin_f, min(ymin_f, ymin_t), xmax_f, max(ymax_f, ymax_t))
if check_no_overlap_with_others(expanded_box, formula_boxes, formula_box):
xmin_f = new_xmin_f
ymin_f = min(ymin_f, ymin_t)
ymax_f = max(ymax_f, ymax_t)
# 扩展右边
if right_text:
xmin_t, ymin_t, xmax_t, ymax_t = right_text
new_xmax_f = xmax_t
expanded_box = (xmin_f, min(ymin_f, ymin_t), new_xmax_f, max(ymax_f, ymax_t))
if check_no_overlap_with_others(expanded_box, formula_boxes, formula_box):
xmax_f = new_xmax_f
ymin_f = min(ymin_f, ymin_t)
ymax_f = max(ymax_f, ymax_t)
# 扩展上边
if top_text:
xmin_t, ymin_t, xmax_t, ymax_t = top_text
new_ymin_f = ymin_t
expanded_box = (min(xmin_f, xmin_t), new_ymin_f, max(xmax_f, xmax_t), ymax_f)
if check_no_overlap_with_others(expanded_box, formula_boxes, formula_box):
ymin_f = new_ymin_f
xmin_f = min(xmin_f, xmin_t)
xmax_f = max(xmax_f, xmax_t)
# 扩展下边
if bottom_text:
xmin_t, ymin_t, xmax_t, ymax_t = bottom_text
new_ymax_f = ymax_t
expanded_box = (min(xmin_f, xmin_t), ymin_f, max(xmax_f, xmax_t), new_ymax_f)
if check_no_overlap_with_others(expanded_box, formula_boxes, formula_box):
ymax_f = new_ymax_f
xmin_f = min(xmin_f, xmin_t)
xmax_f = max(xmax_f, xmax_t)
expanded_formula_boxes.append((xmin_f, ymin_f, xmax_f, ymax_f))
return expanded_formula_boxes
def draw_boxes_on_image(image_path, boxes, output_path):
"""
在图像上绘制框并保存结果图像。
参数:
image_path (str): 输入图像的路径。
boxes (list): 需要绘制的框的列表,格式为[(xmin, ymin, xmax, ymax), ...]。
output_path (str): 保存结果图像的路径。
"""
image = cv2.imread(image_path)
def random_color():
return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
for box in boxes:
xmin, ymin, xmax, ymax = box
color = random_color()
cv2.rectangle(image, (xmin, ymin), (xmax, ymax), color, 2)
cv2.imwrite(output_path, image)
print(f"扩展后的图像已保存到 {output_path}")
def main(xml_file_path, image_file_path, output_image_path):
"""
主函数,执行XML解析、公式框扩展和绘制框的操作。
参数:
xml_file_path (str): XML文件的路径。
image_file_path (str): 输入图像的路径。
output_image_path (str): 保存结果图像的路径。
"""
formula_boxes, text_boxes = parse_xml(xml_file_path)
expanded_formula_boxes = expand_formula_boxes(formula_boxes, text_boxes)
for box in expanded_formula_boxes:
print(box)
draw_boxes_on_image(image_file_path, expanded_formula_boxes, output_image_path)
if __name__ == "__main__":
main('1.xml', '1.png', './expanded_image.png')
import requests
import os
import mimetypes
from typing import Dict, Optional, Union, Tuple
from urllib.parse import quote
class OBSUploader:
def __init__(self, base_url: str = "https://open.5rs.me", auth_token: Optional[str] = None):
"""
Initialize the OBS uploader.
Args:
base_url: The base URL for the API
auth_token: The authorization token for API access
"""
self.base_url = base_url.rstrip('/')
self.auth_token = auth_token
self.headers = {
'Authorization': f'Bearer {auth_token}' if auth_token else None
}
# Initialize mimetypes
mimetypes.init()
def _get_content_type(self, file_path: Union[str, bytes]) -> Tuple[str, bytes]:
"""
Get content type and file content from file path or bytes.
Args:
file_path: Path to the file or file content as bytes
Returns:
Tuple of (content_type, file_content)
"""
if isinstance(file_path, str):
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
content_type, _ = mimetypes.guess_type(file_path)
with open(file_path, 'rb') as f:
file_content = f.read()
else:
file_content = file_path
# For bytes input, try to detect type from first few bytes
content_type = 'application/octet-stream' # Default content type
return content_type or 'application/octet-stream', file_content
def get_upload_url(self, biz_code: str, object_name: str, content_type: str) -> Dict:
"""
Get a temporary upload URL for the specified object.
Args:
biz_code: Business code for the upload
object_name: Name/path of the object to upload
content_type: MIME type of the file
Returns:
Dict containing the upload URL and related information
"""
endpoint = f"{self.base_url}/aimodel/v1.0/obs/getCreatePostSignature"
params = {
'bizCode': biz_code,
'objectName': object_name,
'mimeType': content_type
}
response = requests.get(endpoint, params=params, headers=self.headers)
response.raise_for_status()
return response.json()
def upload_file(self, file_path: Union[str, bytes], biz_code: str, object_name: str) -> Dict:
"""
Upload a file using temporary credentials.
Args:
file_path: Path to the file to upload or file content as bytes
biz_code: Business code for the upload
object_name: Name/path of the object to upload
Returns:
Dict containing the upload result and file URL
"""
# Get content type and file content
content_type, file_content = self._get_content_type(file_path)
# Get temporary upload URL with content type
upload_info = self.get_upload_url(biz_code, object_name, content_type)
if upload_info['errCode'] != 0:
raise Exception(f"Failed to get upload URL: {upload_info['message']}")
upload_url = upload_info['data']['temporarySignatureUrl']
# Upload the file with the correct content type
headers = {
'Content-Type': content_type,
'Content-Length': str(len(file_content))
}
response = requests.put(upload_url, data=file_content, headers=headers)
response.raise_for_status()
return {
'success': True,
'file_url': upload_info['data']['domain'] + '/' + object_name,
'object_url_map': upload_info['data']['objectUrlMap']
}
# Example usage:
if __name__ == "__main__":
# Initialize uploader
uploader = OBSUploader(auth_token="dcg-4c1e3a7f4fcd415e8c93151ff539d20a")
# Upload a file
try:
result = uploader.upload_file(
file_path="/data/wangtengbo/formula_node4_生产/logs/2025-06-06/draw_box_sub_images/0a9fb8f899c74d979c7dce58f61ff00e/formula_1.png",
biz_code="formula",
object_name="image/test.jpg"
)
print(result)
print(f"File uploaded successfully! URL: {result['file_url']}")
except Exception as e:
print(f"Upload failed: {str(e)}")
\ No newline at end of file
import matplotlib.pyplot as plt
import matplotlib as mpl
# 设置 LaTeX 渲染
mpl.rcParams['text.usetex'] = True
mpl.rcParams['font.size'] = 12
# LaTeX 公式
latex_formula = r'$a \perp b \Leftrightarrow a \cdot b = 0$'
# 创建一个图形和轴
fig, ax = plt.subplots()
# 隐藏轴
ax.axis('off')
# 显示 LaTeX 公式
ax.text(0.5, 0.5, latex_formula, fontsize=20, ha='center', va='center')
# 保存图形为图像
output_image_path = "latex_formula.png"
plt.savefig(output_image_path, bbox_inches='tight')
# 显示图像
plt.show()
import re
from sympy.parsing.latex import parse_latex
def parse_and_normalize_latex(latex_str):
"""
使用 sympy 解析 LaTeX 并标准化表达式。
:param latex_str: LaTeX 表达式字符串
:return: 标准化后的字符串表示
"""
try:
expr = parse_latex(latex_str)
return str(expr)
except Exception as e:
print(f"Error parsing LaTeX: {e}")
return None
def extract_and_normalize_formulas(text):
"""
从文本中提取公式并标准化。
:param text: 包含公式的文本
:return: 标准化后的公式列表
"""
formulas = []
# 提取 LaTeX 公式
latex_matches = re.findall(r'\\[a-zA-Z]+|\\frac|\\sqrt|[A-Za-z]+=|\\[A-Za-z0-9]+', text)
for match in latex_matches:
normalized = parse_and_normalize_latex(match)
if normalized:
formulas.append(normalized)
return formulas
def check_formula_in_text(target_formula, text):
"""
判断目标公式是否存在于文本中。
:param target_formula: LaTeX 表达式的目标公式
:param text: 包含公式的目标文本
:return: True 如果目标公式在文本中,否则 False
"""
normalized_target_formula = parse_and_normalize_latex(target_formula)
if not normalized_target_formula:
return False
normalized_formulas_in_text = extract_and_normalize_formulas(text)
return normalized_target_formula in normalized_formulas_in_text
def process_text(text):
"""
去除文本中的所有空行,并按照换行符进行分割。
:param text: 输入的多行字符串
:return: 去除空行后按 \n 分割的列表
"""
# 去除空行
non_empty_lines = [r'{}'.format(line) for line in text.splitlines() if line.strip()]
return non_empty_lines
def Scan_Content_Aggregation(all_page_details):
all_lines_info=process_text(all_page_details)
print(all_lines_info)
#all_results_markdown=[line for line in all_lines_info if is_break(line) and check_numbers_in_string(line)]
#all_page_details=[[{},{}],[{},{}]]
#过滤内容
# print(all_lines_info)
# for line in all_lines_info:
# if not is_break(line):
# continue
# if not check_numbers_in_string(line):
# continue
# all_results_markdown.append(line)
return ''
# 测试案例
if __name__ == "__main__":
original_text="""
**(多媒体展示)填空:**
(1)$\sqrt { 4 } \times \sqrt { 9 } =$ ,$\sqrt { 4 \times 9 } =$ ;
(2)$\sqrt { 2 5 } \times \sqrt { 1 6 } =$ ,$\sqrt { 2 5 \times 1 6 } =$ ;
(3)$\sqrt { \frac { 1 } { 9 } } \times \sqrt { 3 6 } =$ ,
$\sqrt { \frac { 1 } { 9 } \times 3 6 } =$ ;
(4)$\sqrt { 1 0 0 } \times \sqrt { 0 } =$ ,$\sqrt { 1 0 0 \times 0 } =$
生:(1)$\sqrt { 4 } \times \sqrt { 9 } = 6$,$\sqrt { 4 \times 9 } = 6$;(2)$\sqrt { 2 5 } \times \sqrt { 1 6 } = 2 0$,$\sqrt { 2 5 \times 1 6 } = 2 0 ; ( 3 ) \sqrt { \frac { 1 } { 9 } }$
$\times \sqrt { 3 6 } = 2$ $\sqrt { \frac { 1 } { 9 } } \times 3 6 = 2 ;$ $; ( 4 ) \sqrt { 1 0 0 } \times \sqrt { 0 } = 0$,$\sqrt { 1 0 0 \times 0 } = 0 .$·
试一试,参考上面的结果,比较各组等式的大小关系.
生:上面各组中两个算式的结果相等.
## 二、新课教授
$\because x > 0, \therefore x = 30 \sqrt{2}.$
【例4】若$\frac { \sqrt { x + 1 } } { \sqrt { x - 1 } } = \sqrt { \frac { x + 1 } { x - 1 } }$成立,求x的取值范围.
分析:等式$\frac { \sqrt { a } } { \sqrt { b } } = \sqrt { \frac { a } { b } }$只有a≥0,b&gt;0时才能成立.
解:由题意,得$\{ \begin{matrix} x + 1 \geq 0 \\ x - 1 > 0 \end{matrix} ,$即$\{ \begin{matrix} x \geq - 1 , \\ x > 1 . \end{matrix}$
∴x&gt;1.
## 四、巩固练习
(2)首先利用$\sqrt { a ^ { 3 } } = \vert a \vert$化简去掉二次根号,再根据x的范围来判断绝对值中的代数式的正负,去掉绝对值符号.
$\vert x - 2 \vert + \sqrt { ( x + 3 ) ^ { 2 } } + \sqrt { x ^ { 2 } - 10 x + 25 }$
=|x-2|+|x+3|+|x-5|·
∵-3≤x≤2,
∴x-2≤0,$\textcircled { 3 }$x+3≥0,,x-5<0.
∴原式=-(x-2)+(x+3)-(x-5)
=-x+2+x+3-x+5
"""
cleaned_text = Scan_Content_Aggregation(original_text)
print(cleaned_text)
import base64
from PIL import Image
from io import BytesIO
import cv2
import numpy as np
import matplotlib.pyplot as plt
def get_base64_from_image(image):
"""
将输入的图像转换为Base64编码的字符串。
:param image: 输入图像(OpenCV格式)
:return: Base64编码的字符串
"""
# 将图像转换为PIL格式
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# 使用BytesIO将图像保存到内存中的字节流
buffered = BytesIO()
pil_image.save(buffered, format="PNG")
# 获取字节流并进行Base64编码
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
return img_str
def get_subimage_base64(image_path, coordinates, scale_factor=1):
"""
提取子图并返回其Base64编码的字符串。
:param image_path: 输入图像路径(可以是图像文件或PDF文件)
:param coordinates: 以8个数字表示的四个角坐标,顺序为左上,右上,右下,左下
:param scale_factor: 如果输入为PDF,scale_factor 用于将坐标从72dpi转换为目标图像的像素坐标(默认为1,无需转换)
:return: Base64编码的子图
"""
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Failed to read the image file '{image_path}'.")
# 获取图像的尺寸
height, width = image.shape[:2]
# 解析输入坐标
x1, y1, x2, y2, x3, y3, x4, y4 = map(int, coordinates)
# 计算裁剪区域的矩形边界
x_min = min(x1, x2, x3, x4)
y_min = min(y1, y2, y3, y4)
x_max = max(x1, x2, x3, x4)
y_max = max(y1, y2, y3, y4)
# 确保裁剪区域不超出图像边界
x_min = max(0, x_min)
y_min = max(0, y_min)
x_max = min(width, x_max)
y_max = min(height, y_max)
# 提取ROI
roi = image[y_min:y_max, x_min:x_max]
# 如果子图为空,抛出异常
if roi.size == 0:
raise ValueError(f"The extracted region is empty for the given coordinates: {coordinates}")
# 将提取的子图转换为Base64编码并返回
return get_base64_from_image(roi)
def visualize_base64_image(base64_str, output_path=None):
"""
可视化Base64编码的图像,并使用cv2存储图像。
:param base64_str: Base64编码的图像字符串
:param output_path: 可选的输出路径,如果提供则使用cv2保存图像到文件
"""
# 解码Base64编码为图像字节流
img_data = base64.b64decode(base64_str)
# 使用BytesIO将字节流读取为PIL图像
img = Image.open(BytesIO(img_data))
# 将PIL图像转换为OpenCV格式(BGR)
open_cv_image = np.array(img)
open_cv_image = open_cv_image[:, :, ::-1] # 从RGB转为BGR
# 使用cv2保存图像,如果指定了输出路径
if output_path:
cv2.imwrite(output_path, open_cv_image)
print(f"Image saved to: {output_path}")
if __name__ == '__main__':
# 示例:图像坐标为 (left, upper, right, lower)
image_path = "/data/wangtengbo/Deployments_Formula_Checker_v5_复现并微调_v7.0-合和OCR-没有版面/app/logs_data/sub_images/20241210142852866/formula_1.png"
coordinates = [74, 143, 1240, 143, 1240, 241, 74, 241]
# 获取子图的Base64编码
base64_subimage = get_subimage_base64(image_path, coordinates)
# 可视化Base64编码的子图
visualize_base64_image(base64_subimage,output_path='/data/wangtengbo/Deployments_Formula_Checker_v5_复现并微调_v7.0-合和OCR-没有版面/app/utils/base64.jpg')
import base64
from PIL import Image
from io import BytesIO
import cv2
import numpy as np
import cv2
import numpy as np
import os
from loguru import logger
def draw_box_and_save(image_path, coordinates, draw_box_sub_img_save_dir,save_img_name,scale_factor=1):
"""
在图像上绘制多边形框并保存回原路径,返回图像路径。
:param image_path: 输入图像路径
:param coordinates: 以8个数字表示的四个角坐标,顺序为左上、右上、右下、左下
:param scale_factor: 可选的缩放因子(若不需要则默认为1)
:return: 保存后的图像路径(即 image_path)
"""
# 读取图像
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"无法读取图像文件: {image_path}")
# 解析坐标并应用缩放
x1, y1, x2, y2, x3, y3, x4, y4 = map(float, coordinates)
x1, y1, x2, y2, x3, y3, x4, y4 = [int(c * scale_factor) for c in (x1, y1, x2, y2, x3, y3, x4, y4)]
# 构造多边形顶点数组
pts = np.array([[x1, y1],
[x2, y2],
[x3, y3],
[x4, y4]], dtype=np.int32).reshape((-1, 1, 2))
# 在图像上绘制红色多边形框
cv2.polylines(image, [pts], isClosed=True, color=(0, 0, 255), thickness=8)
save_image_path=os.path.join(draw_box_sub_img_save_dir,save_img_name+'.png')
# 保存覆盖原图
success = cv2.imwrite(save_image_path, image)
if not success:
logger.info(f'保存图像错误!\n\nsave_image_path={save_image_path}')
raise IOError(f"无法将图像保存到: {save_image_path}")
return save_image_path
def get_base64_from_image(image):
"""
将输入的图像转换为Base64编码的字符串。
:param image: 输入图像(OpenCV格式)
:return: Base64编码的字符串
"""
# 将图像转换为PIL格式
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# 使用BytesIO将图像保存到内存中的字节流
buffered = BytesIO()
pil_image.save(buffered, format="PNG")
# 获取字节流并进行Base64编码
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
return img_str
def get_subimage_base64_boxes(image_path, coordinates, scale_factor=1):
"""
提取子图并返回其Base64编码的字符串。
:param image_path: 输入图像路径(可以是图像文件或PDF文件)
:param coordinates: 以8个数字表示的四个角坐标,顺序为左上,右上,右下,左下
:param scale_factor: 如果输入为PDF,scale_factor 用于将坐标从72dpi转换为目标图像的像素坐标(默认为1,无需转换)
:return: Base64编码的子图
"""
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Failed to read the image file '{image_path}'.")
# 获取图像的尺寸
height, width = image.shape[:2]
# 解析输入坐标
x1, y1, x2, y2, x3, y3, x4, y4 = map(int, coordinates)
# 计算矩形边界的顶点
pts = np.array([[x1, y1], [x2, y2], [x3, y3], [x4, y4]], np.int32)
pts = pts.reshape((-1, 1, 2))
# 在图像上绘制红色边框
image_with_box = image.copy()
cv2.polylines(image_with_box, [pts], isClosed=True, color=(0, 0, 255), thickness=8)
# 将修改后的图像转换为Base64编码并返回
return get_base64_from_image(image_with_box)
def visualize_base64_image(base64_str, output_path=None):
"""
可视化Base64编码的图像,并使用cv2存储图像。
:param base64_str: Base64编码的图像字符串
:param output_path: 可选的输出路径,如果提供则使用cv2保存图像到文件
"""
# 解码Base64编码为图像字节流
img_data = base64.b64decode(base64_str)
# 使用BytesIO将字节流读取为PIL图像
img = Image.open(BytesIO(img_data))
# 将PIL图像转换为OpenCV格式(BGR)
open_cv_image = np.array(img)
open_cv_image = open_cv_image[:, :, ::-1] # 从RGB转为BGR
# 使用cv2保存图像,如果指定了输出路径
if output_path:
cv2.imwrite(output_path, open_cv_image)
print(f"Image saved to: {output_path}")
if __name__ == '__main__':
# 示例:图像坐标为 (left, upper, right, lower)
image_path = "/data/wangtengbo/Deployments_Formula_Checker_v5_复现并微调_v7.0-合和OCR-没有版面/app/logs_data/sub_images/20241214164802726/formula_1.png"
coordinates = [55, 1476, 3659, 1510, 3645, 3070, 41, 3042]
# 获取添加红色边界框的图像的Base64编码
base64_image_with_box = get_subimage_base64_boxes(image_path, coordinates)
# 可视化Base64编码的图像
visualize_base64_image(base64_image_with_box, output_path='/data/wangtengbo/Deployments_Formula_Checker_v5_复现并微调_v7.0-合和OCR-没有版面/app/utils/image_with_box.jpg')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment