diff --git a/docs/.vuepress/notes/en/guide.ts b/docs/.vuepress/notes/en/guide.ts index 8dc26d906..28fd604cc 100644 --- a/docs/.vuepress/notes/en/guide.ts +++ b/docs/.vuepress/notes/en/guide.ts @@ -116,7 +116,12 @@ export const Guide: ThemeNote = defineNoteConfig({ prefix: 'agent', items: [ "agent_for_data", - "DataFlow-AgentPipelineOrchestration" + "DataFlow-AgentPipelineOrchestration", + "operator_assemble_line", + "operator_qa", + "operator_write", + "pipeline_prompt", + "pipeline_rec&refine" ] }, ], diff --git a/docs/.vuepress/notes/zh/guide.ts b/docs/.vuepress/notes/zh/guide.ts index 942a615d0..ec8307e9a 100644 --- a/docs/.vuepress/notes/zh/guide.ts +++ b/docs/.vuepress/notes/zh/guide.ts @@ -115,7 +115,12 @@ export const Guide: ThemeNote = defineNoteConfig({ prefix: 'agent', items: [ "agent_for_data", - "DataFlow-AgentPipelineOrchestration" + "DataFlow-AgentPipelineOrchestration", + "operator_assemble_line", + "operator_qa", + "operator_write", + "pipeline_prompt", + "pipeline_rec&refine" ] }, // { diff --git a/docs/en/notes/guide/agent/operator_assemble_line.md b/docs/en/notes/guide/agent/operator_assemble_line.md new file mode 100644 index 000000000..efa5d9cd3 --- /dev/null +++ b/docs/en/notes/guide/agent/operator_assemble_line.md @@ -0,0 +1,108 @@ +--- +title: Visualized Operator Assemble Line +createTime: 2026/02/05 22:11:00 +permalink: /en/guide/agent/operator_assemble_line/ +--- + +## 1. Overview + +**Visualized Operator Assemble Line** is a "low-code/no-code" development tool provided by the DataFlow-Agent platform. It allows users to bypass complex Python coding or AI planning processes by directly browsing available operators in the system via a Graphical User Interface (GUI), manually configuring parameters, and assembling them into ordered data processing pipelines. + +The core value of this feature lies in: + +* **What You See Is What You Get**: Real-time viewing of operator parameter definitions and pipeline structures. +* **Automatic Linking**: The system automatically attempts to match the output of a previous operator with the input of the next, simplifying data flow configuration. +* **Code Generation and Execution**: Assembled logic is automatically converted into standard Python code and executed in the background. + +## 2. Features + +This functional module primarily consists of frontend interaction logic (`op_assemble_line.py`) and a backend execution workflow (`wf_df_op_usage.py`). + +### 2.1 Dynamic Operator Loading and Introspection + +The system automatically scans the `OPERATOR_REGISTRY` upon startup, loading all registered operators and categorizing them based on their module paths. + +* **Automatic Parameter Parsing**: Using Python's `inspect` module, the system automatically extracts method signatures from the `init` and `run` methods of operator classes to generate corresponding configuration boxes in the UI. +* **Prompt Template Support**: For operators that support Prompts, the UI automatically reads `ALLOWED_PROMPTS` and provides a dropdown selection box. + +### 2.2 Intelligent Parameter Linking + +During the UI orchestration process, the system features "automatic wiring" capabilities. It analyzes the input-output relationships between adjacent operators, automatically matches keys with similar names, and displays the data flow through visualized connections. + +## 3. User Guide + +This feature provides two modes of use: the **Graphical Interface (Gradio UI)** and **Command-line Scripts**. + +### 3.1 UI Operation + +Ideal for interactive exploration and rapid verification. + +1. **Environment Configuration**: Enter API-related information and the input JSONL file path at the top of the page. +2. **Orchestrate Pipeline**: + 1. **Select Operator**: Choose an operator category and a specific operator from the left dropdown menu. + 2. **Configure Parameters**: Enter parameters into the JSON edit box. + 3. **Add Operator**: Click the "Add Operator to Pipeline" button and drag items in the list below to adjust the execution order. +3. **Run and Results**: Click "Run Pipeline" to view the generated code and a preview of the processed results in the execution result section. + +### 3.2 Script Invocation and Explicit Configuration + +For automated tasks or batch processing, the `run_dfa_op_assemble.py` script can be used. This method bypasses the UI and defines the operator sequence directly through code. + +> **Note: Explicit Configuration Requirement**: Unlike the "Automatic Linking" in the UI, the script mode requires you to **explicitly configure** all parameters. You must ensure that the `output_key` of the previous operator strictly matches the `input_key` of the next; the script will not automatically correct parameter names for you. + +#### 1. Modify Configuration + +Open `run_dfa_op_assemble.py` and modify the configuration area at the top of the file. + +**Key Configuration Item**: **`PIPELINE_STEPS`**—a list defining the pipeline execution steps. Each element contains an `op_name` and `params`. + +```python +# [Pipeline Definition] +PIPELINE_STEPS = [ + { + "op_name": "ReasoningAnswerGenerator", + "params": { + # __init__ parameters (Note: unified into 'params' in wf_df_op_usage) + "prompt_template": "dataflow.prompts.reasoning.math.MathAnswerGeneratorPrompt", + # run parameters + "input_key": "raw_content", + "output_key": "generated_cot" + } + }, + { + "op_name": "ReasoningPseudoAnswerGenerator", + "params": { + "max_times": 3, + "input_key": "generated_cot", + "output_key_answer": "pseudo_answers", + "output_key_answer_value": "pseudo_answer_value", + "output_key_solutions": "pseudo_solutions", + "output_key_correct_solution_example": "pseudo_correct_solution_example" + } + } +] + +``` + +**Other Required Configurations**: + +* `CACHE_DIR`: **Must use an absolute path** to avoid path errors when the generated Python script executes in a subprocess. +* `INPUT_FILE`: The absolute path to the initial data file. + +#### 2. Run Script + +```bash +python run_dfa_op_assemble.py + +``` + +#### 3. Output Results + +After execution, the console will print: + +* **[Generation]**: The path of the generated Python script (e.g., `pipeline_script_pipeline_001.py`). +* **[Code Preview]**: A preview of the first 20 lines of the generated code. +* **[Execution]**: + * `Status: success` indicates successful execution. + * `STDOUT`: Prints the standard output logs from the pipeline runtime. + diff --git a/docs/en/notes/guide/agent/operator_qa.md b/docs/en/notes/guide/agent/operator_qa.md new file mode 100644 index 000000000..f0c126e83 --- /dev/null +++ b/docs/en/notes/guide/agent/operator_qa.md @@ -0,0 +1,112 @@ +--- +title: Operator QA +createTime: 2026/02/05 22:11:00 +permalink: /en/guide/agent/operator_qa/ +--- + +## 1. Overview + +**Operator QA** is a built-in vertical domain expert assistant within the DataFlow-Agent platform. Its core mission is to help users quickly navigate the extensive DataFlow operator library to find required tools, understand their usage, and inspect underlying source code. + +Unlike generic chatbots, Operator QA integrates **RAG (Retrieval-Augmented Generation)** technology. It is equipped with a complete operator index (FAISS) and a metadata knowledge base of the DataFlow project. When a user asks a question, the Agent autonomously decides whether to retrieve information from the knowledge base, which operators to inspect, and provides accurate technical details—including code snippets and parameter descriptions—back to the user. + +## 2. Core Features + +This module is driven by a frontend UI (`operator_qa.py`), an entry script (`run_dfa_operator_qa.py`), and a backend agent (`operator_qa_agent.py`). It possesses the following core capabilities: + +### 2.1 Intelligent Retrieval and Recommendation + +The Agent does more than simple keyword matching; it identifies user needs based on semantic understanding. + +* **Semantic Search**: If a user describes a need like "I want to filter out missing values," the Agent uses vector retrieval to find relevant operators such as `ContentNullFilter`. +* **On-Demand Invocation**: Based on the `BaseAgent` graph mode (`use_agent=True`), the Agent automatically determines whether to call the `search_operators` tool or respond directly based on the conversation context. + +### 2.2 Multi-turn Conversation + +Utilizing the `AdvancedMessageHistory` module, the system maintains a complete session context. + +* **Contextual Memory**: A user can ask, "Which operators can load data?" followed by "How do I fill in **its** parameters?" The Agent can recognize that "its" refers to the operator recommended in the previous turn. +* **State Persistence**: In both script interaction and UI modes, by reusing the same `state` and `graph` instances, the `messages` list accumulates across multiple turns, ensuring the LLM maintains a full memory. + +### 2.3 Visualization and Interaction + +* **Gradio UI**: Provides code previews, operator highlighting, and quick-question buttons. +* **Interaction**: Supports multi-turn Q&A, clearing history, and viewing history. + +## 3. Architectural Components + +### 3.1 OperatorQAAgent + +* Inherits from `BaseAgent` and is configured in ReAct/Graph mode. +* Possesses Post-Tools permissions to call RAG services for data retrieval. +* Responsible for parsing natural language, planning database queries, and generating final natural language responses. + +### 3.2 OperatorRAGService + +* A service layer decoupled from the Agent. +* Manages the FAISS vector index and `ops.json` metadata. +* Provides underlying capabilities such as `search` (vector search), `get_operator_info` (fetch details), and `get_operator_source` (fetch source code). + +## 4. User Guide + +### 4.1 UI Operation + +1. **Configure Model**: In the "Configuration" panel on the right, verify the API URL and Key, and select a model (defaults to `gpt-4o`). +2. **Initiate Inquiry**: + 1. **Dialogue Box**: Type your question. + 2. **Quick Buttons**: Click "Quick Question" buttons, such as "Which operator filters missing values?" to start instantly. +3. **View Results**: + 1. **Chat Area**: Displays the Agent's response and citations. + 2. **Right Panel**: + * `Related Operators`: Lists operator names retrieved by the Agent. + * `Code Snippets`: Displays Python source code if specific implementations are involved. + +### 4.2 Script Invocation and Explicit Configuration + +Beyond the UI, the system provides the `run_dfa_operator_qa.py` script, which supports running the Q&A service through explicit code configuration—ideal for development and debugging. + +**Configuration Method:** Directly modify the constant configuration area at the top of the script without passing command-line arguments: + +```python +# ===== Example config (edit here) ===== +INTERACTIVE = False # True for multi-turn mode, False for single query +QUERY = "Which operator should I use to filter missing values?" # Question for single query + +LANGUAGE = "en" +SESSION_ID = "demo_operator_qa" +CACHE_DIR = "dataflow_cache" +TOP_K = 5 # Number of retrieval results + +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +API_KEY = os.getenv("DF_API_KEY", "") +MODEL = os.getenv("DF_MODEL", "gpt-4o") + +OUTPUT_JSON = "" # e.g., "cache_local/operator_qa_result.json"; empty string means no file saving + +``` + +**Execution Modes:** + +1. **Single Query Mode** (`INTERACTIVE = False`): Executes a single `QUERY`; results can be printed or saved as a JSON file. +2. **Interactive Mode** (`INTERACTIVE = True`): Starts a terminal dialogue loop supporting `exit` to quit, `clear` to reset context, and `history` to view session history. + +**Core Logic:** The script demonstrates how to explicitly construct `DFRequest` and `MainState`, and manually build the execution graph: + +```python +# 1. Explicitly construct the request +req = DFRequest( + language=LANGUAGE, + chat_api_url=CHAT_API_URL, + api_key=API_KEY, + model=MODEL, + target="" # Populated before each query +) + +# 2. Initialize state and graph +state = MainState(request=req, messages=[]) +graph = create_operator_qa_graph().build() + +# 3. Execute +result = await run_single_query(state, graph, QUERY) + +``` diff --git a/docs/en/notes/guide/agent/operator_write.md b/docs/en/notes/guide/agent/operator_write.md new file mode 100644 index 000000000..d1ce90e0e --- /dev/null +++ b/docs/en/notes/guide/agent/operator_write.md @@ -0,0 +1,120 @@ +--- +title: Operator Write +createTime: 2026/02/05 22:11:00 +permalink: /en/guide/agent/operator_write/ +--- + +## 1. Overview + +**Operator Write** is the core productivity module of the DataFlow-Agent. It is not merely a tool for generating Python code based on user requirements but rather builds a closed-loop system for **generation, execution, and debugging**. + +This workflow enables: + +1. **Semantic Matching**: Understanding user intent (e.g., "filter missing values") and finding the best-matching base class within the existing operator library. +2. **Code Generation**: Writing executable operator code based on the base class and user data samples. +3. **Automatic Injection**: Automatically injecting LLM service capabilities into the operator if needed. +4. **Subprocess Execution**: Instantiating and running the generated operator in a controlled environment. +5. **Self-Healing**: Launching a Debugger to analyze stack traces if execution fails, automatically modifying the code, and retrying until success or the maximum retry limit is reached. + +## 2. Core Features + +### 2.1 Intelligent Code Generation + +* **Sample-Based Programming**: The Agent reads actual data samples (calling the pre-tool `local_tool_for_sample`) and the data Schema to ensure the generated code correctly handles real field names and data types. +* **Operator Reuse**: The system prioritizes retrieving existing operator libraries (calling the pre-tool `match_operator`) to generate code inherited from existing base classes rather than starting from scratch, ensuring code standardization and maintainability. + +### 2.2 Automatic Debugging Loop + +This is a system equipped with self-reflection capabilities. + +* **Execution Monitoring**: At the `llm_instantiate` node, the system attempts to execute the generated code (`exec(code_str)`) and captures standard output and standard errors. +* **Error Diagnosis**: If an exception occurs, the `code_debugger` Agent analyzes the error stack (`error_trace`) and the current code to generate repair suggestions (`debug_reason`). +* **Auto-Rewrite**: The `rewriter` Agent regenerates the code based on the repair suggestions, automatically updates the file, and enters the next round of testing. + +### 2.3 LLM Service Injection + +For complex operators requiring Large Model calls (e.g., "generate summary based on content"), the `llm_append_serving` node automatically injects standard LLM call interfaces (`self.llm_serving`) into the operator code, empowering it with AI capabilities. + +## 3. Workflow Architecture + +This feature is orchestrated by `wf_pipeline_write.py`, forming a directed graph containing conditional loops. + +1. **Match Node**: Retrieves reference operators. +2. **Write Node**: Writes the initial code. +3. **Append Serving Node**: Injects LLM capabilities. +4. **Instantiate Node**: Attempts to run the code. +5. **Debugger Node** (Conditional Trigger): Analyzes errors. +6. **Rewriter Node**: Fixes the code. + +## 4. User Guide + +This feature provides two modes of usage: **Graphical Interface (Gradio UI)** and **Command Line Script**. + +### 4.1 UI Operation + +The frontend page code is located in `operator_write.py`, offering a visualized interactive experience. + +#### 1. Configure Inputs + +Configure the following in the left panel of the page: + +* **Target Description**: Describe in detail the function and purpose of the operator you want to create. + * Example: "Create an operator that performs sentiment analysis on text." +* **Operator Category**: The category the operator belongs to, used for matching similar operators as references. Defaults to `"Default"`. Options include `"filter"`, `"mapper"`, `"aggregator"`, etc.. +* **Test Data File**: Specify the `.jsonl` file path used for testing the generated operator. Defaults to the project's built-in `tests/test.jsonl`. +* **Debug Settings**: + * `Enable Debug Mode`: If checked, the system automatically attempts to fix the code if an error occurs. + * `Max Debug Rounds`: Set the maximum number of automatic repair attempts (default is 3). +* **Output Path**: Specify the save path for the generated code (optional). + +#### 2. View Results + +After clicking the **"Generate Operator"** button, the right panel displays detailed results: + +* **Generated Code**: Final usable Python code, supporting syntax highlighting. +* **Matched Operators**: Displays the list of reference operators found by the system in the library (e.g., `"LangkitSampleEvaluator"`, `"LexicalDiversitySampleEvaluator"`, `"PresidioSampleEvaluator"`, `"PerspectiveSampleEvaluator"`, etc.). +* **Execution Result**: Shows `success: true/false` and specific log information `stdout`/`stderr`. +* **Debug Info**: If debugging was triggered, this displays the runtime captured `stdout`/`stderr` and the selected input field key (`input_key`). +* **Agent Results**: Detailed execution results for each Agent node. +* **Execution Log**: Complete execution log information, facilitating the troubleshooting of the Agent's thought process. + +### 4.2 Script Invocation and Explicit Configuration + +For developers or automated tasks, `run_dfa_operator_write.py` can be executed directly. + +#### 1. Modify Configuration + +Open `run_dfa_operator_write.py` and modify the parameters in the configuration area at the top of the file: + +```python +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +MODEL = os.getenv("DF_MODEL", "gpt-4o") +LANGUAGE = "en" + +TARGET = "Create an operator that filters out missing values and keeps rows with non-empty fields." +CATEGORY = "Default" # Fallback category (if classifier misses) +OUTPUT_PATH = "" # e.g., "cache_local/my_operator.py"; empty string means no file saving +JSON_FILE = "" # Empty string uses project built-in tests/test.jsonl + +NEED_DEBUG = False +MAX_DEBUG_ROUNDS = 3 + +``` + +#### 2. Run Script + +```bash +python run_dfa_operator_write.py + +``` + +#### 3. Output Results + +The script will print key information to the console: + +* `Matched ops`: The matched reference operators. +* `Code preview`: A preview fragment of the generated code. +* `Execution Result`: + * `Success: True` indicates code generation and execution passed. + * `Success: False` will print `stderr preview` for troubleshooting. +* `Debug Runtime Preview`: Displays the automatically selected `input_key` and runtime logs. diff --git a/docs/en/notes/guide/agent/pipeline_prompt.md b/docs/en/notes/guide/agent/pipeline_prompt.md new file mode 100644 index 000000000..0515a270b --- /dev/null +++ b/docs/en/notes/guide/agent/pipeline_prompt.md @@ -0,0 +1,114 @@ +--- +title: Operator Reuse / Prompt Optimization +createTime: 2026/02/05 22:11:00 +permalink: /en/guide/agent/pipeline_prompt/ +--- + +## 1. Overview + +**Prompt Optimization** is the core module of DataFlow-Agent designed for **Prompt-Engineering**. Its design goal is to solve the problem of "generic operator logic reuse". + +This module adopts a **single-node architecture**. When a user proposes a new data processing requirement, the Agent not only writes a Prompt that complies with operator specifications but also **automatically generates synthetic test data** and internally builds and runs test scripts. + + +## 2. Core Features + +### 2.1 Example-Based Generation + +* **Operator Code Analysis**: The Agent automatically reads the source code of the target operator (`OP_NAME`) and extracts its parameter definitions. +* **Prompt Migration**: The system retrieves existing Prompt cases from the operator library and uses them as context to guide the LLM in generating a new Prompt class that conforms to the operator's interface specifications (e.g., `init` parameter structure). + +### 2.2 Self-Verification Based on Synthetic Data + +The generated Prompt does not merely remain as text; the Agent immediately tests it. + +* **Data Synthesis**: The Agent **does not require** large-scale business data from the user for testing. Instead, it utilizes the LLM to analyze the operator logic and automatically generates a set of **synthetic test data** covering various edge cases, saving it as a temporary JSONL file. +* **Subprocess Execution**: The Agent internally and automatically builds a temporary Python test script and launches a subprocess to execute it, verifying whether the generated Prompt runs correctly and produces the expected results. + +### 2.3 Iterative Optimization + +* **Interactive Feedback**: The system does not perform blind automatic retries. Users input modification suggestions after viewing the test results, the generated Prompt, and data previews on the frontend. +* **Targeted Hot Update**: Upon receiving feedback, the backend `PromptWriter` calls the `revise_with_feedback` method to make targeted modifications to the Prompt code while maintaining the existing context, automatically triggering a new round of the testing loop. + +## 3. System Architecture + +This function is defined by `wf_pipeline_prompt.py`, featuring a core **single-node** workflow. All generation and verification logic are highly cohesive within the `PromptWriter` Agent. + +### 3.1 Core Node Process + +The **Prompt Writer Node** is the only node in the graph, executing the following internal logic in sequence: + +1. **Context Retrieval**: Calls Pre-tools to obtain the target operator's source code, the user's target, and Prompt examples. +2. **Prompt Generation**: Calls the LLM to generate the Prompt class code in Python form. It then saves the generated code to the `state` object via the `update_state_result` method and writes it to a local file to provide dependencies for subsequent test steps. +3. **Test Data Synthesis**: Calls the internal method `_build_test_data_by_llm` to generate synthetic test data based on the task description. +4. **Test Script Construction**: Calls the internal method `_build_test_code` to generate a temporary test script using string templates. +5. **Subprocess Execution**: Uses `subprocess` to run the test script and captures standard output (stdout) and standard error (stderr). +6. **Test Result Output**: Scans and reads the test result file generated by the subprocess execution, updates the test results into `state.temp_data`, and completes the process. + +### 3.2 Iterative Optimization Mechanism + +The optimization process depends on **frontend interaction**: + +1. The user views the execution results in the UI. +2. The user submits feedback. +3. The frontend calls `_on_chat_submit`, triggering the Agent's `revise_with_feedback` interface. +4. The Agent modifies the code based on the feedback and re-executes the validation phase described above (Test Data Synthesis -> Test Script Construction -> Subprocess Execution). + +## 4. User Guide + +### 4.1 Graphical Interface + +The frontend code is located in `PA_frontend.py`, providing a complete interactive development environment. + +**Initial Generation:** + +1. Configure API information (URL, Key, Model). +2. Fill in the task description and operator name. +3. (Optional) Specify the output format, argument list, and file output root path. +4. Click the "Generate Prompt Template" button. +5. View the test data, test results, Prompt code, and test code generated by the Agent. + +**Multi-turn Optimization:** + +1. If the results do not meet expectations, enter improvement suggestions in the dialogue box on the right. +2. Click "Send Rewrite Instruction". +3. View the updated code and test results. +4. Repeat steps 1-3 until a satisfactory result is obtained. + +**Using the Generated Prompt:** + +1. Get the generated Prompt file location from "Prompt File Path". +2. Import the Prompt class into your operator. +3. Specify `prompt_template` in the operator's `init()`. + +### 4.2 Script Invocation + +Use the `run_dfa_pipeline_prompt.py` script for automated generation. + +**Configuration Parameters:** + +```python +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +MODEL = os.getenv("DF_MODEL", "gpt-4o") +LANGUAGE = "en" + +TASK_DESCRIPTION = "Write a prompt for an operator that filters missing values" +OP_NAME = "PromptedFilter" + +# These two items are only required if the operator does not possess any preset prompts; +# otherwise, it will generate based on existing prompts. +OUTPUT_FORMAT = "" # e.g., "Return JSON with keys: ..." +ARGUMENTS = [] # e.g., ["min_len=10", "drop_na=true"] + +# Cache directory used to store test data and prompts +CACHE_DIR = "./pa_cache" +DELETE_TEST_FILES = True + +``` + +**Run Command:** + +```bash +python run_dfa_pipeline_prompt.py + +``` \ No newline at end of file diff --git a/docs/en/notes/guide/agent/pipeline_rec&refine.md b/docs/en/notes/guide/agent/pipeline_rec&refine.md new file mode 100644 index 000000000..8769b0d2d --- /dev/null +++ b/docs/en/notes/guide/agent/pipeline_rec&refine.md @@ -0,0 +1,236 @@ +--- +title: Pipeline Recommendation & Refinement +createTime: 2026/02/05 22:11:00 +permalink: /en/guide/agent/pipeline_rec&refine/ +--- + +This module contains two closely collaborative core subsystems: + +1. **Pipeline Recommendation**: Responsible for the "0 to 1" process, transforming natural language requirements into complete executable Pipelines. +2. **Pipeline Refinement**: Responsible for the "1 to N" process, fine-tuning existing Pipeline structures based on user feedback. + + +## Part 1: Pipeline Recommendation + +### 1. Overview + +**Pipeline Recommendation** is the core orchestration engine of the DataFlow-Agent. It understands complex business requirements, automatically decomposes task steps, retrieves optimal components from the operator library, plans data flow, and generates executable Python code. + +The system possesses self-healing capabilities: when generated code fails to execute, the Agent proactively consults operator source code documentation, analyzes the cause of the error, and corrects the code until execution succeeds. + +### 2. System Architecture + +This function is orchestrated by `wf_pipeline_recommend_extract_json.py`, forming a directed graph containing multiple levels of intelligent agents. The detailed responsibilities of the nodes are as follows: + +#### 2.1 Analysis and Planning Phase + +1. **Classifier Node** + 1. **Responsibility**: Reads a small number of data samples to identify data types and business domains. This determines the tendency of subsequent operator recommendations. + 2. **Input**: `state.request.json_file` (data file path). + 3. **Output**: `state.category`. +2. **Target Parser Node** + 1. **Core Task (What it does)**: Acts as a business analyst. It does not directly generate code but translates vague user requirements into logically rigorous steps. + 2. **Input**: The user's natural language requirement (e.g., "Filter out texts shorter than 10 in the pdf, then deduplicate, and finally extract keywords"). + 3. **LLM Thinking**: Decomposes the requirement into a standard list of steps conforming to data processing logic (e.g., `["Read and parse pdf into plain text", "Filter out text data shorter than 10 characters", "Deduplicate text data to remove repetitive content", "Extract keywords from text data"]`). + 4. **Subsequent Action**: Uses the descriptions of the decomposed steps to retrieve the most similar physical operators from the operator vector database, forming a **candidate operator pool** for use in the next stage. +3. **Recommender Node** + 1. **Core Task**: Responsible for turning scattered candidate operators into an organized execution plan. + 2. **Input**: + * `target`: The user's original requirement. + * `sample`: Data samples (to understand data characteristics, such as field names and formats). + * `split_ops`: The list of candidate operators and their functional descriptions retrieved via RAG by the `target_parser` in the previous step. + 3. **LLM Thinking**: + * **Logical Sequencing**: Each stage is not limited to a single operator but follows the "requirement". + * **Data Compatibility**: If an operator requires field "X" but it does not exist in the sample data, it must ensure an operator creating that field precedes it. + * **Gap Filling**: Can existing operators meet the requirement? If not, a versatile `PromptedGenerator` needs to be inserted. + 4. **Output**: An ordered list of operator names and recommendation reasons, such as: + ```json + { + "ops": [ + "Text2SQLQuestionGenerator", + "SQLExecutionFilter", + "SQLConsistencyFilter", + "SQLVariationGenerator", + "Text2SQLQuestionGenerator", + "Text2SQLPromptGenerator", + "Text2SQLCoTGenerator", + "ReasoningQuestionSolvableSampleEvaluator", + "SQLComponentClassifier", + "PromptedGenerator" + ], + "reason": "This pipeline design aims to meet all user requirements. + 1. First, use Text2SQLQuestionGenerator to parse the SQL data file and extract SQL statements and corresponding natural language questions. + 2. Next, use SQLExecutionFilter to execute SQL statements in the database to verify their validity. + 3. Then, use SQLConsistencyFilter for consistency filtering to ensure SQL statements match their corresponding natural language questions. + 4. Next, use SQLVariationGenerator to augment valid SQL statements, including value replacement, increasing syntax difficulty, and changing writing styles. + 5. Subsequently, use Text2SQLQuestionGenerator to generate corresponding natural language questions based on the augmented SQL statements. + 6. Next, use Text2SQLPromptGenerator to generate prompt content, and generate the Chain of Thought reasoning process via Text2SQLCoTGenerator. + 7. Then, use ReasoningQuestionSolvableSampleEvaluator to classify the generated data, assessing the difficulty for large models to solve the problem, and use SQLComponentClassifier to assess the difficulty of SQL components. + 8. Finally, use PromptedGenerator to output synthetic SQL data and its corresponding natural language questions and reasoning processes to ensure all requirements are met." + } + + ``` + + +#### 2.2 Construction and Execution Phase + +1. **Builder Node** + 1. **Responsibility**: Converts the recommendation plan (JSON) into an actual Python code file (`pipeline.py`) and launches a subprocess to execute that code. + 2. **Mechanism**: Supports creating subprocesses to execute code, capturing standard output (stdout) and standard error (stderr). + 3. **Output**: `state.execution_result` (Success/Fail status and logs). + + + +#### 2.3 Automatic Repair Loop + +When the `builder` execution fails and `need_debug=True`, it enters this loop: + +1. **Debugger Node** + * **Responsibility**: Analyzes the error stack (`error_trace`) and current code to determine the error type (parameter error, logic error, etc.). + + +2. **Info Requester Node** + * **Responsibility**: This is an active learning node. If the Debugger deems information insufficient, it calls tools to read the **source code** or **documentation** of relevant operators to obtain context. + + +3. **Rewriter Node** + 1. **Responsibility**: Synthesizes error logs and source code knowledge found by the InfoRequester to generate the complete repaired code. + 2. **Flow**: The repaired code is sent back to the `builder` for testing until success or the maximum number of retries (`max_debug_rounds`) is reached. + + + +#### 2.4 Output Phase + +* **Exporter Node** + * **Responsibility**: After successful execution, organizes the final Pipeline information, code paths, and data samples, formatting the output for the user. + + + +### 3. User Guide + +#### 3.1 Graphical Interface + +Code located in `pipeline_rec.py`. + +1. **Configure Inputs**: + 1. Enter your requirements in the "Target Description" box. + 2. Input the jsonl file to be processed. + 3. Configure API information (URL, Key, Model). + 4. (Optional) Configure embedding model and debug options. + 5. Select whether to automatically update the vector index (needs to be checked if operators are not in the registry). + 6. Select whether to use debug mode (debug mode will automatically run the generated Pipeline code until the maximum iteration rounds). + + +2. **Generate Pipeline**: + + Click **"Generate Pipeline"**. + + +3. **View Results**: + 1. **Pipeline Code**: View the final generated pipeline code. + 2. **Execution Log**: View execution log information. + 3. **Agent Results**: Detailed execution results of each Agent node, including the recommended operator list, construction process, etc.. + 4. **Pipeline JSON**: Generated Pipeline topology JSON, containing operator node lists and inter-node connection relationships. + + + +#### 3.2 Script Invocation + +Use the `run_dfa_pipeline_recommend.py` script. + +**Configuration Parameters:** + +```python +LANGUAGE = "en" +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +MODEL = os.getenv("DF_MODEL", "gpt-4o") +TARGET = "Just give me simple filtering or deduplication operators, only 2 operators needed" +NEED_DEBUG = True # Enable auto-repair +MAX_DEBUG_ROUNDS = 5 # Maximum repair attempts +CACHE_DIR = "dataflow_cache" +TEST_JSON_REL_PATH = "tests/test.jsonl" # Test data path + +``` + +**Run Command:** + +```bash +python run_dfa_pipeline_recommend.py + +``` + +**Output:** After running, the script generates `pipeline.py`, `final_state.json`, and `graph.png` under `dataflow_cache/session_{id}/`. + + +## Part 2: Pipeline Refinement + +### 1. Overview + +Pipeline Refinement allows users to fine-tune generated DataFlow Pipelines using natural language. Users do not need to manually modify complex JSON configurations or Python code; simply inputting instructions like "delete the intermediate filter node" allows the system to intelligently parse the intent and automatically adjust the Pipeline's topology. + +### 2. System Architecture + +This function is orchestrated by `wf_pipeline_refine.py`, adopting a three-stage architecture of **Analyzer -> Planner -> Refiner**: + +#### 2.1 Refine Target Analyzer + +* **Core Responsibilities**: + * **Intent Recognition**: Compares the current Pipeline structure (`state.pipeline_structure_code`) and the user's natural language requirement (`target`) to analyze the type of modification the user wishes to make (add, delete, modify). + * **Pre-emptive RAG**: This is a key feature. The Analyzer parses descriptions of sub-operations implied in user requirements and directly calls RAG search `_get_operators_by_rag_with_scores`. It calculates similarity scores, evaluates match quality, and packages the best-matching operator code `code_snippet` and warning messages into `op_contexts`. +* **Input**: `state.pipeline_structure_code` (current pipeline code), `state.request.target` (user modification instruction). +* **Output**: Intent analysis results containing `needed_operators_desc`, and `op_contexts` containing rich context (operator code, match scores). + +#### 2.2 Refine Planner + +* **Responsibility**: Based on the intent provided by the Analyzer and the pre-retrieved operator context, formulates a specific **modification plan**. It does not directly modify code but generates structured operational steps. +* **Input**: Analyzer's analysis results (`intent`), operator context (`op_context`), current node summary. +* **Output**: A list of structured operational steps, for example: + * `REMOVE_NODE: node_filter_1` + * `ADD_NODE: node_deduplicate (after node_loader)` + * `UPDATE_EDGE: node_loader -> node_deduplicate`. + + + +#### 2.3 JSON Pipeline Refiner + +* **Responsibility**: Executes the Planner's plan, directly manipulating the Nodes and Edges of the Pipeline's JSON data structure. +* **Tool Enhancement**: This Agent has `search_operator_by_description` and `get_operator_code_by_name` mounted as Post-Tools. Although the Analyzer has already provided `op_context`, if the Refiner finds information insufficient during execution, it can still proactively initiate a search to supplement operator information. +* **Output**: Updated `state.pipeline_structure_code`. + +### 3. User Guide + +#### 3.1 Graphical Interface + +Integrated at the bottom of the `pipeline_rec.py` page. + +1. **Prerequisite**: Must first click "Generate Pipeline" at the top of the page to generate initial pipeline code, at which point `pipeline_json_state` will be initialized. +2. **Input Optimization Instruction**: Enter instructions in the "Optimization Requirement" text box. +3. **Execute Optimization**: Click **"Refine Pipeline"**. The system will display the updated Python code, JSON structure, and Agent execution logs. +4. **History Backtracking**: Use "Previous Round" and "Next Round" buttons to switch between different optimization versions and view the code evolution process. +5. **Warning Prompts**: If RAG match quality is low, an `Optimization Warning` comment will be automatically added to the top of the code, alerting the user that the currently generated operator may not fully match the requirement. + +#### 3.2 Script Invocation + +Use the `run_dfa_pipeline_refine.py` script. + +**Configuration Parameters:** + +```python +# Input file: pipeline structure file generated in the previous step (.json) +INPUT_JSON = "dataflow_cache/session_xxx/final_state.json" +OUTPUT_JSON = "cache_local/pipeline_refine_result.json" # Output file; if empty string, only prints result +# Modification Target +TARGET = "Please adjust the Pipeline to contain only 3 nodes, simplifying data flow" + +LANGUAGE = "en" +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +MODEL = os.getenv("DF_MODEL", "gpt-4o") + +``` + +**Run Command:** + +```bash +python run_dfa_pipeline_refine.py + +``` \ No newline at end of file diff --git a/docs/zh/notes/guide/agent/operator_assemble_line.md b/docs/zh/notes/guide/agent/operator_assemble_line.md new file mode 100644 index 000000000..629a80e92 --- /dev/null +++ b/docs/zh/notes/guide/agent/operator_assemble_line.md @@ -0,0 +1,105 @@ +--- +title: 可视化算子编排 +createTime: 2026/02/05 22:11:00 +permalink: /zh/guide/agent/operator_assemble_line/ +--- + +## 1. 概述 + +**可视化算子编排** 是 DataFlow-Agent 平台提供的一个“低代码/无代码”开发工具。它允许用户跳过复杂的 Python 编码或 AI 规划过程,直接通过图形用户界面(UI)浏览系统中的可用算子(Operators),手动配置参数,并将它们组装成有序的数据处理流水线。 + +该功能的核心价值在于: + +- **所见即所得**:实时查看算子参数定义和流水线结构。 +- **自动链接**:系统会自动尝试将上一个算子的输出(Output)与下一个算子的输入(Input)进行匹配,简化数据流配置。 +- **代码生成与执行**:编排好的逻辑会被自动转换为标准的 Python 代码并在后台执行。 + +## 2. 功能特性 + +该功能模块主要由前端交互逻辑 (op_assemble_line.py) 和后端执行工作流 (wf_df_op_usage.py) 组成。 + +### 2.1 动态算子加载与自省 + +系统启动时会自动扫描 `OPERATOR_REGISTRY`,加载所有注册的算子,并根据其模块路径自动分类。 + +- **参数自动解析**:系统通过 Python 的 `inspect` 模块自动提取算子类的 `init` 和 `run` 方法签名,在 UI 上生成对应的配置框。 +- **Prompt 模板支持**:对于支持 Prompt 的算子,UI 会自动读取 `ALLOWED_PROMPTS` 并提供下拉选择框。 + +### 2.2 智能参数链接 + +在 UI 编排过程中,系统具备“自动接线”能力。它会分析相邻两个算子的输入输出关系,自动匹配名称相似的 Key,并通过可视化连线展示数据流向。 + +## 3. 使用指南 + +本功能提供 **图形界面 (Gradio UI)** 和 **命令行脚本** 两种使用方式。 + +### 3.1 界面操作 + +适合交互式探索和快速验证。 + +1. **环境配置**:在页面顶部填写 API 相关信息和输入 JSONL 文件路径。 +2. **编排流水线**: + 1. **选择算子**:从左侧下拉框选择算子分类和具体算子。 + 2. **配置参数**:在 JSON 编辑框中填入参数。 + 3. **添加算子**:点击“添加算子到 Pipeline”按钮,并在下方列表中拖拽调整执行顺序。 +3. **运行与结果**:点击“运行 Pipeline”,在下方执行结果处查看生成的代码和处理结果数据预览。 + +### 3.2 脚本调用与显式配置 + +对于自动化任务或批量处理,可以使用 `run_dfa_op_assemble.py` 脚本。此方式跳过 UI,直接通过代码定义算子序列。 + +> **注意:显式配置要求** 与 UI 的“自动链接”不同,脚本模式下您必须**显式配置**所有参数。您需要确保上一个算子的 `output_key` 与下一个算子的 `input_key` 严格匹配,脚本不会自动为您纠正参数名。 + +#### 1. 修改配置 + +打开 `run_dfa_op_assemble.py`,在文件顶部的配置区域进行修改。 + +**关键配置项**:**`PIPELINE_STEPS`** 这是一个列表,定义了 Pipeline 的执行步骤。每个元素包含 `op_name` 和 `params`。 + +```Python +# [Pipeline 定义] +PIPELINE_STEPS = [ + { + "op_name": "ReasoningAnswerGenerator", + "params": { + # __init__ 参数 (注意:在 wf_df_op_usage 中统一合并为 params) + "prompt_template": "dataflow.prompts.reasoning.math.MathAnswerGeneratorPrompt", + # run 参数 + "input_key": "raw_content", + "output_key": "generated_cot" + } + }, + { + "op_name": "ReasoningPseudoAnswerGenerator", + "params": { + "max_times": 3, + "input_key": "generated_cot", + "output_key_answer": "pseudo_answers", + "output_key_answer_value": "pseudo_answer_value", + "output_key_solutions": "pseudo_solutions", + "output_key_correct_solution_example": "pseudo_correct_solution_example" + } + } +] +``` + +**其他必要配置:** + +- `CACHE_DIR`: **必须使用绝对路径**,以避免生成的 Python 脚本在子进程执行时出现路径错误。 +- `INPUT_FILE`: 初始数据文件的绝对路径。 + +#### 2. 运行脚本 + +```Bash +python run_dfa_op_assemble.py +``` + +#### 3. 结果输出 + +脚本执行后,控制台将打印: + +- **[Generation]**: 生成的 Python 脚本路径 (`pipeline_script_pipeline_001.py`)。 +- **[Code Preview]**: 生成代码的前 20 行预览。 +- **[Execution]**: + - `Status: success` 表示执行成功。 + - `STDOUT`: 打印 pipeline 运行时的标准输出日志。 \ No newline at end of file diff --git a/docs/zh/notes/guide/agent/operator_qa.md b/docs/zh/notes/guide/agent/operator_qa.md new file mode 100644 index 000000000..e8c2493fc --- /dev/null +++ b/docs/zh/notes/guide/agent/operator_qa.md @@ -0,0 +1,110 @@ +--- +title: 算子问答 +createTime: 2026/02/05 22:11:00 +permalink: /zh/guide/agent/operator_qa/ +--- + +## 1. 概述 + +**算子智能问答 (Operator QA)** 是 DataFlow-Agent 平台内置的垂直领域专家助手。它的核心使命是帮助用户快速在海量的 DataFlow 算子库中找到所需的工具,理解其用法,并查看底层源码。 + +不同于通用的聊天机器人,Operator QA 集成了 **RAG(检索增强生成)** 技术。它挂载了 DataFlow 项目的完整算子索引(FAISS)和元数据知识库。当用户提问时,Agent 会自主决定是否需要检索知识库,检索哪些算子,并将准确的技术细节(包括代码片段、参数说明)反馈给用户。 + +## 2. 核心特性 + +该功能模块由前端 UI (operator_qa.py)、脚本入口 (run_dfa_operator_qa.py) 和后端智能体 (operator_qa_agent.py) 共同驱动,具备以下核心能力: + +### 2.1 智能检索与推荐 + +Agent 并非简单地进行关键词匹配,而是基于语义理解用户的需求。 + +- **语义搜索**:用户只需描述“我想过滤掉缺失值”,Agent 会通过向量检索找到 `ContentNullFilter`等相关算子。 +- **按需调用**:基于 `BaseAgent` 的图模式 (`use_agent=True`),Agent 会根据对话上下文自动判断是否需要调用 `search_operators` 工具,或者直接基于上下文回答。 + +### 2.2 多轮对话 + +利用 `AdvancedMessageHistory` 模块,系统维护了完整的会话上下文。 + +- **上下文记忆**:用户可以先问“有哪些加载数据的算子?”,接着问“**它**的参数怎么填?”。Agent 能识别“它”指的是上一轮推荐的算子。 +- **状态保持**:在脚本交互模式或 UI 中,通过复用同一个 `state` 和 `graph` 实例,`messages` 列表会在多轮对话中累积,确保 LLM 拥有完整记忆。 + +### 2.3 可视化与交互 + +- **Gradio UI**:提供代码预览、算子高亮和快捷提问按钮。 +- **交互**:支持多轮问答,支持清除历史、查看历史等。 + +## 3. 架构组件 + +### 3.1 OperatorQAAgent + +- 继承自 `BaseAgent`,配置为 ReAct/Graph 模式。 +- 它拥有后置工具(Post-Tools)权限,可以调用 RAG 服务获取数据。 +- 它负责解析用户的自然语言,规划是否查库,并生成最终的自然语言回复。 + +### 3.2 OperatorRAGService + +- 这是一个与 Agent 解耦的服务层。 +- 管理 FAISS 向量索引和 `ops.json` 元数据。 +- 提供 `search`(向量搜索)、`get_operator_info`(获取详情)、`get_operator_source`(获取源码)等底层能力。 + +## 4. 使用指南 + +### 4.1 界面操作 + +1. **配置模型**:在右侧“配置”面板确认 API URL 和 Key,并选择模型(默认使用 `gpt-4o`)。 +2. **发起提问**: + 1. **对话框**:输入你的问题。 + 2. **快捷按钮**:也可以点击下方的“快捷问题”按钮,如“过滤缺失值用什么算子?”快速开始。 +3. **查看结果**: + 1. **对话区**:显示 Agent 的回答和引用来源。 + 2. **右侧面板**: + - `相关算子`:列出 Agent 检索到的算子名称。 + - `代码片段`:如果涉及具体实现,这里会显示 Python 源码。 + +### 4.2 脚本调用与显式配置 + +除了 UI 界面,系统提供了 `run_dfa_operator_qa.py` 脚本,支持通过代码显式配置参数来运行问答服务。这种方式适合开发调试。 + +**配置方式:** 直接修改脚本顶部的常量配置区域,无需通过命令行参数传递: + +```Python +# ===== Example config (edit here) ===== +INTERACTIVE = False # True开启多轮交互模式,False为单次查询 +QUERY = "我想过滤掉缺失值用哪个算子?" # 单次查询的问题 + +LANGUAGE = "zh" +SESSION_ID = "demo_operator_qa" +CACHE_DIR = "dataflow_cache" +TOP_K = 5 # 检索结果数量 + +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +API_KEY = os.getenv("DF_API_KEY", "") +MODEL = os.getenv("DF_MODEL", "gpt-4o") + +OUTPUT_JSON = "" # e.g. "cache_local/operator_qa_result.json";空字符串表示不落盘 +``` + +**运行模式:** + +1. **单次查询模式** (`INTERACTIVE = False`): 执行单次 `QUERY`,结果可直接打印或保存为 JSON 文件。 +2. **交互模式** (`INTERACTIVE = True`): 启动终端对话循环,支持 `exit` 退出、`clear` 清除上下文、`history` 查看历史。 + +**核心代码逻辑:** 脚本演示了如何显式构造 `DFRequest` 和 `MainState`,并手动构建执行图: + +```Python +# 1. 显式构造请求 +req = DFRequest( + language=LANGUAGE, + chat_api_url=CHAT_API_URL, + api_key=API_KEY, + model=MODEL, + target="" # 每次查询前再写入 +) + +# 2. 初始化状态与图 +state = MainState(request=req, messages=[]) +graph = create_operator_qa_graph().build() + +# 3. 执行 +result = await run_single_query(state, graph, QUERY) +``` \ No newline at end of file diff --git a/docs/zh/notes/guide/agent/operator_write.md b/docs/zh/notes/guide/agent/operator_write.md new file mode 100644 index 000000000..594514a3f --- /dev/null +++ b/docs/zh/notes/guide/agent/operator_write.md @@ -0,0 +1,118 @@ +--- +title: 算子编写 +createTime: 2026/02/05 22:11:00 +permalink: /zh/guide/agent/operator_write/ +--- + +## 1. 概述 + +**算子编写 (Operator Write)** 是 DataFlow-Agent 的核心生产力模块。它不仅仅是根据用户需求生成一段 Python 代码,而是构建了一个闭环的**生成-执行-调试**系统。 + +该工作流能够: + +1. **语义匹配**:理解用户意图(如“过滤缺失值”),在现有算子库中寻找最匹配的基类。 +2. **代码生成**:基于基类和用户数据样例,编写可执行的算子代码。 +3. **自动注入**:如果需要,为算子注入 LLM 服务能力。 +4. **子进程执行**:在一个受控环境中实例化并运行生成的算子。 +5. **自我修复**:如果执行报错,启动 Debugger 分析堆栈信息,自动修改代码并重试,直到成功或达到最大重试次数。 + +## 2. 核心特性 + +### 2.1 智能代码生成 + +- **基于样例编程**:Agent 会读取实际的数据样例 (调用前置工具`local_tool_for_sample`) 和数据 Schema,确保生成的代码能够正确处理真实的字段名和数据类型。 +- **算子复用**:系统优先检索现有的算子库(调用前置工具`match_operator`),生成继承自现有基类的代码,而不是从零开始,保证了代码的规范性和可维护性。 + +### 2.2 自动调试闭环 + +这是一个具备自我反思能力的系统。 + +- **执行监控**:在 `llm_instantiate` 节点,系统尝试执行生成的代码 (`exec(code_str)`) 并捕获标准输出和标准错误。 +- **错误诊断**:如果发生异常,`code_debugger` Agent 会分析错误堆栈 (`error_trace`) 和当前代码,生成修复建议 (`debug_reason`)。 +- **自动重写**:`rewriter` Agent 根据修复建议重新生成代码,并自动更新文件,进入下一轮测试。 + +### 2.3 LLM 服务注入 + +对于需要调用大模型的复杂算子(如“根据内容生成摘要”),`llm_append_serving` 节点会自动在算子代码中注入标准的 LLM 调用接口 (`self.llm_serving`),使其具备 AI 能力。 + +## 3. 工作流架构 + +该功能由 `wf_pipeline_write.py` 编排,形成一个包含条件循环的有向图。 + +1. **Match Node**: 检索参考算子。 +2. **Write Node**: 编写初始代码。 +3. **Append Serving Node**: 注入 LLM 能力。 +4. **Instantiate Node**: 尝试运行代码。 +5. **Debugger Node** (条件触发): 分析错误。 +6. **Rewriter Node**: 修复代码。 + +## 4. 使用指南 + +本功能提供 **图形界面 (Gradio UI)** 和 **命令行脚本** 两种使用方式。 + +### 4.1 界面操作 + +前端页面代码位于 `operator_write.py`,提供了可视化的交互体验。 + +#### 1. 配置输入 + +在页面的左侧面板进行配置: + +- **目标描述**: 详细描述您想要创建的算子功能和用途。 + - 示例: "创建一个算子,用于对文本进行情感分析。" +- **算子类别**: 算子所属类别,用于匹配相似算子作为参考,默认为 `"Default"`,可选:`"filter"`, `"mapper"`, `"aggregator"` 等。 +- **测试数据文件**: 指定用于测试生成的算子的 `.jsonl` 文件路径。默认为项目自带的 `tests/test.jsonl`。 +- **调试设置**: + - `启用调试模式 (Enable Debug Mode)`: 勾选后,如果代码报错,系统会自动尝试修复。 + - `最大调试轮次`: 设置自动修复的最大尝试次数(默认 3 次)。 +- **输出路径**: 指定生成代码的保存路径(可选)。 + +#### 2. 查看结果 + +点击 **"生成算子"** 按钮后,右侧面板会展示详细结果: + +- **生成的代码**: 最终可用的 Python 代码,支持语法高亮 +- **匹配的算子**: 显示系统在算子库中找到的参考算子列表(如 `"LangkitSampleEvaluator"`,`"LexicalDiversitySampleEvaluator"`,`"PresidioSampleEvaluator"`,`"PerspectiveSampleEvaluator"`等) +- **执行结果**: 显示 `success: true/false` 以及具体的日志信息`stdout`/`stderr`。 +- **调试信息**: 如果触发了调试,这里会显示运行时捕获的 `stdout`/`stderr` 以及选定的输入字段键名 (`input_key`) +- **Agent结果:** 各个 Agent 节点的详细执行结果 +- **执行日志**: 完整的执行日志信息,方便排查 Agent 的思考过程 + +### 4.2 脚本调用与显式配置 + +对于开发者或自动化任务,可以直接运行 `run_dfa_operator_write.py`。 + +#### 1. 修改配置 + +打开 `run_dfa_operator_write.py`,在文件顶部的配置区域修改参数: + +```Python +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +MODEL = os.getenv("DF_MODEL", "gpt-4o") +LANGUAGE = "en" + +TARGET = "Create an operator that filters out missing values and keeps rows with non-empty fields." +CATEGORY = "Default" # 兜底类别(若 classifier 未命中) +OUTPUT_PATH = "" # e.g. "cache_local/my_operator.py";空则不落盘 +JSON_FILE = "" # 空则使用项目自带 tests/test.jsonl + +NEED_DEBUG = False +MAX_DEBUG_ROUNDS = 3 +``` + +#### 2. 运行脚本 + +```Bash +python run_dfa_operator_write.py +``` + +#### 3. 结果输出 + +脚本会在控制台打印关键信息: + +- `Matched ops`: 匹配到的参考算子。 +- `Code preview`: 生成代码的预览片段。 +- `Execution Result`: + - `Success: True` 表示代码生成并运行通过。 + - `Success: False` 会打印 `stderr preview` 供排查。 +- `Debug Runtime Preview`: 显示自动选择的 `input_key` 和运行时日志。 \ No newline at end of file diff --git a/docs/zh/notes/guide/agent/pipeline_prompt.md b/docs/zh/notes/guide/agent/pipeline_prompt.md new file mode 100644 index 000000000..bcf4f6142 --- /dev/null +++ b/docs/zh/notes/guide/agent/pipeline_prompt.md @@ -0,0 +1,110 @@ +--- +title: 算子复用/提示词优化 +createTime: 2026/02/05 22:11:00 +permalink: /zh/guide/agent/pipeline_prompt/ +--- + +## 1. 概述 + +**提示词优化**是 DataFlow-Agent 面向 **Prompt-Engineering** 的核心模块。它的设计目标是解决“通用算子逻辑复用”的问题。 + +该模块采用**单节点架构**。当用户提出一个新的数据处理需求时,Agent 不仅会编写符合算子规范的 Prompt,还会**自动生成合成测试数据**,并在内部构建和运行测试脚本。 + +## 2. 核心特性 + +### 2.1 基于范例的生成 + +- **算子代码分析**:Agent 会自动读取目标算子(`OP_NAME`)的源码,提取其参数定义。 +- **Prompt 迁移**:系统检索算子库中已有的 Prompt 案例,将其作为上下文,指导 LLM 生成符合该算子接口规范(如 `init` 参数结构)的新 Prompt 类。 + +### 2.2 基于合成数据的自我验证 + +生成的 Prompt 不会仅停留在文本层面,Agent 会立即对其进行测试。 + +- **数据合成**:Agent **不需要**用户的大规模业务数据进行测试,而是利用 LLM 分析算子逻辑,自动生成一组覆盖多种边界情况的**合成测试数据**,保存为临时的 JSONL 文件。 +- **子进程执行**:Agent 内部会自动构建一个临时 Python 测试脚本,并启动子进程执行该脚本,验证生成的 Prompt 是否能正确跑通并产生预期结果。 + +### 2.3 迭代优化 + +- **交互式反馈**:系统不进行盲目的自动重试。用户在前端查看测试结果、生成的 Prompt 和数据预览后,输入修改意见。 +- **定向热更新**:后端 `PromptWriter` 接收反馈后,调用 `revise_with_feedback` 方法,在保持现有上下文的基础上定向修改 Prompt 代码,并自动触发新一轮的测试循环。 + +## 3. 系统架构 + +该功能由 `wf_pipeline_prompt.py` 定义,核心为一个**单节点**工作流。所有的生成与验证逻辑均高度内聚在 `PromptWriter` Agent 中。 + +### 3.1 核心节点流程 + +**Prompt Writer Node** 是图中唯一的节点,它按顺序执行以下内部逻辑: + +1. **上下文检索**: 调用 Pre-tools 获取目标算子的源码、用户的target 和 Prompt 范例。 +2. **提示词生成**: 调用 LLM 生成 Python 形式的 Prompt 类代码。随后通过 `update_state_result` 方法将生成的代码保存到 `state` 对象并写入本地文件,为后续测试步骤提供依赖。 +3. **测试数据合成**: 调用内部方法 `_build_test_data_by_llm`,根据任务描述生成合成测试数据。 +4. **测试脚本构建**: 调用内部方法 `_build_test_code`,利用字符串模板生成临时的测试脚本。 +5. **子进程执行**: 使用 `subprocess` 运行测试脚本,捕获标准输出 (stdout) 和标准错误 (stderr)。 +6. **测试结果输出**: 扫描读取子进程运行生成的测试结果文件,将测试结果更新到 `state.temp_data` 中,完成流程。 + +### 3.2 迭代优化机制 + +优化过程依赖于**前端交互**: + +1. 用户在 UI 查看执行结果。 +2. 用户提交反馈。 +3. 前端调用 `_on_chat_submit`,触发 Agent 的 `revise_with_feedback` 接口。 +4. Agent 根据反馈修改代码并重新复用执行上述的验证阶段,即测试数据合成 -> 测试脚本构建 -> 子进程执行。 + +## 4. 使用指南 + +### 4.1 图形界面 + +前端代码位于 `PA_frontend.py`,提供了完整的交互式开发环境。 + +**初次生成:** + +1. 配置 API 信息(URL、Key、模型) +2. 填写任务描述、算子名称 +3. (可选)指定输出格式、参数列表和文件输出根路径 +4. 点击"生成 Prompt 模板"按钮 +5. 查看 Agent 生成的测试数据、测试结果、Prompt 代码和测试代码 + +**多轮优化:** + +1. 如果结果不符合预期,在右侧对话框中输入改进建议 +2. 点击"发送改写指令" +3. 查看更新后的代码和测试结果 +4. 重复步骤 1-3 直到获得满意结果 + +使用生成的 Prompt**:** + +1. 从"Prompt 文件路径"获取生成的 Prompt 文件位置 +2. 将 Prompt 类导入到您的算子中 +3. 在算子的 `init()` 中指定 `prompt_template` + +### 4.2 脚本调用 + +使用 `run_dfa_pipeline_prompt.py` 脚本进行自动化生成。 + +**配置参数**: + +```Python +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +MODEL = os.getenv("DF_MODEL", "gpt-4o") +LANGUAGE = "en" + +TASK_DESCRIPTION = "Write a prompt for an operator that filters missing values" +OP_NAME = "PromptedFilter" + +# 这两项在算子不拥有任何一个预置提示词时才需要提供,否则会仿照已有提示词生成 +OUTPUT_FORMAT = "" # e.g. "Return JSON with keys: ..." +ARGUMENTS = [] # e.g. ["min_len=10", "drop_na=true"] + +# 缓存目录,用于存储测试数据和提示词 +CACHE_DIR = "./pa_cache" +DELETE_TEST_FILES = True +``` + +**运行命令**: + +```Bash +python run_dfa_pipeline_prompt.py +``` \ No newline at end of file diff --git a/docs/zh/notes/guide/agent/pipeline_rec&refine.md b/docs/zh/notes/guide/agent/pipeline_rec&refine.md new file mode 100644 index 000000000..b0d6dd65d --- /dev/null +++ b/docs/zh/notes/guide/agent/pipeline_rec&refine.md @@ -0,0 +1,214 @@ +--- +title: 智能 Pipeline 推荐与优化 +createTime: 2026/02/05 22:11:00 +permalink: /zh/guide/agent/pipeline_rec&refine/ +--- + + +本模块包含两个紧密协作的核心子系统: + +1. **智能 Pipeline 推荐 (Recommendation)**:负责“从 0 到 1”,将自然语言需求转化为完整的可执行 Pipeline。 +2. **Pipieline 迭代优化 (Refinement)**:负责“从 1 到 N”,基于用户反馈对现有 Pipeline 结构进行微调。 + +## 第一部分:Pipeline 推荐 (Pipeline Recommendation) + +### 1. 概述 + +**Pipeline 推荐** 是 DataFlow-Agent 的核心编排引擎。它能够理解复杂的业务需求,自动拆解任务步骤,从算子库中检索最佳组件,规划数据流向,并生成可执行的 Python 代码。 + +该系统具备自我修复能力:在生成代码执行失败时,Agent 会主动查阅算子源码文档,分析错误原因并修正代码,直至执行成功。 + +### 2. 系统架构 + +该功能由 `wf_pipeline_recommend_extract_json.py` 编排,形成一个包含多级智能体的有向图。以下是详细的节点职责说明: + +#### 2.1 分析与规划阶段 + +1. **Classifier Node** + 1. **职责**: 读取少量数据样例,识别数据类型和业务领域。这决定了后续推荐算子的倾向性。 + 2. **输入**: `state.request.json_file` (数据文件路径)。 + 3. **输出**: `state.category`。 +2. **Target Parser Node** + 1. **核心任务 (What it does)**: 充当业务分析的角色。它不直接生成代码,而是将用户模糊的需求转化为逻辑严密的步骤。 + 2. **输入**: 用户的自然语言需求(例如:“过滤掉pdf中长度小于10的文本,然后去重,最后提取关键词”)。 + 3. **LLM 思考**: 将需求拆解为标准的、符合数据处理逻辑的步骤列表(如 `["读取解析pdf成纯文本", "过滤掉长度小于10个字符的文本数据", "对文本数据进行去重处理,移除重复内容","从文本数据中提取关键词"]`)。 + 4. **后续动作**: 利用拆解出的步骤描述,去算子向量数据库中检索最相似的物理算子,形成**候选算子池**,供下一阶段使用。 +3. **Recommender Node** + 1. **核心任务**: 负责将散乱的候选算子变成有序的执行方案。 + 2. **输入**: + - `target`: 用户的原始需求。 + - `sample`: 数据样本(了解数据特征,如字段名、格式)。 + - `split_ops`: 上一步 `target_parser` 通过 RAG 检索出来的候选算子列表及其功能描述。 + 3. **LLM** **思考**: + - **逻辑排序**: 每个阶段不是只能有一个算子,而是遵循 “需求” + - **数据兼容性**: 若某算子需要字段“X”但样例数据中不存在,必须确保在它之前有算子创建该字段 + - **查漏补缺**: 现有算子能满足需求吗?如果不行,需要插入一个万能的 `PromptedGenerator` + 4. **输出**: 一个有序的算子名称列表以及推荐理由,如 + ```JSON + { + "ops": [ + "Text2SQLQuestionGenerator", + "SQLExecutionFilter", + "SQLConsistencyFilter", + "SQLVariationGenerator", + "Text2SQLQuestionGenerator", + "Text2SQLPromptGenerator", + "Text2SQLCoTGenerator", + "ReasoningQuestionSolvableSampleEvaluator", + "SQLComponentClassifier", + "PromptedGenerator" + ], + "reason": "该流水线设计旨在满足用户的所有需求。 + 1. 首先,通过 Text2SQLQuestionGenerator 解析 SQL 数据文件并提取 SQL 语句和对应的自然语言问题。 + 2. 接着,使用 SQLExecutionFilter 在数据库中执行 SQL 语句以验证其有效性。 + 3. 然后,使用 SQLConsistencyFilter 进行一致性过滤,确保 SQL 语句与其对应的自然语言问题一致。 + 4. 接下来,使用 SQLVariationGenerator 对有效的 SQL 语句进行扩增,包括替换数值、提高语法难度和更改书写方式。 + 5. 随后,使用 Text2SQLQuestionGenerator 基于扩增后的 SQL 语句生成对应的自然语言问题。 + 6. 接着,使用 Text2SQLPromptGenerator 生成 Prompt 提示词内容,并通过 Text2SQLCoTGenerator 生成思维链推理过程。 + 7. 然后,使用 ReasoningQuestionSolvableSampleEvaluator 对生成的数据进行分类,评估大模型解决问题的难度,并使用 SQLComponentClassifier 评估 SQL 组成部分的难度。 + 8. 最后,使用 PromptedGenerator 输出合成的 SQL 数据及其对应的自然语言问题和推理过程,以确保所有需求得到满足。" + } + ``` + +#### 2.2 构建与执行阶段 + +1. **Builder Node** + 1. **职责**: 将推荐方案(JSON)转化为实际的 Python 代码文件 (`pipeline.py`),并启动子进程执行该代码。 + 2. **机制**: 支持创建子进程执行代码,捕获标准输出 (stdout) 和标准错误 (stderr)。 + 3. **输出**: `state.execution_result` (Success/Fail 状态及日志)。 + +#### 2.3 自动修复闭环 + +当 `builder` 执行失败且 `need_debug=True` 时,进入此循环: + +1. **Debugger Node** + + - **职责**: 分析错误堆栈 (`error_trace`) 和当前代码,判断错误类型(参数错误、逻辑错误等)。 +2. **Info Requester Node** + + - **职责**: 这是一个主动学习节点。如果 Debugger 认为信息不足,它会调用工具读取相关算子的**源代码**或**文档**,获取上下文信息。 +3. **Rewriter Node** + 1. **职责**: 综合错误日志和 InfoRequester 查到的源码知识,生成修复后的完整代码。 + 2. **流转**: 修复后的代码会再次送入 `builder` 进行测试,直到成功或达到最大重试次数 (`max_debug_rounds`)。 + +#### 2.4 输出阶段 + +- **Exporter Node** + + - **职责**: 执行成功后,整理最终的 Pipeline 信息、代码路径及数据样例,格式化输出给用户。 + +### 3. 使用指南 + +#### 3.1 图形界面 + +代码位于 `pipeline_rec.py`。 + +1. **配置输入**: + 1. 在"目标描述"框中输入您的需求 + 2. 输入需要处理jsonl文件 + 3. 配置 API 信息(URL、Key、模型) + 4. (可选)配置嵌入模型和调试选项 + 5. 选择是否需要自动更新向量索引(如果出现算子不在注册机里,则需要勾选) + 6. 选择是否使用debug模式(debug模式会自动运行生成的 Pipeline 代码,直到最大迭代轮次) +2. **生成pipeline**: + + 点击 **" Generate Pipeline"**。 +3. **结果查看**: + 1. **Pipeline Code**: 查看最终生成的pipeline 代码 + 2. **Execution Log**: 查看执行的日志信息 + 3. **Agent Results:** 各个 Agent 节点的详细执行结果,包含推荐的算子列表、构建过程等 + 4. **Pipeline JSON:** 生成的Pipeline拓扑结构JSON,包含算子节点列表和节点间连接关系 + +#### 3.2 脚本调用 + +使用 `run_dfa_pipeline_recommend.py` 脚本。 + +**配置参数:** + +```Python +LANGUAGE = "en" +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +MODEL = os.getenv("DF_MODEL", "gpt-4o") +TARGET = "给我简单的过滤或者去重算子就好了,只需要2个算子" +NEED_DEBUG = True # 开启自动修复 +MAX_DEBUG_ROUNDS = 5 # 最大尝试修复次数 +CACHE_DIR = "dataflow_cache" +TEST_JSON_REL_PATH = "tests/test.jsonl" # 测试数据路径 +``` + +**运行命令:** + +```Bash +python run_dfa_pipeline_recommend.py +``` + +**输出:** 脚本运行后会在 `dataflow_cache/session_{id}/` 下生成 `pipeline.py`, `final_state.json` 和 `graph.png`。 + +## 第二部分:Pipeline 迭代优化 (Pipeline Refinement) + +### 1. 概述 + +Pipeline 迭代优化 (Refinement) 允许用户通过自然语言对已生成的 DataFlow Pipeline 进行微调。用户无需手动修改复杂的 JSON 配置或 Python 代码,只需输入如“删除中间的过滤节点”等指令,系统便会智能解析意图并自动调整 Pipeline 的拓扑结构。 + +### 2. 系统架构 + +该功能由 `wf_pipeline_refine.py` 编排,采用 **Analyzer -> Planner -> Refiner** 的三段式架构: + +#### 2.1 Refine Target Analyzer + +- **核心职责**: + - **意图识别**: 比较当前的 Pipeline 结构(`state.pipeline_structure_code`)和用户的自然语言需求(`target`),分析用户希望进行的修改类型(增、删、改)。 + - **RAG 预检索 (Pre-emptive RAG)**: 这是关键特性。Analyzer 会解析出用户需求中隐含的子操作描述,并直接调用 RAG 搜索 `_get_operators_by_rag_with_scores`。它会计算相似度分数、评估匹配质量,并将最佳匹配的算子代码`code_snippet`和警告信息打包进 `op_contexts`。 +- **输入**: `state.pipeline_structure_code` (当前 pipeline 代码), `state.request.target` (用户修改指令)。 +- **输出**: 包含 `needed_operators_desc` 的意图分析结果,以及包含丰富上下文的 `op_contexts`(算子代码、匹配度评分)。 + +#### 2.2 Refine Planner + +- **职责**: 基于 Analyzer 提供的意图和预检索到的算子上下文,制定具体的**修改计划**。它不直接修改代码,而是生成结构化的操作步骤。 +- **输入**: Analyzer 的分析结果 (`intent`)、算子上下文 (`op_context`)、当前节点摘要。 +- **输出**: 结构化的操作步骤列表,例如: + - `REMOVE_NODE: node_filter_1` + - `ADD_NODE: node_deduplicate (after node_loader)` + - `UPDATE_EDGE: node_loader -> node_deduplicate`。 + +#### 2.3 JSON Pipeline Refiner + +- **职责**: 执行 Planner 的计划,直接操作 Pipeline 的 JSON 数据结构 Nodes 和 Edges。 +- **工具增强**: 该 Agent 挂载了 `search_operator_by_description` 和 `get_operator_code_by_name` 作为后置工具。虽然 Analyzer 已经提供了 `op_context`,但如果 Refiner 在执行过程中发现信息不足,它仍可以主动发起搜索来补充算子信息。 +- **输出**: 更新后的 `state.pipeline_structure_code`。 + +### 3. 使用指南 + +#### 3.1 图形界面 + +集成在 `pipeline_rec.py` 页面底部。 + +1. **前提**:必须先在页面上方点击 "Generate Pipeline" 生成初始 pipeline 代码,此时 `pipeline_json_state` 会被初始化。 +2. **输入优化指令**:在 "优化需求" 文本框中输入指令。 +3. **执行优化**:点击 **"Refine Pipeline"**。系统将显示更新后的 Python 代码、JSON 结构以及 Agent 的执行日志。 +4. **历史回溯**:使用 "上一轮" 和 "下一轮" 按钮在不同的优化版本间切换,查看代码演进过程。 +5. **警告提示**: 如果 RAG 匹配度较低,代码顶部会自动添加 `优化警告` 注释,提示用户当前生成的算子可能未完全匹配需求。 + +#### 3.2 脚本调用 + +使用 `run_dfa_pipeline_refine.py` 脚本。 + +**配置参数:** + +```Python +# 输入文件:上一步生成的 pipeline 结构文件 (.json) +INPUT_JSON = "dataflow_cache/session_xxx/final_state.json" +OUTPUT_JSON = "cache_local/pipeline_refine_result.json" # 输出文件,如果是空字符串仅打印结果 +# 修改目标 +TARGET = "请将 Pipeline 调整为只包含3个节点,简化数据流" + +LANGUAGE = "en" +CHAT_API_URL = os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1/") +MODEL = os.getenv("DF_MODEL", "gpt-4o") +``` + +**运行命令:** + +```Bash +python run_dfa_pipeline_refine.py +``` \ No newline at end of file