Skip to content

Commit 840e9ec

Browse files
committed
feat: Correctly stub and read logical type time unit.
Also adds test file generated from duckdb SQL similar to #147
1 parent 74177c3 commit 840e9ec

4 files changed

Lines changed: 274 additions & 29 deletions

File tree

src/DataFrame/IO/Parquet.hs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
{-# LANGUAGE NumericUnderscores #-}
12
{-# LANGUAGE OverloadedStrings #-}
23
{-# LANGUAGE RecordWildCards #-}
4+
{-# LANGUAGE ScopedTypeVariables #-}
35
{-# LANGUAGE TypeApplications #-}
46

57
module DataFrame.IO.Parquet where
@@ -14,6 +16,8 @@ import qualified Data.List as L
1416
import qualified Data.Map as M
1517
import qualified Data.Text as T
1618
import Data.Text.Encoding
19+
import Data.Time
20+
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
1721
import Data.Word
1822
import qualified DataFrame.Internal.Column as DI
1923
import DataFrame.Internal.DataFrame (DataFrame)
@@ -28,6 +32,7 @@ import DataFrame.IO.Parquet.Thrift
2832
import DataFrame.IO.Parquet.Types
2933
import System.Directory (doesDirectoryExist)
3034

35+
import qualified Data.Vector.Unboxed as VU
3136
import System.FilePath ((</>))
3237

3338
{- | Read a parquet file from path and load it into a dataframe.
@@ -93,13 +98,15 @@ readParquet path = do
9398
let schemaTail = drop 1 (schema fileMetadata)
9499
let colPath = columnPathInSchema (columnMetaData colChunk)
95100
let (maxDef, maxRep) = levelsForPath schemaTail colPath
101+
let lType = logicalType (schemaTail !! colIdx)
96102
column <-
97103
processColumnPages
98104
(maxDef, maxRep)
99105
pages
100106
(columnType metadata)
101107
primaryEncoding
102108
maybeTypeLength
109+
lType
103110

104111
modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column)
105112

@@ -172,8 +179,9 @@ processColumnPages ::
172179
ParquetType ->
173180
ParquetEncoding ->
174181
Maybe Int32 ->
182+
LogicalType ->
175183
IO DI.Column
176-
processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength = do
184+
processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do
177185
let dictPages = filter isDictionaryPage pages
178186
let dataPages = filter isDataPage pages
179187

@@ -206,10 +214,10 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength = do
206214
in pure (toMaybeBool maxDef defLvls vals)
207215
PINT32 ->
208216
let (vals, _) = readNInt32 nPresent afterLvls
209-
in pure (toMaybeInt32 maxDef defLvls vals)
217+
in pure (applyLogicalType lType $ toMaybeInt32 maxDef defLvls vals)
210218
PINT64 ->
211219
let (vals, _) = readNInt64 nPresent afterLvls
212-
in pure (toMaybeInt64 maxDef defLvls vals)
220+
in pure (applyLogicalType lType $ toMaybeInt64 maxDef defLvls vals)
213221
PINT96 ->
214222
let (vals, _) = readNInt96Times nPresent afterLvls
215223
in pure (toMaybeUTCTime maxDef defLvls vals)
@@ -258,10 +266,10 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength = do
258266
in pure (toMaybeBool maxDef defLvls vals)
259267
PINT32 ->
260268
let (vals, _) = readNInt32 nPresent afterLvls
261-
in pure (toMaybeInt32 maxDef defLvls vals)
269+
in pure (applyLogicalType lType $ toMaybeInt32 maxDef defLvls vals)
262270
PINT64 ->
263271
let (vals, _) = readNInt64 nPresent afterLvls
264-
in pure (toMaybeInt64 maxDef defLvls vals)
272+
in pure (applyLogicalType lType $ toMaybeInt64 maxDef defLvls vals)
265273
PINT96 ->
266274
let (vals, _) = readNInt96Times nPresent afterLvls
267275
in pure (toMaybeUTCTime maxDef defLvls vals)
@@ -296,3 +304,37 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength = do
296304
(c : cs) ->
297305
pure $
298306
L.foldl' (\l r -> fromRight (error "concat failed") (DI.concatColumns l r)) c cs
307+
308+
applyLogicalType :: LogicalType -> DI.Column -> DI.Column
309+
applyLogicalType (TimestampType isUTC unit) col =
310+
fromRight col $
311+
DI.mapColumn
312+
(microsecondsToUTCTime . (* (1_000_000 `div` unitDivisor unit)))
313+
col
314+
applyLogicalType (DecimalType precision scale) col
315+
| precision <= 9 = case DI.toVector @Int32 @VU.Vector col of
316+
Right xs ->
317+
DI.fromUnboxedVector $
318+
VU.map (\raw -> fromIntegral @Int32 @Double raw / 10 ^ scale) xs
319+
Left _ -> col
320+
| precision <= 18 = case DI.toVector @Int64 @VU.Vector col of
321+
Right xs ->
322+
DI.fromUnboxedVector $
323+
VU.map (\raw -> fromIntegral @Int64 @Double raw / 10 ^ scale) xs
324+
Left _ -> col
325+
| otherwise = col
326+
applyLogicalType _ col = col
327+
328+
microsecondsToUTCTime :: Int64 -> UTCTime
329+
microsecondsToUTCTime us =
330+
posixSecondsToUTCTime (fromIntegral us / 1_000_000)
331+
332+
unitDivisor :: TimeUnit -> Int64
333+
unitDivisor MILLISECONDS = 1_000
334+
unitDivisor MICROSECONDS = 1_000_000
335+
unitDivisor NANOSECONDS = 1_000_000_000
336+
unitDivisor TIME_UNIT_UNKNOWN = 1
337+
338+
applyScale :: Int32 -> Int32 -> Double
339+
applyScale scale rawValue =
340+
fromIntegral rawValue / (10 ^ scale)

src/DataFrame/IO/Parquet/Thrift.hs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,7 +1139,7 @@ readTimeType isAdjustedToUTC unit buf pos lastFieldId = do
11391139
Nothing -> return (TimeType{isAdjustedToUTC = isAdjustedToUTC, unit = unit})
11401140
Just (elemType, identifier) -> case identifier of
11411141
1 -> do
1142-
isAdjustedToUTC' <- (== compactBooleanTrue) <$> readAndAdvance pos buf
1142+
let isAdjustedToUTC' = elemType == toTType compactBooleanTrue
11431143
readTimeType isAdjustedToUTC' unit buf pos identifier
11441144
2 -> do
11451145
unit' <- readUnit TIME_UNIT_UNKNOWN buf pos 0
@@ -1156,16 +1156,15 @@ readTimestampType ::
11561156
readTimestampType isAdjustedToUTC unit buf pos lastFieldId = do
11571157
fieldContents <- readField buf pos lastFieldId
11581158
case fieldContents of
1159-
Nothing -> return (TimestampType isAdjustedToUTC unit)
1159+
Nothing -> return (TimestampType{isAdjustedToUTC = isAdjustedToUTC, unit = unit})
11601160
Just (elemType, identifier) -> case identifier of
11611161
1 -> do
1162-
isAdjustedToUTC' <- (== compactBooleanTrue) <$> readNoAdvance pos buf
1163-
readTimestampType False unit buf pos identifier
1162+
let isAdjustedToUTC' = elemType == toTType compactBooleanTrue
1163+
readTimestampType isAdjustedToUTC' unit buf pos identifier
11641164
2 -> do
1165-
_ <- readField buf pos 0
11661165
unit' <- readUnit TIME_UNIT_UNKNOWN buf pos 0
11671166
readTimestampType isAdjustedToUTC unit' buf pos identifier
1168-
_ -> error $ "UNKNOWN field ID for TimestampType" ++ show identifier
1167+
_ -> error $ "UNKNOWN field ID for TimestampType " ++ show identifier
11691168

11701169
readUnit :: TimeUnit -> BS.ByteString -> IORef Int -> Int16 -> IO TimeUnit
11711170
readUnit unit buf pos lastFieldId = do
@@ -1174,9 +1173,12 @@ readUnit unit buf pos lastFieldId = do
11741173
Nothing -> return unit
11751174
Just (elemType, identifier) -> case identifier of
11761175
1 -> do
1176+
_ <- readField buf pos 0
11771177
readUnit MILLISECONDS buf pos identifier
11781178
2 -> do
1179+
_ <- readField buf pos 0
11791180
readUnit MICROSECONDS buf pos identifier
11801181
3 -> do
1182+
_ <- readField buf pos 0
11811183
readUnit NANOSECONDS buf pos identifier
11821184
n -> error $ "Unknown time unit: " ++ show n

0 commit comments

Comments
 (0)