Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/DataFrame.hs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ __I/O__
* @D.readTsv :: FilePath -> IO DataFrame@
* @D.writeCsv :: FilePath -> DataFrame -> IO ()@
* @D.readParquet :: FilePath -> IO DataFrame@
* @D.readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame@

__Exploration__

Expand Down Expand Up @@ -252,7 +253,15 @@ import DataFrame.IO.CSV as CSV (
writeCsv,
writeSeparated,
)
import DataFrame.IO.Parquet as Parquet (readParquet, readParquetFiles)
import DataFrame.IO.Parquet as Parquet (
ParquetReadOptions (..),
ParquetTimestampPolicy (..),
defaultParquetReadOptions,
readParquet,
readParquetFiles,
readParquetFilesWithOpts,
readParquetWithOpts,
)
import DataFrame.IO.Unstable.CSV as UnstableCSV (
fastReadCsvUnstable,
fastReadTsvUnstable,
Expand Down
199 changes: 151 additions & 48 deletions src/DataFrame/IO/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

module DataFrame.IO.Parquet where

import Control.Exception (throw)
import Control.Monad
import Data.Bits
import qualified Data.ByteString as BSO
Expand All @@ -14,15 +15,18 @@ import Data.IORef
import Data.Int
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Time
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import Data.Word
import DataFrame.Errors (DataFrameException (ColumnNotFoundException))
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import qualified DataFrame.Operations.Core as DI
import DataFrame.Operations.Merge ()
import qualified DataFrame.Operations.Subset as DS
import System.FilePath.Glob (glob)

import DataFrame.IO.Parquet.Dictionary
Expand All @@ -35,6 +39,26 @@ import System.Directory (doesDirectoryExist)
import qualified Data.Vector.Unboxed as VU
import System.FilePath ((</>))

data ParquetTimestampPolicy
= PreserveTimestampPrecision
| CoerceTimestampToDay
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem as fundamental. Let's hold off on it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also revert this back to old implementationg for the time converstion. I saw you guys talked about it in a different issue so let me know if we want to circle back to this.

deriving (Eq, Show)

data ParquetReadOptions = ParquetReadOptions
{ selectedColumns :: Maybe [T.Text]
Comment thread
mchav marked this conversation as resolved.
, timestampPolicy :: ParquetTimestampPolicy
, rowRange :: Maybe (Int, Int)
}
deriving (Eq, Show)

defaultParquetReadOptions :: ParquetReadOptions
defaultParquetReadOptions =
ParquetReadOptions
{ selectedColumns = Nothing
, timestampPolicy = PreserveTimestampPrecision
, rowRange = Nothing
}

{- | Read a parquet file from path and load it into a dataframe.

==== __Example__
Expand All @@ -43,10 +67,36 @@ ghci> D.readParquet ".\/data\/mtcars.parquet"
@
-}
readParquet :: FilePath -> IO DataFrame
readParquet path = do
readParquet = readParquetWithOpts defaultParquetReadOptions

readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetWithOpts opts path = do
fileMetadata <- readMetadataFromPath path
let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata)
let columnNames = map fst columnPaths
let leafNames = map (last . T.splitOn ".") columnNames
let availableSelectedColumns = L.nub (columnNames ++ leafNames)
let selectedColumnSet = S.fromList <$> selectedColumns opts
let shouldReadColumn colName colPath =
case selectedColumnSet of
Nothing -> True
Just selected ->
let fullPath = T.intercalate "." (map T.pack colPath)
in colName `S.member` selected || fullPath `S.member` selected
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not worry about nested fields for now. The reader doesn't even have a good way to support them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the nested field for now. Made it so its leaf name only


case selectedColumns opts of
Nothing -> pure ()
Just requested ->
let missing = requested L.\\ availableSelectedColumns
in unless
(L.null missing)
( throw
( ColumnNotFoundException
(T.pack $ show missing)
"readParquetWithOpts"
availableSelectedColumns
)
)

colMap <- newIORef (M.empty :: M.Map T.Text DI.Column)
lTypeMap <- newIORef (M.empty :: M.Map T.Text LogicalType)
Expand Down Expand Up @@ -77,53 +127,59 @@ readParquet path = do
then T.pack $ "col_" ++ show colIdx
else T.pack $ last colPath

let colDataPageOffset = columnDataPageOffset metadata
let colDictionaryPageOffset = columnDictionaryPageOffset metadata
let colStart =
if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset
then colDictionaryPageOffset
else colDataPageOffset
let colLength = columnTotalCompressedSize metadata

let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents)

pages <- readAllPages (columnCodec metadata) columnBytes

let maybeTypeLength =
if columnType metadata == PFIXED_LEN_BYTE_ARRAY
then getTypeLength colPath
else Nothing

let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata))

let schemaTail = drop 1 (schema fileMetadata)
let colPath = columnPathInSchema (columnMetaData colChunk)
let (maxDef, maxRep) = levelsForPath schemaTail colPath
let lType = logicalType (schemaTail !! colIdx)
column <-
processColumnPages
(maxDef, maxRep)
pages
(columnType metadata)
primaryEncoding
maybeTypeLength
lType

modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column)
modifyIORef lTypeMap (M.insert colName lType)
when (shouldReadColumn colName colPath) $ do
let colDataPageOffset = columnDataPageOffset metadata
let colDictionaryPageOffset = columnDictionaryPageOffset metadata
let colStart =
if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset
then colDictionaryPageOffset
else colDataPageOffset
let colLength = columnTotalCompressedSize metadata

let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents)

pages <- readAllPages (columnCodec metadata) columnBytes

let maybeTypeLength =
if columnType metadata == PFIXED_LEN_BYTE_ARRAY
then getTypeLength colPath
else Nothing

let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata))

let schemaTail = drop 1 (schema fileMetadata)
let (maxDef, maxRep) = levelsForPath schemaTail colPath
let lType = logicalType (schemaTail !! colIdx)
column <-
processColumnPages
(maxDef, maxRep)
pages
(columnType metadata)
primaryEncoding
maybeTypeLength
lType

modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column)
modifyIORef lTypeMap (M.insert colName lType)

finalColMap <- readIORef colMap
finalLTypeMap <- readIORef lTypeMap
let orderedColumns =
map
( \name -> (name, applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name)
( \name ->
( name
, applyLogicalTypeWithOptions opts (finalLTypeMap M.! name) $ finalColMap M.! name
)
)
(filter (`M.member` finalColMap) columnNames)

pure $ DI.fromNamedColumns orderedColumns
pure $ applyRowRange opts (DI.fromNamedColumns orderedColumns)

readParquetFiles :: FilePath -> IO DataFrame
readParquetFiles path = do
readParquetFiles = readParquetFilesWithOpts defaultParquetReadOptions

readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetFilesWithOpts opts path = do
isDir <- doesDirectoryExist path

let pat = if isDir then path </> "*" else path
Expand All @@ -137,8 +193,14 @@ readParquetFiles path = do
error $
"readParquetFiles: no parquet files found for " ++ path
_ -> do
dfs <- mapM readParquet files
pure (mconcat dfs)
let optsWithoutRowRange = opts{rowRange = Nothing}
dfs <- mapM (readParquetWithOpts optsWithoutRowRange) files
pure (applyRowRange opts (mconcat dfs))

applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange opts df = case rowRange opts of
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

fmap (DS.range df) (rowRange opts)

Or use <$>.

Nothing -> df
Just (start, end) -> DS.range (start, end) df

readMetadataFromPath :: FilePath -> IO FileMetadata
readMetadataFromPath path = do
Expand Down Expand Up @@ -309,13 +371,13 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do
pure $
L.foldl' (\l r -> fromRight (error "concat failed") (DI.concatColumns l r)) c cs

applyLogicalType :: LogicalType -> DI.Column -> DI.Column
applyLogicalType (TimestampType isUTC unit) col =
fromRight col $
DI.mapColumn
(microsecondsToUTCTime . (* (1_000_000 `div` unitDivisor unit)))
col
applyLogicalType (DecimalType precision scale) col
applyLogicalTypeWithOptions ::
ParquetReadOptions -> LogicalType -> DI.Column -> DI.Column
applyLogicalTypeWithOptions opts (TimestampType _ unit) col =
case timestampPolicy opts of
PreserveTimestampPrecision -> asUTCTime unit col
CoerceTimestampToDay -> asDay unit col
applyLogicalTypeWithOptions _ (DecimalType precision scale) col
| precision <= 9 = case DI.toVector @Int32 @VU.Vector col of
Right xs ->
DI.fromUnboxedVector $
Expand All @@ -327,7 +389,48 @@ applyLogicalType (DecimalType precision scale) col
VU.map (\raw -> fromIntegral @Int64 @Double raw / 10 ^ scale) xs
Left _ -> col
| otherwise = col
applyLogicalType _ col = col
applyLogicalTypeWithOptions opts _ col =
case timestampPolicy opts of
CoerceTimestampToDay -> coerceUTCTimeColumnToDay col
PreserveTimestampPrecision -> col

applyLogicalType :: LogicalType -> DI.Column -> DI.Column
applyLogicalType = applyLogicalTypeWithOptions defaultParquetReadOptions

asUTCTime :: TimeUnit -> DI.Column -> DI.Column
asUTCTime unit col = case DI.mapColumn (timestampValueToUTCTime unit) col of
Right out -> out
Left _ -> case DI.mapColumn
(fmap (timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe UTCTime)
col of
Right out -> out
Left _ -> col

asDay :: TimeUnit -> DI.Column -> DI.Column
asDay unit col = case DI.mapColumn (utctDay . timestampValueToUTCTime unit) col of
Right out -> out
Left _ -> case DI.mapColumn
(fmap (utctDay . timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe Day)
col of
Right out -> out
Left _ -> coerceUTCTimeColumnToDay col

coerceUTCTimeColumnToDay :: DI.Column -> DI.Column
coerceUTCTimeColumnToDay col = case DI.mapColumn utctDay col of
Right out -> out
Left _ -> case DI.mapColumn (fmap utctDay :: Maybe UTCTime -> Maybe Day) col of
Right out -> out
Left _ -> col

timestampValueToUTCTime :: TimeUnit -> Int64 -> UTCTime
timestampValueToUTCTime MILLISECONDS value =
posixSecondsToUTCTime (fromIntegral value / 1_000)
timestampValueToUTCTime MICROSECONDS value =
posixSecondsToUTCTime (fromIntegral value / 1_000_000)
timestampValueToUTCTime NANOSECONDS value =
posixSecondsToUTCTime (fromIntegral value / 1_000_000_000)
timestampValueToUTCTime TIME_UNIT_UNKNOWN value =
posixSecondsToUTCTime (fromIntegral value)

microsecondsToUTCTime :: Int64 -> UTCTime
microsecondsToUTCTime us =
Expand Down
73 changes: 73 additions & 0 deletions tests/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module Parquet where

import Assertions (assertExpectException)
import qualified DataFrame as D
import qualified DataFrame.Functions as F

Expand Down Expand Up @@ -186,6 +187,74 @@ allTypesDictionary =
(unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet"))
)

selectedColumnsWithOpts :: Test
selectedColumnsWithOpts =
TestCase
( assertEqual
"selectedColumnsWithOpts"
(D.select ["id", "bool_col"] allTypes)
( unsafePerformIO
( D.readParquetWithOpts
(D.defaultParquetReadOptions{D.selectedColumns = Just ["id", "bool_col"]})
"./tests/data/alltypes_plain.parquet"
)
)
)

rowRangeWithOpts :: Test
rowRangeWithOpts =
TestCase
( assertEqual
"rowRangeWithOpts"
(D.range (2, 5) allTypes)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is circular since if the range function is broken/produces the wrong result this will still pass. Should just test against the expect dimensions.

( unsafePerformIO
( D.readParquetWithOpts
(D.defaultParquetReadOptions{D.rowRange = Just (2, 5)})
"./tests/data/alltypes_plain.parquet"
)
)
)

timestampPolicyCoerceToDayWithOpts :: Test
timestampPolicyCoerceToDayWithOpts =
TestCase
( assertEqual
"timestampPolicyCoerceToDayWithOpts"
( D.fromNamedColumns
[
( "timestamp_col"
, D.fromList
[ fromGregorian 2009 3 1 :: Day
, fromGregorian 2009 3 1
]
)
]
)
( unsafePerformIO
( D.readParquetWithOpts
( D.defaultParquetReadOptions
{ D.selectedColumns = Just ["timestamp_col"]
, D.timestampPolicy = D.CoerceTimestampToDay
, D.rowRange = Just (0, 2)
}
)
"./tests/data/alltypes_plain.parquet"
)
)
)

missingSelectedColumnWithOpts :: Test
Comment thread
mchav marked this conversation as resolved.
missingSelectedColumnWithOpts =
TestCase
( assertExpectException
"missingSelectedColumnWithOpts"
"Column not found"
( D.readParquetWithOpts
(D.defaultParquetReadOptions{D.selectedColumns = Just ["does_not_exist"]})
"./tests/data/alltypes_plain.parquet"
)
)

transactions :: D.DataFrame
transactions =
D.fromNamedColumns
Expand Down Expand Up @@ -819,6 +888,10 @@ tests =
[ allTypesPlain
, allTypesPlainSnappy
, allTypesDictionary
, selectedColumnsWithOpts
, rowRangeWithOpts
, timestampPolicyCoerceToDayWithOpts
, missingSelectedColumnWithOpts
, mtCars
, allTypesTinyPagesLastFew
, allTypesTinyPagesDimensions
Expand Down