ストリーミング
遅延IO
The problem with using the usual monadic approach to processing data accumulated through IO is that the Prelude tools require us to manifest large amounts of data in memory all at once before we can even begin computation.
mapM :: Monad m => (a -> m b) -> [a] -> m [b]
sequence :: Monad m => [m a] -> m [a]
Reading from the file creates a thunk for the string that forced will then read the file. The problem is then that this method ties the ordering of IO effects to evaluation order which is difficult to reason about in the large.
Consider that normally the monad laws ( in the absence of seq
) guarantee that these computations should be
identical. But using lazy IO we can construct a degenerate case.
import System.IO
main :: IO ()
main = do
withFile "foo.txt" ReadMode $ \fd -> do
contents <- hGetContents fd
print contents
-- "foo\n"
contents <- withFile "foo.txt" ReadMode hGetContents
print contents
-- ""
So what we need is a system to guarantee deterministic resource handling with constant memory usage. To that end both the Conduits and Pipes libraries solved this problem using different ( though largely equivalent ) approaches.
pipes
await :: Monad m => Pipe a y m a
yield :: Monad m => a -> Pipe x a m ()
(>->) :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
runEffect :: Monad m => Effect m r -> m r
toListM :: Monad m => Producer a m () -> m [a]
Pipes is a stream processing library with a strong emphasis on the static semantics of composition. The
simplest usage is to connect "pipe" functions with a (>->)
composition operator, where each component can
await
and yield
to push and pull values along the stream.
import Pipes
import Pipes.Prelude as P
import Control.Monad
import Control.Monad.Identity
a :: Producer Int Identity ()
a = forM_ [1..10] yield
b :: Pipe Int Int Identity ()
b = forever $ do
x <- await
yield (x*2)
yield (x*3)
yield (x*4)
c :: Pipe Int Int Identity ()
c = forever $ do
x <- await
if (x `mod` 2) == 0
then yield x
else return ()
result :: [Int]
result = P.toList $ a >-> b >-> c
For example we could construct a "FizzBuzz" pipe.
{-# LANGUAGE MultiWayIf #-}
import Pipes
import qualified Pipes.Prelude as P
count :: Producer Integer IO ()
count = each [1..100]
fizzbuzz :: Pipe Integer String IO ()
fizzbuzz = do
n <- await
if | n `mod` 15 == 0 -> yield "FizzBuzz"
| n `mod` 5 == 0 -> yield "Fizz"
| n `mod` 3 == 0 -> yield "Buzz"
| otherwise -> return ()
fizzbuzz
main :: IO ()
main = runEffect $ count >-> fizzbuzz >-> P.stdoutLn
To continue with the degenerate case we constructed with Lazy IO, consider than we can now compose and sequence deterministic actions over files without having to worry about effect order.
import Pipes
import Pipes.Prelude as P
import System.IO
readF :: FilePath -> Producer String IO ()
readF file = do
lift $ putStrLn $ "Opened" ++ file
h <- lift $ openFile file ReadMode
fromHandle h
lift $ putStrLn $ "Closed" ++ file
lift $ hClose h
main :: IO ()
main = runEffect $ readF "foo.txt" >-> P.take 3 >-> stdoutLn
This is simple a sampling of the functionality of lens. The documentation for
pipes is extensive and great deal of care has been taken make the library
extremely thorough. pipes
is a shining example of an accessible yet category
theoretic driven design.
See: Pipes Tutorial
安全なpipes
bracket :: MonadSafe m => Base m a -> (a -> Base m b) -> (a -> m c) -> m c
As a motivating example, ZeroMQ is a network messaging library that abstracts over traditional Unix sockets to a variety of network topologies. Most notably it isn't designed to guarantee any sort of transactional guarantees for delivery or recovery in case of errors so it's necessary to design a layer on top of it to provide the desired behavior at the application layer.
In Haskell we'd like to guarantee that if we're polling on a socket we get messages delivered in a timely
fashion or consider the resource in an error state and recover from it. Using pipes-safe
we can manage the
life cycle of lazy IO resources and can safely handle failures, resource termination and finalization
gracefully. In other languages this kind of logic would be smeared across several places, or put in some
global context and prone to introduce errors and subtle race conditions. Using pipes we instead get a nice
tight abstraction designed exactly to fit this kind of use case.
For instance now we can bracket the ZeroMQ socket creation and finalization within the SafeT
monad
transformer which guarantees that after successful message delivery we execute the pipes function as expected,
or on failure we halt the execution and finalize the socket.
import Pipes
import Pipes.Safe
import qualified Pipes.Prelude as P
import System.Timeout (timeout)
import Data.ByteString.Char8
import qualified System.ZMQ as ZMQ
data Opts = Opts
{ _addr :: String -- ^ ZMQ socket address
, _timeout :: Int -- ^ Time in milliseconds for socket timeout
}
recvTimeout :: Opts -> ZMQ.Socket a -> Producer ByteString (SafeT IO) ()
recvTimeout opts sock = do
body <- liftIO $ timeout (_timeout opts) (ZMQ.receive sock [])
case body of
Just msg -> do
liftIO $ ZMQ.send sock msg []
yield msg
recvTimeout opts sock
Nothing -> liftIO $ print "socket timed out"
collect :: ZMQ.Context
-> Opts
-> Producer ByteString (SafeT IO) ()
collect ctx opts = bracket zinit zclose (recvTimeout opts)
where
-- Initialize the socket
zinit = do
liftIO $ print "waiting for messages"
sock <- ZMQ.socket ctx ZMQ.Rep
ZMQ.bind sock (_addr opts)
return sock
-- On timeout or completion guarantee the socket get closed.
zclose sock = do
liftIO $ print "finalizing"
ZMQ.close sock
runZmq :: ZMQ.Context -> Opts -> IO ()
runZmq ctx opts = runSafeT $ runEffect $
collect ctx opts >-> P.take 10 >-> P.print
main :: IO ()
main = do
ctx <- ZMQ.init 1
let opts = Opts {_addr = "tcp://127.0.0.1:8000", _timeout = 1000000 }
runZmq ctx opts
ZMQ.term ctx
conduit
await :: Monad m => ConduitM i o m (Maybe i)
yield :: Monad m => o -> ConduitM i o m ()
($$) :: Monad m => Source m a -> Sink a m b -> m b
(=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m c
type Sink i = ConduitM i Void
type Source m o = ConduitM () o m ()
type Conduit i m o = ConduitM i o m ()
Conduits are conceptually similar though philosophically different approach to the same problem of constant space deterministic resource handling for IO resources.
The first initial difference is that await function now returns a Maybe
which allows different handling of
termination. The composition operators are also split into a connecting operator ($$
) and a fusing
operator (=$
) for combining Sources and Sink and a Conduit and a Sink respectively.
{-# LANGUAGE MultiWayIf #-}
import Data.Conduit
import Control.Monad.Trans
import qualified Data.Conduit.List as CL
source :: Source IO Int
source = CL.sourceList [1..100]
conduit :: Conduit Int IO String
conduit = do
val <- await
liftIO $ print val
case val of
Nothing -> return ()
Just n -> do
if | n `mod` 15 == 0 -> yield "FizzBuzz"
| n `mod` 5 == 0 -> yield "Fizz"
| n `mod` 3 == 0 -> yield "Buzz"
| otherwise -> return ()
conduit
sink :: Sink String IO ()
sink = CL.mapM_ putStrLn
main :: IO ()
main = source $$ conduit =$ sink
See: Conduit Overview