Skip to content

Commit 7e7dba6

Browse files
authored
Merge pull request #10 from Intugle/features/adapter
changed dataframe to adapter
2 parents c38e50f + 2780cde commit 7e7dba6

21 files changed

Lines changed: 3408 additions & 3181 deletions

File tree

notebooks/upstream.ipynb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -750,8 +750,14 @@
750750
}
751751
],
752752
"metadata": {
753+
"kernelspec": {
754+
"display_name": "data-tools",
755+
"language": "python",
756+
"name": "python3"
757+
},
753758
"language_info": {
754-
"name": "python"
759+
"name": "python",
760+
"version": "3.12.3"
755761
}
756762
},
757763
"nbformat": 4,

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ dependencies = [
4141
"langchain-nvidia-ai-endpoints>=0.3.16",
4242
"langchain-xai>=0.2.5",
4343
"langchain-perplexity>=0.1.2",
44+
"duckdb>=1.3.2",
4445
]
4546

4647
[dependency-groups]

src/data_tools/adapters/adapter.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Any
3+
4+
from data_tools.adapters.models import (
5+
ColumnProfile,
6+
ProfilingOutput,
7+
)
8+
9+
10+
class Adapter(ABC):
11+
@abstractmethod
12+
def profile(self, data: Any) -> ProfilingOutput:
13+
pass
14+
15+
@abstractmethod
16+
def column_profile(
17+
self,
18+
data: Any,
19+
table_name: str,
20+
column_name: str,
21+
total_count: int,
22+
sample_limit: int = 10,
23+
dtype_sample_limit: int = 10000,
24+
) -> ColumnProfile:
25+
pass
26+
27+
@abstractmethod
28+
def load():
29+
...
30+
31+
@abstractmethod
32+
def execute():
33+
raise NotImplementedError()
Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from typing import Any, Callable
44

5-
from .dataframe import DataFrame
5+
from .adapter import Adapter
66

77

88
class ModuleInterface:
@@ -18,13 +18,14 @@ def import_module(name: str) -> ModuleInterface:
1818
return importlib.import_module(name) # type: ignore
1919

2020

21-
DEFAULT_PLUGINS = ["data_tools.dataframes.types.pandas.pandas"]
21+
DEFAULT_PLUGINS = [
22+
"data_tools.adapters.types.pandas.pandas",
23+
"data_tools.adapters.types.duckdb.duckdb",
24+
]
2225

2326

24-
class DataFrameFactory:
25-
dataframe_funcs: dict[
26-
str, tuple[Callable[[Any], bool], Callable[..., DataFrame]]
27-
] = {}
27+
class AdapterFactory:
28+
dataframe_funcs: dict[str, tuple[Callable[[Any], bool], Callable[..., Adapter]]] = {}
2829

2930
# LOADER
3031
def __init__(self, plugins: list[dict] = None):
@@ -42,7 +43,7 @@ def register(
4243
cls,
4344
env_type: str,
4445
checker_fn: Callable[[Any], bool],
45-
creator_fn: Callable[..., DataFrame],
46+
creator_fn: Callable[..., Adapter],
4647
) -> None:
4748
"""Register a new execution engine type"""
4849
cls.dataframe_funcs[env_type] = (checker_fn, creator_fn)
@@ -53,7 +54,7 @@ def unregister(cls, env_type: str) -> None:
5354
cls.dataframe_funcs.pop(env_type, None)
5455

5556
@classmethod
56-
def create(cls, df: Any) -> DataFrame:
57+
def create(cls, df: Any) -> Adapter:
5758
"""Create a execution engine type"""
5859
for checker_fn, creator_fn in cls.dataframe_funcs.values():
5960
if checker_fn(df):
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class ColumnProfile(BaseModel):
4242
)
4343
datatype_l2: Optional[str] = Field(
4444
default=None,
45-
description="The inferred data type category (dimension/measure) for the column, based on the datatype l1 results",
45+
description="The inferred data type category (dimension/measure) for the column, based on the datatype l1 results", # noqa: E501
4646
)
4747
business_glossary: Optional[str] = Field(
4848
default=None, description="The business glossary entry for the column, if available."
@@ -114,9 +114,7 @@ class KeyIdentificationOutput(BaseModel):
114114
115115
"""
116116

117-
column_name: Optional[str] = Field(
118-
default=None, description="The name of the column identified as a primary key."
119-
)
117+
column_name: Optional[str] = Field(default=None, description="The name of the column identified as a primary key.")
120118

121119

122120
class ColumnGlossary(BaseModel):
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
import time
2+
3+
from typing import Any, Optional
4+
5+
import duckdb
6+
import numpy as np
7+
import pandas as pd
8+
import pandas.api.types as ptypes
9+
10+
from data_tools.adapters.adapter import Adapter
11+
from data_tools.adapters.factory import AdapterFactory
12+
from data_tools.adapters.models import (
13+
ColumnProfile,
14+
ProfilingOutput,
15+
)
16+
from data_tools.adapters.types.duckdb.models import DuckdbConfig
17+
from data_tools.adapters.types.pandas.utils import convert_to_native
18+
from data_tools.common.exception import errors
19+
from data_tools.core.utilities.processing import string_standardization
20+
21+
22+
class DuckdbAdapter(Adapter):
23+
def profile(self, data: DuckdbConfig) -> ProfilingOutput:
24+
"""
25+
Generates a profile of a file.
26+
27+
Args:
28+
df: The input pandas DataFrame.
29+
30+
Returns:
31+
A pydantic model containing the profile information:
32+
- "count": Total number of rows.
33+
- "columns": List of column names.
34+
- "dtypes": A dictionary mapping column names to generalized data types.
35+
"""
36+
if not isinstance(data, DuckdbConfig):
37+
raise TypeError("Input must be a duckdb config.")
38+
39+
def __format_dtype__(dtype: Any) -> str:
40+
"""Maps dtype to a generalized type string."""
41+
type_map = {
42+
"VARCHAR": "string",
43+
"DATE": "date & time",
44+
"BIGINT": "integer",
45+
"DOUBLE": "float",
46+
"FLOAT": "float",
47+
}
48+
return type_map.get(dtype, "string")
49+
50+
table_name = "__profile_table__"
51+
self.load(data, table_name)
52+
53+
# Fetching total count of table
54+
query = f"""
55+
SELECT count(*) as count FROM {table_name}
56+
"""
57+
data = duckdb.execute(query).fetchall()
58+
59+
total_count = data[0][0]
60+
61+
# Fetching column name and their data types of table
62+
query = """
63+
SELECT column_name, data_type
64+
FROM duckdb_columns()
65+
WHERE table_name = ?
66+
"""
67+
data = duckdb.execute(query, [table_name]).fetchall()
68+
69+
dtypes = {col: __format_dtype__(dtype) for col, dtype in data}
70+
columns = [col for col, _ in data]
71+
72+
return ProfilingOutput(
73+
count=total_count,
74+
columns=columns,
75+
dtypes=dtypes,
76+
)
77+
78+
def column_profile(
79+
self,
80+
data: DuckdbConfig,
81+
table_name: str,
82+
column_name: str,
83+
total_count: int,
84+
sample_limit: int = 10,
85+
dtype_sample_limit: int = 10000,
86+
) -> Optional[ColumnProfile]:
87+
"""
88+
Generates a detailed profile for a single column of a pandas DataFrame.
89+
90+
It calculates null and distinct counts, and generates two types of samples:
91+
1. `sample_data`: A sample of unique values.
92+
2. `dtype_sample`: A potentially larger sample combining unique values with
93+
random non-unique values, intended for data type analysis.
94+
95+
Args:
96+
df: The input pandas DataFrame.
97+
column_name: The name of the column to profile.
98+
sample_limit: The desired number of items for the data samples.
99+
100+
Returns:
101+
A dictionary containing the profile for the column, or None if the
102+
column does not exist.
103+
"""
104+
if not isinstance(data, DuckdbConfig):
105+
raise TypeError("Input must be a duckdb config.")
106+
107+
self.load(data, table_name)
108+
109+
start_ts = time.time()
110+
111+
# --- Calculations --- #
112+
query = f"""
113+
SELECT
114+
COUNT(DISTINCT {column_name}) AS distinct_count,
115+
SUM(CASE WHEN {column_name} IS NULL THEN 1 ELSE 0 END) AS null_count
116+
FROM
117+
{table_name}
118+
"""
119+
distinct_null_data = duckdb.execute(query).fetchall()
120+
121+
distinct_count, null_count = distinct_null_data[0]
122+
not_null_count = total_count - null_count
123+
124+
# --- Sampling Logic --- #
125+
# 1. Get a sample of distinct values.
126+
sample_query = f"""
127+
SELECT
128+
DISTINCT CAST( {column_name} AS VARCHAR) AS sample_values
129+
FROM
130+
{table_name}
131+
WHERE
132+
{column_name} IS NOT NULL LIMIT {dtype_sample_limit}
133+
"""
134+
data = duckdb.execute(sample_query).fetchall()
135+
distinct_values = [d[0] for d in data]
136+
137+
not_null_series = pd.Series(distinct_values)
138+
139+
if distinct_count > 0:
140+
distinct_sample_size = min(distinct_count, dtype_sample_limit)
141+
sample_data = list(np.random.choice(distinct_values, distinct_sample_size, replace=False))
142+
else:
143+
sample_data = []
144+
145+
# 2. Create a combined sample for data type analysis.
146+
dtype_sample = None
147+
if distinct_count >= dtype_sample_limit:
148+
# If we have enough distinct values, that's the best sample.
149+
dtype_sample = sample_data
150+
elif distinct_count > 0 and not_null_count > 0:
151+
# If distinct values are few, supplement them with random non-distinct values.
152+
remaining_sample_size = dtype_sample_limit - distinct_count
153+
154+
# Use replace=True in case the number of non-null values is less than the remaining sample size needed.
155+
additional_samples = list(not_null_series.sample(n=remaining_sample_size, replace=True))
156+
157+
# Combine the full set of unique values with the additional random samples.
158+
dtype_sample = list(distinct_values) + additional_samples
159+
else:
160+
dtype_sample = []
161+
162+
# --- Convert numpy types to native Python types for JSON compatibility --- #
163+
native_sample_data = convert_to_native(sample_data)
164+
native_dtype_sample = convert_to_native(dtype_sample)
165+
166+
business_name = string_standardization(column_name)
167+
168+
# --- Final Profile --- #
169+
return ColumnProfile(
170+
column_name=column_name,
171+
business_name=business_name,
172+
table_name=table_name,
173+
null_count=null_count,
174+
count=total_count,
175+
distinct_count=distinct_count,
176+
uniqueness=distinct_count / total_count if total_count > 0 else 0.0,
177+
completeness=not_null_count / total_count if total_count > 0 else 0.0,
178+
sample_data=native_sample_data[:sample_limit],
179+
dtype_sample=native_dtype_sample,
180+
ts=time.time() - start_ts,
181+
)
182+
183+
@staticmethod
184+
def _get_load_func(data: DuckdbConfig):
185+
func = {
186+
"csv": "read_csv",
187+
"parquet": "read_parquet",
188+
"xlsx": "read_xlsx",
189+
}
190+
ld_func = func.get(data.type)
191+
if ld_func is None:
192+
raise errors.NotFoundError(f"Type: {data.type} not supported")
193+
194+
return f"{ld_func}('{data.path}')"
195+
196+
def load(self, data: DuckdbConfig, table_name: str):
197+
ld_func = self._get_load_func(data)
198+
199+
query = f"""CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM {ld_func};"""
200+
201+
duckdb.execute(query)
202+
203+
def execute(): ...
204+
205+
206+
def can_handle_pandas(df: Any) -> bool:
207+
return isinstance(df, DuckdbConfig)
208+
209+
210+
def register(factory: AdapterFactory):
211+
factory.register("duckdb", can_handle_pandas, DuckdbAdapter)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from data_tools.common.schema import SchemaBase
2+
3+
4+
class DuckdbConfig(SchemaBase):
5+
path: str
6+
type: str

0 commit comments

Comments
 (0)