Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8e11a2a
text filter pipeline
zzy1127 Jun 26, 2025
1292f4b
Merge branch 'main' of github.com:Open-DataFlow/DataFlow-Preview into…
zzy1127 Jun 26, 2025
5694dae
add quratingfilter
zzy1127 Jun 26, 2025
b73b990
fix bugs for qurating
zzy1127 Jun 26, 2025
cab5bef
merge
zzy1127 Jun 26, 2025
21f4f8f
Merge branch 'main' of github.com:zzy1127/DataFlow-Preview into zzy11…
MOLYHECI Jun 27, 2025
fe5a9e8
update qurating filter
zzy1127 Jun 27, 2025
3c61105
add finewebedu scorer
zzy1127 Jun 27, 2025
058f387
pairqual online and modify parameters
zzy1127 Jun 27, 2025
05bad5b
del files
zzy1127 Jun 27, 2025
a747f67
add textbook scorer
zzy1127 Jun 27, 2025
168e216
Merge branch 'main' of github.com:zzy1127/DataFlow-Preview into zzy11…
MOLYHECI Jun 27, 2025
69c9756
text pipeline sft operators
MOLYHECI Jun 27, 2025
3e0e8cf
Merge branch 'zzy1127-main'
MOLYHECI Jun 27, 2025
630b874
rm test_general_text.py
MOLYHECI Jun 27, 2025
6be3fe9
del req
zzy1127 Jun 28, 2025
8158f54
sft filter pipeline
zzy1127 Jun 28, 2025
b8be54d
all four pipeline complete
zzy1127 Jun 28, 2025
3c8e0a2
fix bugs
zzy1127 Jun 28, 2025
e81ea1f
merge
zzy1127 Jun 28, 2025
4345fce
merge
zzy1127 Jun 28, 2025
5b3dd6e
add scorers
zzy1127 Jun 29, 2025
ea38bdc
Merge branch 'OpenDCAI:main' into main
zzy1127 Jun 29, 2025
9c813a7
[update] add dependency of text to requirements.txt
SunnyHaze Jun 29, 2025
c81aee4
[update] remove dependency
SunnyHaze Jun 29, 2025
12cef22
add try except for flash_attention
zzy1127 Jun 29, 2025
37c0dc2
Merge branch 'main' of github.com:zzy1127/DataFlow-Preview into main
zzy1127 Jun 29, 2025
b8aef43
Merge branch 'main' into main
SunnyHaze Jun 29, 2025
98b8832
add dependency for kbc
SunnyHaze Jun 29, 2025
abed05d
[debug] debug for flash_atten import
SunnyHaze Jun 29, 2025
f69cae4
[debug] fixed test case issue
SunnyHaze Jun 29, 2025
fdef180
add text cpu pipeline
zzy1127 Jun 29, 2025
d170d04
merge
zzy1127 Jun 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ test/example
cache
cache_local
.cache
dataflow_cache/

test_mineru

requirements-kbc.txt
70 changes: 50 additions & 20 deletions dataflow/example/GeneralTextPipeline/pt_input.jsonl

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions dataflow/example/GeneralTextPipeline/sft_input.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{"instruction": "Can you provide a list of healthy habits to maintain a healthy lifestyle? Please format your response as an HTML page with bullet points.", "output": "Here's an HTML page with bullet points for healthy habits:\n<html>\n <body>\n <h3>Healthy Habits:</h3>\n <ul>\n <li>Eating a balanced diet with plenty of fruits and vegetables.</li>\n <li>Engaging in regular physical activity, such as walking, running, or cycling.</li>\n <li>Getting enough sleep each night, ideally 7-8 hours.</li>\n <li>Staying hydrated by drinking plenty of water throughout the day.</li>\n <li>Limiting alcohol consumption and avoiding smoking.</li>\n <li>Managing stress through relaxation techniques like meditation or yoga.</li>\n <li>Regularly visiting a healthcare provider for check-ups and preventative care.</li>\n </ul>\n </body>\n</html>"}
{"instruction": "How can we use Python to calculate the GCD (greatest common divisor) of five numbers and express each number in terms of the GCD?", "output": "Yes, that's correct! The function you've provided takes in five numbers as arguments and returns the GCD of those numbers along with each number expressed in terms of the GCD. This is a useful tool for simplifying fractions or finding the common factor between multiple numbers. Great job!"}
{"instruction": "Generate a list of ten essential items a person might need for a camping trip in a specific region, taking into consideration the weather, terrain, and local wildlife.", "output": "Sure, here's a list of ten essential items a person might need for a camping trip in a specific region, taking into consideration the weather, terrain, and local wildlife: [...] use the emergency shelter to stay warm and dry until help arrives."}
{"instruction": "How can we implement a privacy-enhancing technique using homomorphic encryption in Go language to protect personal data in the age of quantum computing, while also ensuring that the data remains accessible to authorized parties only?", "output": "Homomorphic encryption is a powerful technique that allows computations to be performed on encrypted data without first decrypting it. [...] HElib provides a range of other functions for performing more complex operations on encrypted data, such as addition, subtraction, and comparison. It is important to note that homomorphic encryption can be computationally expensive, so it is important to optimize the implementation to ensure that it remains efficient and secure even against advanced quantum computing attacks."}
{"instruction": "Can you provide a list of the world's most famous painters and their notable works in JSON format?", "output": "[{\"name\": \"Leonardo da Vinci\", \"notable_works\": [\"Mona Lisa\", \"The Last Supper\", \"Vitruvian Man\"]}, {\"name\": \"Vincent van Gogh\", \"notable_works\": [\"Starry Night\", \"Sunflowers\", \"The Potato Eaters\"]}, {\"name\": \"Pablo Picasso\", \"notable_works\": [\"Guernica\", \"Les Demoiselles d'Avignon\", \"The Old Guitarist\"]}, {\"name\": \"Michelangelo\", \"notable_works\": [\"David\", \"Sistine Chapel ceiling\", \"The Creation of Adam\"]}, {\"name\": \"Rembrandt\", \"notable_works\": [\"The Night Watch\", \"Self-Portrait with Two Circles\", \"The Anatomy Lesson of Dr. Nicolaes Tulp\"]}]"}
{"instruction": "Can you provide a detailed explanation of the intricate mechanism of how plants use cellular respiration to convert organic compounds into energy?", "output": "Certainly! Cellular respiration is the process by which plants convert organic compounds into energy that can be used for growth and other metabolic processes. [...] C6H12O6 + 6O2 → 6CO2 + 6H2O + ATP\nThis equation shows that glucose and oxygen are the reactants, and carbon dioxide, water, and ATP are the products. It is important to note that this equation must be balanced to ensure that the number of atoms of each element is the same on both sides of the equation."}
{"instruction": "How can we analyze the sentiment of a given text using natural language processing with SentimentAnalysis.Core in Scala?", "output": "To analyze the sentiment of a given text using natural language processing with SentimentAnalysis.Core in Scala, you can follow the steps below: [...] This can be done using various natural language processing techniques such as tokenization, stemming, and stop word removal."}
{"instruction": "Using the latest scientific data and taking into account the unique environmental factors of each region, brainstorm a comprehensive set of solutions to reduce water pollution in both freshwater and saltwater ecosystems.", "output": "Reducing water pollution in both freshwater and saltwater ecosystems is a complex issue that requires a comprehensive approach. Here are some possible solutions to address both point and non-point sources of pollution: [...] However, the long-term benefits of improving water quality are significant and can have a positive impact on local economies and communities."}
{"instruction": "How can we use XML data to summarize the passage on global climate change?", "output": "Using XML data, we can summarize the passage on global climate change as follows:\n<summary>\n <text>Global climate change is a critical challenge for the world due to rising sea levels, biodiversity loss, and extreme weather events caused by human activities like burning fossil fuels. The long-term impacts on the planet could be devastating if left unchecked.</text>\n <source>Study on Global Climate Change</source>\n</summary>"}
{"instruction": "Calculate the total surface area of a cube with a side length of 5 cm, while considering the effect of a material with a coefficient of thermal expansion of 0.000023 per degree Celsius on the dimensions of the cube at a temperature of 20 degrees Celsius.", "output": "The surface area of a cube with a side length of 5 cm is given by the formula:\nSA = 6s^2\nwhere s is the length of a side of the cube. [...] So, the total surface area of the cube with a side length of 5 cm, accounting for thermal expansion at a temperature of 20 degrees Celsius, is approximately 150.092 cm^2."}
{"instruction": "Using the Linnaean classification system, provide a detailed description of the taxonomy of the skunk cabbage plant.", "output": "The skunk cabbage plant belongs to the kingdom Plantae, phylum Anthophyta, class Magnoliopsida, order Alismatales, family Araceae, genus Symplocarpus, and species foetidus. [...] Overall, while these plants share some similarities in their morphology and ecological role, they differ in their physical characteristics, growth patterns, and environmental adaptations."}
{"instruction": "Identify the most suitable adverb for the following sentence\nShe works", "output": "hard."}
{"instruction": "Assume that there is an excel table named \"people\" which contains the following data:\nName Age Occupation\n---------------------\nJohn 25 Engineer\nSara 30 Doctor\nMark 40 Teacher\nBased on the above data, can you describe the person named \"John\"?", "output": "Yes, John is a 25-year-old engineer according to the data in the \"people\" excel table."}
{"instruction": "How can I modify this C# code to query a database of restaurant ratings and return only those with excellent cuisine and a minimum rating of 4 stars, while also incorporating the requirement that the menu is somewhat expensive?", "output": "To modify this C# code to query a database of restaurant ratings and return only those with excellent cuisine and a minimum rating of 4 stars, while also incorporating the requirement that the menu is somewhat expensive, you can use LINQ to filter the data directly from the database. [...] This can further improve performance by reducing the amount of data that needs to be transferred between the database and the application."}
59 changes: 57 additions & 2 deletions dataflow/llmserving/LocalModelLLMServing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import torch
from dataflow import get_logger
from vllm import LLM,SamplingParams
from huggingface_hub import snapshot_download
from dataflow.core import LLMServingABC
from transformers import AutoTokenizer

class LocalModelLLMServing(LLMServingABC):
'''
Expand Down Expand Up @@ -36,7 +38,8 @@ def __init__(self,
)
except:
self.real_model_path = model_name_or_path
logging.info(f"Model will be loaded from {self.real_model_path}")
self.logger = get_logger()
self.logger.info(f"Model will be loaded from {self.real_model_path}")
self.sampling_params = SamplingParams(
temperature=temperature,
top_p=top_p,
Expand All @@ -52,6 +55,8 @@ def __init__(self,
max_model_len=max_model_len,
gpu_memory_utilization=gpu_memory_utilization,
)
self.tokenizer = AutoTokenizer.from_pretrained(self.real_model_path, cache_dir=cache_dir)

def generate(self):
# # read input file : accept jsonl file only
# dataframe = self.datastorage.read(self.input_file, "dataframe")
Expand Down Expand Up @@ -80,4 +85,54 @@ def generate_from_input(self,
full_prompts = [system_prompt + '\n' + question for question in user_inputs]
responses = self.llm.generate(full_prompts, self.sampling_params)
return [output.outputs[0].text for output in responses]

def load(self,
tensor_parallel_size: int = 1,
model_name_or_path: str = None,
cache_dir: str = None,
temperature: float = 0.7,
top_p: float = 0.9,
max_tokens: int = 1024,
top_k: int = 40,
repetition_penalty: float = 1.0,
seed: int = 42,
download_dir: str = "./ckpt/models/",
max_model_len: int = 4096,
model_source: str= "remote",
):
if model_name_or_path is None:
raise ValueError("model_name_or_path is required")
if(model_source=="local"):
self.real_model_path = model_name_or_path
elif(model_source=="remote"):
try:
self.real_model_path = snapshot_download(
repo_id=model_name_or_path,
cache_dir=cache_dir,
local_dir=f"{download_dir}{model_name_or_path}",
)
except:
self.real_model_path = model_name_or_path
self.logger.info(f"Model will be loaded from {self.real_model_path}")
self.sampling_params = SamplingParams(
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
top_k=top_k,
repetition_penalty=repetition_penalty,
seed=seed
# hat_template_kwargs={"enable_thinking": False},
)
self.llm = LLM(
model=self.real_model_path,
tensor_parallel_size=tensor_parallel_size,
max_model_len=max_model_len,
)


def cleanup(self):
del self.llm
import gc;
gc.collect()
torch.cuda.empty_cache()

33 changes: 31 additions & 2 deletions dataflow/operators/eval/GeneralText/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
from .NgramScorer import NgramScorer
from .statistics.ngram_scorer import NgramScorer
from .statistics.lexical_diversity_scorer import LexicalDiversityScorer
from .statistics.langkit_scorer import LangkitScorer

from .models.deita_quality_scorer import DeitaQualityScorer
from .models.instag_scorer import InstagScorer
from .models.debertav3_scorer import DebertaV3Scorer
from .models.deita_complexity_scorer import DeitaComplexityScorer
from .models.fineweb_edu_scorer import FineWebEduScorer
from .models.pair_qual_scorer import PairQualScorer
# from .models.presidio_scorer import PresidioScorer
# from .models.rm_scorer import RMScorer
from .models.textbook_scorer import TextbookScorer
from .models.superfiltering_scorer import SuperfilteringScorer
from .models.qurating_scorer import QuratingScorer
# from .models.perplexity_scorer import PerplexityScorer

__all__ = [
'NgramScorer'
'NgramScorer',
'LexicalDiversityScorer',
'LangkitScorer',
'DeitaQualityScorer',
'InstagScorer',
'DebertaV3Scorer',
'DeitaComplexityScorer',
'FineWebEduScorer',
'PairQualScorer',
'PresidioScorer',
# 'RMScorer',
'TextbookScorer',
'SuperfilteringScorer',
'QuratingScorer',
# 'PerplexityScorer'
]
161 changes: 161 additions & 0 deletions dataflow/operators/eval/GeneralText/models/Kenlm/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import os
import re
import unicodedata
from typing import Dict

import kenlm
import sentencepiece

class SentencePiece:
def __init__(
self,
model: str,
):
super().__init__()
self.sp = sentencepiece.SentencePieceProcessor()
self.sp.load(str(model))

def do(self, text: dict) -> dict:
tokenized = self.sp.encode_as_pieces(text)
return " ".join(tokenized)


class KenlmModel:
digit_re: re.Pattern = re.compile(r"\d")
unicode_punct: Dict[str, str] = {
",": ",",
"。": ".",
"、": ",",
"„": '"',
"”": '"',
"“": '"',
"«": '"',
"»": '"',
"1": '"',
"」": '"',
"「": '"',
"《": '"',
"》": '"',
"´": "'",
"∶": ":",
":": ":",
"?": "?",
"!": "!",
"(": "(",
")": ")",
";": ";",
"–": "-",
"—": " - ",
".": ". ",
"~": "~",
"’": "'",
"…": "...",
"━": "-",
"〈": "<",
"〉": ">",
"【": "[",
"】": "]",
"%": "%",
"►": "-",
}
unicode_punct_re = re.compile(f"[{''.join(unicode_punct.keys())}]")
non_printing_chars_re = re.compile(
f"[{''.join(map(chr, list(range(0,32)) + list(range(127,160))))}]"
)
kenlm_model_dir = None
sentence_piece_model_dir = None

def __init__(
self,
model_dataset: str,
language: str,
lower_case: bool = False,
remove_accents: bool = False,
normalize_numbers: bool = True,
punctuation: int = 1,
):
self.model = kenlm.Model(os.path.join(model_dataset, f"{language}.arpa.bin"))
self.tokenizer = SentencePiece(os.path.join(model_dataset, f"{language}.sp.model"))
self.accent = remove_accents
self.case = lower_case
self.numbers = normalize_numbers
self.punct = punctuation

@classmethod
def from_pretrained(
cls,
model_dataset: str,
language: str,
):
return cls(
model_dataset,
language,
False,
False,
True,
1,
)

def pp(self, log_score, length):
return 10.0 ** (-log_score / length)

def get_perplexity(self, doc: str, normalize_cc_net: bool = True):
if normalize_cc_net:
doc = self.normalize(
doc,
accent=self.accent,
case=self.case,
numbers=self.numbers,
punct=self.punct,
)
# Tokenize (after normalizing): See https://github.com/facebookresearch/cc_net/blob/bda555bd1cf1ee2e0b925363e62a61cd46c8b60d/cc_net/mine.py#L352 for full pipeline
doc = self.tokenizer.do(doc)
doc_log_score, doc_length = 0, 0
for line in doc.split("\n"):
log_score = self.model.score(line)
length = len(line.split()) + 1
doc_log_score += log_score
doc_length += length
return round(self.pp(doc_log_score, doc_length), 1)

def normalize(
self,
line: str,
accent: bool = True,
case: bool = True,
numbers: bool = True,
punct: int = 1,
) -> str:
line = line.strip()
if not line:
return line
if case:
line = line.lower()
if accent:
line = self.strip_accents(line)
if numbers:
line = self.digit_re.sub("0", line)
if punct == 1:
line = self.replace_unicode_punct(line)
elif punct == 2:
line = self.remove_unicode_punct(line)
line = self.remove_non_printing_char(line)
return line

def strip_accents(self, line: str) -> str:
"""Strips accents from a piece of text."""
nfd = unicodedata.normalize("NFD", line)
output = [c for c in nfd if unicodedata.category(c) != "Mn"]
if len(output) == line:
return line
return "".join(output)

def replace_unicode_punct(self, text: str) -> str:
return "".join(self.unicode_punct.get(c, c) for c in text)

def remove_unicode_punct(self, text: str) -> str:
"""More aggressive version of replace_unicode_punct but also faster."""
return self.unicode_punct_re.sub("", text)

def remove_non_printing_char(self, text: str) -> str:
return self.non_printing_chars_re.sub("", text)
Loading
Loading