Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dataflow/example/PDF2VQAPipeline/vqa_extract_test.jsonl
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"question_pdf_path": "./example_data/PDF2VQAPipeline/questionextract_test.pdf", "answer_pdf_path": "./example_data/PDF2VQAPipeline/questionextract_test.pdf", "name": "math1"}
{"question_pdf_path": "./example_data/PDF2VQAPipeline/math_question.pdf", "answer_pdf_path": "./example_data/PDF2VQAPipeline/math_answer.pdf", "name": "math2"}
{"input_pdf_paths": "./example_data/PDF2VQAPipeline/questionextract_test.pdf", "name": "math1"}
{"input_pdf_paths": ["./example_data/PDF2VQAPipeline/math_question.pdf", "./example_data/PDF2VQAPipeline/math_answer.pdf"], "name": "math2"}
1 change: 1 addition & 0 deletions dataflow/operators/pdf2vqa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .generate.mineru_to_llm_input_operator import MinerU2LLMInputOperator
from .generate.llm_output_parser import LLMOutputParser
from .generate.qa_merger import QA_Merger
from .generate.pdf_merger import PDF_Merger


else:
Expand Down
16 changes: 9 additions & 7 deletions dataflow/operators/pdf2vqa/generate/llm_output_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
@OPERATOR_REGISTRY.register()
class LLMOutputParser(OperatorABC):
def __init__(self,
mode: Literal['question', 'answer'],
output_dir,
intermediate_dir: str = "intermediate",
):
self.logger = get_logger()
self.mode = mode
self.output_dir = output_dir
self.intermediate_dir = intermediate_dir

Expand Down Expand Up @@ -69,7 +67,7 @@ def _id_to_text(self, input_ids, input_json, image_prefix="images"):

def _convert_response(self, input_response, input_json_path, image_prefix="images"):
qa_list = []
with open(input_json_path, 'r') as infile:
with open(input_json_path, 'r', encoding='utf-8') as infile:
input_json = list(json.load(infile))
# 提取title
for chapter_block in re.findall(r'<chapter>(.*?)</chapter>', input_response, flags=re.DOTALL):
Expand Down Expand Up @@ -114,18 +112,22 @@ def run(self, storage: DataFlowStorage,
response = Path(row[input_response_path_key]).read_text(encoding='utf-8')
name = row[input_name_key]

image_prefix = os.path.join(name, f"{self.mode}_images")
image_prefix = os.path.join(name, f"vqa_images")
qa_list = self._convert_response(response, converted_json_path, image_prefix)
output_qalist_path = os.path.join(self.output_dir, name, f"extracted_{self.mode}s.jsonl")
output_qalist_path = os.path.join(self.output_dir, name, f"extracted_vqa.jsonl")
os.makedirs(os.path.dirname(output_qalist_path), exist_ok=True)
with open(output_qalist_path, 'w') as outfile:
with open(output_qalist_path, 'w', encoding='utf-8') as outfile:
for qa in qa_list:
json.dump(qa, outfile, ensure_ascii=False)
outfile.write('\n')

# 复制图片
src_dir = os.path.join(self.intermediate_dir, 'mineru', Path(converted_json_path).stem).replace('_content_list_converted','')
src_dir = os.path.dirname(converted_json_path)
src_images = os.path.join(src_dir, 'vlm', 'images')
if not os.path.exists(src_images):
src_images = os.path.join(src_dir, 'images')
if not os.path.exists(src_images):
raise ValueError(f"Images directory {src_images} not found! There might be a change in Mineru API!")
dst_images = os.path.join(self.output_dir, image_prefix)

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dataflow.utils.registry import OPERATOR_REGISTRY
from dataflow.utils.storage import DataFlowStorage

from pathlib import Path

@OPERATOR_REGISTRY.register()
class MinerU2LLMInputOperator(OperatorABC):
def __init__(self):
Expand All @@ -24,7 +26,7 @@ def get_desc(lang: str = "zh") -> str:
)

def _convert_json(self, input_file, output_file):
with open(input_file, 'r') as infile:
with open(input_file, 'r', encoding="utf-8") as infile:
data = list(json.load(infile))

new_data = []
Expand All @@ -47,7 +49,7 @@ def _convert_json(self, input_file, output_file):
new_data.append(item)
id += 1

with open(output_file, 'w') as outfile:
with open(output_file, 'w', encoding='utf-8') as outfile:
json.dump(new_data, outfile, ensure_ascii=False)

def run(self, storage: DataFlowStorage,
Expand All @@ -57,12 +59,17 @@ def run(self, storage: DataFlowStorage,
dataframe = storage.read("dataframe")

for index, row in dataframe.iterrows():
input_json_path = row[input_markdown_path_key].replace('.md', '_content_list.json')
converted_path = input_json_path.replace('.json', '_converted.json')
md_path = Path(row[input_markdown_path_key])
try:
input_json_path = list(md_path.parent.glob("*_content_list.json"))[0]
except:
raise ValueError("No _content_list.json file found in the api result. There might be an error with the Mineru api.")

converted_path = str(input_json_path).replace('.json', '_converted.json')
self._convert_json(input_json_path, converted_path)
dataframe.at[index, output_converted_layout_key] = converted_path

with open(converted_path, 'r') as infile:
with open(converted_path, 'r', encoding='utf-8') as infile:
data = json.load(infile)
assert isinstance(data, list), f"Expected list, got {type(data)} for {input_json_path}"

Expand Down
83 changes: 83 additions & 0 deletions dataflow/operators/pdf2vqa/generate/pdf_merger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
from pypdf import PdfWriter
from dataflow.core import OperatorABC
from dataflow.utils.registry import OPERATOR_REGISTRY
from dataflow.utils.storage import DataFlowStorage

@OPERATOR_REGISTRY.register()
class PDF_Merger(OperatorABC):
def __init__(self, output_dir: str):
"""
初始化 PDF 合并算子。

:param output_dir: 合并后 PDF 文件的存放根目录
"""
self.output_dir = output_dir
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir, exist_ok=True)

@staticmethod
def get_desc(lang: str = "zh") -> str:
if lang == 'zh':
return (
"PDF 文件合并算子。"
"输入 PDF 路径列表,按顺序合并为一个 PDF 文件,"
"并保存到指定目录。"
)
else:
return (
"PDF merging operator."
"Takes a list of PDF paths, merges them in order into a single PDF,"
"and saves it to the specified directory."
)

def run(self,
storage: DataFlowStorage,
input_pdf_list_key: str,
input_name_key: str,
output_pdf_path_key: str
):
"""
执行合并逻辑。

:param input_pdf_list_key: DataFrame 中存放 PDF 路径列表 (str或list[str]) 的列名
:param input_name_key: DataFrame 中用于命名的列名(如文件名或ID)
:param output_pdf_path_key: 合并后结果路径存入的列名
"""
dataframe = storage.read("dataframe")

for idx, row in dataframe.iterrows():
pdf_paths = row[input_pdf_list_key]
if isinstance(pdf_paths, str):
pdf_paths = [pdf_paths]
name = row[input_name_key]

# 构建输出路径:output_dir/name/merged.pdf
save_dir = os.path.join(self.output_dir, str(name))
os.makedirs(save_dir, exist_ok=True)
output_path = os.path.join(save_dir, f"{name}_merged.pdf")

try:
merger = PdfWriter()
valid_count = 0

for path in pdf_paths:
if os.path.exists(path):
merger.append(path)
valid_count += 1

if valid_count > 0:
with open(output_path, "wb") as f:
merger.write(f)
merger.close()

# 将结果写回 dataframe
dataframe.loc[idx, output_pdf_path_key] = output_path
else:
dataframe.loc[idx, output_pdf_path_key] = None

except Exception as e:
print(f"Error merging PDFs for {name}: {e}")
dataframe.loc[idx, output_pdf_path_key] = None

storage.write(dataframe)
8 changes: 3 additions & 5 deletions dataflow/operators/pdf2vqa/generate/qa_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def get_desc(lang: str = "zh") -> str:
)

def run(self, storage: DataFlowStorage,
input_question_qalist_path_key,
input_answer_qalist_path_key,
input_qalist_path_key,
input_name_key,
output_merged_qalist_path_key,
output_merged_md_path_key,
Expand All @@ -41,12 +40,11 @@ def run(self, storage: DataFlowStorage,
dataframe[output_qa_item_key] = dataframe[output_qa_item_key].astype(object)

for idx, row in dataframe.iterrows():
question_qalist_path = row[input_question_qalist_path_key]
answer_qalist_path = row[input_answer_qalist_path_key]
qa_list_path = row[input_qalist_path_key]
name = row[input_name_key]

output_merged_qalist_path = os.path.join(self.output_dir, name, "merged_qa_pairs.jsonl")
merge_qa_pair(question_qalist_path, answer_qalist_path, output_merged_qalist_path, strict_title_match=self.strict_title_match)
merge_qa_pair(qa_list_path, output_merged_qalist_path, strict_title_match=self.strict_title_match)

output_merged_md_path = os.path.join(self.output_dir, name, "merged_qa_pairs.md")
jsonl_to_md(output_merged_qalist_path, output_merged_md_path)
Expand Down
2 changes: 1 addition & 1 deletion dataflow/prompts/pdf2vqa.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def build_prompt(self) -> str:
- If there are multiple sub-questions (such as "(1)", "(a)") under one main question, always put them together in the same `<qa_pair>`…`</qa_pair>` block.
- If a question and its answer/solution are contiguous, wrap them together as a single `<qa_pair>`…`</qa_pair>` block, e.g.:
`<qa_pair><label>1</label><question>…</question><answer>…</answer><solution>…</solution></qa_pair>`
- If only questions or only answers/solutions appear, wrap each question or answer/solution in a `<qa_pair>`…`</qa_pair>` block with the missing part left empty. For example, if only questions appear:
- If a question and its answer/solution are NOT contiguous (e.g. only question; only answer and/or solution; all questions at the front and all answers/solutions at the back), wrap each question or answer/solution in a `<qa_pair>`…`</qa_pair>` block with the missing part left empty. For example, if only questions appear:
`<qa_pair><label>1</label><question>…</question><answer></answer><solution></solution></qa_pair>`
- In total, there are 7 possibilities: only question, only answer, only solution, question with answer, question with solution, answer with solution, full question and answer and solution.
- If multiple qa pairs appear, wrap each qa pair in its own `<qa_pair>`…`</qa_pair>` block.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from dataflow.operators.knowledge_cleaning import FileOrURLToMarkdownConverterFlash
from dataflow.operators.knowledge_cleaning import FileOrURLToMarkdownConverterAPI

from dataflow.serving import APILLMServing_request
from dataflow.utils.storage import FileStorage
from dataflow.operators.pdf2vqa import MinerU2LLMInputOperator, LLMOutputParser, QA_Merger
from dataflow.operators.pdf2vqa import MinerU2LLMInputOperator, LLMOutputParser, QA_Merger, PDF_Merger
from dataflow.operators.core_text import ChunkedPromptedGenerator

from dataflow.pipeline import PipelineABC
from dataflow.prompts.pdf2vqa import QAExtractPrompt

from pypdf import PdfWriter

class PDF_VQA_extract_optimized_pipeline(PipelineABC):
def __init__(self):
super().__init__()
Expand All @@ -27,89 +29,58 @@ def __init__(self):

self.vqa_extract_prompt = QAExtractPrompt()

self.mineru_executor = FileOrURLToMarkdownConverterFlash(
intermediate_dir="../example_data/KBCleaningPipeline/flash/",
mineru_model_path="<your Model Path>/MinerU2.5-2509-1.2B", # !!! place your local model path here !!!
# https://huggingface.co/opendatalab/MinerU2.5-2509-1.2B.
batch_size=4, # batchsize per vllm worker
replicas=1, # num of vllm workers
num_gpus_per_replica=0.5, # for ray to schedule vllm workers to GPU, can be float, e.g. 0.5 means each worker uses half GPU, 1 means each worker uses whole GPU
engine_gpu_util_rate_to_ray_cap=0.9 # actuall GPU utilization for each worker; acturall memory per worker= num_gpus_per_replica * engine_gpu_util_rate_to_ray_cap; this is to avoid OOM, you can set it to 0.9 or 0.8 to leave some buffer for other processes on
)
self.pdf_merger = PDF_Merger(output_dir="./cache")
self.mineru_executor = FileOrURLToMarkdownConverterAPI(intermediate_dir = "intermediate")
self.input_formatter = MinerU2LLMInputOperator()
self.vqa_extractor = ChunkedPromptedGenerator(
llm_serving=self.llm_serving,
system_prompt = self.vqa_extract_prompt.build_prompt(),
max_chunk_len=128000,
)
self.llm_output_question_parser = LLMOutputParser(mode="question", output_dir="./cache", intermediate_dir="intermediate")
self.llm_output_answer_parser = LLMOutputParser(mode="answer", output_dir="./cache", intermediate_dir="intermediate")
self.llm_output_parser = LLMOutputParser(output_dir="./cache", intermediate_dir="intermediate")
self.qa_merger = QA_Merger(output_dir="./cache", strict_title_match=False)
def forward(self):
# 目前的处理逻辑是:MinerU处理问题-MinerU处理答案-格式化问题文本-格式化答案文本-问题文本输入LLM-答案文本输入LLM-解析问题输出-解析答案输出-合并问答对
# 由于问答对可能来自同一份pdf,也有可能来自不同pdf,而dataflow目前不支持分支,因此这里只能将question和answer的pdf都进行一次处理,
# 即使是同一份pdf也会被处理两次,最后再合并问答对。
# 未来会再思考如何优化这个流程,避免重复处理同一份pdf,提升性能。

self.mineru_executor.run(
self.pdf_merger.run(
storage=self.storage.step(),
input_key="question_pdf_path",
output_key="question_markdown_path",
input_pdf_list_key="input_pdf_paths",
input_name_key="name",
output_pdf_path_key="merged_pdf_path",
)
self.mineru_executor.run(
storage=self.storage.step(),
input_key="answer_pdf_path",
output_key="answer_markdown_path",
)
self.input_formatter.run(
storage=self.storage.step(),
input_markdown_path_key="question_markdown_path",
output_converted_layout_key="converted_question_layout_path",
input_key="merged_pdf_path",
output_key="vqa_markdown_path",
)
self.input_formatter.run(
storage=self.storage.step(),
input_markdown_path_key="answer_markdown_path",
output_converted_layout_key="converted_answer_layout_path",
input_markdown_path_key="vqa_markdown_path",
output_converted_layout_key="converted_vqa_layout_path",
)
self.vqa_extractor.run(
storage=self.storage.step(),
input_path_key="converted_question_layout_path",
output_path_key="vqa_extracted_questions_path",
)
self.vqa_extractor.run(
storage=self.storage.step(),
input_path_key="converted_answer_layout_path",
output_path_key="vqa_extracted_answers_path",
)
self.llm_output_question_parser.run(
storage=self.storage.step(),
input_response_path_key="vqa_extracted_questions_path",
input_converted_layout_path_key="converted_question_layout_path",
input_name_key="name",
output_qalist_path_key="extracted_questions_path",
input_path_key="converted_vqa_layout_path",
output_path_key="extracted_llm_vqa_path",
)
self.llm_output_answer_parser.run(
self.llm_output_parser.run(
storage=self.storage.step(),
input_response_path_key="vqa_extracted_answers_path",
input_converted_layout_path_key="converted_answer_layout_path",
input_response_path_key="extracted_llm_vqa_path",
input_converted_layout_path_key="converted_vqa_layout_path",
input_name_key="name",
output_qalist_path_key="extracted_answers_path",
output_qalist_path_key="extracted_vqa_path",
)
self.qa_merger.run(
storage=self.storage.step(),
input_question_qalist_path_key="extracted_questions_path",
input_answer_qalist_path_key="extracted_answers_path",
input_qalist_path_key="extracted_vqa_path",
input_name_key="name",
output_merged_qalist_path_key="output_merged_qalist_path",
output_merged_qalist_path_key="output_merged_vqalist_path",
output_merged_md_path_key="output_merged_md_path",
output_qa_item_key="qa_pair",
output_qa_item_key="vqa_pair",
)



if __name__ == "__main__":
# jsonl中每一行包含question_pdf_path, answer_pdf_path, name (math1, math2, physics1, chemistry1, ...)
# 如果question和answer在同一份pdf中,请将question_pdf_path和answer_pdf_path设置为相同的路径,会自动切换为interleaved模式
# jsonl中每一行包含input_pdf_path, name (math1, math2, physics1, chemistry1, ...)
pipeline = PDF_VQA_extract_optimized_pipeline()
pipeline.compile()
pipeline.forward()
21 changes: 15 additions & 6 deletions dataflow/utils/pdf2vqa/format_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,26 @@ def refine_title(title: str, strict_title_match=False):
title = new_title
return title

def merge_qa_pair(question_jsonl, answer_jsonl, output_jsonl, strict_title_match=False):
def merge_qa_pair(vqa_jsonl, output_jsonl, strict_title_match=False):
already_complete_count = 0
with open(question_jsonl, 'r', encoding='utf-8') as q_file, open(answer_jsonl, 'r', encoding='utf-8') as a_file, open(output_jsonl, 'w', encoding='utf-8') as out_file:
question_list = []
answer_list = []
with open(vqa_jsonl, 'r', encoding='utf-8') as vqa_file:
for line in vqa_file:
data = json.loads(line)
if data["question"] != "":
question_list.append(data)
else:
# 用于支持题目在前面,答案在后面的pdf
answer_list.append(data)

with open(output_jsonl, 'w', encoding='utf-8') as out_file:
chapter_id = 0
chapter_title = ""
label = float('inf')
questions = {}
answers = {}
for line in q_file:
data = json.loads(line)
for data in question_list:
label_match = re.search(r'\d+', data["label"])
if label_match:
data["label"] = label_match.group()
Expand Down Expand Up @@ -68,8 +78,7 @@ def merge_qa_pair(question_jsonl, answer_jsonl, output_jsonl, strict_title_match
chapter_id = 0
chapter_title = ""
label = float('inf')
for line in a_file:
data = json.loads(line)
for data in answer_list:
label_match = re.search(r'\d+', data["label"])
if label_match:
data["label"] = label_match.group()
Expand Down
Loading