Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/DataFrame/IO/Parquet/Binary.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
module DataFrame.IO.Parquet.Binary where

import Control.Monad
import Control.Exception (bracketOnError)
import Data.Bits
import qualified Data.ByteString as BS
import Data.Char
import Data.IORef
import Data.Int
import Data.Word
import qualified Foreign.Ptr as Foreign
import qualified Foreign.Storable as Foreign
import qualified Data.ByteString.Unsafe as BSU
import qualified Foreign.Marshal.Alloc as Foreign

littleEndianWord32 :: BS.ByteString -> Word32
littleEndianWord32 bytes
Expand Down Expand Up @@ -124,7 +129,7 @@ readInt32FromBuffer buf bufferPos = do
readString :: BS.ByteString -> IORef Int -> IO String
readString buf pos = do
nameSize <- readVarIntFromBuffer @Int buf pos
map (chr . fromIntegral) <$> replicateM nameSize (readAndAdvance pos buf)
replicateM nameSize (chr . fromIntegral <$> readAndAdvance pos buf)

readByteStringFromBytes :: BS.ByteString -> (BS.ByteString, BS.ByteString)
readByteStringFromBytes xs =
Expand All @@ -136,10 +141,28 @@ readByteStringFromBytes xs =
readByteString :: BS.ByteString -> IORef Int -> IO BS.ByteString
readByteString buf pos = do
size <- readVarIntFromBuffer @Int buf pos
BS.pack <$> replicateM size (readAndAdvance pos buf)
fillByteStringByWord8 size (\_ -> readAndAdvance pos buf)

readByteString' :: BS.ByteString -> Int64 -> IO BS.ByteString
readByteString' buf size = BS.pack <$> mapM (`readSingleByte` buf) [0 .. (size - 1)]
readByteString' buf size =
fillByteStringByWord8 (fromIntegral size) ((`readSingleByte` buf) . fromIntegral)

-- | Allocate a fixed-size buffer, repeat the action on each index.
-- Fill it into the buffer to get a ByteString.
fillByteStringByWord8 :: Int -> (Int -> IO Word8) -> IO BS.ByteString
fillByteStringByWord8 size getByte = do
bracketOnError
(Foreign.mallocBytes size :: IO (Foreign.Ptr Word8))
Foreign.free
-- ^ ensures p is freed if (IO Word8) throws.
(\p -> do
fill 0 p
BSU.unsafePackCStringFinalizer p size (Foreign.free p)
)
where fill i p
| i >= size = pure ()
| otherwise = getByte i >>= Foreign.pokeByteOff p i >> fill (i+1) p
{-# INLINE fillByteStringByWord8 #-}

readSingleByte :: Int64 -> BS.ByteString -> IO Word8
readSingleByte pos buffer = return $ BS.index buffer (fromIntegral pos)
Expand Down
Loading