Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions configs/vietnam_job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
base_dir: './datasets/test_vietnamese_jsonl_data'
targets:
- 0
- 1
output_dir: './datasets/test_output_data'

n_dist: 10
n_output: 1
is_cluster: False
min_doc_len: 50
max_doc_len: 100000
min_mean_word_len: 3
max_mean_word_len: 10
symbol_to_word_ratio: 0.1
bullet_point_ratio: 0.9
ellipsis_ratio: 0.3
indonesia_word_ratio: 0.25
64 changes: 64 additions & 0 deletions dps/spark/jobs/vietnamese_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import yaml

from pyspark import SparkContext
from pyspark.rdd import RDD
from dps.spark.spark_session import spark_session, spark_session_for_cluster

from dps.spark.prep.lang_agnostic_prep import (
doc_len_filter,
bullet_ellipsis_filter,
remove_whitespace,
process_html_and_uri_text,
replace_email_and_url,
remove_repeated_text,
)
from dps.spark.prep.vietnamese_prep import normalize_vietnam, vietnamese_bad_words_filter, replace_vietnam_pii, reduce_emoticon
from dps.spark.utils.io_utils import read_line, to_json


def preprocess_text(input_text: str):
processing_function_list = [
process_html_and_uri_text,
reduce_emoticon,
remove_whitespace,
replace_email_and_url,
replace_vietnam_pii,
remove_repeated_text,
normalize_vietnam,
]

for func in processing_function_list:
input_text = func(input_text)

if isinstance(input_text, str):
processed_text = input_text
else:
processed_text = " ".join(input_text)

return processed_text


def vietnamese_job(config_path: str):
with open(config_path) as f:
conf = yaml.load(f, Loader=yaml.FullLoader)

input_paths = ",".join([f'{conf["base_dir"]}/{t}' for t in conf["targets"]])
if conf["is_local"]:
from dps.spark.spark_session import spark_session_local
sessin_fn = spark_session_local
else:
session_fn = spark_session_for_cluster if conf["is_cluster"] else spark_session

with session_fn("Japanse text processing job") as spark:
sc: SparkContext = spark.sparkContext
proc_rdd: RDD = (
sc.textFile(input_paths)
.repartition(conf["n_dist"])
.flatMap(read_line)
.filter(lambda x: vietnamese_bad_words_filter(x["text"]))
.filter(lambda x: doc_len_filter(x["text"], conf["min_doc_len"], conf["max_doc_len"]))
.filter(lambda x: bullet_ellipsis_filter(x["text"], conf["bullet_point_ratio"], conf["ellipsis_ratio"]))
.filter(lambda x: dict(text=preprocess_text(x["text"])))
.filter(lambda x: doc_len_filter(x["text"], conf["min_doc_len"], conf["max_doc_len"]))
)
proc_rdd.repartition(conf["n_output"]).flatMap(to_json).saveAsTextFile(conf["output_dir"])
73 changes: 73 additions & 0 deletions dps/spark/prep/vietnamese_prep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from dps.spark.utils.korean_utils import CARD_PATTERN
from dps.spark.utils.lang_agnostic_utils import replace_with_token
from dps.spark.utils.token_utils import CARD_END_TOKEN, CARD_START_TOKEN, PHONE_NUMBER_END_TOKEN, PHONE_NUMBER_START_TOKEN, RRN_END_TOKEN, RRN_START_TOKEN
from dps.spark.utils.vietnamese_utils import BAD_WORDS_VIETNAMESE, NATIONAL_ID_PATTERN, PHONE_NUMBER_PATTERN

from underthesea import text_normalize, word_tokenize


def vietnamese_bad_words_filter(text):
"""Drop text that contains bad words"""
for bad_word in BAD_WORDS_VIETNAMESE:
if bad_word in text:
return False
return True


def reduce_emoticon(text: str, num_repeats=2):
"""
Reducing the number of repeating emoticons in a text.
If the number of emoticons repeated are more than num_repeats, then reduce it to num_repeats.
example: :):):):):) => :):)
"""
emoticons = [":)", ":D", ":P", ":(", ":O", ";)", "xD"] # list of emoticons to check for repetition
for emoticon in emoticons:
count = 0
while emoticon * (num_repeats + 1) in text:
count += 1
text = text.replace(emoticon * (num_repeats + 1), emoticon * num_repeats)
return text


def normalize_vietnam(text: str):
"""
Normalize text in Vietnamese
example: Ðảm baỏ chất lựơng phòng thí nghịêm hoá học => Đảm bảo chất lượng phòng thí nghiệm hóa học
"""
text = text_normalize(text)
return text


def replace_vietnam_pii(text: str):
replaces = []

text = replace_with_token(
text, CARD_PATTERN, CARD_START_TOKEN, CARD_END_TOKEN, replaces
)

text = replace_with_token(
text, NATIONAL_ID_PATTERN, RRN_START_TOKEN, RRN_END_TOKEN, replaces
)

text = replace_with_token(
text,
PHONE_NUMBER_PATTERN,
PHONE_NUMBER_START_TOKEN,
PHONE_NUMBER_END_TOKEN,
replaces,
)

for before, after in replaces:
text = text.replace(before, after)

return text


def vietnamese_mean_word_len_filter(
text: str, min_mean_word_len: int, max_mean_word_len: int
) -> bool:
# TODO: might be better to add another argument `is_japanese` to lang_agnostic_prep.mean_word_len_filter
words = word_tokenize(text)
words_lens = [len(word) for word in words]
mean_word_len = sum(words_lens) / len(words_lens)
return min_mean_word_len <= mean_word_len <= max_mean_word_len
Loading