From 5cb2ec1689a735339af3d6851e7e998970deeb79 Mon Sep 17 00:00:00 2001 From: Piar <2741277534@qq.com> Date: Tue, 10 Feb 2026 19:21:56 +0800 Subject: [PATCH] docs: update DFA docs (paths, params, cases, GUI guide) - Fix file path references in docs - Improve script parameter explanations - Add practical case studies - Add graphical interface usage guide --- .../guide/agent/operator_assemble_line.md | 100 ++++++-- docs/en/notes/guide/agent/operator_qa.md | 180 +++++++++++--- docs/en/notes/guide/agent/operator_write.md | 222 ++++++++++++++++-- docs/en/notes/guide/agent/pipeline_prompt.md | 162 +++++++++++-- .../notes/guide/agent/pipeline_rec&refine.md | 156 +++++++++--- .../guide/agent/operator_assemble_line.md | 91 +++++-- docs/zh/notes/guide/agent/operator_qa.md | 181 +++++++++++--- docs/zh/notes/guide/agent/operator_write.md | 198 ++++++++++++++-- docs/zh/notes/guide/agent/pipeline_prompt.md | 157 +++++++++++-- .../notes/guide/agent/pipeline_rec&refine.md | 163 ++++++++++--- 10 files changed, 1342 insertions(+), 268 deletions(-) diff --git a/docs/en/notes/guide/agent/operator_assemble_line.md b/docs/en/notes/guide/agent/operator_assemble_line.md index efa5d9cd3..af9bf2f45 100644 --- a/docs/en/notes/guide/agent/operator_assemble_line.md +++ b/docs/en/notes/guide/agent/operator_assemble_line.md @@ -16,7 +16,7 @@ The core value of this feature lies in: ## 2. Features -This functional module primarily consists of frontend interaction logic (`op_assemble_line.py`) and a backend execution workflow (`wf_df_op_usage.py`). +This functional module primarily consists of frontend interaction logic (`gradio_app/pages/op_assemble_line.py`) and a backend execution workflow (`dataflow_agent/workflow/wf_df_op_usage.py`). ### 2.1 Dynamic Operator Loading and Introspection @@ -35,7 +35,11 @@ This feature provides two modes of use: the **Graphical Interface (Gradio UI)** ### 3.1 UI Operation -Ideal for interactive exploration and rapid verification. +It is ideal for interactive exploration and rapid validation. To launch the web interface: +```python +python gradio_app/app.py +``` +Visit `http://127.0.0.1:7860` and start using 1. **Environment Configuration**: Enter API-related information and the input JSONL file path at the top of the page. 2. **Orchestrate Pipeline**: @@ -46,25 +50,33 @@ Ideal for interactive exploration and rapid verification. ### 3.2 Script Invocation and Explicit Configuration -For automated tasks or batch processing, the `run_dfa_op_assemble.py` script can be used. This method bypasses the UI and defines the operator sequence directly through code. +For automated tasks or batch processing, you can use the `script/run_dfa_op_assemble.py` script. This method skips the UI and directly defines the operator sequence through code. -> **Note: Explicit Configuration Requirement**: Unlike the "Automatic Linking" in the UI, the script mode requires you to **explicitly configure** all parameters. You must ensure that the `output_key` of the previous operator strictly matches the `input_key` of the next; the script will not automatically correct parameter names for you. +#### 1. Modify the Configuration -#### 1. Modify Configuration +Open `script/run_dfa_op_assemble.py` and make modifications in the configuration section at the top of the file. -Open `run_dfa_op_assemble.py` and modify the configuration area at the top of the file. +##### API and File Configuration +- CHAT_API_URL: URL of the LLM service +- API_KEY: Authentication key for model invocation. If your Pipeline contains operators that need to call large models (such as reasoning generation, content rewriting, etc.), this item is **required**; otherwise, the operators will not run. +- MODEL: Model name, default is gpt-4o +- INPUT_FILE: Path of the input JSONL file (test data file) +##### Other Configurations +- CACHE_DIR: Storage directory for results and pipeline code files. The pipeline code files generated during script execution, intermediate execution data, and final data results will all be saved here. +- SESSION_ID: Unique identifier for the task. -**Key Configuration Item**: **`PIPELINE_STEPS`**—a list defining the pipeline execution steps. Each element contains an `op_name` and `params`. +#### 2. Define Pipeline Steps +This is the most critical part of the script. You need to define the execution order and parameters of operators in the `PIPELINE_STEPS` list. Each step consists of an **operator name (op_name)** and a **parameter set (params)**. -```python -# [Pipeline Definition] +```Python +# [Pipeline 定义] PIPELINE_STEPS = [ { "op_name": "ReasoningAnswerGenerator", "params": { - # __init__ parameters (Note: unified into 'params' in wf_df_op_usage) - "prompt_template": "dataflow.prompts.reasoning.math.MathAnswerGeneratorPrompt", - # run parameters + # __init__ 参数 (注意:在 wf_df_op_usage 中统一合并为 params) + "prompt_template": "dataflow.prompts.reasoning.general.GeneralAnswerGeneratorPrompt", + # run 参数 "input_key": "raw_content", "output_key": "generated_cot" } @@ -81,28 +93,64 @@ PIPELINE_STEPS = [ } } ] +``` +> **Note: Explicit Configuration Requirements** Unlike the "automatic linking" in the UI, you must **explicitly configure** all parameters in script mode. You need to ensure that the `output_key` of the previous operator strictly matches the `input_key` of the next operator; the script will not automatically correct parameter names for you. +#### 3. Run the Script + +```Bash +python script/run_dfa_op_assemble.py ``` -**Other Required Configurations**: +#### 4. Result Output -* `CACHE_DIR`: **Must use an absolute path** to avoid path errors when the generated Python script executes in a subprocess. -* `INPUT_FILE`: The absolute path to the initial data file. +After the script is executed, the console will print: -#### 2. Run Script +- **[Generation]**: Path of the generated Pipeline code. +- **[Code Preview]**: Preview of the first 20 lines of the generated code. +- **[Execution]**: Execution status. -```bash -python run_dfa_op_assemble.py +#### 5. Practical Case: General Text Reasoning and Pseudo-Answer Generation +We have a `tests/test.jsonl` file, where each line contains a `"raw_content"` field. Our goal is: based on the general English text content of this field, first invoke the large language model to generate reasoning-based answers for the text content, then generate pseudo-answers by generating candidate answers in multiple rounds and selecting the optimal one through statistics, and finally output key fields such as the list of candidate answers, optimal pseudo-answer, corresponding reasoning processes, and typical correct reasoning examples. Therefore, we select the `ReasoningAnswerGenerator` and `ReasoningPseudoAnswerGenerator` operators to orchestrate the Pipeline. -``` +The following is a complete configuration example: -#### 3. Output Results +```Python +# [Pipeline 定义] +PIPELINE_STEPS = [ + { + "op_name": "ReasoningAnswerGenerator", + "params": { + # __init__ 参数 (注意:在 wf_df_op_usage 中统一合并为 params) + "prompt_template": "dataflow.prompts.reasoning.general.GeneralAnswerGeneratorPrompt", + # run 参数 + "input_key": "raw_content", + "output_key": "generated_cot" + } + }, + { + "op_name": "ReasoningPseudoAnswerGenerator", + "params": { + "max_times": 3, + "input_key": "generated_cot", + "output_key_answer": "pseudo_answers", + "output_key_answer_value": "pseudo_answer_value", + "output_key_solutions": "pseudo_solutions", + "output_key_correct_solution_example": "pseudo_correct_solution_example" + } + } +] +``` +After completing the configuration, execute the following command in the terminal: -After execution, the console will print: +```Bash +python script/run_dfa_op_assemble.py +``` +The script will automatically perform the following actions: -* **[Generation]**: The path of the generated Python script (e.g., `pipeline_script_pipeline_001.py`). -* **[Code Preview]**: A preview of the first 20 lines of the generated code. -* **[Execution]**: - * `Status: success` indicates successful execution. - * `STDOUT`: Prints the standard output logs from the pipeline runtime. +1. Build the graph: Parse your PIPELINE_STEPS. +2. Generate code: Convert the configuration into standard Python code and store it under `dataflow_cache/generated_pipelines/`. +3. Execute the task: Start a child process to run the generated Pipeline. +4. Output the report: The terminal will display [Execution] Status: success as well as a partial preview of the code. +You can directly go to the `CACHE_DIR` directory to view the generated JSONL result file and verify whether the data meets expectations. diff --git a/docs/en/notes/guide/agent/operator_qa.md b/docs/en/notes/guide/agent/operator_qa.md index f0c126e83..7dd6fff9e 100644 --- a/docs/en/notes/guide/agent/operator_qa.md +++ b/docs/en/notes/guide/agent/operator_qa.md @@ -12,7 +12,7 @@ Unlike generic chatbots, Operator QA integrates **RAG (Retrieval-Augmented Gener ## 2. Core Features -This module is driven by a frontend UI (`operator_qa.py`), an entry script (`run_dfa_operator_qa.py`), and a backend agent (`operator_qa_agent.py`). It possesses the following core capabilities: +This module is driven by a frontend UI (`gradio_app/pages/operator_qa.py`), a backend execution workflow (`dataflow_agent/workflow/wf_operator_qa.py`), and a backend agent (`dataflow_agent/agentroles/data_agents/operator_qa_agent.py`). It possesses the following core capabilities: ### 2.1 Intelligent Retrieval and Recommendation @@ -49,8 +49,16 @@ Utilizing the `AdvancedMessageHistory` module, the system maintains a complete s ## 4. User Guide +This feature provides two modes of use: the **Graphical Interface (Gradio UI)** and **Command-line Scripts**. + ### 4.1 UI Operation +It is ideal for interactive exploration and rapid validation. To launch the web interface: +```python +python gradio_app/app.py +``` +Visit `http://127.0.0.1:7860` and start using + 1. **Configure Model**: In the "Configuration" panel on the right, verify the API URL and Key, and select a model (defaults to `gpt-4o`). 2. **Initiate Inquiry**: 1. **Dialogue Box**: Type your question. @@ -63,50 +71,154 @@ Utilizing the `AdvancedMessageHistory` module, the system maintains a complete s ### 4.2 Script Invocation and Explicit Configuration -Beyond the UI, the system provides the `run_dfa_operator_qa.py` script, which supports running the Q&A service through explicit code configuration—ideal for development and debugging. +In addition to the UI interface, the system provides the `script/run_dfa_operator_qa.py` script. This method is suitable for development and debugging, or for querying operator usage through code automation. -**Configuration Method:** Directly modify the constant configuration area at the top of the script without passing command-line arguments: +#### 1. Modify the Configuration -```python -# ===== Example config (edit here) ===== -INTERACTIVE = False # True for multi-turn mode, False for single query -QUERY = "Which operator should I use to filter missing values?" # Question for single query +Open `script/run_dfa_operator_qa.py` and make modifications in the configuration section at the top of the file. -LANGUAGE = "en" -SESSION_ID = "demo_operator_qa" -CACHE_DIR = "dataflow_cache" -TOP_K = 5 # Number of retrieval results +**API and File Configuration** -CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") -API_KEY = os.getenv("DF_API_KEY", "") -MODEL = os.getenv("DF_MODEL", "gpt-4o") +* **CHAT_API_URL**: URL of the LLM service. +* **API_KEY**: Model invocation key. The Agent needs to call the large model to understand your questions and summarize the answers. +* **MODEL**: Model name, the default is `gpt-4o`. +* **CACHE_DIR**: Cache directory. +* **TOP_K**: Retrieval depth. Specify the maximum number of candidate results the Agent returns when retrieving relevant operators from the knowledge base (5 by default). + +**Query and Interaction Mode Configuration** + +* **INTERACTIVE**: **Interaction control switch** (`True` / `False`). + * `True` (Interactive Mode): Launch the continuous conversation mode in the terminal. You can ask follow-up questions like chatting, and support `clear` to clear history. + * `False` (One-time Mode): The script only executes one question specified by `QUERY` and exits immediately after outputting the result. +* **QUERY**: Question content for one-time query. Only effective when `INTERACTIVE = False`. +* **OUTPUT_JSON**: Result save path. + * Only effective in one-time mode. + * If a path is set, the Agent's answer, the retrieved list of operators and code snippets will be completely saved as a JSON file; if left blank, it will only be printed to the console. -OUTPUT_JSON = "" # e.g., "cache_local/operator_qa_result.json"; empty string means no file saving +#### 2. Run the Script + +After completing the configuration, execute the following command in the terminal: + +```bash +python script/run_dfa_operator_qa.py ``` -**Execution Modes:** +#### 3. Result Output + +After the script is executed, the console behaves differently depending on the mode: -1. **Single Query Mode** (`INTERACTIVE = False`): Executes a single `QUERY`; results can be printed or saved as a JSON file. -2. **Interactive Mode** (`INTERACTIVE = True`): Starts a terminal dialogue loop supporting `exit` to quit, `clear` to reset context, and `history` to view session history. +* **Interactive Mode**: The `🧑 You:` prompt appears in the terminal, waiting for input. + * Enter `exit` or `quit` to exit. + * Enter `clear` to clear conversation history. + * Enter `history` to view conversation history. +* **One-time Mode**: The console directly prints the Agent's thinking process, the retrieved list of operators and the final answer. If `OUTPUT_JSON` is configured, a prompt of successful file saving will also be displayed. -**Core Logic:** The script demonstrates how to explicitly construct `DFRequest` and `MainState`, and manually build the execution graph: +#### 4. Practical Case: Find Operators for "Data Cleaning" + +Suppose you need to clean data when developing a Pipeline and want to know if there are ready-made operators in the DataFlow library for processing. + +**Scenario Configuration**: We set it to one-time query mode and specify to save the results locally for viewing detailed parameters in the code later. + +Open the script and modify the configuration as follows: ```python -# 1. Explicitly construct the request -req = DFRequest( - language=LANGUAGE, - chat_api_url=CHAT_API_URL, - api_key=API_KEY, - model=MODEL, - target="" # Populated before each query -) - -# 2. Initialize state and graph -state = MainState(request=req, messages=[]) -graph = create_operator_qa_graph().build() - -# 3. Execute -result = await run_single_query(state, graph, QUERY) +# ===== Example config (edit here) ===== + +# 1. Disable interactive mode and execute a one-time query +INTERACTIVE = False + +# 2. Define your specific requirements +QUERY = "I want to clean data, which operator should I use?" + +# 3. Ensure the API configuration is correct +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +API_KEY = os.getenv("DF_API_KEY", "") +MODEL = os.getenv("DF_MODEL", "gpt-4o") + +# 4. Specify the result save path +OUTPUT_JSON = "cache_local/operator_qa_result.json" + ``` + +**Run**: + +After running the script, the Agent will perform RAG retrieval and generate an answer. Open the generated `script/cache_local/operator_qa_result.json` and you can see the data with the following structure: + +```json +{ + "success": true, + "query": "I want to clean data, which operator should I use?", + "answer": "在 DataFlow 中,有多个算子可以用于数据清洗。以下是一些推荐的算子:\n\n1. **KBCTextCleaner**: 适用于对原始知识内容进行标准化处理,包括HTML标签清理、特殊字符规范化、链接处理和结构优化。适合需要提升RAG知识库质量的场景。\n\n2. **KBCTextCleanerBatch**: 类似于 KBCTextCleaner,但支持批量处理。\n\n3. **ContentNullFilter**: 用于过滤空值、空字符串或仅包含空白字符的文本,确保输入数据的有效性。\n\n4. **HtmlUrlRemoverRefiner**: 去除文本中的URL链接和HTML标签,净化文本内容。\n\n5. **PresidioFilter**: 基于PresidioScorer打分器的得分对数据进行过滤,识别并处理文本中的私人实体(PII)。", + "related_operators": [ + "KBCTextCleaner", + "KBCTextCleanerBatch", + "ContentNullFilter", + "HtmlUrlRemoverRefiner", + "PresidioFilter" + ], + "code_snippet": null, + "follow_up_suggestions": [ + "了解如何配置这些算子的参数", + "查看某个算子的详细实现", + "询问特定数据清洗场景的最佳实践" + ], + "messages": [ + { + "type": "SystemMessage", + "content": "\n[角色]\n你是 DataFlow 算子库的智能问答助手。你的职责是帮助用户了解和使用 DataFlow 中的各种数据处理算子。\n\n[能力]\n1. 根据用户描述的需求,推荐合适的算子\n2. 解释算子的功能、用途和使用场景\n3. 详细说明算子的参数含义和配置方法\n4. 在需要时展示算子的源码实现\n5. 基于多轮对话理解用户的上下文需求\n\n[DataFlow 算子简介]\nDataFlow 是一个数据处理框架,提供了丰富的算子用于数据清洗、过滤、生成、评估等任务。\n每个算子都是一个 Python 类,通常包含:\n- `__init__` 方法:初始化算子,配置必要的参数(如 LLM 服务、提示词等)\n- `run` 方法:执行数据处理逻辑,接收输入数据并产出处理结果\n\n[可用工具]\n你可以调用以下工具来获取算子信息:\n\n1. **search_operators(query, top_k)** - 根据功能描述搜索相关算子\n - 当用户询问某类功能的算子时使用\n - 如果对话历史中已有相关算子信息,可以不调用直接回答\n\n2. **get_operator_info(operator_name)** - 获取指定算子的详细描述\n - 当用户询问特定算子的功能时使用\n\n3. **get_operator_source_code(operator_name)** - 获取算子的完整源代码\n - 当用户需要了解算子实现细节时使用\n\n4. **get_operator_parameters(operator_name)** - 获取算子的参数详情\n - 当用户询问算子如何配置、参数含义时使用\n\n[工具调用策略]\n- 如果是新问题且对话历史中没有相关信息 → 调用 search_operators 检索\n- 如果对话历史中已有相关算子信息 → 可以直接回答,无需重复检索\n- 如果用户追问某个算子的细节 → 调用 get_operator_info/get_operator_source_code/get_operator_parameters\n\n[回答风格]\n1. 清晰简洁,重点突出\n2. 使用中文回答(除非用户要求英文)\n3. 对于技术细节,提供具体的代码示例\n4. 在解释参数时,说明参数类型、默认值和作用\n\n[输出格式]\n请以 JSON 格式返回,包含以下字段:\n{\n \"answer\": \"对用户问题的详细回答\",\n \"related_operators\": [\"相关算子名称列表\"],\n \"source_explanation\": \"说明答案的信息来源,例如:'通过search_operators检索到的XXX算子'、'基于对话历史中的算子信息'、'基于我的知识库'\",\n \"code_snippet\": \"如有必要,提供代码片段(可选)\",\n \"follow_up_suggestions\": [\"可能的后续问题建议(可选)\"]\n}\n\n\n请以JSON格式返回结果,不要包含其他文字说明!!!直接返回json内容,不要```json进行包裹!!", + "role": "", + "additional_kwargs": {}, + "metadata": {} + }, + { + "type": "HumanMessage", + "content": "\n[用户问题]\nI want to clean data, which operator should I use?\n\n[任务]\n请根据用户问题回答。对话历史会自动包含在消息中,你可以参考之前的对话。\n\n工具调用指南:\n1. 如果需要查找算子,调用 search_operators 工具\n2. 如果需要某个算子的详细信息,调用 get_operator_info 工具\n3. 如果需要源码,调用 get_operator_source_code 工具\n4. 如果需要参数详情,调用 get_operator_parameters 工具\n5. 如果之前的对话中已有相关信息,可以直接回答,无需重复调用工具\n\n回答要求:\n- 基于工具返回的信息或对话上下文中的信息回答\n- 在 source_explanation 中说明答案来源\n- 如果问题不明确,可以在 follow_up_suggestions 中给出澄清建议\n\n请以 JSON 格式返回你的回答。\n", + "role": "", + "additional_kwargs": {}, + "metadata": {} + }, + { + "type": "AIMessage", + "content": "", + "role": "", + "additional_kwargs": { + "tool_calls": [ + { + "id": "call_06xfRcedme8OAVBq33keXVdS", + "function": { + "arguments": "{\"query\":\"clean data\"}", + "name": "search_operators" + }, + "type": "function" + } + ], + "refusal": null + }, + "metadata": {} + }, + { + "content": "{\n \"query\": \"clean data\",\n \"matched_operators\": [\n \"KBCTextCleaner\",\n \"KBCTextCleanerBatch\",\n \"ContentNullFilter\",\n \"HtmlUrlRemoverRefiner\",\n \"PresidioFilter\"\n ],\n \"operator_details\": [\n {\n \"node\": 1,\n \"name\": \"KBCTextCleaner\",\n \"description\": \"知识清洗算子:对原始知识内容进行标准化处理,包括HTML标签清理、特殊字符规范化、链接处理和结构优化,提升RAG知识库的质量。主要功能:\\n1. 移除冗余HTML标签但保留语义化标签\\n2. 标准化引号/破折号等特殊字符\\n3. 处理超链接同时保留文本\\n4. 保持原始段落结构和代码缩进\\n5. 确保事实性内容零修改\\n\\n输入格式示例:\\n
\\n\\n输出格式示例:\\n标题文本\\n\\n正文段落,包括特殊符号,例如\\\"直引号\\\"、-破折号等\\n\\n[Image: 示例图 example.jpg]\\n\\n链接文本\\n\\n代码片段\\n\\n[结构保持,语义保留,敏感信息脱敏处理(如手机号、保密标记等)]\",\n \"category\": \"knowledge_cleaning\"\n },\n {\n \"node\": 2,\n \"name\": \"KBCTextCleanerBatch\",\n \"description\": \"知识清洗算子:对原始知识内容进行标准化处理,包括HTML标签清理、特殊字符规范化、链接处理和结构优化,提升RAG知识库的质量。主要功能:\\n1. 移除冗余HTML标签但保留语义化标签\\n2. 标准化引号/破折号等特殊字符\\n3. 处理超链接同时保留文本\\n4. 保持原始段落结构和代码缩进\\n5. 确保事实性内容零修改\",\n \"category\": \"knowledge_cleaning\"\n },\n {\n \"node\": 3,\n \"name\": \"ContentNullFilter\",\n \"description\": \"该算子用于过滤空值、空字符串或仅包含空白字符的文本,确保输入数据的有效性。\\n初始化参数:\\n- 无\\n运行参数:\\n- storage:DataFlowStorage对象\\n- input_key:输入文本字段名\\n- output_key:输出标签字段名,默认为'content_null_filter_label'\\n返回值:\\n- 包含output_key的列表\",\n \"category\": \"general_text\"\n },\n {\n \"node\": 4,\n \"name\": \"HtmlUrlRemoverRefiner\",\n \"description\": \"去除文本中的URL链接和HTML标签,净化文本内容。使用正则表达式匹配并移除各种形式的URL和HTML标签。输入参数:\\n- input_key:输入文本字段名\\n输出参数:\\n- 包含净化后文本的DataFrame\\n- 返回输入字段名,用于后续算子引用\",\n \"category\": \"general_text\"\n },\n {\n \"node\": 5,\n \"name\": \"PresidioFilter\",\n \"description\": \"基于PresidioScorer打分器的得分对数据进行过滤。使用Microsoft Presidio模型识别文本中的私人实体(PII),返回PII信息个数。\\n支持识别姓名、邮箱、电话号码、身份证号等多种敏感信息类型,可用于数据隐私保护和合规性检查。\\n输入参数:\\n- min_score:保留样本的最小PII数量阈值,默认为0\\n- max_score:保留样本的最大PII数量阈值,默认为5\\n- lang:文本语言,默认为'en'\\n- device:模型运行设备,默认为'cuda'\\n- model_cache_dir:模型缓存目录,默认为'./dataflow_cache'\\n输出参数:\\n- 过滤后的DataFrame,仅保留PII数量在[min_score, max_score]范围内的样本\\n- 返回包含输出字段名的列表,用于后续算子引用\",\n \"category\": \"general_text\"\n }\n ]\n}",
+ "additional_kwargs": {},
+ "response_metadata": {},
+ "type": "tool",
+ "name": "search_operators",
+ "id": "72462389-4f54-4b33-9c31-0f662cc537cb",
+ "tool_call_id": "call_06xfRcedme8OAVBq33keXVdS",
+ "artifact": null,
+ "status": "success"
+ },
+ {
+ "type": "AIMessage",
+ "content": "{\n \"answer\": \"在 DataFlow 中,有多个算子可以用于数据清洗。以下是一些推荐的算子:\\n\\n1. **KBCTextCleaner**: 适用于对原始知识内容进行标准化处理,包括HTML标签清理、特殊字符规范化、链接处理和结构优化。适合需要提升RAG知识库质量的场景。\\n\\n2. **KBCTextCleanerBatch**: 类似于 KBCTextCleaner,但支持批量处理。\\n\\n3. **ContentNullFilter**: 用于过滤空值、空字符串或仅包含空白字符的文本,确保输入数据的有效性。\\n\\n4. **HtmlUrlRemoverRefiner**: 去除文本中的URL链接和HTML标签,净化文本内容。\\n\\n5. **PresidioFilter**: 基于PresidioScorer打分器的得分对数据进行过滤,识别并处理文本中的私人实体(PII)。\",\n \"related_operators\": [\n \"KBCTextCleaner\",\n \"KBCTextCleanerBatch\",\n \"ContentNullFilter\",\n \"HtmlUrlRemoverRefiner\",\n \"PresidioFilter\"\n ],\n \"source_explanation\": \"通过search_operators检索到的相关算子\",\n \"code_snippet\": null,\n \"follow_up_suggestions\": [\n \"了解如何配置这些算子的参数\",\n \"查看某个算子的详细实现\",\n \"询问特定数据清洗场景的最佳实践\"\n ]\n}",
+ "role": "",
+ "additional_kwargs": {
+ "refusal": null
+ },
+ "metadata": {}
+ }
+ ]
+}
+
+```
\ No newline at end of file
diff --git a/docs/en/notes/guide/agent/operator_write.md b/docs/en/notes/guide/agent/operator_write.md
index d1ce90e0e..d772db39b 100644
--- a/docs/en/notes/guide/agent/operator_write.md
+++ b/docs/en/notes/guide/agent/operator_write.md
@@ -37,7 +37,7 @@ For complex operators requiring Large Model calls (e.g., "generate summary based
## 3. Workflow Architecture
-This feature is orchestrated by `wf_pipeline_write.py`, forming a directed graph containing conditional loops.
+This feature is orchestrated by `dataflow_agent/workflow/wf_pipeline_write.py`, forming a directed graph containing conditional loops.
1. **Match Node**: Retrieves reference operators.
2. **Write Node**: Writes the initial code.
@@ -52,7 +52,11 @@ This feature provides two modes of usage: **Graphical Interface (Gradio UI)** an
### 4.1 UI Operation
-The frontend page code is located in `operator_write.py`, offering a visualized interactive experience.
+The frontend page code is located in `gradio_app/pages/operator_write.py`, which provides a visual interactive experience. It is ideal for interactive exploration and rapid validation. To launch the web interface:
+```python
+python gradio_app/app.py
+```
+Visit `http://127.0.0.1:7860` and start using
#### 1. Configure Inputs
@@ -80,41 +84,211 @@ After clicking the **"Generate Operator"** button, the right panel displays deta
### 4.2 Script Invocation and Explicit Configuration
-For developers or automated tasks, `run_dfa_operator_write.py` can be executed directly.
+For developers, it is recommended to directly modify and run `script/run_dfa_operator_write.py`. This method can be more flexibly integrated into automated workflows and save the generated operator files.
+
+#### 1. Modify the Configuration
+
+Open `script/run_dfa_operator_write.py` and modify the parameters in the configuration section at the top of the file.
+
+**Task Configuration**
+
+ * **`TARGET`**: Describe the function of the operator in natural language. The more specific the description, the more accurate the generated code. It is recommended to include descriptions of input fields and expected outputs.
+
+ * Example: `"Create an operator for performing sentiment analysis on text"`
+
+ * Example: `"Implement a data deduplication operator that supports deduplication based on a combination of multiple fields"`
+
+ * **`CATEGORY`**: The category to which the operator belongs, used to match similar operators as references
+
+ * Default: `"Default"`
+
+ * Optional: `"reasoning"`, `"agentic_rag"`, `"knowledge_cleaning"`, etc.
+
+ * **`JSON_FILE`**: Data file (`.jsonl` format) used to test the operator.
+
+ * Default: If left blank, the project's built-in test data `tests/test.jsonl` will be used.
+
+ * **`OUTPUT_PATH`**: Save path for the generated Python code. If left blank, the code will only be printed to the console and no file will be saved.
+
+**API and Debug Configuration**
+
+ * **`CHAT_API_URL`**: URL of the LLM service
+
+ * **`api_key`**: Access key (using the environment variable DF_API_KEY)
+
+ * **`MODEL`**: Model name, default is gpt-4o
+
+ * **`NEED_DEBUG`**: Whether to enable the automatic debugging loop (`True` / `False`)
+
+ * `True`: If the generated code reports an error when running on `JSON_FILE`, the Agent will automatically analyze the error stack and attempt to rewrite the code
+
+ * `False`: Generate and execute the code, then end immediately regardless of whether it runs successfully
+
+ * **`MAX_DEBUG_ROUNDS`**: Maximum number of automatic repair attempts, default is 3 rounds
-#### 1. Modify Configuration
+#### 2. Run the Script
-Open `run_dfa_operator_write.py` and modify the parameters in the configuration area at the top of the file:
+After completing the configuration, execute the following command in the terminal:
+
+```bash
+python script/run_dfa_operator_write.py
+```
+
+#### 3. Result Output
+
+During script execution, the following key information will be output:
+
+* **[Match Operator Result]**: Displays the "reference operators" found by the Agent in the existing operator library
+
+* **[Writer Result]**: Length of the generated code and its save location
+
+* **[Execution Result]**: Code execution result
+
+ * `Success: True`: Indicates the code was generated successfully and ran without errors on the test data.
+
+ * `Success: False`: Indicates the run failed.
+
+* **[Debug Runtime Preview]**: `stdout`/`stderr` captured during runtime, as well as the selected input field key name (`input_key`)
+
+#### 4. Practical Case: Writing a Sentiment Analysis Operator
+
+We have a log file `tests/test.jsonl` containing the field `"raw_content"`. We want to create an operator to perform sentiment analysis on the text content of this field.
+
+**Configuration Example:**
```python
+# ===== Example config (edit here) =====
+# API KEY is passed in via the environment variable DF_API_KEY
CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
MODEL = os.getenv("DF_MODEL", "gpt-4o")
LANGUAGE = "en"
-TARGET = "Create an operator that filters out missing values and keeps rows with non-empty fields."
-CATEGORY = "Default" # Fallback category (if classifier misses)
-OUTPUT_PATH = "" # e.g., "cache_local/my_operator.py"; empty string means no file saving
-JSON_FILE = "" # Empty string uses project built-in tests/test.jsonl
+# 1. Define specific requirements
+TARGET = "Create an operator for performing sentiment analysis on text"
+CATEGORY = "Default"
+# 2. Specify the result save path
+OUTPUT_PATH = "cache_local/my_operator.py"
+# 3. Specify the test data path
+JSON_FILE = "tests/test.jsonl"
+# 4. Enable debugging
+NEED_DEBUG = True
+MAX_DEBUG_ROUNDS = 10
+```
-NEED_DEBUG = False
-MAX_DEBUG_ROUNDS = 3
+**Run:**
-```
+After running the script, the terminal will output the following:
-#### 2. Run Script
+``` bash
+==== Match Operator Result ====
+Matched ops: ['LangkitSampleEvaluator', 'LexicalDiversitySampleEvaluator', 'PresidioSampleEvaluator', 'PerspectiveSampleEvaluator']
-```bash
-python run_dfa_operator_write.py
+==== Writer Result ====
+Code length: 3619
+Saved to: cache_local/my_operator.py
-```
+==== Execution Result (instantiate) ====
+Success: True
-#### 3. Output Results
+==== Debug Runtime Preview ====
+input_key: raw_content
+available_keys: ['raw_content']
+[debug stdout]
+ [selected_input_key] raw_content
-The script will print key information to the console:
+[debug stderr]
+Generating......: 100%|######### | 18/20 [00:08<00:00, 3.34it/s]
+```
+
+The generated code is saved to `script/cache_local/my_operator.py`. Open it to view the generated code:
+
+``` python
+from dataflow.core import OperatorABC
+from dataflow.utils.registry import OPERATOR_REGISTRY
+from dataflow.utils.storage import DataFlowStorage, FileStorage
+from dataflow import get_logger
+from dataflow.serving import APILLMServing_request
+import pandas as pd
+
+@OPERATOR_REGISTRY.register()
+class SentimentAnalysisOperator(OperatorABC):
+ def __init__(self, llm_serving=None):
+ self.logger = get_logger()
+ self.logger.info(f'Initializing {self.__class__.__name__}...')
+ self.llm_serving = llm_serving
+ self.score_name = 'SentimentScore'
+ self.logger.info(f'{self.__class__.__name__} initialized.')
+
+ @staticmethod
+ def get_desc(lang: str = "zh"):
+ if lang == "zh":
+ return (
+ "使用LLM进行文本情感分析,返回情感得分,得分越高表示情感越积极。\n"
+ "输入参数:\n"
+ "- llm_serving:LLM服务对象\n"
+ "- input_key:输入文本字段名\n"
+ "- output_key:输出得分字段名,默认'SentimentScore'\n"
+ "输出参数:\n"
+ "- 包含情感分析得分的DataFrame"
+ )
+ else:
+ return (
+ "Perform sentiment analysis on text using LLM, returning sentiment scores where higher scores indicate more positive sentiment.\n"
+ "Input Parameters:\n"
+ "- llm_serving: LLM serving object\n"
+ "- input_key: Field name for input text\n"
+ "- output_key: Field name for output score, default 'SentimentScore'\n"
+ "Output Parameters:\n"
+ "- DataFrame containing sentiment analysis scores"
+ )
+
+ def get_score(self, samples: list[dict], input_key: str) -> list[float]:
+ texts = [sample.get(input_key, '') or '' for sample in samples]
+ return self.llm_serving.generate_from_input(texts)
+
+ def eval(self, dataframe: pd.DataFrame, input_key: str) -> list[float]:
+ self.logger.info(f"Evaluating {self.score_name}...")
+ samples = dataframe.to_dict(orient='records')
+ scores = self.get_score(samples, input_key)
+ self.logger.info("Evaluation complete!")
+ return scores
+
+ def run(self,
+ storage: DataFlowStorage,
+ input_key: str | None = None,
+ output_key: str = 'SentimentScore'):
+ dataframe = storage.read("dataframe")
+ if input_key is None:
+ input_key = self._auto_select_input_key(dataframe)
+ dataframe[output_key] = self.eval(dataframe, input_key)
+ storage.write(dataframe)
+
+ def _auto_select_input_key(self, dataframe: pd.DataFrame) -> str:
+ preferred_keys = ['raw_content', 'text', 'content', 'sentence', 'instruction', 'input', 'query', 'problem', 'prompt']
+ for key in preferred_keys:
+ if key in dataframe.columns and dataframe[key].notnull().any():
+ return key
+ return dataframe.columns[0]
+
+# Runnable entry code
+
+test_data_path = '/root/autodl-tmp/DataFlow-Agent/tests/test.jsonl'
+
+# Initialize FileStorage
+storage = FileStorage(first_entry_file_name=test_data_path, cache_path="./cache_local", file_name_prefix="dataflow_cache_step", cache_type="jsonl")
+storage = storage.step()
+
+# Initialize llm_serving
+llm_serving = APILLMServing_request(api_url="http://123.129.219.111:3000/v1/chat/completions", key_name_of_api_key="DF_API_KEY", model_name="gpt-4o")
+
+# Select input key
+available_keys = ['raw_content']
+preselected_input_key = 'raw_content'
+input_key = preselected_input_key if preselected_input_key in available_keys else available_keys[0]
+print(f"[selected_input_key] {input_key}")
+
+# Instantiate and run the operator
+operator = SentimentAnalysisOperator(llm_serving=llm_serving)
+operator.run(storage=storage, input_key=input_key)
+```
-* `Matched ops`: The matched reference operators.
-* `Code preview`: A preview fragment of the generated code.
-* `Execution Result`:
- * `Success: True` indicates code generation and execution passed.
- * `Success: False` will print `stderr preview` for troubleshooting.
-* `Debug Runtime Preview`: Displays the automatically selected `input_key` and runtime logs.
diff --git a/docs/en/notes/guide/agent/pipeline_prompt.md b/docs/en/notes/guide/agent/pipeline_prompt.md
index 0515a270b..fa033c4b0 100644
--- a/docs/en/notes/guide/agent/pipeline_prompt.md
+++ b/docs/en/notes/guide/agent/pipeline_prompt.md
@@ -32,7 +32,7 @@ The generated Prompt does not merely remain as text; the Agent immediately tests
## 3. System Architecture
-This function is defined by `wf_pipeline_prompt.py`, featuring a core **single-node** workflow. All generation and verification logic are highly cohesive within the `PromptWriter` Agent.
+This function is defined by `dataflow_agent/workflow/wf_pipeline_prompt.py`, featuring a core **single-node** workflow. All generation and verification logic are highly cohesive within the `PromptWriter` Agent.
### 3.1 Core Node Process
@@ -56,9 +56,15 @@ The optimization process depends on **frontend interaction**:
## 4. User Guide
+This feature provides two modes of usage: **Graphical Interface (Gradio UI)** and **Command Line Script**.
+
### 4.1 Graphical Interface
-The frontend code is located in `PA_frontend.py`, providing a complete interactive development environment.
+The frontend code is located in `gradio_app/pages/PA_frontend.py`, which provides a visual interactive experience. It is ideal for interactive exploration and rapid validation. To launch the web interface:
+```python
+python gradio_app/app.py
+```
+Visit `http://127.0.0.1:7860` and start using
**Initial Generation:**
@@ -81,34 +87,146 @@ The frontend code is located in `PA_frontend.py`, providing a complete interacti
2. Import the Prompt class into your operator.
3. Specify `prompt_template` in the operator's `init()`.
-### 4.2 Script Invocation
+### 4.2 Script Invocation and Explicit Configuration
-Use the `run_dfa_pipeline_prompt.py` script for automated generation.
+For developers who need to integrate Prompt generation into automated pipelines or prefer code-based configuration, `script/run_dfa_pipeline_prompt.py` can be used.
-**Configuration Parameters:**
+#### 1. Modify the Configuration
-```python
-CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
-MODEL = os.getenv("DF_MODEL", "gpt-4o")
-LANGUAGE = "en"
+Open `script/run_dfa_pipeline_prompt.py` and make modifications in the configuration section at the top of the file.
-TASK_DESCRIPTION = "Write a prompt for an operator that filters missing values"
-OP_NAME = "PromptedFilter"
+**API Configuration**
+ * **`CHAT_API_URL`**: URL of the LLM service
+ * **`api_key`**: Access key (using the environment variable DF_API_KEY)
+ * **`MODEL`**: Model name, default is gpt-4o
-# These two items are only required if the operator does not possess any preset prompts;
-# otherwise, it will generate based on existing prompts.
-OUTPUT_FORMAT = "" # e.g., "Return JSON with keys: ..."
-ARGUMENTS = [] # e.g., ["min_len=10", "drop_na=true"]
+**Task Configuration**
+ * **`TASK_DESCRIPTION`**: Describe the task you want this Prompt to complete in natural language
+ * Example: `"I want to write a filter prompt suitable for financial questions."`
+ * **`OP_NAME`**: Specify which operator will load and use the generated Prompt
+ * **`OUTPUT_FORMAT`** (Optional): Specify the output format of the Prompt. If left blank, the Agent will generate it by imitating existing prompts
+ * **`ARGUMENTS`** (Optional): Parameters required by the Prompt template, separated by commas, spaces, or newlines
+ * Example: `["min_len=10", "drop_na=true"]`
-# Cache directory used to store test data and prompts
-CACHE_DIR = "./pa_cache"
-DELETE_TEST_FILES = True
+**Environment Configuration**
+ * **`CACHE_DIR`**: Result output directory. The generated Prompt files (`.py`), temporary test data, test code, etc., will all be saved here
+ * **`DELETE_TEST_FILES`**: Whether to automatically clean up temporary synthetic test data after running (`True`/`False`)
-```
+#### 2. Run the Script
-**Run Command:**
+After completing the configuration, execute the following command in the terminal:
```bash
-python run_dfa_pipeline_prompt.py
+python script/run_dfa_pipeline_prompt.py
+```
+
+#### 3. Result Output
+
+After the script is executed, the console will print the generation process. You can find the generated files in the `CACHE_DIR` directory.
+
+#### 4. Practical Case: Reuse the ReasoningQuestionFilter to Write a Filter Prompt for Financial Questions
+
+Suppose we want to reuse the `ReasoningQuestionFilter` operator in the system and turn it into a filter for financial domain questions. Open the script and modify the configuration as follows:
-```
\ No newline at end of file
+```python
+# ===== Example config (edit here) =====
+
+# 1. Define the task
+TASK_DESCRIPTION = "I want to write a filter prompt suitable for financial questions."
+
+# 2. Specify the operator to reuse (tell the Agent this Prompt is for PromptedGenerator)
+OP_NAME = "ReasoningQuestionFilter"
+
+# These two items only need to be provided if the operator does not have any preset prompts; otherwise, it will be generated by imitating existing prompts
+OUTPUT_FORMAT = "" # e.g. "Return JSON with keys: ..."
+ARGUMENTS = [] # e.g. ["min_len=10", "drop_na=true"]
+
+# Cache directory for storing test data and prompts
+CACHE_DIR = "./pa_cache"
+DELETE_TEST_FILES = False
+```
+
+**Run:**
+
+After running the script, the terminal will output execution logs. You can find the generated Prompt file `finance_question_filter_prompt20260209143556.py`, test code `test_FinanceQuestionFilterPrompt.py`, and test data in the `CACHE_DIR` directory. The content of the generated Prompt is as follows:
+``` python
+__all__ = ['FinanceQuestionFilterPrompt']
+
+from dataflow.core.prompt import DIYPromptABC
+
+class FinanceQuestionFilterPrompt(DIYPromptABC):
+ def __init__(self):
+ pass
+
+ def build_prompt(self, question: str) -> str:
+ prompt = f"""
+ # 角色:
+ 你是一个金融问题的审核助手。
+ # 任务
+ 你的任务是检查给定的金融问题是否符合以下标准:
+ 0. 首先,确认输入仅包含一个明确的金融问题(没有额外的指令如“重写”、“翻译”或提供的答案);如果不符合,输出 judgement_test=false。
+ 1. 检查拼写、语法和格式(例如货币符号、百分比表示),不解释语义。
+ 2. 对于每个最小前提(无法进一步分解),验证其是否违反常识、金融领域事实或任务要求(例如,“负利率”在某些情况下可能无效);如果无效,则失败。
+ 3. 检查前提之间或推理过程中的任何矛盾,或者最终结果是否明显不合理或不可解;如果是,则失败。
+ 4. 如果以上都通过,检查是否有足够的信息来完成任务;缺少必要条件 ⇒ 失败,冗余细节是可以接受的。
+
+ # 输出格式
+ 完成这些步骤后,输出格式必须为:
+ {{
+ "judgement_test": true/false,
+ "error_type": "<错误描述或null>"
+ }}
+ 你可以包括你的思维过程,但最终输出必须是上面的JSON格式。
+
+ 这里是需要评估的内容:
+ -------------------------------
+ {question}
+ -------------------------------
+ """
+ return prompt
+```
+The test code `test_FinanceQuestionFilterPrompt.py` generated by the Agent is as follows:
+``` python
+"""
+Auto-generated by prompt_writer
+"""
+from dataflow.pipeline import PipelineABC
+from dataflow.utils.storage import FileStorage
+from dataflow.serving import APILLMServing_request, LocalModelLLMServing_vllm
+
+try:
+ from dataflow.operators.reasoning.filter.reasoning_question_filter import ReasoningQuestionFilter
+except Exception:
+ from dataflow.operators.reasoning import ReasoningQuestionFilter
+from finance_question_filter_prompt20260209143556 import FinanceQuestionFilterPrompt
+
+class RecommendPipeline(PipelineABC):
+ def __init__(self):
+ super().__init__()
+ # -------- FileStorage --------
+ self.storage = FileStorage(
+ first_entry_file_name="./pa_cache/prompt_test_data.jsonl",
+ cache_path="./pa_cache",
+ file_name_prefix="dataflow_cache_step",
+ cache_type="jsonl",
+ )
+ # -------- LLM Serving (Remote) --------
+ self.llm_serving = APILLMServing_request(
+ api_url="http://123.129.219.111:3000/v1/chat/completions",
+ key_name_of_api_key="DF_API_KEY",
+ model_name="gpt-4o",
+ max_workers=100,
+ )
+
+ self.reasoning_question_filter = ReasoningQuestionFilter(system_prompt='You are a helpful assistant.', llm_serving=self.llm_serving, prompt_template=FinanceQuestionFilterPrompt())
+
+ def forward(self):
+ self.reasoning_question_filter.run(
+ storage=self.storage.step(), input_key='math_problem'
+ )
+
+if __name__ == "__main__":
+ pipeline = RecommendPipeline()
+ pipeline.compile()
+ pipeline.forward()
+```
diff --git a/docs/en/notes/guide/agent/pipeline_rec&refine.md b/docs/en/notes/guide/agent/pipeline_rec&refine.md
index 8769b0d2d..2292c7428 100644
--- a/docs/en/notes/guide/agent/pipeline_rec&refine.md
+++ b/docs/en/notes/guide/agent/pipeline_rec&refine.md
@@ -20,7 +20,7 @@ The system possesses self-healing capabilities: when generated code fails to exe
### 2. System Architecture
-This function is orchestrated by `wf_pipeline_recommend_extract_json.py`, forming a directed graph containing multiple levels of intelligent agents. The detailed responsibilities of the nodes are as follows:
+This function is orchestrated by `dataflow_agent/workflow/wf_pipeline_recommend_extract_json.py`, forming a directed graph containing multiple levels of intelligent agents. The detailed responsibilities of the nodes are as follows:
#### 2.1 Analysis and Planning Phase
@@ -75,7 +75,7 @@ This function is orchestrated by `wf_pipeline_recommend_extract_json.py`, formin
#### 2.2 Construction and Execution Phase
1. **Builder Node**
- 1. **Responsibility**: Converts the recommendation plan (JSON) into an actual Python code file (`pipeline.py`) and launches a subprocess to execute that code.
+ 1. **Responsibility**: Converts the recommendation plan (JSON) into an actual Python code file and launches a subprocess to execute that code.
2. **Mechanism**: Supports creating subprocesses to execute code, capturing standard output (stdout) and standard error (stderr).
3. **Output**: `state.execution_result` (Success/Fail status and logs).
@@ -108,9 +108,15 @@ When the `builder` execution fails and `need_debug=True`, it enters this loop:
### 3. User Guide
+This feature provides two modes of usage: **Graphical Interface (Gradio UI)** and **Command Line Script**.
+
#### 3.1 Graphical Interface
-Code located in `pipeline_rec.py`.
+Code located in `gradio_app/pages/pipeline_rec.py`.It is ideal for interactive exploration and rapid validation. To launch the web interface:
+```python
+python gradio_app/app.py
+```
+Visit `http://127.0.0.1:7860` and start using
1. **Configure Inputs**:
1. Enter your requirements in the "Target Description" box.
@@ -136,30 +142,82 @@ Code located in `pipeline_rec.py`.
#### 3.2 Script Invocation
-Use the `run_dfa_pipeline_recommend.py` script.
+For automated tasks or batch generation, it is recommended to directly modify and run `script/run_dfa_pipeline_recommend.py`.
-**Configuration Parameters:**
+##### 1. Modify the Configuration
-```python
-LANGUAGE = "en"
-CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
-MODEL = os.getenv("DF_MODEL", "gpt-4o")
-TARGET = "Just give me simple filtering or deduplication operators, only 2 operators needed"
-NEED_DEBUG = True # Enable auto-repair
-MAX_DEBUG_ROUNDS = 5 # Maximum repair attempts
-CACHE_DIR = "dataflow_cache"
-TEST_JSON_REL_PATH = "tests/test.jsonl" # Test data path
+Open `script/run_dfa_pipeline_recommend.py` and make modifications in the configuration section at the top of the file.
-```
+**API Configuration**
+
+ * **`CHAT_API_URL`**: URL of the LLM service
+ * **`api_key`**: Access key (using the environment variable DF_API_KEY)
+ * **`MODEL`**: Model name, default is gpt-4o
+
+**Task Configuration**
+
+ * **`TARGET`**: Describe your data processing requirements in detail in natural language
+ * Example: `"Please help me orchestrate a pipeline specifically for large-scale pre-training data cleaning, covering the entire process from deduplication and rewriting to quality filtering"`
+ * **`TEST_JSON_REL_PATH`**: Relative path of the data file used to test the Pipeline
+ * Format: One JSON object per line
+ * Default: `{Project Root Directory}/tests/test.jsonl`
+
+**Debug Configuration**
+
+ * **`NEED_DEBUG`**: Whether to enable automatic debugging and repair
+ * **`True`**: The Agent will attempt to run the generated code immediately. If an error is reported (e.g., `ImportError`, `KeyError`), it will start the Debugger Agent to analyze the error stack, automatically modify the code and retry
+ * **`False`**: End immediately after generating and running the code, without automatic debugging and repair
+ * **`MAX_DEBUG_ROUNDS`**: Maximum number of automatic repair attempts, default is 5 rounds
+
+**File Configuration**
-**Run Command:**
+ * **`CACHE_DIR`**: Result output directory. The generated pipeline code, execution logs, intermediate results, etc., will all be saved here
+
+##### 2. Run the Script
```bash
python run_dfa_pipeline_recommend.py
+```
+
+##### 3. Result Output
+
+After the script is executed, the console will print the execution logs and the final execution status. After the script runs, `my_pipeline.py`, `final_state.json` and `graph.png` will be generated under `CACHE_DIR`.
+
+##### 4. Practical Case: Pre-training Data Cleaning Pipeline
+
+Suppose we have pre-training data `tests/test.jsonl` containing dirty data, and we want to clean it to obtain high-quality data. Open the script and modify the configuration as follows:
+
+**Scenario Configuration:**
+
+```python
+# ===== Example config (edit here) =====
+
+# 1. Define the task flow
+TARGET = """
+- 1. Please help me orchestrate a dedicated pipeline for large-scale pre-training data cleaning, covering the entire process from deduplication and rewriting to quality filtering. - 1. Please help me orchestrate a dedicated pipeline for large-scale pre-training data cleaning, covering the entire process from deduplication and rewriting to quality filtering.
+- 2. In the pre-training phase, raw web data (such as Common Crawl) is often filled with a large amount of noise, advertisements, garbled characters, and duplicate content, resulting in uneven data quality. I need to first perform appropriate rewriting on the raw data, such as removing a large number of excessive spaces, HTML tags, etc. Then, rule-based heuristic filtering needs to be applied to eliminate obviously garbage text, incomplete text, and overly short invalid data. Meanwhile, considering the complexity of online content, I need to filter data in a specified language for training large models. Web data has a high duplication rate, so it is best to use a fuzzy deduplication algorithm to clean up similar documents, leaving only one copy. Finally, to ensure that the model learns high-quality knowledge, I hope to have a quality classification model to score the cleaned data and retain only the content with high educational value, thereby building a high-quality pre-training corpus.
+- 3. I need an end-to-end pipeline specifically for processing massive pre-training corpora. First, you can perform basic normalization processing on the raw text, removing excessive spaces, HTML tags, and emojis. Then, use heuristic rules for initial filtering to screen out obviously low-quality text. These heuristic rules should cover a wide range, including filtering out text segments with an excessively high symbol/word ratio, text segments containing sensitive words, text segments with an abnormal number of words, incomplete text segments ending with colons/ellipses, text segments with an abnormal number of sentences, empty text, text segments with an abnormal average word length, text segments containing HTML tags, text segments without punctuation marks, text segments with special symbols or watermarks, text segments with an excessively high proportion of parentheses, text segments with an excessively high proportion of uppercase letters, text segments containing lorem ipsum (random dummy text), text segments with an excessively low proportion of independent words, text segments with a small number of characters, text segments starting with bullet points, and text segments containing an excessive amount of Javascript. On this basis, use MinHash or similar algorithms for document-level fuzzy deduplication to significantly reduce data redundancy. Subsequently, use a trained quality assessment model to score and filter the remaining data. Finally, a language identification step can be added to ensure that only high-quality and clean text in the target language is retained in the end.
+"""
+
+# 2. Specify the test data path
+TEST_JSON_REL_PATH = "tests/test.jsonl"
+# 3. Enable Debug
+NEED_DEBUG = True
+MAX_DEBUG_ROUNDS = 5
```
-**Output:** After running, the script generates `pipeline.py`, `final_state.json`, and `graph.png` under `dataflow_cache/session_{id}/`.
+**Run:**
+After running the script, the workflow will execute in the following steps:
+
+1. **Analyze user data and intent**: Analyze the characteristics of the user's data.
+2. **Decompose user tasks and recommend operators**: Decompose the user's intent into multiple tasks, retrieve and match operators related to the user's intent.
+3. **Generate code**: Analyze the order of requirements, connect these operators in series, and write the pipeline code.
+4. **Automatic testing**: Start a child process for trial operation. If an error occurs and debug mode is enabled, the Debugger Node will attempt to fix it.
+5. **Final delivery**: End the workflow when execution is successful or the maximum number of debug rounds is reached.
+
+Users can find the generated Pipeline code files and execution log files in the `CACHE_DIR` directory.
+
## Part 2: Pipeline Refinement
@@ -170,7 +228,7 @@ Pipeline Refinement allows users to fine-tune generated DataFlow Pipelines using
### 2. System Architecture
-This function is orchestrated by `wf_pipeline_refine.py`, adopting a three-stage architecture of **Analyzer -> Planner -> Refiner**:
+This function is orchestrated by `dataflow_agent/workflow/wf_pipeline_refine.py`, adopting a three-stage architecture of **Analyzer -> Planner -> Refiner**:
#### 2.1 Refine Target Analyzer
@@ -199,9 +257,15 @@ This function is orchestrated by `wf_pipeline_refine.py`, adopting a three-stage
### 3. User Guide
+This feature provides two modes of usage: **Graphical Interface (Gradio UI)** and **Command Line Script**.
+
#### 3.1 Graphical Interface
-Integrated at the bottom of the `pipeline_rec.py` page.
+Integrated in `gradio_app/pages/pipeline_rec.py`.It is ideal for interactive exploration and rapid validation. To launch the web interface:
+```python
+python gradio_app/app.py
+```
+Visit `http://127.0.0.1:7860` and start using
1. **Prerequisite**: Must first click "Generate Pipeline" at the top of the page to generate initial pipeline code, at which point `pipeline_json_state` will be initialized.
2. **Input Optimization Instruction**: Enter instructions in the "Optimization Requirement" text box.
@@ -209,28 +273,50 @@ Integrated at the bottom of the `pipeline_rec.py` page.
4. **History Backtracking**: Use "Previous Round" and "Next Round" buttons to switch between different optimization versions and view the code evolution process.
5. **Warning Prompts**: If RAG match quality is low, an `Optimization Warning` comment will be automatically added to the top of the code, alerting the user that the currently generated operator may not fully match the requirement.
-#### 3.2 Script Invocation
+#### 3.2 Script Invocation
-Use the `run_dfa_pipeline_refine.py` script.
+Use `script/run_dfa_pipeline_refine.py` to fine-tune the structure of an existing Pipeline.
-**Configuration Parameters:**
+##### 1. Modify the Configuration
-```python
-# Input file: pipeline structure file generated in the previous step (.json)
-INPUT_JSON = "dataflow_cache/session_xxx/final_state.json"
-OUTPUT_JSON = "cache_local/pipeline_refine_result.json" # Output file; if empty string, only prints result
-# Modification Target
-TARGET = "Please adjust the Pipeline to contain only 3 nodes, simplifying data flow"
+**API Configuration**
-LANGUAGE = "en"
-CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
-MODEL = os.getenv("DF_MODEL", "gpt-4o")
+ * **`CHAT_API_URL`**: URL of the LLM service
+ * **`api_key`**: Access key (using the environment variable DF_API_KEY)
+ * **`MODEL`**: Model name, default is gpt-4o
-```
+**Task Configuration**
-**Run Command:**
+ * **`INPUT_JSON`**: Path of the Pipeline structure file to be optimized
+ * **`OUTPUT_JSON`**: Save path for the optimized Pipeline JSON structure file
+ * **`TARGET`**: Describe how you want to modify the Pipeline in natural language
+ * Example: `"Please adjust the Pipeline to contain only 3 nodes and simplify the data flow"`
+
+##### 2. Run the Script
```bash
-python run_dfa_pipeline_refine.py
+python script/run_dfa_pipeline_refine.py
+```
+
+##### 3. Practical Case: Simplify the Pipeline
+
+Suppose the Pipeline generated in the previous step is too complex and contains redundant "cleaning" operators, and we want to remove them to simplify the Pipeline.
+
+**Scenario Configuration:**
+
+```python
+# ===== Example config (edit here) =====
+
+# 1. Specify the Pipeline structure file generated in the previous step
+INPUT_JSON = "dataflow_agent/tmps/pipeline.json"
+
+# 2. Issue modification instructions
+TARGET = "Please simplify the intermediate cleaning operators and streamline the data flow."
+
+# 3. Specify the result save location
+OUTPUT_JSON = "cache_local/pipeline_refine_result.json.json"
+```
+
+**Run:**
+The Agent will analyze the JSON topology structure of the current Pipeline, find the corresponding deduplication node, and remove it.
-```
\ No newline at end of file
diff --git a/docs/zh/notes/guide/agent/operator_assemble_line.md b/docs/zh/notes/guide/agent/operator_assemble_line.md
index 629a80e92..018b4dbd3 100644
--- a/docs/zh/notes/guide/agent/operator_assemble_line.md
+++ b/docs/zh/notes/guide/agent/operator_assemble_line.md
@@ -16,7 +16,7 @@ permalink: /zh/guide/agent/operator_assemble_line/
## 2. 功能特性
-该功能模块主要由前端交互逻辑 (op_assemble_line.py) 和后端执行工作流 (wf_df_op_usage.py) 组成。
+该功能模块主要由前端交互逻辑 (`gradio_app/pages/op_assemble_line.py`) 和后端执行工作流 (`dataflow_agent/workflow/wf_df_op_usage.py`) 组成。
### 2.1 动态算子加载与自省
@@ -35,7 +35,11 @@ permalink: /zh/guide/agent/operator_assemble_line/
### 3.1 界面操作
-适合交互式探索和快速验证。
+适合交互式探索和快速验证。启动 Web 界面:
+```python
+python gradio_app/app.py
+```
+访问 `http://127.0.0.1:7860` 开始使用
1. **环境配置**:在页面顶部填写 API 相关信息和输入 JSONL 文件路径。
2. **编排流水线**:
@@ -46,15 +50,23 @@ permalink: /zh/guide/agent/operator_assemble_line/
### 3.2 脚本调用与显式配置
-对于自动化任务或批量处理,可以使用 `run_dfa_op_assemble.py` 脚本。此方式跳过 UI,直接通过代码定义算子序列。
-
-> **注意:显式配置要求** 与 UI 的“自动链接”不同,脚本模式下您必须**显式配置**所有参数。您需要确保上一个算子的 `output_key` 与下一个算子的 `input_key` 严格匹配,脚本不会自动为您纠正参数名。
+对于自动化任务或批量处理,可以使用 `script/run_dfa_op_assemble.py` 脚本。此方式跳过 UI,直接通过代码定义算子序列。
#### 1. 修改配置
-打开 `run_dfa_op_assemble.py`,在文件顶部的配置区域进行修改。
+打开 `script/run_dfa_op_assemble.py`,在文件顶部的配置区域进行修改。
-**关键配置项**:**`PIPELINE_STEPS`** 这是一个列表,定义了 Pipeline 的执行步骤。每个元素包含 `op_name` 和 `params`。
+##### API 和文件配置
+- CHAT_API_URL: LLM 服务地址
+- API_KEY: 模型调用的鉴权密钥,如果您的 Pipeline 中包含需要调用大模型的算子(如推理生成、内容改写等),此项为必填,否则算子将无法运行。
+- MODEL: 模型名称,默认 gpt-4o
+- INPUT_FILE: 输入 JSONL 文件路径,测试数据文件
+##### 其他配置
+- CACHE_DIR: 结果和 pipeline 代码的存储目录。脚本运行过程中生成的 pipeline 代码文件、执行的中间数据和最终数据结果都会保存在这里。
+- SESSION_ID: 任务的唯一标识符。
+
+#### 2.定义 Pipeline 步骤
+这是脚本中最关键的部分。您需要在`PIPELINE_STEPS`列表中定义算子的执行顺序和参数。 每一个步骤由 **算子名称 (op_name)** 和 **参数集合 (params)** 组成。
```Python
# [Pipeline 定义]
@@ -63,7 +75,7 @@ PIPELINE_STEPS = [
"op_name": "ReasoningAnswerGenerator",
"params": {
# __init__ 参数 (注意:在 wf_df_op_usage 中统一合并为 params)
- "prompt_template": "dataflow.prompts.reasoning.math.MathAnswerGeneratorPrompt",
+ "prompt_template": "dataflow.prompts.reasoning.general.GeneralAnswerGeneratorPrompt",
# run 参数
"input_key": "raw_content",
"output_key": "generated_cot"
@@ -82,24 +94,63 @@ PIPELINE_STEPS = [
}
]
```
+> **注意:显式配置要求** 与 UI 的“自动链接”不同,脚本模式下您必须**显式配置**所有参数。您需要确保上一个算子的 `output_key` 与下一个算子的 `input_key` 严格匹配,脚本不会自动为您纠正参数名。
-**其他必要配置:**
-
-- `CACHE_DIR`: **必须使用绝对路径**,以避免生成的 Python 脚本在子进程执行时出现路径错误。
-- `INPUT_FILE`: 初始数据文件的绝对路径。
-
-#### 2. 运行脚本
+#### 3. 运行脚本
```Bash
-python run_dfa_op_assemble.py
+python script/run_dfa_op_assemble.py
```
-#### 3. 结果输出
+#### 4. 结果输出
脚本执行后,控制台将打印:
-- **[Generation]**: 生成的 Python 脚本路径 (`pipeline_script_pipeline_001.py`)。
+- **[Generation]**: 生成的 Pipeline 代码路径。
- **[Code Preview]**: 生成代码的前 20 行预览。
-- **[Execution]**:
- - `Status: success` 表示执行成功。
- - `STDOUT`: 打印 pipeline 运行时的标准输出日志。
\ No newline at end of file
+- **[Execution]**: 执行情况。
+
+#### 5. 实战 Case:通用文本推理与伪答案生成
+我们有一个 `tests/test.jsonl` 文件,里面每行都有一个 `"raw_content"` 字段。我们希望:基于该字段的通用英文文本内容,先调用大语言模型针对文本内容生成推理式答案,再通过多轮生成候选答案并统计选优的方式生成伪答案,最终输出候选答案列表、最优伪答案、对应推理过程及典型正确推理示例等关键字段。所以我们选择 `ReasoningAnswerGenerator` 和 `ReasoningPseudoAnswerGenerator` 两个算子来编排 Pipeline。
+
+以下是完整的配置示例:
+
+```Python
+# [Pipeline 定义]
+PIPELINE_STEPS = [
+ {
+ "op_name": "ReasoningAnswerGenerator",
+ "params": {
+ # __init__ 参数 (注意:在 wf_df_op_usage 中统一合并为 params)
+ "prompt_template": "dataflow.prompts.reasoning.general.GeneralAnswerGeneratorPrompt",
+ # run 参数
+ "input_key": "raw_content",
+ "output_key": "generated_cot"
+ }
+ },
+ {
+ "op_name": "ReasoningPseudoAnswerGenerator",
+ "params": {
+ "max_times": 3,
+ "input_key": "generated_cot",
+ "output_key_answer": "pseudo_answers",
+ "output_key_answer_value": "pseudo_answer_value",
+ "output_key_solutions": "pseudo_solutions",
+ "output_key_correct_solution_example": "pseudo_correct_solution_example"
+ }
+ }
+]
+```
+配置完成后,在终端执行:
+
+```Bash
+python script/run_dfa_op_assemble.py
+```
+脚本会自动完成以下动作:
+
+1. 构建图:解析您的 PIPELINE_STEPS。
+2. 生成代码:将配置转换为标准的 Python 代码,存储在 `dataflow_cache/generated_pipelines/` 下。
+3. 执行任务:启动子进程运行生成的 Pipeline。
+4. 输出报告:终端会显示 [Execution] Status: success 以及代码部分预览。
+
+您可以直接去 `CACHE_DIR` 目录下查看生成的 JSONL 结果文件,验证数据是否符合预期。
\ No newline at end of file
diff --git a/docs/zh/notes/guide/agent/operator_qa.md b/docs/zh/notes/guide/agent/operator_qa.md
index e8c2493fc..572677afe 100644
--- a/docs/zh/notes/guide/agent/operator_qa.md
+++ b/docs/zh/notes/guide/agent/operator_qa.md
@@ -12,7 +12,7 @@ permalink: /zh/guide/agent/operator_qa/
## 2. 核心特性
-该功能模块由前端 UI (operator_qa.py)、脚本入口 (run_dfa_operator_qa.py) 和后端智能体 (operator_qa_agent.py) 共同驱动,具备以下核心能力:
+该功能模块由前端 UI (`gradio_app/pages/operator_qa.py`)、执行工作流 (`dataflow_agent/workflow/wf_operator_qa.py`) 和后端智能体 (`dataflow_agent/agentroles/data_agents/operator_qa_agent.py`) 共同驱动,具备以下核心能力:
### 2.1 智能检索与推荐
@@ -49,8 +49,16 @@ Agent 并非简单地进行关键词匹配,而是基于语义理解用户的
## 4. 使用指南
+本功能提供 **图形界面 (Gradio UI)** 和 **命令行脚本** 两种使用方式。
+
### 4.1 界面操作
+适合交互式探索和快速验证。启动 Web 界面:
+```python
+python gradio_app/app.py
+```
+访问 `http://127.0.0.1:7860` 开始使用
+
1. **配置模型**:在右侧“配置”面板确认 API URL 和 Key,并选择模型(默认使用 `gpt-4o`)。
2. **发起提问**:
1. **对话框**:输入你的问题。
@@ -63,48 +71,157 @@ Agent 并非简单地进行关键词匹配,而是基于语义理解用户的
### 4.2 脚本调用与显式配置
-除了 UI 界面,系统提供了 `run_dfa_operator_qa.py` 脚本,支持通过代码显式配置参数来运行问答服务。这种方式适合开发调试。
+除了 UI 界面,系统提供了 `script/run_dfa_operator_qa.py` 脚本。这种方式适合开发调试,或者通过代码自动化地查询算子用法。
+
+#### 1. 修改配置
+
+打开 `script/run_dfa_operator_qa.py`,在文件顶部的配置区域进行修改。
+
+**API 和文件配置**
+
+* **CHAT_API_URL**: LLM 服务地址。
+* **API_KEY**: 模型调用密钥。Agent 需要调用大模型来理解您的问题并总结答案。
+* **MODEL**: 模型名称,默认为 `gpt-4o`。
+* **CACHE_DIR**: 缓存目录。
+* **TOP_K**: 检索深度。指定 Agent 在知识库中检索相关算子时,最多返回多少个候选结果(默认 5 个)。
-**配置方式:** 直接修改脚本顶部的常量配置区域,无需通过命令行参数传递:
+**查询与交互模式配置**
-```Python
+* **INTERACTIVE**: **交互控制开关**(`True` / `False`)。
+ * `True` (交互模式):启动终端内的连续对话模式,您可以像聊天一样不断追问,支持 `clear` 清除历史。
+ * `False` (单次模式):脚本只执行 `QUERY` 指定的一个问题,输出结果后立即结束。
+* **QUERY**: 单次查询的问题内容。仅在 `INTERACTIVE = False` 时生效。
+* **OUTPUT_JSON**: 结果保存路径。
+ * 仅在单次模式下生效。
+ * 如果设置了路径,Agent 的回答、检索到的算子列表及代码片段会被完整保存为 JSON 文件;留空则只打印到控制台。
+
+
+
+#### 2. 运行脚本
+
+配置完成后,在终端执行:
+
+```bash
+python script/run_dfa_operator_qa.py
+
+```
+
+#### 3. 结果输出
+
+脚本执行后,根据模式不同,控制台会有不同表现:
+
+* **交互模式**:终端会出现 `🧑 你:` 提示符,等待输入。
+ * 输入 `exit` 或 `quit` 退出。
+ * 输入 `clear` 清除对话历史
+ * 输入 `history` 查看历史对话。
+* **单次模式**:控制台将直接打印 Agent 的思考过程、检索到的算子列表以及最终回答。如果配置了 `OUTPUT_JSON`,还会提示文件保存成功。
+
+#### 4. 实战 Case:查找“清洗数据”的算子
+
+假设您在开发 Pipeline 时遇到数据需要清洗,想知道 DataFlow 库里有没有现成的算子可以处理。
+
+**场景配置:** 我们将其设置为单次查询模式,并指定将结果保存到本地,以便后续在代码中查看详细参数。
+
+打开脚本修改如下配置:
+
+```python
# ===== Example config (edit here) =====
-INTERACTIVE = False # True开启多轮交互模式,False为单次查询
-QUERY = "我想过滤掉缺失值用哪个算子?" # 单次查询的问题
-LANGUAGE = "zh"
-SESSION_ID = "demo_operator_qa"
-CACHE_DIR = "dataflow_cache"
-TOP_K = 5 # 检索结果数量
+# 1. 关闭交互模式,执行单次查询
+INTERACTIVE = False
+
+# 2. 定义您的具体需求
+QUERY = "我想清洗数据,应该用哪个算子?"
+# 3. 确保 API 配置正确
CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
API_KEY = os.getenv("DF_API_KEY", "")
MODEL = os.getenv("DF_MODEL", "gpt-4o")
-OUTPUT_JSON = "" # e.g. "cache_local/operator_qa_result.json";空字符串表示不落盘
-```
-
-**运行模式:**
+# 4. 指定结果保存位置
+OUTPUT_JSON = "cache_local/operator_qa_result.json"
-1. **单次查询模式** (`INTERACTIVE = False`): 执行单次 `QUERY`,结果可直接打印或保存为 JSON 文件。
-2. **交互模式** (`INTERACTIVE = True`): 启动终端对话循环,支持 `exit` 退出、`clear` 清除上下文、`history` 查看历史。
-**核心代码逻辑:** 脚本演示了如何显式构造 `DFRequest` 和 `MainState`,并手动构建执行图:
+```
-```Python
-# 1. 显式构造请求
-req = DFRequest(
- language=LANGUAGE,
- chat_api_url=CHAT_API_URL,
- api_key=API_KEY,
- model=MODEL,
- target="" # 每次查询前再写入
-)
+**运行:**
+
+运行脚本后,Agent 会执行 RAG 检索并生成回答。打开生成的 `script/cache_local/operator_qa_result.json`,您可以看到如下结构的数据:
+
+```json
+{
+ "success": true,
+ "query": "我想清洗数据,应该用哪个算子?",
+ "answer": "对于数据清洗任务,您可以考虑使用以下算子:\n\n1. **KBCTextCleanerBatch**:用于对原始知识内容进行标准化处理,包括HTML标签清理、特殊字符规范化、链接处理和结构优化,提升RAG知识库的质量。\n\n2. **KBCTextCleaner**:类似于KBCTextCleanerBatch,专注于知识内容的标准化处理。\n\n3. **HtmlUrlRemoverRefiner**:去除文本中的URL链接和HTML标签,净化文本内容。\n\n4. **RemoveNumberRefiner**:移除文本中的数字字符,保留纯文本内容。\n\n5. **ReferenceRemoverRefiner**:删除文本中未闭合的引用标签和引用链接,净化文本中的引用标记。",
+ "related_operators": [
+ "KBCTextCleanerBatch",
+ "KBCTextCleaner",
+ "HtmlUrlRemoverRefiner",
+ "RemoveNumberRefiner",
+ "ReferenceRemoverRefiner"
+ ],
+ "code_snippet": null,
+ "follow_up_suggestions": [
+ "您需要清洗哪种类型的数据?",
+ "是否需要了解某个算子的详细参数配置?",
+ "需要查看算子的源码实现吗?"
+ ],
+ "messages": [
+ {
+ "type": "SystemMessage",
+ "content": "\n[角色]\n你是 DataFlow 算子库的智能问答助手。你的职责是帮助用户了解和使用 DataFlow 中的各种数据处理算子。\n\n[能力]\n1. 根据用户描述的需求,推荐合适的算子\n2. 解释算子的功能、用途和使用场景\n3. 详细说明算子的参数含义和配置方法\n4. 在需要时展示算子的源码实现\n5. 基于多轮对话理解用户的上下文需求\n\n[DataFlow 算子简介]\nDataFlow 是一个数据处理框架,提供了丰富的算子用于数据清洗、过滤、生成、评估等任务。\n每个算子都是一个 Python 类,通常包含:\n- `__init__` 方法:初始化算子,配置必要的参数(如 LLM 服务、提示词等)\n- `run` 方法:执行数据处理逻辑,接收输入数据并产出处理结果\n\n[可用工具]\n你可以调用以下工具来获取算子信息:\n\n1. **search_operators(query, top_k)** - 根据功能描述搜索相关算子\n - 当用户询问某类功能的算子时使用\n - 如果对话历史中已有相关算子信息,可以不调用直接回答\n\n2. **get_operator_info(operator_name)** - 获取指定算子的详细描述\n - 当用户询问特定算子的功能时使用\n\n3. **get_operator_source_code(operator_name)** - 获取算子的完整源代码\n - 当用户需要了解算子实现细节时使用\n\n4. **get_operator_parameters(operator_name)** - 获取算子的参数详情\n - 当用户询问算子如何配置、参数含义时使用\n\n[工具调用策略]\n- 如果是新问题且对话历史中没有相关信息 → 调用 search_operators 检索\n- 如果对话历史中已有相关算子信息 → 可以直接回答,无需重复检索\n- 如果用户追问某个算子的细节 → 调用 get_operator_info/get_operator_source_code/get_operator_parameters\n\n[回答风格]\n1. 清晰简洁,重点突出\n2. 使用中文回答(除非用户要求英文)\n3. 对于技术细节,提供具体的代码示例\n4. 在解释参数时,说明参数类型、默认值和作用\n\n[输出格式]\n请以 JSON 格式返回,包含以下字段:\n{\n \"answer\": \"对用户问题的详细回答\",\n \"related_operators\": [\"相关算子名称列表\"],\n \"source_explanation\": \"说明答案的信息来源,例如:'通过search_operators检索到的XXX算子'、'基于对话历史中的算子信息'、'基于我的知识库'\",\n \"code_snippet\": \"如有必要,提供代码片段(可选)\",\n \"follow_up_suggestions\": [\"可能的后续问题建议(可选)\"]\n}\n\n\n请以JSON格式返回结果,不要包含其他文字说明!!!直接返回json内容,不要```json进行包裹!!",
+ "role": "",
+ "additional_kwargs": {},
+ "metadata": {}
+ },
+ {
+ "type": "HumanMessage",
+ "content": "\n[用户问题]\n我想清洗数据,应该用哪个算子?\n\n[任务]\n请根据用户问题回答。对话历史会自动包含在消息中,你可以参考之前的对话。\n\n工具调用指南:\n1. 如果需要查找算子,调用 search_operators 工具\n2. 如果需要某个算子的详细信息,调用 get_operator_info 工具\n3. 如果需要源码,调用 get_operator_source_code 工具\n4. 如果需要参数详情,调用 get_operator_parameters 工具\n5. 如果之前的对话中已有相关信息,可以直接回答,无需重复调用工具\n\n回答要求:\n- 基于工具返回的信息或对话上下文中的信息回答\n- 在 source_explanation 中说明答案来源\n- 如果问题不明确,可以在 follow_up_suggestions 中给出澄清建议\n\n请以 JSON 格式返回你的回答。\n",
+ "role": "",
+ "additional_kwargs": {},
+ "metadata": {}
+ },
+ {
+ "type": "AIMessage",
+ "content": "",
+ "role": "",
+ "additional_kwargs": {
+ "tool_calls": [
+ {
+ "id": "call_DZer3f6W1WLsvDeHBktupSXw",
+ "function": {
+ "arguments": "{\"query\":\"数据清洗\"}",
+ "name": "search_operators"
+ },
+ "type": "function"
+ }
+ ],
+ "refusal": null
+ },
+ "metadata": {}
+ },
+ {
+ "content": "{\n \"query\": \"数据清洗\",\n \"matched_operators\": [\n \"KBCTextCleanerBatch\",\n \"KBCTextCleaner\",\n \"HtmlUrlRemoverRefiner\",\n \"RemoveNumberRefiner\",\n \"ReferenceRemoverRefiner\"\n ],\n \"operator_details\": [\n {\n \"node\": 1,\n \"name\": \"KBCTextCleanerBatch\",\n \"description\": \"知识清洗算子:对原始知识内容进行标准化处理,包括HTML标签清理、特殊字符规范化、链接处理和结构优化,提升RAG知识库的质量。主要功能:\\n1. 移除冗余HTML标签但保留语义化标签\\n2. 标准化引号/破折号等特殊字符\\n3. 处理超链接同时保留文本\\n4. 保持原始段落结构和代码缩进\\n5. 确保事实性内容零修改\",\n \"category\": \"knowledge_cleaning\"\n },\n {\n \"node\": 2,\n \"name\": \"KBCTextCleaner\",\n \"description\": \"知识清洗算子:对原始知识内容进行标准化处理,包括HTML标签清理、特殊字符规范化、链接处理和结构优化,提升RAG知识库的质量。主要功能:\\n1. 移除冗余HTML标签但保留语义化标签\\n2. 标准化引号/破折号等特殊字符\\n3. 处理超链接同时保留文本\\n4. 保持原始段落结构和代码缩进\\n5. 确保事实性内容零修改\\n\\n输入格式示例:\\n\\n\\n输出格式示例:\\n标题文本\\n\\n正文段落,包括特殊符号,例如\\\"直引号\\\"、-破折号等\\n\\n[Image: 示例图 example.jpg]\\n\\n链接文本\\n\\n代码片段\\n\\n[结构保持,语义保留,敏感信息脱敏处理(如手机号、保密标记等)]\",\n \"category\": \"knowledge_cleaning\"\n },\n {\n \"node\": 3,\n \"name\": \"HtmlUrlRemoverRefiner\",\n \"description\": \"去除文本中的URL链接和HTML标签,净化文本内容。使用正则表达式匹配并移除各种形式的URL和HTML标签。输入参数:\\n- input_key:输入文本字段名\\n输出参数:\\n- 包含净化后文本的DataFrame\\n- 返回输入字段名,用于后续算子引用\",\n \"category\": \"general_text\"\n },\n {\n \"node\": 4,\n \"name\": \"RemoveNumberRefiner\",\n \"description\": \"该算子用于移除文本中的数字字符,包括0-9的阿拉伯数字。\\n通过字符过滤实现数字移除,保留纯文本内容。\\n输入参数:\\n- 无初始化参数\\n运行参数:\\n- input_key:输入文本字段名\\n输出参数:\\n- 处理后的DataFrame,包含去除数字的文本\\n- 返回包含输入字段名的列表,用于后续算子引用\",\n \"category\": \"general_text\"\n },\n {\n \"node\": 5,\n \"name\": \"ReferenceRemoverRefiner\",\n \"description\": \"删除文本中未闭合的引用标签和引用链接,包括标签和{{cite}}模板的各种完整和不完整形式。净化文本中的引用标记。输入参数:\\n- input_key:输入文本字段名\\n输出参数:\\n- 包含移除引用标记后文本的DataFrame\\n- 返回输入字段名,用于后续算子引用\",\n \"category\": \"general_text\"\n }\n ]\n}",
+ "additional_kwargs": {},
+ "response_metadata": {},
+ "type": "tool",
+ "name": "search_operators",
+ "id": "63bb6bcd-5184-456f-91d8-72fb06b9cdc0",
+ "tool_call_id": "call_DZer3f6W1WLsvDeHBktupSXw",
+ "artifact": null,
+ "status": "success"
+ },
+ {
+ "type": "AIMessage",
+ "content": "{\n \"answer\": \"对于数据清洗任务,您可以考虑使用以下算子:\\n\\n1. **KBCTextCleanerBatch**:用于对原始知识内容进行标准化处理,包括HTML标签清理、特殊字符规范化、链接处理和结构优化,提升RAG知识库的质量。\\n\\n2. **KBCTextCleaner**:类似于KBCTextCleanerBatch,专注于知识内容的标准化处理。\\n\\n3. **HtmlUrlRemoverRefiner**:去除文本中的URL链接和HTML标签,净化文本内容。\\n\\n4. **RemoveNumberRefiner**:移除文本中的数字字符,保留纯文本内容。\\n\\n5. **ReferenceRemoverRefiner**:删除文本中未闭合的引用标签和引用链接,净化文本中的引用标记。\",\n \"related_operators\": [\n \"KBCTextCleanerBatch\",\n \"KBCTextCleaner\",\n \"HtmlUrlRemoverRefiner\",\n \"RemoveNumberRefiner\",\n \"ReferenceRemoverRefiner\"\n ],\n \"source_explanation\": \"通过search_operators检索到的相关算子\",\n \"code_snippet\": null,\n \"follow_up_suggestions\": [\n \"您需要清洗哪种类型的数据?\",\n \"是否需要了解某个算子的详细参数配置?\",\n \"需要查看算子的源码实现吗?\"\n ]\n}",
+ "role": "",
+ "additional_kwargs": {
+ "refusal": null
+ },
+ "metadata": {}
+ }
+ ]
+}
-# 2. 初始化状态与图
-state = MainState(request=req, messages=[])
-graph = create_operator_qa_graph().build()
+```
-# 3. 执行
-result = await run_single_query(state, graph, QUERY)
-```
\ No newline at end of file
diff --git a/docs/zh/notes/guide/agent/operator_write.md b/docs/zh/notes/guide/agent/operator_write.md
index 594514a3f..62caab2be 100644
--- a/docs/zh/notes/guide/agent/operator_write.md
+++ b/docs/zh/notes/guide/agent/operator_write.md
@@ -37,7 +37,7 @@ permalink: /zh/guide/agent/operator_write/
## 3. 工作流架构
-该功能由 `wf_pipeline_write.py` 编排,形成一个包含条件循环的有向图。
+该功能由 `dataflow_agent/workflow/wf_pipeline_write.py` 编排,形成一个包含条件循环的有向图。
1. **Match Node**: 检索参考算子。
2. **Write Node**: 编写初始代码。
@@ -52,7 +52,11 @@ permalink: /zh/guide/agent/operator_write/
### 4.1 界面操作
-前端页面代码位于 `operator_write.py`,提供了可视化的交互体验。
+前端页面代码位于 `gradio_app/pages/operator_write.py`,提供了可视化的交互体验,适合交互式探索和快速验证。启动 Web 界面:
+```python
+python gradio_app/app.py
+```
+访问 `http://127.0.0.1:7860` 开始使用
#### 1. 配置输入
@@ -80,39 +84,187 @@ permalink: /zh/guide/agent/operator_write/
### 4.2 脚本调用与显式配置
-对于开发者或自动化任务,可以直接运行 `run_dfa_operator_write.py`。
+对于开发者,推荐直接修改并运行 `script/run_dfa_operator_write.py`。这种方式可以更灵活地集成到自动化流程中,并保存生成的算子文件。
#### 1. 修改配置
-打开 `run_dfa_operator_write.py`,在文件顶部的配置区域修改参数:
+打开 `script/run_dfa_operator_write.py`,在文件顶部的配置区域修改参数。
+
+**任务配置**
+ * **`TARGET`**: 用自然语言描述算子的功能。描述越具体,生成的代码越准确。建议包含对输入字段和预期输出的描述。
+ * 示例:`"创建一个算子,用于对文本进行情感分析"`
+ * 示例:`"实现一个数据去重算子,支持多字段组合去重"`
+ * **`CATEGORY`**: 算子所属类别,用于匹配相似算子作为参考
+ * 默认:`"Default"`
+ * 可选:`"reasoning"`, `"agentic_rag"`, `"knowledge_cleaning"` 等
+ * **`JSON_FILE`**: 用于测试算子的数据文件(`.jsonl` 格式)。
+ * 默认:留空则使用项目自带的测试数据`tests/test.jsonl`。
+ * **`OUTPUT_PATH`**: 生成的 Python 代码保存路径。如果留空,代码只会打印在控制台,不会保存文件。
+
+**API 与 调试配置**
+ * **`CHAT_API_URL`**: LLM 服务地址
+ * **`api_key`**: 访问密钥(使用环境变量 DF_API_KEY)
+ * **`MODEL`**: 模型名称,默认 gpt-4o
+ * **`NEED_DEBUG`**: 是否开启自动调试循环 (`True` / `False`)
+ * `True`:如果生成的代码在 `JSON_FILE` 上运行报错,Agent 会自动分析错误堆栈并尝试重写代码
+ * `False`:生成代码并执行后立即结束,不管是否能运行成功
+ * **`MAX_DEBUG_ROUNDS`**: 最大自动修复次数,默认 3 次
+
+#### 2. 运行脚本
+
+配置完成后,在终端执行:
+
+```bash
+python script/run_dfa_operator_write.py
+
+```
+
+#### 3. 结果输出
+
+脚本执行过程中会输出以下关键信息:
+
+* **[Match Operator Result]**: 显示 Agent 在现有算子库中找到的“参考算子”
+* **[Writer Result]**: 生成的代码长度和保存位置
+* **[Execution Result]**:代码执行结果
+ * `Success: True`:表示代码生成成功,且在测试数据上运行无误。
+ * `Success: False`:表示运行失败。
+* **[Debug Runtime Preview]**:运行时捕获的 `stdout`/`stderr` 以及选定的输入字段键名 (`input_key`)
+
+#### 4. 实战 Case:编写一个情感分析算子
+
+我们有一个日志文件 `tests/test.jsonl`,其中包含字段 `"raw_content"`。我们希望创建一个算子,对该字段的文本内容进行情感分析。
+
+**配置示例:**
-```Python
+```python
+# ===== Example config (edit here) =====、
+# API KEY 通过设置环境变量 DF_API_KEY 传入
CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
MODEL = os.getenv("DF_MODEL", "gpt-4o")
LANGUAGE = "en"
-TARGET = "Create an operator that filters out missing values and keeps rows with non-empty fields."
-CATEGORY = "Default" # 兜底类别(若 classifier 未命中)
-OUTPUT_PATH = "" # e.g. "cache_local/my_operator.py";空则不落盘
-JSON_FILE = "" # 空则使用项目自带 tests/test.jsonl
+# 1. 定义具体需求
+TARGET = "创建一个算子,用于对文本进行情感分析"
+CATEGORY = "Default"
+# 2. 指定结果保存路径
+OUTPUT_PATH = "cache_local/my_operator.py"
+# 3. 指定测试数据路径
+JSON_FILE = "tests/test.jsonl"
+# 4. 开启调试
+NEED_DEBUG = True
+MAX_DEBUG_ROUNDS = 10
-NEED_DEBUG = False
-MAX_DEBUG_ROUNDS = 3
```
-#### 2. 运行脚本
+**运行:**
+运行脚本后,终端会给出以下输出:
+``` bash
+==== Match Operator Result ====
+Matched ops: ['LangkitSampleEvaluator', 'LexicalDiversitySampleEvaluator', 'PresidioSampleEvaluator', 'PerspectiveSampleEvaluator']
-```Bash
-python run_dfa_operator_write.py
-```
+==== Writer Result ====
+Code length: 3619
+Saved to: cache_local/my_operator.py
-#### 3. 结果输出
+==== Execution Result (instantiate) ====
+Success: True
-脚本会在控制台打印关键信息:
+==== Debug Runtime Preview ====
+input_key: raw_content
+available_keys: ['raw_content']
+[debug stdout]
+ [selected_input_key] raw_content
-- `Matched ops`: 匹配到的参考算子。
-- `Code preview`: 生成代码的预览片段。
-- `Execution Result`:
- - `Success: True` 表示代码生成并运行通过。
- - `Success: False` 会打印 `stderr preview` 供排查。
-- `Debug Runtime Preview`: 显示自动选择的 `input_key` 和运行时日志。
\ No newline at end of file
+[debug stderr]
+Generating......: 100%|######### | 18/20 [00:08<00:00, 3.34it/s]
+```
+生成的代码保存到 `script/cache_local/my_operator.py`中,打开可以查看生成的代码:
+``` python
+from dataflow.core import OperatorABC
+from dataflow.utils.registry import OPERATOR_REGISTRY
+from dataflow.utils.storage import DataFlowStorage, FileStorage
+from dataflow import get_logger
+from dataflow.serving import APILLMServing_request
+import pandas as pd
+
+@OPERATOR_REGISTRY.register()
+class SentimentAnalysisOperator(OperatorABC):
+ def __init__(self, llm_serving=None):
+ self.logger = get_logger()
+ self.logger.info(f'Initializing {self.__class__.__name__}...')
+ self.llm_serving = llm_serving
+ self.score_name = 'SentimentScore'
+ self.logger.info(f'{self.__class__.__name__} initialized.')
+
+ @staticmethod
+ def get_desc(lang: str = "zh"):
+ if lang == "zh":
+ return (
+ "使用LLM进行文本情感分析,返回情感得分,得分越高表示情感越积极。\n"
+ "输入参数:\n"
+ "- llm_serving:LLM服务对象\n"
+ "- input_key:输入文本字段名\n"
+ "- output_key:输出得分字段名,默认'SentimentScore'\n"
+ "输出参数:\n"
+ "- 包含情感分析得分的DataFrame"
+ )
+ else:
+ return (
+ "Perform sentiment analysis on text using LLM, returning sentiment scores where higher scores indicate more positive sentiment.\n"
+ "Input Parameters:\n"
+ "- llm_serving: LLM serving object\n"
+ "- input_key: Field name for input text\n"
+ "- output_key: Field name for output score, default 'SentimentScore'\n"
+ "Output Parameters:\n"
+ "- DataFrame containing sentiment analysis scores"
+ )
+
+ def get_score(self, samples: list[dict], input_key: str) -> list[float]:
+ texts = [sample.get(input_key, '') or '' for sample in samples]
+ return self.llm_serving.generate_from_input(texts)
+
+ def eval(self, dataframe: pd.DataFrame, input_key: str) -> list[float]:
+ self.logger.info(f"Evaluating {self.score_name}...")
+ samples = dataframe.to_dict(orient='records')
+ scores = self.get_score(samples, input_key)
+ self.logger.info("Evaluation complete!")
+ return scores
+
+ def run(self,
+ storage: DataFlowStorage,
+ input_key: str | None = None,
+ output_key: str = 'SentimentScore'):
+ dataframe = storage.read("dataframe")
+ if input_key is None:
+ input_key = self._auto_select_input_key(dataframe)
+ dataframe[output_key] = self.eval(dataframe, input_key)
+ storage.write(dataframe)
+
+ def _auto_select_input_key(self, dataframe: pd.DataFrame) -> str:
+ preferred_keys = ['raw_content', 'text', 'content', 'sentence', 'instruction', 'input', 'query', 'problem', 'prompt']
+ for key in preferred_keys:
+ if key in dataframe.columns and dataframe[key].notnull().any():
+ return key
+ return dataframe.columns[0]
+
+# Runnable entry code
+
+test_data_path = '/root/autodl-tmp/DataFlow-Agent/tests/test.jsonl'
+
+# Initialize FileStorage
+storage = FileStorage(first_entry_file_name=test_data_path, cache_path="./cache_local", file_name_prefix="dataflow_cache_step", cache_type="jsonl")
+storage = storage.step()
+
+# Initialize llm_serving
+llm_serving = APILLMServing_request(api_url="http://123.129.219.111:3000/v1/chat/completions", key_name_of_api_key="DF_API_KEY", model_name="gpt-4o")
+
+# Select input key
+available_keys = ['raw_content']
+preselected_input_key = 'raw_content'
+input_key = preselected_input_key if preselected_input_key in available_keys else available_keys[0]
+print(f"[selected_input_key] {input_key}")
+
+# Instantiate and run the operator
+operator = SentimentAnalysisOperator(llm_serving=llm_serving)
+operator.run(storage=storage, input_key=input_key)
+```
\ No newline at end of file
diff --git a/docs/zh/notes/guide/agent/pipeline_prompt.md b/docs/zh/notes/guide/agent/pipeline_prompt.md
index bcf4f6142..e3ff1bff1 100644
--- a/docs/zh/notes/guide/agent/pipeline_prompt.md
+++ b/docs/zh/notes/guide/agent/pipeline_prompt.md
@@ -31,7 +31,7 @@ permalink: /zh/guide/agent/pipeline_prompt/
## 3. 系统架构
-该功能由 `wf_pipeline_prompt.py` 定义,核心为一个**单节点**工作流。所有的生成与验证逻辑均高度内聚在 `PromptWriter` Agent 中。
+该功能由 `dataflow_agent/workflow/wf_pipeline_prompt.py` 定义,核心为一个**单节点**工作流。所有的生成与验证逻辑均高度内聚在 `PromptWriter` Agent 中。
### 3.1 核心节点流程
@@ -55,9 +55,15 @@ permalink: /zh/guide/agent/pipeline_prompt/
## 4. 使用指南
+本功能提供 **图形界面 (Gradio UI)** 和 **命令行脚本** 两种使用方式。
+
### 4.1 图形界面
-前端代码位于 `PA_frontend.py`,提供了完整的交互式开发环境。
+前端代码位于 `gradio_app/pages/PA_frontend.py`,提供了可视化的交互体验,适合交互式探索和快速验证。启动 Web 界面:
+```python
+python gradio_app/app.py
+```
+访问 `http://127.0.0.1:7860` 开始使用
**初次生成:**
@@ -74,25 +80,62 @@ permalink: /zh/guide/agent/pipeline_prompt/
3. 查看更新后的代码和测试结果
4. 重复步骤 1-3 直到获得满意结果
-使用生成的 Prompt**:**
+**使用生成的 Prompt:**
1. 从"Prompt 文件路径"获取生成的 Prompt 文件位置
2. 将 Prompt 类导入到您的算子中
3. 在算子的 `init()` 中指定 `prompt_template`
-### 4.2 脚本调用
+### 4.2 脚本调用与显式配置
+
+对于需要将 Prompt 生成集成到自动化流水线,或者习惯代码配置的开发者,可以使用 `script/run_dfa_pipeline_prompt.py`。
+
+#### 1. 修改配置
+
+打开 `script/run_dfa_pipeline_prompt.py`,在文件顶部的配置区域进行修改。
-使用 `run_dfa_pipeline_prompt.py` 脚本进行自动化生成。
+**API 配置**
+ * **`CHAT_API_URL`**: LLM 服务地址
+ * **`api_key`**: 访问密钥(使用环境变量 DF_API_KEY)
+ * **`MODEL`**: 模型名称,默认 gpt-4o
-**配置参数**:
+**任务配置**
+ * **`TASK_DESCRIPTION`**: 用自然语言描述您希望这个 Prompt 完成什么任务
+ * 示例:`"我想写一个适用于金融问题的过滤器提示词."`
+ * **`OP_NAME`**: 指定生成的 Prompt 将被哪个算子加载使用
+ * **`OUTPUT_FORMAT`** (可选): 指定 Prompt 输出的格式。如果不填,Agent 会仿照已有提示词生成
+ * **`ARGUMENTS`** (可选): Prompt 模板需要的参数,用逗号、空格或换行分隔
+ * 示例:`["min_len=10", "drop_na=true"]`
-```Python
-CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
-MODEL = os.getenv("DF_MODEL", "gpt-4o")
-LANGUAGE = "en"
+**环境配置**
+ * **`CACHE_DIR`**: 结果输出目录。生成的 Prompt 文件(`.py`)、临时的测试数据、测试代码等都会保存在这里
+ * **`DELETE_TEST_FILES`**: 运行结束后是否自动清理临时的合成测试数据(`True`/`False`)
+
+#### 2. 运行脚本
+
+配置完成后,在终端执行:
+
+```bash
+python script/run_dfa_pipeline_prompt.py
+
+```
-TASK_DESCRIPTION = "Write a prompt for an operator that filters missing values"
-OP_NAME = "PromptedFilter"
+#### 3. 结果输出
+
+脚本执行完毕后,控制台会打印生成过程。您可以在 `CACHE_DIR` 目录下找到生成的文件
+
+#### 4. 实战 Case:复用ReasoningQuestionFilter过滤器,编写适用金融问题的过滤器提示词
+
+假设我们想复用系统中的 `ReasoningQuestionFilter` 算子,让它变成为一个金融领域问题的过滤器。打开脚本修改如下配置:
+
+```python
+# ===== Example config (edit here) =====
+
+# 1. 定义任务
+TASK_DESCRIPTION = "我想写一个适用于金融问题的过滤器提示词"
+
+# 2. 指定复用的算子 (告诉 Agent 这个 Prompt 是给 PromptedGenerator 用的)
+OP_NAME = "ReasoningQuestionFilter"
# 这两项在算子不拥有任何一个预置提示词时才需要提供,否则会仿照已有提示词生成
OUTPUT_FORMAT = "" # e.g. "Return JSON with keys: ..."
@@ -100,11 +143,91 @@ ARGUMENTS = [] # e.g. ["min_len=10", "drop_na=true"]
# 缓存目录,用于存储测试数据和提示词
CACHE_DIR = "./pa_cache"
-DELETE_TEST_FILES = True
-```
+DELETE_TEST_FILES = False
-**运行命令**:
+```
-```Bash
-python run_dfa_pipeline_prompt.py
+**运行:**
+
+运行脚本后,终端会输出执行的日志,您可以在 `CACHE_DIR` 目录下找到生成的 Prompt 文件`finance_question_filter_prompt20260209143556.py`、测试代码`test_FinanceQuestionFilterPrompt.py`以及测试数据,生成的 Prompt 内容如下:
+``` python
+__all__ = ['FinanceQuestionFilterPrompt']
+
+from dataflow.core.prompt import DIYPromptABC
+
+class FinanceQuestionFilterPrompt(DIYPromptABC):
+ def __init__(self):
+ pass
+
+ def build_prompt(self, question: str) -> str:
+ prompt = f"""
+ # 角色:
+ 你是一个金融问题的审核助手。
+ # 任务
+ 你的任务是检查给定的金融问题是否符合以下标准:
+ 0. 首先,确认输入仅包含一个明确的金融问题(没有额外的指令如“重写”、“翻译”或提供的答案);如果不符合,输出 judgement_test=false。
+ 1. 检查拼写、语法和格式(例如货币符号、百分比表示),不解释语义。
+ 2. 对于每个最小前提(无法进一步分解),验证其是否违反常识、金融领域事实或任务要求(例如,“负利率”在某些情况下可能无效);如果无效,则失败。
+ 3. 检查前提之间或推理过程中的任何矛盾,或者最终结果是否明显不合理或不可解;如果是,则失败。
+ 4. 如果以上都通过,检查是否有足够的信息来完成任务;缺少必要条件 ⇒ 失败,冗余细节是可以接受的。
+
+ # 输出格式
+ 完成这些步骤后,输出格式必须为:
+ {{
+ "judgement_test": true/false,
+ "error_type": "<错误描述或null>"
+ }}
+ 你可以包括你的思维过程,但最终输出必须是上面的JSON格式。
+
+ 这里是需要评估的内容:
+ -------------------------------
+ {question}
+ -------------------------------
+ """
+ return prompt
+```
+Agent 生成的测试代码`test_FinanceQuestionFilterPrompt.py`如下:
+``` python
+"""
+Auto-generated by prompt_writer
+"""
+from dataflow.pipeline import PipelineABC
+from dataflow.utils.storage import FileStorage
+from dataflow.serving import APILLMServing_request, LocalModelLLMServing_vllm
+
+try:
+ from dataflow.operators.reasoning.filter.reasoning_question_filter import ReasoningQuestionFilter
+except Exception:
+ from dataflow.operators.reasoning import ReasoningQuestionFilter
+from finance_question_filter_prompt20260209143556 import FinanceQuestionFilterPrompt
+
+class RecommendPipeline(PipelineABC):
+ def __init__(self):
+ super().__init__()
+ # -------- FileStorage --------
+ self.storage = FileStorage(
+ first_entry_file_name="./pa_cache/prompt_test_data.jsonl",
+ cache_path="./pa_cache",
+ file_name_prefix="dataflow_cache_step",
+ cache_type="jsonl",
+ )
+ # -------- LLM Serving (Remote) --------
+ self.llm_serving = APILLMServing_request(
+ api_url="http://123.129.219.111:3000/v1/chat/completions",
+ key_name_of_api_key="DF_API_KEY",
+ model_name="gpt-4o",
+ max_workers=100,
+ )
+
+ self.reasoning_question_filter = ReasoningQuestionFilter(system_prompt='You are a helpful assistant.', llm_serving=self.llm_serving, prompt_template=FinanceQuestionFilterPrompt())
+
+ def forward(self):
+ self.reasoning_question_filter.run(
+ storage=self.storage.step(), input_key='math_problem'
+ )
+
+if __name__ == "__main__":
+ pipeline = RecommendPipeline()
+ pipeline.compile()
+ pipeline.forward()
```
\ No newline at end of file
diff --git a/docs/zh/notes/guide/agent/pipeline_rec&refine.md b/docs/zh/notes/guide/agent/pipeline_rec&refine.md
index b0d6dd65d..f70f0d773 100644
--- a/docs/zh/notes/guide/agent/pipeline_rec&refine.md
+++ b/docs/zh/notes/guide/agent/pipeline_rec&refine.md
@@ -20,7 +20,7 @@ permalink: /zh/guide/agent/pipeline_rec&refine/
### 2. 系统架构
-该功能由 `wf_pipeline_recommend_extract_json.py` 编排,形成一个包含多级智能体的有向图。以下是详细的节点职责说明:
+该功能由 `dataflow_agent/workflow/wf_pipeline_recommend_extract_json.py` 编排,形成一个包含多级智能体的有向图。以下是详细的节点职责说明:
#### 2.1 分析与规划阶段
@@ -73,7 +73,7 @@ permalink: /zh/guide/agent/pipeline_rec&refine/
#### 2.2 构建与执行阶段
1. **Builder Node**
- 1. **职责**: 将推荐方案(JSON)转化为实际的 Python 代码文件 (`pipeline.py`),并启动子进程执行该代码。
+ 1. **职责**: 将推荐方案(JSON)转化为实际的 Python 代码文件,并启动子进程执行该代码。
2. **机制**: 支持创建子进程执行代码,捕获标准输出 (stdout) 和标准错误 (stderr)。
3. **输出**: `state.execution_result` (Success/Fail 状态及日志)。
@@ -99,9 +99,15 @@ permalink: /zh/guide/agent/pipeline_rec&refine/
### 3. 使用指南
+本功能提供 **图形界面 (Gradio UI)** 和 **命令行脚本** 两种使用方式。
+
#### 3.1 图形界面
-代码位于 `pipeline_rec.py`。
+代码位于 `gradio_app/pages/pipeline_rec.py`,适合交互式探索和快速验证。启动 Web 界面:
+```python
+python gradio_app/app.py
+```
+访问 `http://127.0.0.1:7860` 开始使用
1. **配置输入**:
1. 在"目标描述"框中输入您的需求
@@ -121,28 +127,84 @@ permalink: /zh/guide/agent/pipeline_rec&refine/
#### 3.2 脚本调用
-使用 `run_dfa_pipeline_recommend.py` 脚本。
+对于自动化任务或批量生成,推荐直接修改并运行 `script/run_dfa_pipeline_recommend.py`。
+##### 1. 修改配置
+
+打开 `script/run_dfa_pipeline_recommend.py`,在文件顶部的配置区域进行修改。
+
+**API 配置**
+
+ * **`CHAT_API_URL`**: LLM 服务地址
+ * **`api_key`**: 访问密钥(使用环境变量 DF_API_KEY)
+ * **`MODEL`**: 模型名称,默认 gpt-4o
+
+**任务配置**
+
+ * **`TARGET`**: 用自然语言详细描述您的数据处理需求
+ * 示例:`"请帮我编排一个专门用于大规模预训练数据清洗的流水线,涵盖从去重、改写到质量过滤的全过程"`
+ * **`TEST_JSON_REL_PATH`**: 用于测试 Pipeline 的数据文件的相对路径
+ * 格式:每行一个 JSON 对象
+ * 默认:`{项目根目录}/tests/test.jsonl`
+
+**调试配置**
+
+ * **`NEED_DEBUG`**: 是否启用自动调试和修复
+ * **`True`**: Agent 生成代码后会立即尝试运行。如果报错(如 `ImportError`, `KeyError`),它会启动 Debugger Agent 分析错误堆栈,自动修改代码并重试
+ * **`False`**:生成代码运行后立即结束,不进行自动调试和修复
+ * **`MAX_DEBUG_ROUNDS`**: 最大自动修复次数,默认 5 次
+
+**文件配置**
-**配置参数:**
+ * **`CACHE_DIR`**: 结果输出目录。生成的 pipeline 代码、执行的日志、中间结果等都会保存在这里
+
+##### 2. 运行脚本
+
+```bash
+python script/run_dfa_pipeline_recommend.py
-```Python
-LANGUAGE = "en"
-CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
-MODEL = os.getenv("DF_MODEL", "gpt-4o")
-TARGET = "给我简单的过滤或者去重算子就好了,只需要2个算子"
-NEED_DEBUG = True # 开启自动修复
-MAX_DEBUG_ROUNDS = 5 # 最大尝试修复次数
-CACHE_DIR = "dataflow_cache"
-TEST_JSON_REL_PATH = "tests/test.jsonl" # 测试数据路径
```
-**运行命令:**
+##### 3. 结果输出
+
+脚本执行完毕后,控制台会打印执行的日志和最终执行状态,脚本运行后会在 `CACHE_DIR` 下生成 `my_pipeline.py`, `final_state.json` 和 `graph.png`。
+
+##### 4. 实战 Case:预训练数据清洗流水线
+
+假设我们有一个包含脏数据的预训练数据 `tests/test.jsonl`,我们希望清洗出一份高质量数据。打开脚本修改如下配置:
+
+**场景配置:**
+
+```python
+# ===== Example config (edit here) =====
+
+# 1. 定义任务流程
+TARGET = """
+- 1.请帮我编排一个专门用于大规模预训练数据清洗的流水线,涵盖从去重、改写到质量过滤的全过程。 - 1. 请帮我编排一个专门用于大规模预训练数据清洗的流水线,涵盖从去重、改写到质量过滤的全过程。
+- 2. 在预训练阶段,原始的网页数据(如Common Crawl)往往充斥着大量的噪声、广告、乱码以及重复内容,数据质量参差不齐。我需要先做对原始数据做适当的改写,比如删除大量多余空格、html标签等。接着,需要通过基于规则的启发式过滤,把那些显而易见的垃圾文本、不完整文本和过短的无效数据剔除掉。同时,考虑到网络上内容复杂,我需要筛选指定语言的数据来训练大模型。网络数据的重复率很高,最好能通过模糊去重算法把相似的文档都清理掉,只保留一份。最后,为了保证模型学到的是高质量知识,我希望还能有一个质量分类模型,对清洗后的数据打分,只留下那些高教育价值的内容,从而构建一个高质量的预训练语料库。
+- 3. 我需要一个专门处理海量预训练语料的端到端流水线。首先,你可以对原始文本进行基础的规范化处理,删除多余空格、html标签和表情符号。接着,利用启发式规则进行初步过滤,筛掉显着的低质量文本。这些启发式规则覆盖广泛,需要过滤掉符号/单词比例过高的文段、含敏感词的文段、单词数量异常的文段、以冒号/省略号结尾的不完整文段、语句数量异常的文段、空文本、平均单词长度异常的文段、含html标签的文段、无标点符号的文段、含特殊符号或水印的文段、括号比例过高的文段、大写字母比例过高的文段、含lorem ipsum(随机假文)的文段、独立单词比例过小的文段、字符数量较少的文段、以项目符号开头的文段和含有Javascript数量过多的文段。在此基础上,使用MinHash或类似算法进行文档级的模糊去重,大幅降低数据冗余。随后,利用训练好的质量评估模型对剩余数据进行打分和筛选。最后,还可以加入一个语言识别步骤,确保最终留下的都是目标语言的高质量纯净文本。
+"""
+
+# 2. 指定测试数据路径
+TEST_JSON_REL_PATH = "tests/test.jsonl"
+
+# 3. 开启 Debug
+NEED_DEBUG = True
+MAX_DEBUG_ROUNDS = 5
-```Bash
-python run_dfa_pipeline_recommend.py
```
-**输出:** 脚本运行后会在 `dataflow_cache/session_{id}/` 下生成 `pipeline.py`, `final_state.json` 和 `graph.png`。
+**运行:**
+运行脚本后,工作流会按以下步骤执行:
+
+1. **分析用户的数据和意图**:分析用户的数据的特征。
+2. **拆解用户任务,推荐算子**:将用户的意图拆解成多个任务,检索匹配出与用户意图相关的算子。
+3. **生成代码**:分析需求顺序,串联这些算子,编写 pipeline 代码。
+4. **自动测试**:启动子进程试运行。如果出现了错误并启动了调试模式,Debugger Node 会尝试修复。
+5. **最终交付**:在成功执行或者达到最大调试轮数时结束工作流。
+
+用户可以在`CACHE_DIR`目录下找到生成的 Pipeline 代码文件和执行的日志文件。
+
+
## 第二部分:Pipeline 迭代优化 (Pipeline Refinement)
@@ -152,7 +214,7 @@ Pipeline 迭代优化 (Refinement) 允许用户通过自然语言对已生成的
### 2. 系统架构
-该功能由 `wf_pipeline_refine.py` 编排,采用 **Analyzer -> Planner -> Refiner** 的三段式架构:
+该功能由 `dataflow_agent/workflow/wf_pipeline_refine.py` 编排,采用 **Analyzer -> Planner -> Refiner** 的三段式架构:
#### 2.1 Refine Target Analyzer
@@ -179,9 +241,15 @@ Pipeline 迭代优化 (Refinement) 允许用户通过自然语言对已生成的
### 3. 使用指南
+本功能提供 **图形界面 (Gradio UI)** 和 **命令行脚本** 两种使用方式。
+
#### 3.1 图形界面
-集成在 `pipeline_rec.py` 页面底部。
+集成在 `gradio_app/pages/pipeline_rec.py`,适合交互式探索和快速验证。启动 Web 界面:
+```python
+python gradio_app/app.py
+```
+访问 `http://127.0.0.1:7860` 开始使用
1. **前提**:必须先在页面上方点击 "Generate Pipeline" 生成初始 pipeline 代码,此时 `pipeline_json_state` 会被初始化。
2. **输入优化指令**:在 "优化需求" 文本框中输入指令。
@@ -191,24 +259,49 @@ Pipeline 迭代优化 (Refinement) 允许用户通过自然语言对已生成的
#### 3.2 脚本调用
-使用 `run_dfa_pipeline_refine.py` 脚本。
+使用 `script/run_dfa_pipeline_refine.py` 对已有的 Pipeline 结构进行微调。
+
+##### 1. 修改配置
+
+**API 配置**
+
+ * **`CHAT_API_URL`**: LLM 服务地址
+ * **`api_key`**: 访问密钥(使用环境变量 DF_API_KEY)
+ * **`MODEL`**: 模型名称,默认 gpt-4o
+
+**任务配置**
-**配置参数:**
+ * **`INPUT_JSON`**: 待优化的 Pipeline 结构文件路径
+ * **`OUTPUT_JSON`**: 优化后的 Pipeline JSON 结构文件保存路径
+ * **`TARGET`**: 用自然语言描述您希望如何修改 Pipeline
+ * 示例:`"请将Pipeline调整为只包含3个节点,简化数据流"`
-```Python
-# 输入文件:上一步生成的 pipeline 结构文件 (.json)
-INPUT_JSON = "dataflow_cache/session_xxx/final_state.json"
-OUTPUT_JSON = "cache_local/pipeline_refine_result.json" # 输出文件,如果是空字符串仅打印结果
-# 修改目标
-TARGET = "请将 Pipeline 调整为只包含3个节点,简化数据流"
+##### 2. 运行脚本
+
+```bash
+python script/run_dfa_pipeline_refine.py
-LANGUAGE = "en"
-CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/")
-MODEL = os.getenv("DF_MODEL", "gpt-4o")
```
-**运行命令:**
+##### 3. 实战 Case:简化流水线
+
+假设上一步生成的流水线太复杂,包含了多余的“清洗”算子,我们希望将其移除来简化 Pipeline。
+
+**场景配置:**
+
+```python
+# ===== Example config (edit here) =====
+
+# 1. 指定上一步生成的 Pipeline 结构文件
+INPUT_JSON = "dataflow_agent/tmps/pipeline.json"
+
+# 2. 下达修改指令
+TARGET = "请简化中间的清洗算子,简化数据流。"
+
+# 3. 指定结果保存位置
+OUTPUT_JSON = "cache_local/pipeline_refine_result.json.json"
+
+```
-```Bash
-python run_dfa_pipeline_refine.py
-```
\ No newline at end of file
+**运行:**
+Agent 会分析当前 Pipeline 的 JSON 拓扑结构,找到对应的去重节点,将其移除。
\ No newline at end of file