Skip to content

Commit

Permalink
introducing forkManaged
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Dec 3, 2024
1 parent 984c5bb commit 3d40709
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 10 deletions.
4 changes: 3 additions & 1 deletion Network/QUIC/Client/Reader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import Data.List (intersect)
import Network.Socket (Socket, close, getSocketName)
import qualified Network.Socket.ByteString as NSB

import Network.QUIC.Common
import Network.QUIC.Connection
import Network.QUIC.Connector
import Network.QUIC.Crypto
Expand All @@ -30,6 +31,7 @@ import Network.QUIC.Types
-- | readerClient dies when the socket is closed.
readerClient :: Socket -> Connection -> IO ()
readerClient s0 conn = handleLogUnit logAction $ do
labelMe "readerClient"
wait
loop
where
Expand Down Expand Up @@ -146,5 +148,5 @@ rebind conn microseconds = do
newSock <- natRebinding peersa
oldSock <- setSocket conn newSock
let reader = readerClient newSock conn
forkIO reader >>= addReader conn
forkManaged conn reader
fire conn microseconds $ close oldSock
3 changes: 1 addition & 2 deletions Network/QUIC/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ module Network.QUIC.Client.Run (
migrate,
) where

import Control.Concurrent
import Control.Concurrent.Async
import qualified Control.Exception as E
import qualified Network.Socket as NS
Expand Down Expand Up @@ -58,7 +57,7 @@ run conf client = NS.withSocketsDo $ do
runClient :: ClientConfig -> (Connection -> IO a) -> Bool -> VersionInfo -> IO a
runClient conf client0 isICVN verInfo = do
E.bracket open clse $ \(ConnRes conn myAuthCIDs reader) -> do
forkIO reader >>= addReader conn
forkManaged conn reader
let conf' =
conf
{ ccParameters =
Expand Down
33 changes: 28 additions & 5 deletions Network/QUIC/Connection/Misc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Network.QUIC.Connection.Misc (
delayedAck,
resetDealyedAck,
setMaxPacketSize,
addReader,
forkManaged,
killReaders,
addResource,
freeResources,
Expand All @@ -35,6 +35,8 @@ module Network.QUIC.Connection.Misc (

import Control.Concurrent
import qualified Control.Exception as E
import qualified Data.IntMap as IntMap
import qualified Data.Map.Strict as Map
import Network.Socket (Socket)
import System.Mem.Weak

Expand Down Expand Up @@ -161,12 +163,33 @@ freeResources Connection{..} =
addReader :: Connection -> ThreadId -> IO ()
addReader Connection{..} tid = do
wtid <- mkWeakThreadId tid
atomicModifyIORef'' readers $ \m -> do
m
deRefWeak wtid >>= mapM_ killThread
let n = fromThreadId tid
atomicModifyIORef'' readers $ Map.insert n wtid

delReader :: Connection -> ThreadId -> IO ()
delReader Connection{..} tid = do
wtid <- mkWeakThreadId tid
let n = fromThreadId tid
atomicModifyIORef'' readers $ Map.delete n

forkManaged :: Connection -> IO () -> IO ()
forkManaged conn action = void $ forkIO $ do
E.bracket setup clean $ \_ -> action
where
setup = do
tid <- myThreadId
addReader conn tid
return tid
clean = delReader conn

killReaders :: Connection -> IO ()
killReaders Connection{..} = join $ readIORef readers
killReaders Connection{..} = do
wtids <- readIORef readers
forM_ wtids $ \wtid -> do
mtid <- deRefWeak wtid
case mtid of
Nothing -> return ()
Just tid -> killThread tid

----------------------------------------------------------------

Expand Down
5 changes: 3 additions & 2 deletions Network/QUIC/Connection/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Foreign.Ptr (nullPtr)
import Network.Control (Rate, RxFlow, TxFlow, newRate, newRxFlow, newTxFlow)
import Network.Socket (Cmsg, SockAddr, Socket)
import Network.TLS.QUIC
import System.Mem.Weak (Weak)

import Network.QUIC.Config
import Network.QUIC.Connector
Expand Down Expand Up @@ -202,7 +203,7 @@ data Connection = Connection
-- Manage
, connRecvQ :: RecvQ
, connSocket :: IORef Socket
, readers :: IORef (IO ())
, readers :: IORef (Map Word64 (Weak ThreadId))
, mainThreadId :: ThreadId
, controlRate :: Rate
, -- Info
Expand Down Expand Up @@ -306,7 +307,7 @@ newConnection rl myparams verInfo myAuthCIDs peerAuthCIDs debugLog qLog hooks sr
ecrptBuf <- mallocBytes bufsiz
dcrptBuf <- mallocBytes bufsiz
Connection connstate debugLog qLog hooks send recv recvQ sref
<$> newIORef (return ())
<$> newIORef Map.empty
<*> myThreadId
<*> newRate
-- Info
Expand Down
10 changes: 10 additions & 0 deletions Network/QUIC/Imports.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Network.QUIC.Imports (
#endif
atomicModifyIORef'',
copyBS,
fromThreadId,
) where

import Control.Applicative
Expand All @@ -49,6 +50,10 @@ import Network.ByteOrder
import Network.QUIC.Utils
import Numeric

#if __GLASGOW_HASKELL__ >= 908
import GHC.Conc.Sync (fromThreadId)
#endif

-- | All internal byte sequences.
-- `ByteString` should be used for FFI related stuff.
type Bytes = ShortByteString
Expand All @@ -71,3 +76,8 @@ copyBS dst (PS fptr off len) = withForeignPtr fptr $ \src0 -> do
let src = src0 `plusPtr` off
copyBytes dst src len
return len

#if __GLASGOW_HASKELL__ < 908
fromThreadId :: ThreadId -> Word64

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 8.10)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.0)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.2)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.4)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.6)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 8.10)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 9.0)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 9.2)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 9.4)

Not in scope: type constructor or class ‘ThreadId’

Check failure on line 81 in Network/QUIC/Imports.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 9.6)

Not in scope: type constructor or class ‘ThreadId’
fromThreadId tid = read (drop 9 $ show tid)
#endif

0 comments on commit 3d40709

Please sign in to comment.