diff --git a/src/DataFrame/IO/Parquet/Binary.hs b/src/DataFrame/IO/Parquet/Binary.hs index 5697d44..4f366df 100644 --- a/src/DataFrame/IO/Parquet/Binary.hs +++ b/src/DataFrame/IO/Parquet/Binary.hs @@ -3,12 +3,17 @@ module DataFrame.IO.Parquet.Binary where import Control.Monad +import Control.Exception (bracketOnError) import Data.Bits import qualified Data.ByteString as BS import Data.Char import Data.IORef import Data.Int import Data.Word +import qualified Foreign.Ptr as Foreign +import qualified Foreign.Storable as Foreign +import qualified Data.ByteString.Unsafe as BSU +import qualified Foreign.Marshal.Alloc as Foreign littleEndianWord32 :: BS.ByteString -> Word32 littleEndianWord32 bytes @@ -124,7 +129,7 @@ readInt32FromBuffer buf bufferPos = do readString :: BS.ByteString -> IORef Int -> IO String readString buf pos = do nameSize <- readVarIntFromBuffer @Int buf pos - map (chr . fromIntegral) <$> replicateM nameSize (readAndAdvance pos buf) + replicateM nameSize (chr . fromIntegral <$> readAndAdvance pos buf) readByteStringFromBytes :: BS.ByteString -> (BS.ByteString, BS.ByteString) readByteStringFromBytes xs = @@ -136,10 +141,28 @@ readByteStringFromBytes xs = readByteString :: BS.ByteString -> IORef Int -> IO BS.ByteString readByteString buf pos = do size <- readVarIntFromBuffer @Int buf pos - BS.pack <$> replicateM size (readAndAdvance pos buf) + fillByteStringByWord8 size (\_ -> readAndAdvance pos buf) readByteString' :: BS.ByteString -> Int64 -> IO BS.ByteString -readByteString' buf size = BS.pack <$> mapM (`readSingleByte` buf) [0 .. (size - 1)] +readByteString' buf size = + fillByteStringByWord8 (fromIntegral size) ((`readSingleByte` buf) . fromIntegral) + +-- | Allocate a fixed-size buffer, repeat the action on each index. +-- Fill it into the buffer to get a ByteString. +fillByteStringByWord8 :: Int -> (Int -> IO Word8) -> IO BS.ByteString +fillByteStringByWord8 size getByte = do + bracketOnError + (Foreign.mallocBytes size :: IO (Foreign.Ptr Word8)) + Foreign.free + -- ^ ensures p is freed if (IO Word8) throws. + (\p -> do + fill 0 p + BSU.unsafePackCStringFinalizer p size (Foreign.free p) + ) + where fill i p + | i >= size = pure () + | otherwise = getByte i >>= Foreign.pokeByteOff p i >> fill (i+1) p +{-# INLINE fillByteStringByWord8 #-} readSingleByte :: Int64 -> BS.ByteString -> IO Word8 readSingleByte pos buffer = return $ BS.index buffer (fromIntegral pos)