Ответ 1
Итак, я нашел ответ, который работает для меня, и это может сработать для кого-то другого.
Оказывается, вы можете на самом деле обманывать себя с внутренностями Warp, чтобы сделать это, но тогда у вас осталась базовая версия Warp, и если вам нужны такие вещи, как ведение журнала и т.д., вам понадобится для добавления на него других пакетов.
Также обратите внимание, что так называемые "полузакрытые" соединения (когда клиент закрывает свой конец отправки, но все еще ждет данных) будут обнаружены как закрытые, прерывая ваш расчет. Я не знаю ни одного HTTP-клиента, который занимается полузакрытыми соединениями, а просто что-то, о чем нужно знать.
Во всяком случае, то, что я сделал, сначала скопировал функции runSettings
и runSettingsSocket
, выставленные Network.Wai.Handler.Warp
и Network.Wai.Handler.Warp.Internal
, и произвел версии, которые вызвали функцию, которую я поставил вместо WarpI.socketConnection
, так что у меня есть подпись
runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
-> Wai.Application -> IO ()
Это потребовало копирования нескольких вспомогательных методов, таких как setSocketCloseOnExec
и windowsThreadBlockHack
. Подпись double- IO
может выглядеть странно, но это то, что вы хотите - внешний IO
запускается в основном потоке (который вызывает accept
), а внутренний IO
запускается в потоке для каждого соединения который разворачивается после возврата accept
. Оригинальная функция Warp
runSettings
эквивалентна:
\set -> runSettings' set (WarpI.socketConnection >=> return . return)
Тогда я сделал:
data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared
runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
runSettings' set (WarpI.socketConnection >=> return . wrapConn)
where
-- Fork a 'monitor' thread that does nothing but attempt to
-- perform a read from conn in a loop 1/sec, and wrap the receive
-- methods on conn so that they first consume from the stuff read
-- by the monitoring thread. If the monitoring thread sees
-- end-of-file (signaled by an empty string read), raise
-- ClientDisappered on the per-connection thread.
wrapConn conn = do
tid <- myThreadId
nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
semaphore <- newMVar ()
readerCount <- newIORef (0 :: Int)
monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
return $ conn {
WarpI.connClose = throwTo monitorThread ClientDisappeared
>> WarpI.connClose conn
, WarpI.connRecv = newRecv nxtBstr semaphore readerCount
, WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
}
where
newRecv :: MVar ByteString -> MVar () -> IORef Int
-> IO ByteString
newRecv nxtBstr sem readerCount =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
case w of
Just w' -> return w'
Nothing -> WarpI.connRecv conn
)
newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
-> WarpI.Buffer -> WarpI.BufSize -> IO Bool
newRecvBuf nxtBstr sem readerCount buf bufSize =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do
(fulfilled, buf', bufSize') <-
if bufSize == 0 then return (False, buf, bufSize)
else
do w <- tryTakeMVar nxtBstr
case w of
Nothing -> return (False, buf, bufSize)
Just w' -> do
let wlen = B.length w'
if wlen > bufSize
then do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') bufSize
putMVar nxtBstr (B.drop bufSize w')
return (True, buf, 0)
else do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') wlen
return (wlen == bufSize, plusPtr buf wlen,
bufSize - wlen)
if fulfilled then return True
else WarpI.connRecvBuf conn buf' bufSize'
)
dropClientDisappeared :: ClientDisappeared -> IO ()
dropClientDisappeared _ = return ()
monitor tid nxtBstr sem st =
catch (monitor' tid nxtBstr sem st) dropClientDisappeared
monitor' tid nxtBstr sem st = do
(hitEOF, readerCount) <- withMVar sem $ \_ -> do
w <- tryTakeMVar nxtBstr
case w of
-- No one picked up our bytestring from last time
Just w' -> putMVar nxtBstr w' >> return (False, 0)
Nothing -> do
w <- WarpI.connRecv conn
putMVar nxtBstr w
readerCount <- readIORef st
return (B.null w, readerCount)
if hitEOF && (readerCount == 0)
-- Don't signal if main thread is also trying to read -
-- in that case, main thread will see EOF directly
then throwTo tid ClientDisappeared
else do threadDelay oneSecondInMicros
monitor' tid nxtBstr sem st
oneSecondInMicros = 1000000