X-Git-Url: http://git.cielonegro.org/gitweb.cgi?a=blobdiff_plain;f=Network%2FHTTP%2FLucu%2FResponseWriter.hs;h=02e3938644b2122269d9c98e708c58352b68535d;hb=8225cc52ffe4c3d900ae1f79573089be230b80bd;hp=a3a6af106071d6d8a5f7b357fab3ee4823e057ef;hpb=d05d8c883eaca12ee621975a2b95c5ebdc2357d2;p=Lucu.git diff --git a/Network/HTTP/Lucu/ResponseWriter.hs b/Network/HTTP/Lucu/ResponseWriter.hs index a3a6af1..02e3938 100644 --- a/Network/HTTP/Lucu/ResponseWriter.hs +++ b/Network/HTTP/Lucu/ResponseWriter.hs @@ -1,166 +1,231 @@ +{-# LANGUAGE + DoAndIfThenElse + , RecordWildCards + , ScopedTypeVariables + , UnicodeSyntax + #-} module Network.HTTP.Lucu.ResponseWriter ( responseWriter ) where - -import qualified Data.ByteString.Lazy.Char8 as B -import Control.Concurrent -import Control.Concurrent.STM -import Control.Exception -import Control.Monad +import qualified Blaze.ByteString.Builder.HTTP as BB +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import qualified Data.Ascii as A +import Data.Monoid.Unicode import qualified Data.Sequence as S -import Data.Sequence (ViewR(..)) -import Network.HTTP.Lucu.Config -import Network.HTTP.Lucu.Format -import Network.HTTP.Lucu.HttpVersion -import Network.HTTP.Lucu.Interaction -import Network.HTTP.Lucu.Postprocess -import Network.HTTP.Lucu.Response -import Prelude hiding (catch) -import System.IO +import Data.Sequence (ViewR(..)) +import Network.HTTP.Lucu.Config +import Network.HTTP.Lucu.HandleLike +import Network.HTTP.Lucu.HttpVersion +import Network.HTTP.Lucu.Interaction +import Network.HTTP.Lucu.Postprocess +import Network.HTTP.Lucu.Response +import Prelude.Unicode +import System.IO (hPutStrLn, stderr) +import System.IO.Error +data Context h + = Context { + cConfig ∷ !Config + , cHandle ∷ !h + , cQueue ∷ !InteractionQueue + , cReader ∷ !ThreadId + } -responseWriter :: Config -> Handle -> InteractionQueue -> ThreadId -> IO () +responseWriter ∷ HandleLike h ⇒ Config → h → InteractionQueue → ThreadId → IO () responseWriter cnf h tQueue readerTID - = cnf `seq` h `seq` tQueue `seq` readerTID `seq` - catch awaitSomethingToWrite $ \ exc -> - case exc of - IOException _ -> return () - AsyncException ThreadKilled -> return () - BlockedIndefinitely -> putStrLn "requestWriter: blocked indefinitely" - _ -> print exc + = awaitSomethingToWrite (Context cnf h tQueue readerTID) + `catches` + [ Handler handleIOE + , Handler handleAsyncE + , Handler handleBIOS + , Handler handleOthers + ] where - awaitSomethingToWrite :: IO () - awaitSomethingToWrite - = do action - <- atomically $! - do -- キューが空でなくなるまで待つ - queue <- readTVar tQueue - when (S.null queue) - retry + handleIOE ∷ IOException → IO () + handleIOE e + | isIllegalOperation e + = return () -- EPIPE: should be ignored at all. + | otherwise + = terminate e - -- GettingBody 状態にあり、Continue が期待され - -- てゐて、それがまだ送信前なのであれば、 - -- Continue を送信する。 - case S.viewr queue of - _ :> itr -> do state <- readItr itr itrState id + handleAsyncE ∷ AsyncException → IO () + handleAsyncE ThreadKilled = terminate' + handleAsyncE e = terminate e - if state == GettingBody then - writeContinueIfNecessary itr - else - if state >= DecidingBody then - writeHeaderOrBodyIfNecessary itr - else - retry - action + handleBIOS ∷ BlockedIndefinitelyOnSTM → IO () + handleBIOS = terminate - writeContinueIfNecessary :: Interaction -> STM (IO ()) - writeContinueIfNecessary itr - = itr `seq` - do expectedContinue <- readItr itr itrExpectedContinue id - if expectedContinue then - do wroteContinue <- readItr itr itrWroteContinue id - if wroteContinue then - -- 既に Continue を書込み濟 - retry - else - do reqBodyWanted <- readItr itr itrReqBodyWanted id - if reqBodyWanted /= Nothing then - return $ writeContinue itr - else - retry - else - retry + handleOthers ∷ SomeException → IO () + handleOthers = terminate - writeHeaderOrBodyIfNecessary :: Interaction -> STM (IO ()) - writeHeaderOrBodyIfNecessary itr - -- DecidingBody 以降の状態にあり、まだヘッダを出力する前であ - -- れば、ヘッダを出力する。ヘッダ出力後であり、bodyToSend が - -- 空でなければ、それを出力する。空である時は、もし状態が - -- Done であれば後処理をする。 - = itr `seq` - do wroteHeader <- readItr itr itrWroteHeader id - - if not wroteHeader then - return $ writeHeader itr - else - do bodyToSend <- readItr itr itrBodyToSend id + terminate ∷ Exception e ⇒ e → IO () + terminate e + = do hPutStrLn stderr "requestWriter caught an exception:" + hPutStrLn stderr (show $ toException e) + terminate' - if B.null bodyToSend then - do state <- readItr itr itrState id + terminate' ∷ IO () + terminate' = hClose h - if state == Done then - return $! finalize itr - else - retry - else - return $! writeBodyChunk itr +awaitSomethingToWrite ∷ HandleLike h ⇒ Context h → IO () +awaitSomethingToWrite ctx@(Context {..}) + = join $ + atomically $ + do queue ← readTVar cQueue + case S.viewr queue of + EmptyR → retry + queue' :> itr → do writeTVar cQueue queue' + return $ writeContinueIfNeeded ctx itr + +writeContinueIfNeeded ∷ HandleLike h + ⇒ Context h + → Interaction + → IO () +writeContinueIfNeeded ctx@(Context {..}) itr@(Interaction {..}) + = do isNeeded ← atomically $ readTMVar itrSendContinue + when isNeeded + $ do let cont = Response { + resVersion = HttpVersion 1 1 + , resStatus = Continue + , resHeaders = (∅) + } + cont' ← completeUnconditionalHeaders cConfig cont + hPutBuilder cHandle $ A.toBuilder $ printResponse cont' + hFlush cHandle + writeHeader ctx itr - writeContinue :: Interaction -> IO () - writeContinue itr - = itr `seq` - do let cont = Response { - resVersion = HttpVersion 1 1 - , resStatus = Continue - , resHeaders = [] - } - cont' <- completeUnconditionalHeaders cnf cont - hPutResponse h cont' - hFlush h - atomically $! writeItr itr itrWroteContinue True - awaitSomethingToWrite +writeHeader ∷ HandleLike h + ⇒ Context h + → Interaction + → IO () +writeHeader ctx@(Context {..}) itr@(Interaction {..}) + = do res ← atomically $ + do state ← readTVar itrState + if state ≥ SendingBody then + readTVar itrResponse + else + retry -- Too early to write header fields. + hPutBuilder cHandle $ A.toBuilder $ printResponse res + hFlush cHandle + writeBodyIfNeeded ctx itr - writeHeader :: Interaction -> IO () - writeHeader itr - = itr `seq` - do res <- atomically $! do writeItr itr itrWroteHeader True - readItr itr itrResponse id - hPutResponse h res - hFlush h - awaitSomethingToWrite - - writeBodyChunk :: Interaction -> IO () - writeBodyChunk itr - = itr `seq` - do willDiscardBody <- atomically $! readItr itr itrWillDiscardBody id - willChunkBody <- atomically $! readItr itr itrWillChunkBody id - chunk <- atomically $! do chunk <- readItr itr itrBodyToSend id - writeItr itr itrBodyToSend B.empty - return chunk - unless willDiscardBody - $ do if willChunkBody then - do hPutStr h (fmtHex False 0 $! fromIntegral $! B.length chunk) - hPutStr h "\r\n" - B.hPut h chunk - hPutStr h "\r\n" - else - B.hPut h chunk - hFlush h - awaitSomethingToWrite +writeBodyIfNeeded ∷ HandleLike h + ⇒ Context h + → Interaction + → IO () +writeBodyIfNeeded ctx itr@(Interaction {..}) + = join $ + atomically $ + do willDiscardBody ← readTVar itrWillDiscardBody + if willDiscardBody then + return $ discardBody ctx itr + else + do willChunkBody ← readTVar itrWillChunkBody + if willChunkBody then + return $ writeChunkedBody ctx itr + else + return $ writeNonChunkedBody ctx itr - finishBodyChunk :: Interaction -> IO () - finishBodyChunk itr - = itr `seq` - do willDiscardBody <- atomically $! readItr itr itrWillDiscardBody id - willChunkBody <- atomically $! readItr itr itrWillChunkBody id - when (not willDiscardBody && willChunkBody) - $ hPutStr h "0\r\n\r\n" >> hFlush h +discardBody ∷ HandleLike h + ⇒ Context h + → Interaction + → IO () +discardBody ctx itr@(Interaction {..}) + = join $ + atomically $ + do chunk ← tryTakeTMVar itrBodyToSend + case chunk of + Just _ → return $ discardBody ctx itr + Nothing → do state ← readTVar itrState + if state ≡ Done then + return $ finalize ctx itr + else + retry - finalize :: Interaction -> IO () - finalize itr - = itr `seq` - do finishBodyChunk itr - willClose <- atomically $! - do queue <- readTVar tQueue +writeChunkedBody ∷ HandleLike h + ⇒ Context h + → Interaction + → IO () +writeChunkedBody ctx@(Context {..}) itr@(Interaction {..}) + = join $ + atomically $ + do chunk ← tryTakeTMVar itrBodyToSend + case chunk of + Just b → return $ + do hPutBuilder cHandle $ BB.chunkedTransferEncoding b + hFlush cHandle + writeChunkedBody ctx itr + Nothing → do state ← readTVar itrState + if state ≡ Done then + return $ finalize ctx itr + else + retry - case S.viewr queue of - remaining :> _ -> writeTVar tQueue remaining +writeNonChunkedBody ∷ HandleLike h + ⇒ Context h + → Interaction + → IO () +writeNonChunkedBody ctx@(Context {..}) itr@(Interaction {..}) + = join $ + atomically $ + do chunk ← tryTakeTMVar itrBodyToSend + case chunk of + Just b → return $ + do hPutBuilder cHandle b + hFlush cHandle + writeNonChunkedBody ctx itr + Nothing → do state ← readTVar itrState + if state ≡ Done then + return $ finalize ctx itr + else + retry - readItr itr itrWillClose id - if willClose then - -- reader は恐らく hWaitForInput してゐる最中なので、 - -- スレッドを豫め殺して置かないとをかしくなる。 - do killThread readerTID - hClose h - else - awaitSomethingToWrite +finalize ∷ HandleLike h ⇒ Context h → Interaction → IO () +finalize ctx@(Context {..}) (Interaction {..}) + = join $ + atomically $ + do sentContinue ← takeTMVar itrSendContinue + willDiscardBody ← readTVar itrWillDiscardBody + willChunkBody ← readTVar itrWillChunkBody + willClose ← readTVar itrWillClose + queue ← readTVar cQueue + case S.viewr queue of + queue' :> _ + → writeTVar cQueue queue' + EmptyR + → fail "finalize: cQueue is empty, which should never happen." + return $ + do when (((¬) willDiscardBody) ∧ willChunkBody) + $ do hPutBuilder cHandle BB.chunkedTransferTerminator + hFlush cHandle + if willClose ∨ needToClose sentContinue then + -- The RequestReader is probably blocking on + -- hWaitForInput so we have to kill it before + -- closing the socket. + -- THINKME: Couldn't that somehow be avoided? + do killThread cReader + hClose cHandle + else + awaitSomethingToWrite ctx + where + needToClose ∷ Bool → Bool + needToClose sentContinue + -- We've sent both "HTTP/1.1 100 Continue" and a final + -- response, so nothing prevents our connection from keeping + -- alive. + | sentContinue = False + -- We've got "Expect: 100-continue" but have sent a final + -- response without sending "HTTP/1.1 100 + -- Continue". According to the RFC 2616 (HTTP/1.1), it is + -- undecidable whether the client will send us its + -- (rejected) request body OR start a completely new request + -- in this situation. So the only possible thing to do is to + -- brutally shutdown the connection. + | itrExpectedContinue ≡ Just True = True + -- The client didn't expect 100-continue so we haven't sent + -- one. No need to do anything special. + | otherwise = False