Skip to content

Commit 7a67f04

Browse files
authored
feat: parquet read options #150
* feat(parquet): add read options for projection, timestamp coercion, and row range Problem: Parquet reads were all-or-nothing. Users could not subset columns at read-time, control timestamp-to-day conversion, or subset rows while loading. This issue also required preserving current behavior for existing callers. Solution: introduce ParquetReadOptions (selectedColumns, timestampPolicy, rowRange) plus defaultParquetReadOptions. Add readParquetWithOpts/readParquetFilesWithOpts and keep readParquet/readParquetFiles as default-option wrappers. Wire selectedColumns into decode-time filtering with fail-fast ColumnNotFoundException for missing requested columns. Wire timestampPolicy with PreserveTimestampPrecision and CoerceTimestampToDay behaviors, including fallback coercion for already-decoded UTCTime columns. Wire rowRange through the reader and apply global rowRange semantics for readParquetFilesWithOpts after concatenation. Tradeoffs and rationale: chose an options record instead of multiple specialized APIs to keep extension points coherent and avoid API sprawl. Kept legacy conversion wrappers/helpers (applyLogicalType and UTC helpers) to reduce compatibility risk for existing/internal call paths. read-time projection improves performance by skipping unselected chunk decode; rowRange currently uses post-read slicing semantics (start inclusive, end exclusive) for correctness and consistency with existing range behavior. Verification: add focused Parquet tests for selectedColumns, rowRange, timestampPolicy coercion, and missing selected column errors; run full suite successfully via cabal test (all passing). * chore(parquet): format reader and Parquet tests Apply formatter-driven layout updates in Parquet read-options code and related tests. No behavior change; this commit is formatting-only after lint/format checks. * add to with default - apply parquet read options in order: predicate filtering, column projection, then row range - auto-include predicate-referenced columns during decode when is set, then project back to requested columns - restrict selected-column matching to leaf names only (drop full-path nested matching) - remove and revert timestamp conversion to default behavior - update row-range helper implementation style in - revise parquet option tests: make row-range assertion non-circular and add predicate-focused cases
1 parent 77a140e commit 7a67f04

3 files changed

Lines changed: 241 additions & 42 deletions

File tree

src/DataFrame.hs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ __I/O__
105105
* @D.readTsv :: FilePath -> IO DataFrame@
106106
* @D.writeCsv :: FilePath -> DataFrame -> IO ()@
107107
* @D.readParquet :: FilePath -> IO DataFrame@
108+
* @D.readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame@
108109
109110
__Exploration__
110111
@@ -252,7 +253,14 @@ import DataFrame.IO.CSV as CSV (
252253
writeCsv,
253254
writeSeparated,
254255
)
255-
import DataFrame.IO.Parquet as Parquet (readParquet, readParquetFiles)
256+
import DataFrame.IO.Parquet as Parquet (
257+
ParquetReadOptions (..),
258+
defaultParquetReadOptions,
259+
readParquet,
260+
readParquetFiles,
261+
readParquetFilesWithOpts,
262+
readParquetWithOpts,
263+
)
256264
import DataFrame.IO.Unstable.CSV as UnstableCSV (
257265
fastReadCsvUnstable,
258266
fastReadTsvUnstable,

src/DataFrame/IO/Parquet.hs

Lines changed: 114 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
module DataFrame.IO.Parquet where
88

9+
import Control.Exception (throw)
910
import Control.Monad
1011
import Data.Bits
1112
import qualified Data.ByteString as BSO
@@ -14,15 +15,19 @@ import Data.IORef
1415
import Data.Int
1516
import qualified Data.List as L
1617
import qualified Data.Map as M
18+
import qualified Data.Set as S
1719
import qualified Data.Text as T
1820
import Data.Text.Encoding
1921
import Data.Time
2022
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
2123
import Data.Word
24+
import DataFrame.Errors (DataFrameException (ColumnNotFoundException))
2225
import qualified DataFrame.Internal.Column as DI
2326
import DataFrame.Internal.DataFrame (DataFrame)
27+
import DataFrame.Internal.Expression (Expr, getColumns)
2428
import qualified DataFrame.Operations.Core as DI
2529
import DataFrame.Operations.Merge ()
30+
import qualified DataFrame.Operations.Subset as DS
2631
import System.FilePath.Glob (glob)
2732

2833
import DataFrame.IO.Parquet.Dictionary
@@ -35,6 +40,21 @@ import System.Directory (doesDirectoryExist)
3540
import qualified Data.Vector.Unboxed as VU
3641
import System.FilePath ((</>))
3742

43+
data ParquetReadOptions = ParquetReadOptions
44+
{ selectedColumns :: Maybe [T.Text]
45+
, predicate :: Maybe (Expr Bool)
46+
, rowRange :: Maybe (Int, Int)
47+
}
48+
deriving (Eq, Show)
49+
50+
defaultParquetReadOptions :: ParquetReadOptions
51+
defaultParquetReadOptions =
52+
ParquetReadOptions
53+
{ selectedColumns = Nothing
54+
, predicate = Nothing
55+
, rowRange = Nothing
56+
}
57+
3858
{- | Read a parquet file from path and load it into a dataframe.
3959
4060
==== __Example__
@@ -43,10 +63,38 @@ ghci> D.readParquet ".\/data\/mtcars.parquet"
4363
@
4464
-}
4565
readParquet :: FilePath -> IO DataFrame
46-
readParquet path = do
66+
readParquet = readParquetWithOpts defaultParquetReadOptions
67+
68+
readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
69+
readParquetWithOpts opts path = do
4770
fileMetadata <- readMetadataFromPath path
4871
let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata)
4972
let columnNames = map fst columnPaths
73+
let leafNames = map (last . T.splitOn ".") columnNames
74+
let availableSelectedColumns = L.nub leafNames
75+
let predicateColumns = maybe [] (L.nub . getColumns) (predicate opts)
76+
let selectedColumnsForRead = case selectedColumns opts of
77+
Nothing -> Nothing
78+
Just selected -> Just (L.nub (selected ++ predicateColumns))
79+
let selectedColumnSet = S.fromList <$> selectedColumnsForRead
80+
let shouldReadColumn colName _ =
81+
case selectedColumnSet of
82+
Nothing -> True
83+
Just selected -> colName `S.member` selected
84+
85+
case selectedColumnsForRead of
86+
Nothing -> pure ()
87+
Just requested ->
88+
let missing = requested L.\\ availableSelectedColumns
89+
in unless
90+
(L.null missing)
91+
( throw
92+
( ColumnNotFoundException
93+
(T.pack $ show missing)
94+
"readParquetWithOpts"
95+
availableSelectedColumns
96+
)
97+
)
5098

5199
colMap <- newIORef (M.empty :: M.Map T.Text DI.Column)
52100
lTypeMap <- newIORef (M.empty :: M.Map T.Text LogicalType)
@@ -77,53 +125,59 @@ readParquet path = do
77125
then T.pack $ "col_" ++ show colIdx
78126
else T.pack $ last colPath
79127

80-
let colDataPageOffset = columnDataPageOffset metadata
81-
let colDictionaryPageOffset = columnDictionaryPageOffset metadata
82-
let colStart =
83-
if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset
84-
then colDictionaryPageOffset
85-
else colDataPageOffset
86-
let colLength = columnTotalCompressedSize metadata
87-
88-
let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents)
89-
90-
pages <- readAllPages (columnCodec metadata) columnBytes
91-
92-
let maybeTypeLength =
93-
if columnType metadata == PFIXED_LEN_BYTE_ARRAY
94-
then getTypeLength colPath
95-
else Nothing
96-
97-
let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata))
98-
99-
let schemaTail = drop 1 (schema fileMetadata)
100-
let colPath = columnPathInSchema (columnMetaData colChunk)
101-
let (maxDef, maxRep) = levelsForPath schemaTail colPath
102-
let lType = logicalType (schemaTail !! colIdx)
103-
column <-
104-
processColumnPages
105-
(maxDef, maxRep)
106-
pages
107-
(columnType metadata)
108-
primaryEncoding
109-
maybeTypeLength
110-
lType
111-
112-
modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column)
113-
modifyIORef lTypeMap (M.insert colName lType)
128+
when (shouldReadColumn colName colPath) $ do
129+
let colDataPageOffset = columnDataPageOffset metadata
130+
let colDictionaryPageOffset = columnDictionaryPageOffset metadata
131+
let colStart =
132+
if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset
133+
then colDictionaryPageOffset
134+
else colDataPageOffset
135+
let colLength = columnTotalCompressedSize metadata
136+
137+
let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents)
138+
139+
pages <- readAllPages (columnCodec metadata) columnBytes
140+
141+
let maybeTypeLength =
142+
if columnType metadata == PFIXED_LEN_BYTE_ARRAY
143+
then getTypeLength colPath
144+
else Nothing
145+
146+
let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata))
147+
148+
let schemaTail = drop 1 (schema fileMetadata)
149+
let (maxDef, maxRep) = levelsForPath schemaTail colPath
150+
let lType = logicalType (schemaTail !! colIdx)
151+
column <-
152+
processColumnPages
153+
(maxDef, maxRep)
154+
pages
155+
(columnType metadata)
156+
primaryEncoding
157+
maybeTypeLength
158+
lType
159+
160+
modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column)
161+
modifyIORef lTypeMap (M.insert colName lType)
114162

115163
finalColMap <- readIORef colMap
116164
finalLTypeMap <- readIORef lTypeMap
117165
let orderedColumns =
118166
map
119-
( \name -> (name, applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name)
167+
( \name ->
168+
( name
169+
, applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name
170+
)
120171
)
121172
(filter (`M.member` finalColMap) columnNames)
122173

123-
pure $ DI.fromNamedColumns orderedColumns
174+
pure $ applyReadOptions opts (DI.fromNamedColumns orderedColumns)
124175

125176
readParquetFiles :: FilePath -> IO DataFrame
126-
readParquetFiles path = do
177+
readParquetFiles = readParquetFilesWithOpts defaultParquetReadOptions
178+
179+
readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
180+
readParquetFilesWithOpts opts path = do
127181
isDir <- doesDirectoryExist path
128182

129183
let pat = if isDir then path </> "*" else path
@@ -137,8 +191,27 @@ readParquetFiles path = do
137191
error $
138192
"readParquetFiles: no parquet files found for " ++ path
139193
_ -> do
140-
dfs <- mapM readParquet files
141-
pure (mconcat dfs)
194+
let optsWithoutRowRange = opts{rowRange = Nothing}
195+
dfs <- mapM (readParquetWithOpts optsWithoutRowRange) files
196+
pure (applyRowRange opts (mconcat dfs))
197+
198+
applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
199+
applyRowRange opts df =
200+
maybe df (`DS.range` df) (rowRange opts)
201+
202+
applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame
203+
applySelectedColumns opts df =
204+
maybe df (`DS.select` df) (selectedColumns opts)
205+
206+
applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame
207+
applyPredicate opts df =
208+
maybe df (`DS.filterWhere` df) (predicate opts)
209+
210+
applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame
211+
applyReadOptions opts =
212+
applyRowRange opts
213+
. applySelectedColumns opts
214+
. applyPredicate opts
142215

143216
readMetadataFromPath :: FilePath -> IO FileMetadata
144217
readMetadataFromPath path = do
@@ -310,7 +383,7 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do
310383
L.foldl' (\l r -> fromRight (error "concat failed") (DI.concatColumns l r)) c cs
311384

312385
applyLogicalType :: LogicalType -> DI.Column -> DI.Column
313-
applyLogicalType (TimestampType isUTC unit) col =
386+
applyLogicalType (TimestampType _ unit) col =
314387
fromRight col $
315388
DI.mapColumn
316389
(microsecondsToUTCTime . (* (1_000_000 `div` unitDivisor unit)))

tests/Parquet.hs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
module Parquet where
55

6+
import Assertions (assertExpectException)
67
import qualified DataFrame as D
78
import qualified DataFrame.Functions as F
89

@@ -186,6 +187,117 @@ allTypesDictionary =
186187
(unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet"))
187188
)
188189

190+
selectedColumnsWithOpts :: Test
191+
selectedColumnsWithOpts =
192+
TestCase
193+
( assertEqual
194+
"selectedColumnsWithOpts"
195+
(D.select ["id", "bool_col"] allTypes)
196+
( unsafePerformIO
197+
( D.readParquetWithOpts
198+
(D.defaultParquetReadOptions{D.selectedColumns = Just ["id", "bool_col"]})
199+
"./tests/data/alltypes_plain.parquet"
200+
)
201+
)
202+
)
203+
204+
rowRangeWithOpts :: Test
205+
rowRangeWithOpts =
206+
TestCase
207+
( assertEqual
208+
"rowRangeWithOpts"
209+
(3, 11)
210+
( unsafePerformIO
211+
( D.dimensions
212+
<$> D.readParquetWithOpts
213+
(D.defaultParquetReadOptions{D.rowRange = Just (2, 5)})
214+
"./tests/data/alltypes_plain.parquet"
215+
)
216+
)
217+
)
218+
219+
predicateWithOpts :: Test
220+
predicateWithOpts =
221+
TestCase
222+
( assertEqual
223+
"predicateWithOpts"
224+
(D.fromNamedColumns [("id", D.fromList [6 :: Int32, 7])])
225+
( unsafePerformIO
226+
( D.readParquetWithOpts
227+
( D.defaultParquetReadOptions
228+
{ D.selectedColumns = Just ["id"]
229+
, D.predicate =
230+
Just
231+
( F.geq
232+
(F.col @Int32 "id")
233+
(F.lit (6 :: Int32))
234+
)
235+
}
236+
)
237+
"./tests/data/alltypes_plain.parquet"
238+
)
239+
)
240+
)
241+
242+
predicateUsesNonSelectedColumnWithOpts :: Test
243+
predicateUsesNonSelectedColumnWithOpts =
244+
TestCase
245+
( assertEqual
246+
"predicateUsesNonSelectedColumnWithOpts"
247+
(D.fromNamedColumns [("bool_col", D.fromList [True, False])])
248+
( unsafePerformIO
249+
( D.readParquetWithOpts
250+
( D.defaultParquetReadOptions
251+
{ D.selectedColumns = Just ["bool_col"]
252+
, D.predicate =
253+
Just
254+
( F.geq
255+
(F.col @Int32 "id")
256+
(F.lit (6 :: Int32))
257+
)
258+
}
259+
)
260+
"./tests/data/alltypes_plain.parquet"
261+
)
262+
)
263+
)
264+
265+
predicateWithOptsAcrossFiles :: Test
266+
predicateWithOptsAcrossFiles =
267+
TestCase
268+
( assertEqual
269+
"predicateWithOptsAcrossFiles"
270+
(4, 1)
271+
( unsafePerformIO
272+
( D.dimensions
273+
<$> D.readParquetFilesWithOpts
274+
( D.defaultParquetReadOptions
275+
{ D.selectedColumns = Just ["id"]
276+
, D.predicate =
277+
Just
278+
( F.geq
279+
(F.col @Int32 "id")
280+
(F.lit (6 :: Int32))
281+
)
282+
}
283+
)
284+
"./tests/data/alltypes_plain*.parquet"
285+
)
286+
)
287+
)
288+
289+
missingSelectedColumnWithOpts :: Test
290+
missingSelectedColumnWithOpts =
291+
TestCase
292+
( assertExpectException
293+
"missingSelectedColumnWithOpts"
294+
"Column not found"
295+
( D.readParquetWithOpts
296+
(D.defaultParquetReadOptions{D.selectedColumns = Just ["does_not_exist"]})
297+
"./tests/data/alltypes_plain.parquet"
298+
)
299+
)
300+
189301
transactions :: D.DataFrame
190302
transactions =
191303
D.fromNamedColumns
@@ -819,6 +931,12 @@ tests =
819931
[ allTypesPlain
820932
, allTypesPlainSnappy
821933
, allTypesDictionary
934+
, selectedColumnsWithOpts
935+
, rowRangeWithOpts
936+
, predicateWithOpts
937+
, predicateUsesNonSelectedColumnWithOpts
938+
, predicateWithOptsAcrossFiles
939+
, missingSelectedColumnWithOpts
822940
, mtCars
823941
, allTypesTinyPagesLastFew
824942
, allTypesTinyPagesDimensions

0 commit comments

Comments
 (0)