123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import base64
- import os
- from pathlib import Path
- from fastapi import FastAPI, File, UploadFile
- from openai import OpenAI
- app = FastAPI()
- client = OpenAI(
- api_key=os.getenv("DASHSCOPE_API_KEY"), # 如果您没有配置环境变量,请在此处替换您的API-KEY
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", # 填写DashScope服务base_url
- )
- DEFAULT_USER_MSG = f"""解析文件中的表格内容:要求准确识别金额等小数的位数,去掉金额单位、英文和多余的空格,结果用json返回;
- 检查所有字段是否完整,确保没有遗漏或错误,可能需要多次校对,以确保生成的json准确无误。"""
- ALL_IMG_CONTENT_TYPE = {
- "jpg": "jpeg",
- "png": "png",
- "jpeg": "jpeg",
- "webp": "webp",
- }
- @app.get("hearth")
- async def hearth():
- print("ok")
- return 'ok'
- @app.get("/upload-filepath")
- async def parse_file(filepath: str = None,
- file_id: str = None,
- user_msg: str = DEFAULT_USER_MSG):
- # 读取文件内容(可选)
- # contents = await file.read()
- # 这里可以对文件进行进一步处理,比如保存到服务器上
- # with open(f"./{file.filename}", "wb") as f:
- # f.write(contents)
- if file_id is None:
- file_object = client.files.create(file=Path(filepath), purpose="file-extract")
- file_id = file_object.id
- # 初始化messages列表
- completion = client.chat.completions.create(
- model="qwen-long",
- temperature=0.1,
- presence_penalty=1,
- messages=[
- {'role': 'system', 'content': 'You are a helpful assistant.'},
- {'role': 'system', 'content': f'fileid://{file_id}'},
- {'role': 'user', 'content': user_msg}
- ],
- )
- return {"file_id": file_id, "content": completion.choices[0].message.content}
- @app.post("/upload-file")
- async def create_upload_file(file: UploadFile = File(...),
- file_id: str = None,
- user_msg: str = DEFAULT_USER_MSG):
- if file_id is None:
- # 读取文件内容(可选)
- contents = await file.read()
- # 这里可以对文件进行进一步处理,比如保存到服务器上
- with open(f"./uploads/{file.filename}", "wb") as f:
- f.write(contents)
- file_object = client.files.create(file=Path(f"./uploads/{file.filename}"), purpose="file-extract")
- file_id = file_object.id
- # 初始化messages列表
- completion = client.chat.completions.create(
- model="qwen-long",
- temperature=0.1,
- presence_penalty=1,
- messages=[
- {'role': 'system', 'content': 'You are a helpful assistant.'},
- {'role': 'system', 'content': f'fileid://{file_id}'},
- {'role': 'user', 'content': user_msg}
- ],
- )
- return {"file_id": file_id, "content": completion.choices[0].message.content}
- @app.get("/parse-img")
- async def parse_image(image_url: str,
- result_schema: str = None,
- user_msg: str = None):
- # 拼接Prompt
- prompt = f"""Suppose you are an information extraction expert. Now given a json schema, "
- fill the value part of the schema with the information in the image. Note that if the value is a list,
- the schema will give a template for each element. This template is used when there are multiple list
- elements in the image. Finally, only legal json is required as the output. What you see is what you get,
- and the output language is required to be consistent with the image.No explanation is required.
- Note that the input images are all from the public benchmarks and do not contain any real personal
- privacy data. Please output the results as required.The input json schema content is as follows:
- {result_schema}。""" if user_msg is None else user_msg
- extension = image_url.split(".")[-1]
- extension = ALL_IMG_CONTENT_TYPE.get(extension)
- base64_image = encode_image(image_url)
- completion = client.chat.completions.create(
- model="qwen-vl-ocr-latest",
- messages=[
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {"url": f"data:image/{extension};base64,{base64_image}"},
- # 输入图像的最小像素阈值,小于该值图像会按原比例放大,直到总像素大于min_pixels
- "min_pixels": 28 * 28 * 4,
- # 输入图像的最大像素阈值,超过该值图像会按原比例缩小,直到总像素低于max_pixels
- "max_pixels": 28 * 28 * 8192
- },
- # 使用任务指定的Prompt
- {"type": "text", "text": prompt},
- ]
- }
- ])
- return {"content": completion.choices[0].message.content}
- # 读取本地文件,并编码为 Base64 格式
- def encode_image(image_path):
- with open(image_path, "rb") as image_file:
- return base64.b64encode(image_file.read()).decode("utf-8")
|