Skip to content

Commit 4b66511

Browse files
committed
feat: Refactor parquet to avoid large functions.
1 parent 1e281c5 commit 4b66511

2 files changed

Lines changed: 138 additions & 166 deletions

File tree

src/DataFrame/IO/Parquet.hs

Lines changed: 136 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import System.Directory (doesDirectoryExist)
4040
import qualified Data.Vector.Unboxed as VU
4141
import System.FilePath ((</>))
4242

43+
-- Options -----------------------------------------------------------------
44+
4345
{- | Options for reading Parquet data.
4446
4547
These options are applied in this order:
@@ -82,6 +84,8 @@ defaultParquetReadOptions =
8284
, rowRange = Nothing
8385
}
8486

87+
-- Public API --------------------------------------------------------------
88+
8589
{- | Read a parquet file from path and load it into a dataframe.
8690
8791
==== __Example__
@@ -296,6 +300,8 @@ readParquetFilesWithOpts opts path = do
296300
dfs <- mapM (readParquetWithOpts optsWithoutRowRange) files
297301
pure (applyRowRange opts (mconcat dfs))
298302

303+
-- Options application -----------------------------------------------------
304+
299305
applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
300306
applyRowRange opts df =
301307
maybe df (`DS.range` df) (rowRange opts)
@@ -314,6 +320,8 @@ applyReadOptions opts =
314320
. applySelectedColumns opts
315321
. applyPredicate opts
316322

323+
-- File and metadata parsing -----------------------------------------------
324+
317325
readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString)
318326
readMetadataFromPath path = do
319327
contents <- BSO.readFile path
@@ -336,6 +344,8 @@ readMetadataSizeFromFooter contents =
336344
in
337345
(size, magicString)
338346

347+
-- Schema navigation -------------------------------------------------------
348+
339349
getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]
340350
getColumnPaths schemaElements =
341351
let nodes = parseAll schemaElements
@@ -370,6 +380,8 @@ findLeafSchema elems path =
370380
go nodes [p] = L.find (\n -> sName n == p) nodes
371381
go nodes (p : ps) = L.find (\n -> sName n == p) nodes >>= \n -> go (sChildren n) ps
372382

383+
-- Page decoding -----------------------------------------------------------
384+
373385
processColumnPages ::
374386
(Int, Int) ->
375387
[Page] ->
@@ -390,193 +402,153 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do
390402
DictionaryPageHeader{..} ->
391403
let countForBools =
392404
if pType == PBOOLEAN
393-
then error "is bool" Just dictionaryPageHeaderNumValues
405+
then Just dictionaryPageHeaderNumValues
394406
else maybeTypeLength
395407
in Just (readDictVals pType (pageBytes dictPage) countForBools)
396408
_ -> Nothing
397409

398410
cols <- forM dataPages $ \page -> do
411+
let bs0 = pageBytes page
399412
case pageTypeHeader (pageHeader page) of
400413
DataPageHeader{..} -> do
401414
let n = fromIntegral dataPageHeaderNumValues
402-
let bs0 = pageBytes page
403-
let (defLvls, repLvls, afterLvls) = readLevelsV1 n maxDef maxRep bs0
404-
let nPresent = length (filter (== maxDef) defLvls)
405-
406-
case dataPageHeaderEncoding of
407-
EPLAIN ->
408-
case pType of
409-
PBOOLEAN ->
410-
let (vals, _) = readNBool nPresent afterLvls
411-
in pure $
412-
if maxRep > 0
413-
then stitchForRepBool maxRep maxDef repLvls defLvls vals
414-
else toMaybeBool maxDef defLvls vals
415-
PINT32
416-
| maxDef == 0
417-
, maxRep == 0 ->
418-
pure $ DI.fromUnboxedVector (readNInt32Vec nPresent afterLvls)
419-
PINT32 ->
420-
let (vals, _) = readNInt32 nPresent afterLvls
421-
in pure $
422-
if maxRep > 0
423-
then stitchForRepInt32 maxRep maxDef repLvls defLvls vals
424-
else toMaybeInt32 maxDef defLvls vals
425-
PINT64
426-
| maxDef == 0
427-
, maxRep == 0 ->
428-
pure $ DI.fromUnboxedVector (readNInt64Vec nPresent afterLvls)
429-
PINT64 ->
430-
let (vals, _) = readNInt64 nPresent afterLvls
431-
in pure $
432-
if maxRep > 0
433-
then stitchForRepInt64 maxRep maxDef repLvls defLvls vals
434-
else toMaybeInt64 maxDef defLvls vals
435-
PINT96 ->
436-
let (vals, _) = readNInt96Times nPresent afterLvls
437-
in pure $
438-
if maxRep > 0
439-
then stitchForRepUTCTime maxRep maxDef repLvls defLvls vals
440-
else toMaybeUTCTime maxDef defLvls vals
441-
PFLOAT
442-
| maxDef == 0
443-
, maxRep == 0 ->
444-
pure $ DI.fromUnboxedVector (readNFloatVec nPresent afterLvls)
445-
PFLOAT ->
446-
let (vals, _) = readNFloat nPresent afterLvls
447-
in pure $
448-
if maxRep > 0
449-
then stitchForRepFloat maxRep maxDef repLvls defLvls vals
450-
else toMaybeFloat maxDef defLvls vals
451-
PDOUBLE
452-
| maxDef == 0
453-
, maxRep == 0 ->
454-
pure $ DI.fromUnboxedVector (readNDoubleVec nPresent afterLvls)
455-
PDOUBLE ->
456-
let (vals, _) = readNDouble nPresent afterLvls
457-
in pure $
458-
if maxRep > 0
459-
then stitchForRepDouble maxRep maxDef repLvls defLvls vals
460-
else toMaybeDouble maxDef defLvls vals
461-
PBYTE_ARRAY ->
462-
let (raws, _) = readNByteArrays nPresent afterLvls
463-
texts = map decodeUtf8 raws
464-
in pure $
465-
if maxRep > 0
466-
then stitchForRepText maxRep maxDef repLvls defLvls texts
467-
else toMaybeText maxDef defLvls texts
468-
PFIXED_LEN_BYTE_ARRAY ->
469-
case maybeTypeLength of
470-
Just len ->
471-
let (raws, _) = splitFixed nPresent (fromIntegral len) afterLvls
472-
texts = map decodeUtf8 raws
473-
in pure $
474-
if maxRep > 0
475-
then stitchForRepText maxRep maxDef repLvls defLvls texts
476-
else toMaybeText maxDef defLvls texts
477-
Nothing -> error "FIXED_LEN_BYTE_ARRAY requires type length"
478-
PARQUET_TYPE_UNKNOWN -> error "Cannot read unknown Parquet type"
479-
ERLE_DICTIONARY -> decodeDictV1 dictValsM maxDef maxRep repLvls defLvls nPresent afterLvls
480-
EPLAIN_DICTIONARY -> decodeDictV1 dictValsM maxDef maxRep repLvls defLvls nPresent afterLvls
481-
other -> error ("Unsupported v1 encoding: " ++ show other)
415+
(defLvls, repLvls, afterLvls) = readLevelsV1 n maxDef maxRep bs0
416+
nPresent = length (filter (== maxDef) defLvls)
417+
decodePageData
418+
dictValsM
419+
(maxDef, maxRep)
420+
pType
421+
maybeTypeLength
422+
dataPageHeaderEncoding
423+
defLvls
424+
repLvls
425+
nPresent
426+
afterLvls
427+
"v1"
482428
DataPageHeaderV2{..} -> do
483429
let n = fromIntegral dataPageHeaderV2NumValues
484-
let bs0 = pageBytes page
485-
let (defLvls, repLvls, afterLvls) =
430+
(defLvls, repLvls, afterLvls) =
486431
readLevelsV2
487432
n
488433
maxDef
489434
maxRep
490435
definitionLevelByteLength
491436
repetitionLevelByteLength
492437
bs0
493-
let nPresent =
494-
if dataPageHeaderV2NumNulls > 0
495-
then fromIntegral (dataPageHeaderV2NumValues - dataPageHeaderV2NumNulls)
496-
else length (filter (== maxDef) defLvls)
497-
498-
case dataPageHeaderV2Encoding of
499-
EPLAIN ->
500-
case pType of
501-
PBOOLEAN ->
502-
let (vals, _) = readNBool nPresent afterLvls
503-
in pure $
504-
if maxRep > 0
505-
then stitchForRepBool maxRep maxDef repLvls defLvls vals
506-
else toMaybeBool maxDef defLvls vals
507-
PINT32
508-
| maxDef == 0
509-
, maxRep == 0 ->
510-
pure $ DI.fromUnboxedVector (readNInt32Vec nPresent afterLvls)
511-
PINT32 ->
512-
let (vals, _) = readNInt32 nPresent afterLvls
513-
in pure $
514-
if maxRep > 0
515-
then stitchForRepInt32 maxRep maxDef repLvls defLvls vals
516-
else toMaybeInt32 maxDef defLvls vals
517-
PINT64
518-
| maxDef == 0
519-
, maxRep == 0 ->
520-
pure $ DI.fromUnboxedVector (readNInt64Vec nPresent afterLvls)
521-
PINT64 ->
522-
let (vals, _) = readNInt64 nPresent afterLvls
523-
in pure $
524-
if maxRep > 0
525-
then stitchForRepInt64 maxRep maxDef repLvls defLvls vals
526-
else toMaybeInt64 maxDef defLvls vals
527-
PINT96 ->
528-
let (vals, _) = readNInt96Times nPresent afterLvls
529-
in pure $
530-
if maxRep > 0
531-
then stitchForRepUTCTime maxRep maxDef repLvls defLvls vals
532-
else toMaybeUTCTime maxDef defLvls vals
533-
PFLOAT
534-
| maxDef == 0
535-
, maxRep == 0 ->
536-
pure $ DI.fromUnboxedVector (readNFloatVec nPresent afterLvls)
537-
PFLOAT ->
538-
let (vals, _) = readNFloat nPresent afterLvls
539-
in pure $
540-
if maxRep > 0
541-
then stitchForRepFloat maxRep maxDef repLvls defLvls vals
542-
else toMaybeFloat maxDef defLvls vals
543-
PDOUBLE
544-
| maxDef == 0
545-
, maxRep == 0 ->
546-
pure $ DI.fromUnboxedVector (readNDoubleVec nPresent afterLvls)
547-
PDOUBLE ->
548-
let (vals, _) = readNDouble nPresent afterLvls
549-
in pure $
550-
if maxRep > 0
551-
then stitchForRepDouble maxRep maxDef repLvls defLvls vals
552-
else toMaybeDouble maxDef defLvls vals
553-
PBYTE_ARRAY ->
554-
let (raws, _) = readNByteArrays nPresent afterLvls
555-
texts = map decodeUtf8 raws
556-
in pure $
557-
if maxRep > 0
558-
then stitchForRepText maxRep maxDef repLvls defLvls texts
559-
else toMaybeText maxDef defLvls texts
560-
PFIXED_LEN_BYTE_ARRAY ->
561-
case maybeTypeLength of
562-
Just len ->
563-
let (raws, _) = splitFixed nPresent (fromIntegral len) afterLvls
564-
texts = map decodeUtf8 raws
565-
in pure $
566-
if maxRep > 0
567-
then stitchForRepText maxRep maxDef repLvls defLvls texts
568-
else toMaybeText maxDef defLvls texts
569-
Nothing -> error "FIXED_LEN_BYTE_ARRAY requires type length"
570-
PARQUET_TYPE_UNKNOWN -> error "Cannot read unknown Parquet type"
571-
ERLE_DICTIONARY -> decodeDictV1 dictValsM maxDef maxRep repLvls defLvls nPresent afterLvls
572-
EPLAIN_DICTIONARY -> decodeDictV1 dictValsM maxDef maxRep repLvls defLvls nPresent afterLvls
573-
other -> error ("Unsupported v2 encoding: " ++ show other)
438+
nPresent
439+
| dataPageHeaderV2NumNulls > 0 =
440+
fromIntegral (dataPageHeaderV2NumValues - dataPageHeaderV2NumNulls)
441+
| otherwise = length (filter (== maxDef) defLvls)
442+
decodePageData
443+
dictValsM
444+
(maxDef, maxRep)
445+
pType
446+
maybeTypeLength
447+
dataPageHeaderV2Encoding
448+
defLvls
449+
repLvls
450+
nPresent
451+
afterLvls
452+
"v2"
453+
574454
-- Cannot happen as these are filtered out by isDataPage above
575455
DictionaryPageHeader{} -> error "processColumnPages: impossible DictionaryPageHeader"
576456
INDEX_PAGE_HEADER -> error "processColumnPages: impossible INDEX_PAGE_HEADER"
577457
PAGE_TYPE_HEADER_UNKNOWN -> error "processColumnPages: impossible PAGE_TYPE_HEADER_UNKNOWN"
578458
pure $ DI.concatManyColumns cols
579459

460+
decodePageData ::
461+
Maybe DictVals ->
462+
(Int, Int) ->
463+
ParquetType ->
464+
Maybe Int32 ->
465+
ParquetEncoding ->
466+
[Int] ->
467+
[Int] ->
468+
Int ->
469+
BSO.ByteString ->
470+
String ->
471+
IO DI.Column
472+
decodePageData dictValsM (maxDef, maxRep) pType maybeTypeLength encoding defLvls repLvls nPresent afterLvls versionLabel =
473+
case encoding of
474+
EPLAIN ->
475+
case pType of
476+
PBOOLEAN ->
477+
let (vals, _) = readNBool nPresent afterLvls
478+
in pure $
479+
if maxRep > 0
480+
then stitchForRepBool maxRep maxDef repLvls defLvls vals
481+
else toMaybeBool maxDef defLvls vals
482+
PINT32
483+
| maxDef == 0
484+
, maxRep == 0 ->
485+
pure $ DI.fromUnboxedVector (readNInt32Vec nPresent afterLvls)
486+
PINT32 ->
487+
let (vals, _) = readNInt32 nPresent afterLvls
488+
in pure $
489+
if maxRep > 0
490+
then stitchForRepInt32 maxRep maxDef repLvls defLvls vals
491+
else toMaybeInt32 maxDef defLvls vals
492+
PINT64
493+
| maxDef == 0
494+
, maxRep == 0 ->
495+
pure $ DI.fromUnboxedVector (readNInt64Vec nPresent afterLvls)
496+
PINT64 ->
497+
let (vals, _) = readNInt64 nPresent afterLvls
498+
in pure $
499+
if maxRep > 0
500+
then stitchForRepInt64 maxRep maxDef repLvls defLvls vals
501+
else toMaybeInt64 maxDef defLvls vals
502+
PINT96 ->
503+
let (vals, _) = readNInt96Times nPresent afterLvls
504+
in pure $
505+
if maxRep > 0
506+
then stitchForRepUTCTime maxRep maxDef repLvls defLvls vals
507+
else toMaybeUTCTime maxDef defLvls vals
508+
PFLOAT
509+
| maxDef == 0
510+
, maxRep == 0 ->
511+
pure $ DI.fromUnboxedVector (readNFloatVec nPresent afterLvls)
512+
PFLOAT ->
513+
let (vals, _) = readNFloat nPresent afterLvls
514+
in pure $
515+
if maxRep > 0
516+
then stitchForRepFloat maxRep maxDef repLvls defLvls vals
517+
else toMaybeFloat maxDef defLvls vals
518+
PDOUBLE
519+
| maxDef == 0
520+
, maxRep == 0 ->
521+
pure $ DI.fromUnboxedVector (readNDoubleVec nPresent afterLvls)
522+
PDOUBLE ->
523+
let (vals, _) = readNDouble nPresent afterLvls
524+
in pure $
525+
if maxRep > 0
526+
then stitchForRepDouble maxRep maxDef repLvls defLvls vals
527+
else toMaybeDouble maxDef defLvls vals
528+
PBYTE_ARRAY ->
529+
let (raws, _) = readNByteArrays nPresent afterLvls
530+
texts = map decodeUtf8Lenient raws
531+
in pure $
532+
if maxRep > 0
533+
then stitchForRepText maxRep maxDef repLvls defLvls texts
534+
else toMaybeText maxDef defLvls texts
535+
PFIXED_LEN_BYTE_ARRAY ->
536+
case maybeTypeLength of
537+
Just len ->
538+
let (raws, _) = splitFixed nPresent (fromIntegral len) afterLvls
539+
texts = map decodeUtf8Lenient raws
540+
in pure $
541+
if maxRep > 0
542+
then stitchForRepText maxRep maxDef repLvls defLvls texts
543+
else toMaybeText maxDef defLvls texts
544+
Nothing -> error "FIXED_LEN_BYTE_ARRAY requires type length"
545+
PARQUET_TYPE_UNKNOWN -> error "Cannot read unknown Parquet type"
546+
ERLE_DICTIONARY -> decodeDictV1 dictValsM maxDef maxRep repLvls defLvls nPresent afterLvls
547+
EPLAIN_DICTIONARY -> decodeDictV1 dictValsM maxDef maxRep repLvls defLvls nPresent afterLvls
548+
other -> error ("Unsupported " ++ versionLabel ++ " encoding: " ++ show other)
549+
550+
-- Logical type conversion -------------------------------------------------
551+
580552
applyLogicalType :: LogicalType -> DI.Column -> DI.Column
581553
applyLogicalType (TimestampType _ unit) col =
582554
fromRight col $

0 commit comments

Comments
 (0)