Skip to content

Commit

Permalink
Merge pull request #15 from reflex-frp/cn-#7-encode-eof-in-the-API
Browse files Browse the repository at this point in the history
Encode EOF in the api
  • Loading branch information
3noch authored May 20, 2020
2 parents e2b6477 + 892bedb commit b38205e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
29 changes: 22 additions & 7 deletions src/Reflex/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Reflex.Process
, createRedirectedProcess
, Process(..)
, ProcessConfig(..)
, SendPipe (..)
) where

import Control.Concurrent (forkIO, killThread, ThreadId)
Expand All @@ -32,9 +33,19 @@ import System.Process hiding (createProcess)

import Reflex

data SendPipe i
= SendPipe_Message i
-- ^ A message that's sent to the underlying process
| SendPipe_EOF
-- ^ Send an EOF to the underlying process
| SendPipe_LastMessage i
-- ^ Send the last message (an EOF will be added). This option is offered for
-- convenience, because it has the same effect of sending a Message and then
-- the EOF signal

-- | The inputs to a process
data ProcessConfig t i = ProcessConfig
{ _processConfig_stdin :: Event t i
{ _processConfig_stdin :: Event t (SendPipe i)
-- ^ "stdin" input to be fed to the process
, _processConfig_signal :: Event t P.Signal
-- ^ Signals to send to the process
Expand Down Expand Up @@ -69,7 +80,7 @@ data Process t o e = Process
-- to those pipes.
createRedirectedProcess
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
=> (Handle -> IO (SendPipe i -> IO ()))
-- ^ Builder for the standard input handler
-> (Handle -> (o -> IO ()) -> IO (IO ()))
-- ^ Builder for the standard output handler
Expand All @@ -87,7 +98,7 @@ createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (Proces
po@(mi, mout, merr, ph) <- liftIO $ createProcessFunction redirectedProc
case (mi, mout, merr) of
(Just hIn, Just hOut, Just hErr) -> do
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
writeInput :: SendPipe i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
Expand Down Expand Up @@ -140,22 +151,26 @@ createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (Proces
-- to those pipes.
createProcessBufferingInput
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> IO ByteString -- ^ Read a value from the input stream buffer
-> (ByteString -> IO ()) -- ^ Write a value to the input stream buffer
=> IO (SendPipe ByteString) -- ^ Read a value from the input stream buffer
-> (SendPipe ByteString -> IO ()) -- ^ Write a value to the input stream buffer
-> CreateProcess -- ^ The process specification
-> ProcessConfig t ByteString -- ^ The process configuration in terms of Reflex
-> m (Process t ByteString ByteString)
createProcessBufferingInput readBuffer writeBuffer p procConfig = do
let
input :: Handle -> IO (SendPipe ByteString -> IO ())
input h = do
H.hSetBuffering h H.NoBuffering
void $ liftIO $ forkIO $ fix $ \loop -> do
newMessage <- readBuffer
newMessage :: SendPipe ByteString <- readBuffer
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
when writable $ do
BS.hPutStr h newMessage
case newMessage of
SendPipe_Message m -> BS.hPutStr h m
SendPipe_LastMessage m -> BS.hPutStr h m >> H.hClose h
SendPipe_EOF -> H.hClose h
loop
return writeBuffer

Expand Down
8 changes: 4 additions & 4 deletions test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ checkFRPBlocking downstreamProcess exitMVar = do
timer <- tickLossyFromPostBuildTime 1
void $ performEvent $ (liftIO $ tryPutMVar exitMVar Exit) <$ timer

(ev, evTrigger :: ByteString -> IO ()) <- newTriggerEvent
(ev, evTrigger :: SendPipe ByteString -> IO ()) <- newTriggerEvent
processOutput <- createProcess downstreamProcess (ProcessConfig ev never createProcessWithTermination)
liftIO $ evTrigger $ veryLongByteString 'a'
liftIO $ evTrigger $ veryLongByteString 'b'
liftIO $ evTrigger $ veryLongByteString 'c'
liftIO $ evTrigger $ SendPipe_Message $ veryLongByteString 'a'
liftIO $ evTrigger $ SendPipe_Message $ veryLongByteString 'b'
liftIO $ evTrigger $ SendPipe_LastMessage $ veryLongByteString 'c'

void $ performEvent $ liftIO . BS.putStrLn <$> (_process_stdout processOutput)
pure never)
Expand Down

0 comments on commit b38205e

Please sign in to comment.