Skip to content

Commit 594e640

Browse files
committed
fix: Timestamp logical type parsing (147).
We were passing in the last field ID instead of the identifier which meant that fields were improperly read.
1 parent 1add5cd commit 594e640

3 files changed

Lines changed: 111 additions & 26 deletions

File tree

src/DataFrame/IO/Parquet/Encoding.hs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,24 @@ unpackBitPacked bw count bs
3030
BS.concatMap
3131
(\b -> BS.map (\i -> (b `shiftR` fromIntegral i) .&. 1) (BS.pack [0 .. 7]))
3232
chunk
33-
toN = fst . BS.foldl' (\(a, i) b -> (a .|. (b `shiftL` i), i + 1)) (0, 0)
33+
toN :: BS.ByteString -> Word32
34+
toN =
35+
fst
36+
. BS.foldl'
37+
(\(a, i) b -> (a .|. (fromIntegral b `shiftL` i), i + 1))
38+
(0 :: Word32, 0 :: Int)
3439

35-
extractValues :: Int -> BS.ByteString -> BS.ByteString
40+
extractValues :: Int -> BS.ByteString -> [Word32]
3641
extractValues n bitsLeft
37-
| BS.null bitsLeft = BS.empty
38-
| n <= 0 = BS.empty
39-
| BS.length bitsLeft < bw = BS.empty
42+
| BS.null bitsLeft = []
43+
| n <= 0 = []
44+
| BS.length bitsLeft < bw = []
4045
| otherwise =
4146
let (this, bitsLeft') = BS.splitAt bw bitsLeft
42-
in BS.cons (toN this) (extractValues (n - 1) bitsLeft')
47+
in toN this : extractValues (n - 1) bitsLeft'
4348

4449
vals = extractValues count bits
45-
in (map fromIntegral (BS.unpack vals), rest)
50+
in (vals, rest)
4651

4752
decodeRLEBitPackedHybrid ::
4853
Int -> Int -> BS.ByteString -> ([Word32], BS.ByteString)

src/DataFrame/IO/Parquet/Thrift.hs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -811,21 +811,21 @@ readAesGcmCtrV1 v@(AesGcmCtrV1 aadPrefix aadFileUnique supplyAadPrefix) buf pos
811811
Just (elemType, identifier) -> case identifier of
812812
1 -> do
813813
aadPrefix <- readByteString buf pos
814-
readAesGcmCtrV1 (v{aadPrefix = aadPrefix}) buf pos lastFieldId
814+
readAesGcmCtrV1 (v{aadPrefix = aadPrefix}) buf pos identifier
815815
2 -> do
816816
aadFileUnique <- readByteString buf pos
817817
readAesGcmCtrV1
818818
(v{aadFileUnique = aadFileUnique})
819819
buf
820820
pos
821-
lastFieldId
821+
identifier
822822
3 -> do
823823
supplyAadPrefix <- readAndAdvance pos buf
824824
readAesGcmCtrV1
825825
(v{supplyAadPrefix = supplyAadPrefix == compactBooleanTrue})
826826
buf
827827
pos
828-
lastFieldId
828+
identifier
829829
_ -> return ENCRYPTION_ALGORITHM_UNKNOWN
830830
readAesGcmCtrV1 _ _ _ _ =
831831
error "readAesGcmCtrV1 called with non AesGcmCtrV1"
@@ -843,17 +843,17 @@ readAesGcmV1 v@(AesGcmV1 aadPrefix aadFileUnique supplyAadPrefix) buf pos lastFi
843843
Just (elemType, identifier) -> case identifier of
844844
1 -> do
845845
aadPrefix <- readByteString buf pos
846-
readAesGcmV1 (v{aadPrefix = aadPrefix}) buf pos lastFieldId
846+
readAesGcmV1 (v{aadPrefix = aadPrefix}) buf pos identifier
847847
2 -> do
848848
aadFileUnique <- readByteString buf pos
849-
readAesGcmV1 (v{aadFileUnique = aadFileUnique}) buf pos lastFieldId
849+
readAesGcmV1 (v{aadFileUnique = aadFileUnique}) buf pos identifier
850850
3 -> do
851851
supplyAadPrefix <- readAndAdvance pos buf
852852
readAesGcmV1
853853
(v{supplyAadPrefix = supplyAadPrefix == compactBooleanTrue})
854854
buf
855855
pos
856-
lastFieldId
856+
identifier
857857
_ -> return ENCRYPTION_ALGORITHM_UNKNOWN
858858
readAesGcmV1 _ _ _ _ =
859859
error "readAesGcmV1 called with non AesGcmV1"
@@ -1120,10 +1120,10 @@ readDecimalType precision scale buf pos lastFieldId = do
11201120
Just (elemType, identifier) -> case identifier of
11211121
1 -> do
11221122
scale' <- readInt32FromBuffer buf pos
1123-
readDecimalType precision scale' buf pos lastFieldId
1123+
readDecimalType precision scale' buf pos identifier
11241124
2 -> do
11251125
precision' <- readInt32FromBuffer buf pos
1126-
readDecimalType precision' scale buf pos lastFieldId
1126+
readDecimalType precision' scale buf pos identifier
11271127
_ -> error $ "UNKNOWN field ID for DecimalType" ++ show identifier
11281128

11291129
readTimeType ::
@@ -1136,15 +1136,14 @@ readTimeType ::
11361136
readTimeType isAdjustedToUTC unit buf pos lastFieldId = do
11371137
fieldContents <- readField buf pos lastFieldId
11381138
case fieldContents of
1139-
Nothing -> return (TimeType isAdjustedToUTC unit)
1139+
Nothing -> return (TimeType{isAdjustedToUTC = isAdjustedToUTC, unit = unit})
11401140
Just (elemType, identifier) -> case identifier of
11411141
1 -> do
1142-
-- TODO: Check for empty
11431142
isAdjustedToUTC' <- (== compactBooleanTrue) <$> readAndAdvance pos buf
1144-
readTimeType isAdjustedToUTC' unit buf pos lastFieldId
1143+
readTimeType isAdjustedToUTC' unit buf pos identifier
11451144
2 -> do
11461145
unit' <- readUnit TIME_UNIT_UNKNOWN buf pos 0
1147-
readTimeType isAdjustedToUTC unit' buf pos lastFieldId
1146+
readTimeType isAdjustedToUTC unit' buf pos identifier
11481147
_ -> error $ "UNKNOWN field ID for TimeType" ++ show identifier
11491148

11501149
readTimestampType ::
@@ -1160,13 +1159,12 @@ readTimestampType isAdjustedToUTC unit buf pos lastFieldId = do
11601159
Nothing -> return (TimestampType isAdjustedToUTC unit)
11611160
Just (elemType, identifier) -> case identifier of
11621161
1 -> do
1163-
-- TODO: Check for empty
11641162
isAdjustedToUTC' <- (== compactBooleanTrue) <$> readNoAdvance pos buf
1165-
readTimestampType False unit buf pos lastFieldId
1163+
readTimestampType False unit buf pos identifier
11661164
2 -> do
1167-
_ <- readField buf pos identifier
1165+
_ <- readField buf pos 0
11681166
unit' <- readUnit TIME_UNIT_UNKNOWN buf pos 0
1169-
readTimestampType isAdjustedToUTC unit' buf pos lastFieldId
1167+
readTimestampType isAdjustedToUTC unit' buf pos identifier
11701168
_ -> error $ "UNKNOWN field ID for TimestampType" ++ show identifier
11711169

11721170
readUnit :: TimeUnit -> BS.ByteString -> IORef Int -> Int16 -> IO TimeUnit

tests/Parquet.hs

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,83 @@ allTypesPlain =
6161
(unsafePerformIO (D.readParquet "./tests/data/alltypes_plain.parquet"))
6262
)
6363

64+
allTypesTinyPagesDimensions :: Test
65+
allTypesTinyPagesDimensions =
66+
TestCase
67+
( assertEqual
68+
"allTypesTinyPages last few"
69+
(7300, 13)
70+
( unsafePerformIO
71+
(fmap D.dimensions (D.readParquet "./tests/data/alltypes_tiny_pages.parquet"))
72+
)
73+
)
74+
75+
tinyPagesLast10 :: D.DataFrame
76+
tinyPagesLast10 =
77+
D.fromNamedColumns
78+
[ ("id", D.fromList @Int32 (reverse [6174 .. 6183]))
79+
, ("bool_col", D.fromList @Bool (Prelude.take 10 (cycle [False, True])))
80+
, ("tinyint_col", D.fromList @Int32 [3, 2, 1, 0, 9, 8, 7, 6, 5, 4])
81+
, ("smallint_col", D.fromList @Int32 [3, 2, 1, 0, 9, 8, 7, 6, 5, 4])
82+
, ("int_col", D.fromList @Int32 [3, 2, 1, 0, 9, 8, 7, 6, 5, 4])
83+
, ("bigint_col", D.fromList @Int64 [30, 20, 10, 0, 90, 80, 70, 60, 50, 40])
84+
,
85+
( "float_col"
86+
, D.fromList @Float [3.3, 2.2, 1.1, 0, 9.9, 8.8, 7.7, 6.6, 5.5, 4.4]
87+
)
88+
,
89+
( "date_string_col"
90+
, D.fromList @Text
91+
[ "09/11/10"
92+
, "09/11/10"
93+
, "09/11/10"
94+
, "09/11/10"
95+
, "09/10/10"
96+
, "09/10/10"
97+
, "09/10/10"
98+
, "09/10/10"
99+
, "09/10/10"
100+
, "09/10/10"
101+
]
102+
)
103+
,
104+
( "string_col"
105+
, D.fromList @Text ["3", "2", "1", "0", "9", "8", "7", "6", "5", "4"]
106+
)
107+
,
108+
( "timestamp_col"
109+
, D.fromList @UTCTime
110+
[ UTCTime (fromGregorian 2010 9 10) (secondsToDiffTime 85384)
111+
, UTCTime (fromGregorian 2010 9 10) (secondsToDiffTime 85324)
112+
, UTCTime (fromGregorian 2010 9 10) (secondsToDiffTime 85264)
113+
, UTCTime (fromGregorian 2010 9 10) (secondsToDiffTime 85204)
114+
, UTCTime (fromGregorian 2010 9 9) (secondsToDiffTime 85144)
115+
, UTCTime (fromGregorian 2010 9 9) (secondsToDiffTime 85084)
116+
, UTCTime (fromGregorian 2010 9 9) (secondsToDiffTime 85024)
117+
, UTCTime (fromGregorian 2010 9 9) (secondsToDiffTime 84964)
118+
, UTCTime (fromGregorian 2010 9 9) (secondsToDiffTime 84904)
119+
, UTCTime (fromGregorian 2010 9 9) (secondsToDiffTime 84844)
120+
]
121+
)
122+
, ("year", D.fromList @Int32 (replicate 10 2010))
123+
, ("month", D.fromList @Int32 (replicate 10 9))
124+
]
125+
126+
allTypesTinyPagesLastFew :: Test
127+
allTypesTinyPagesLastFew =
128+
TestCase
129+
( assertEqual
130+
"allTypesTinyPages dimensions"
131+
tinyPagesLast10
132+
( unsafePerformIO
133+
-- Excluding doubles because they are weird to compare.
134+
( fmap
135+
(D.takeLast 10 . D.exclude ["double_col"])
136+
(D.readParquet "./tests/data/alltypes_tiny_pages.parquet")
137+
)
138+
)
139+
)
140+
64141
allTypesPlainSnappy :: Test
65142
allTypesPlainSnappy =
66143
TestCase
@@ -537,7 +614,12 @@ mtCars =
537614
(unsafePerformIO (D.readParquet "./tests/data/mtcars.parquet"))
538615
)
539616

540-
-- Uncomment to run parquet tests.
541-
-- Currently commented because they don't run with github CI
542617
tests :: [Test]
543-
tests = [allTypesPlain, allTypesPlainSnappy, allTypesDictionary, mtCars]
618+
tests =
619+
[ allTypesPlain
620+
, allTypesPlainSnappy
621+
, allTypesDictionary
622+
, mtCars
623+
, allTypesTinyPagesLastFew
624+
, allTypesTinyPagesDimensions
625+
]

0 commit comments

Comments
 (0)