X-Git-Url: http://git.cielonegro.org/gitweb.cgi?p=Lucu.git;a=blobdiff_plain;f=Network%2FHTTP%2FLucu%2FRequestReader.hs;h=4c59b3e9f8b1ac5a1524d634d2595a339c80c853;hp=e3032ce583ea8afdae6c0802462ac283dcea2dbb;hb=1ead053df6a792edafa9d714c4c038a8a9c3ad16;hpb=2d25d34513dc4f6bf62e53e2af2f4a4ef39cc6dc diff --git a/Network/HTTP/Lucu/RequestReader.hs b/Network/HTTP/Lucu/RequestReader.hs index e3032ce..4c59b3e 100644 --- a/Network/HTTP/Lucu/RequestReader.hs +++ b/Network/HTTP/Lucu/RequestReader.hs @@ -1,189 +1,384 @@ +{-# LANGUAGE + CPP + , DoAndIfThenElse + , OverloadedStrings + , RecordWildCards + , ScopedTypeVariables + , UnicodeSyntax + #-} module Network.HTTP.Lucu.RequestReader - ( requestReader -- Config -> ResTree -> Handle -> HostName -> InteractionQueue -> IO () + ( requestReader ) where - -import Control.Concurrent.STM -import Control.Exception -import Control.Monad -import qualified Data.ByteString.Lazy.Char8 as B -import Data.ByteString.Lazy.Char8 (ByteString) -import Data.Map as M -import Data.Map (Map) -import Data.Maybe +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception hiding (block) +import Control.Monad +import qualified Data.Attoparsec.Lazy as LP +import qualified Data.ByteString as Strict +import qualified Data.ByteString.Lazy as Lazy +import Data.List +import Data.Maybe +import Data.Monoid.Unicode import qualified Data.Sequence as S -import Data.Sequence (Seq, (<|), ViewR(..)) -import Network -import Network.HTTP.Lucu.Config -import Network.HTTP.Lucu.DefaultPage -import Network.HTTP.Lucu.HttpVersion -import Network.HTTP.Lucu.Interaction -import Network.HTTP.Lucu.Parser -import Network.HTTP.Lucu.Postprocess -import Network.HTTP.Lucu.Preprocess -import Network.HTTP.Lucu.Request -import Network.HTTP.Lucu.Response -import Network.HTTP.Lucu.Resource -import Prelude hiding (catch) -import System.IO - -import GHC.Conc (unsafeIOToSTM) - -requestReader :: Config -> ResTree -> Handle -> HostName -> InteractionQueue -> IO () -requestReader cnf tree h host tQueue - = do catch (do input <- B.hGetContents h - acceptRequest input) $ \ exc -> - case exc of - IOException _ -> return () - AsyncException ThreadKilled -> return () - BlockedIndefinitely -> putStrLn "requestReader: blocked indefinitely" - _ -> print exc +import qualified Data.Text as T +import Network.HTTP.Lucu.Abortion +import Network.HTTP.Lucu.Config +import Network.HTTP.Lucu.Chunk +import Network.HTTP.Lucu.HandleLike +import Network.HTTP.Lucu.Interaction +import Network.HTTP.Lucu.Preprocess +import Network.HTTP.Lucu.Request +import Network.HTTP.Lucu.Response +import Network.HTTP.Lucu.Resource.Internal +import Network.HTTP.Lucu.Resource.Tree +import Network.HTTP.Lucu.Utils +import Network.Socket +import Prelude.Unicode +import System.IO (hPutStrLn, stderr) + +data Context h + = Context { + cConfig ∷ !Config + , cResTree ∷ !ResTree + , cFallbacks ∷ ![FallbackHandler] + , cHandle ∷ !h + , cPort ∷ !PortNumber + , cAddr ∷ !SockAddr + , cQueue ∷ !InteractionQueue + } + +data ChunkReceivingState + = Initial + | InChunk !Int -- ^Number of remaining octets in the current + -- chunk. It's always positive. + +requestReader ∷ HandleLike h + ⇒ Config + → ResTree + → [FallbackHandler] + → h + → PortNumber + → SockAddr + → InteractionQueue + → IO () +requestReader cnf tree fbs h port addr tQueue + = do input ← hGetLBS h + acceptRequest (Context cnf tree fbs h port addr tQueue) input + `catches` + [ Handler handleAsyncE + , Handler handleOthers + ] + `finally` + enqueue' tQueue EndOfInteraction + where + handleAsyncE ∷ AsyncException → IO () + handleAsyncE ThreadKilled = return () + handleAsyncE e = dump e + + handleOthers ∷ SomeException → IO () + handleOthers = dump + + dump ∷ Exception e ⇒ e → IO () + dump e + = do hPutStrLn stderr "Lucu: requestReader caught an exception:" + hPutStrLn stderr $ show e + +acceptRequest ∷ HandleLike h ⇒ Context h → Lazy.ByteString → IO () +acceptRequest ctx@(Context {..}) input + = do atomically $ + do queue ← readTVar cQueue + when (S.length queue ≥ cnfMaxPipelineDepth cConfig) + -- Too many requests in the pipeline... + retry + if Lazy.null input then + return () + else + case LP.parse request input of + LP.Done input' req → acceptParsableRequest ctx req input' + LP.Fail _ _ _ → acceptNonparsableRequest ctx + +acceptNonparsableRequest ∷ HandleLike h ⇒ Context h → IO () +acceptNonparsableRequest ctx@(Context {..}) + = do syi ← mkSyntacticallyInvalidInteraction cConfig + enqueue ctx syi + +acceptParsableRequest ∷ HandleLike h + ⇒ Context h + → Request + → Lazy.ByteString + → IO () +acceptParsableRequest ctx@(Context {..}) req input + = do let ar = preprocess (cnfServerHost cConfig) cPort req + if isError $ arInitialStatus ar then + acceptSemanticallyInvalidRequest ctx ar input + else + do rsrc ← findResource cResTree cFallbacks $ reqURI $ arRequest ar + case rsrc of + Nothing + → do let ar' = ar { + arInitialStatus = fromStatusCode NotFound + } + acceptSemanticallyInvalidRequest ctx ar' input + Just (path, def) + → acceptRequestForResource ctx ar input path def + +acceptSemanticallyInvalidRequest ∷ HandleLike h + ⇒ Context h + → AugmentedRequest + → Lazy.ByteString + → IO () +acceptSemanticallyInvalidRequest ctx@(Context {..}) ar input + = do sei ← mkSemanticallyInvalidInteraction cConfig ar + enqueue ctx sei + acceptRequest ctx input + +acceptRequestForResource ∷ HandleLike h + ⇒ Context h + → AugmentedRequest + → Lazy.ByteString + → [Strict.ByteString] + → Resource + → IO () +acceptRequestForResource ctx@(Context {..}) ar@(AugmentedRequest {..}) input rsrcPath rsrcDef + = do +#if defined(HAVE_SSL) + cert ← hGetPeerCert cHandle + ni ← mkNormalInteraction cConfig cAddr cert ar rsrcPath +#else + ni ← mkNormalInteraction cConfig cAddr ar rsrcPath +#endif + tid ← spawnRsrc rsrcDef ni + enqueue ctx ni + if reqMustHaveBody arRequest then + waitForReceiveBodyReq ctx ni tid input + else + acceptRequest ctx input + +waitForReceiveBodyReq ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → ThreadId + → Lazy.ByteString + → IO () +waitForReceiveBodyReq ctx ni@(NI {..}) rsrcTid input + = case fromJust niReqBodyLength of + Chunked + → waitForReceiveChunkedBodyReqForTheFirstTime ctx ni rsrcTid input + Fixed len + → waitForReceiveNonChunkedBodyReqForTheFirstTime ctx ni input len + +-- Toooooo long name for a function... +waitForReceiveChunkedBodyReqForTheFirstTime ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → ThreadId + → Lazy.ByteString + → IO () +waitForReceiveChunkedBodyReqForTheFirstTime ctx ni@(NI {..}) rsrcTid input + = join $ + atomically $ + do req ← takeTMVar niReceiveBodyReq + case req of + ReceiveBody wanted + → do putTMVar niSendContinue niExpectedContinue + return $ readCurrentChunk ctx ni rsrcTid wanted input Initial + WasteAll + → do putTMVar niSendContinue False + return $ wasteAllChunks ctx rsrcTid input Initial + +waitForReceiveChunkedBodyReq ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → ThreadId + → Lazy.ByteString + → ChunkReceivingState + → IO () +waitForReceiveChunkedBodyReq ctx ni@(NI {..}) rsrcTid input st + = do req ← atomically $ takeTMVar niReceiveBodyReq + case req of + ReceiveBody wanted + → readCurrentChunk ctx ni rsrcTid wanted input st + WasteAll + → wasteAllChunks ctx rsrcTid input st + +wasteAllChunks ∷ HandleLike h + ⇒ Context h + → ThreadId + → Lazy.ByteString + → ChunkReceivingState + → IO () +wasteAllChunks ctx rsrcTid = go where - acceptRequest :: ByteString -> IO () - acceptRequest input - -- キューに最大パイプライン深度以上のリクエストが溜まってゐる - -- 時は、それが限度以下になるまで待つ。 - = do atomically $ do queue <- readTVar tQueue - when (S.length queue >= cnfMaxPipelineDepth cnf) - retry - - -- リクエストを讀む。パースできない場合は直ちに 400 Bad - -- Request 應答を設定し、それを出力してから切斷するやう - -- に ResponseWriter に通知する。 - case parse requestP input of - (Success req , input') -> acceptParsableRequest req input' - (IllegalInput, _ ) -> acceptNonparsableRequest BadRequest - (ReachedEOF , _ ) -> acceptNonparsableRequest BadRequest - - acceptNonparsableRequest :: StatusCode -> IO () - acceptNonparsableRequest status - = do itr <- newInteraction cnf host Nothing - let res = Response { - resVersion = HttpVersion 1 1 - , resStatus = status - , resHeaders = [] - - } - atomically $ do writeItr itr itrResponse $ Just res - writeItr itr itrWillClose True - writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr - - acceptParsableRequest :: Request -> ByteString -> IO () - acceptParsableRequest req input - = do itr <- newInteraction cnf host (Just req) - action - <- atomically $ - do preprocess itr - isErr <- readItrF itr itrResponse (isError . resStatus) - if isErr == Just True then - acceptSemanticallyInvalidRequest itr input - else - case findResource tree $ (reqURI . fromJust . itrRequest) itr of - Nothing -- Resource が無かった - -> acceptRequestForNonexistentResource itr input - - Just rsrcDef -- あった - -> acceptRequestForExistentResource itr input rsrcDef - action - - acceptSemanticallyInvalidRequest :: Interaction -> ByteString -> STM (IO ()) - acceptSemanticallyInvalidRequest itr input - = do writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr - return $ acceptRequest input - - acceptRequestForNonexistentResource :: Interaction -> ByteString -> STM (IO ()) - acceptRequestForNonexistentResource itr input - = do let res = Response { - resVersion = HttpVersion 1 1 - , resStatus = NotFound - , resHeaders = [] - } - writeItr itr itrResponse $ Just res - writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr - return $ acceptRequest input - - acceptRequestForExistentResource :: Interaction -> ByteString -> ResourceDef -> STM (IO ()) - acceptRequestForExistentResource itr input rsrcDef - = do requestHasBody <- readItr itr itrRequestHasBody id - writeItr itr itrState (if requestHasBody - then ExaminingHeader - else DecidingHeader) - enqueue itr - return $ do runResource rsrcDef itr - if requestHasBody then - observeRequest itr input - else - acceptRequest input - - observeRequest :: Interaction -> ByteString -> IO () - observeRequest itr input - = do isChunked <- atomically $ readItr itr itrRequestIsChunked id - if isChunked then - observeChunkedRequest itr input - else - observeNonChunkedRequest itr input - - observeChunkedRequest :: Interaction -> ByteString -> IO () - observeChunkedRequest itr input - = fail "FIXME: not implemented" - - observeNonChunkedRequest :: Interaction -> ByteString -> IO () - observeNonChunkedRequest itr input - = do action - <- atomically $ - do wantedM <- readItr itr itrReqBodyWanted id - if wantedM == Nothing then - do wasteAll <- readItr itr itrReqBodyWasteAll id - if wasteAll then - -- 破棄要求が來た - do remainingM <- readItr itr itrReqChunkRemaining id - - let (_, input') = if remainingM == Nothing then - (B.takeWhile (\ _ -> True) input, B.empty) - else - B.splitAt (fromIntegral $ fromJust remainingM) input - - writeItr itr itrReqChunkRemaining $ Just 0 - writeItr itr itrReqChunkIsOver True - writeItr itr itrReqBodyWanted Nothing - writeItr itr itrReceivedBody B.empty - - return $ acceptRequest input' - else - -- 要求がまだ来ない - retry - else - -- 受信要求が來た - do remainingM <- readItr itr itrReqChunkRemaining id - - let wanted = fromJust wantedM - expectedChunkLen = fromIntegral $ maybe wanted (min wanted) remainingM - (chunk, input') = B.splitAt expectedChunkLen input - newRemaining = fmap - (\ x -> x - (fromIntegral $ B.length chunk)) - remainingM - isOver = B.length chunk < expectedChunkLen - - writeItr itr itrReqChunkRemaining newRemaining - writeItr itr itrReqChunkIsOver isOver - writeItr itr itrReqBodyWanted Nothing - writeItr itr itrReceivedBody chunk - - if isOver then - return $ acceptRequest input' - else - return $ observeNonChunkedRequest itr input' - action - - enqueue :: Interaction -> STM () - enqueue itr = do queue <- readTVar tQueue - writeTVar tQueue (itr <| queue) \ No newline at end of file + go ∷ Lazy.ByteString → ChunkReceivingState → IO () + go input Initial + = case LP.parse chunkHeader input of + LP.Done input' chunkLen + | chunkLen ≡ 0 → gotFinalChunk input' + | otherwise → gotChunk input' chunkLen + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "wasteAllChunks: chunkHeader" + go input (InChunk chunkLen) + = gotChunk input chunkLen + + gotChunk ∷ Lazy.ByteString → Int → IO () + gotChunk input chunkLen + = let input' = Lazy.drop (fromIntegral chunkLen) input + in + case LP.parse chunkFooter input' of + LP.Done input'' _ + → go input'' Initial + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "wasteAllChunks: chunkFooter" + + gotFinalChunk ∷ Lazy.ByteString → IO () + gotFinalChunk input + = case LP.parse chunkTrailer input of + LP.Done input' _ + → acceptRequest ctx input' + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "wasteAllChunks: chunkTrailer" + +readCurrentChunk ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → ThreadId + → Int + → Lazy.ByteString + → ChunkReceivingState + → IO () +readCurrentChunk ctx ni@(NI {..}) rsrcTid wanted = go + where + go ∷ Lazy.ByteString → ChunkReceivingState → IO () + go input Initial + = case LP.parse chunkHeader input of + LP.Done input' chunkLen + | chunkLen ≡ 0 + → gotFinalChunk input' + | otherwise + → gotChunk input' chunkLen + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "readCurrentChunk: chunkHeader" + go input (InChunk chunkLen) + = gotChunk input chunkLen + + gotChunk ∷ Lazy.ByteString → Int → IO () + gotChunk input chunkLen + = do let bytesToRead = min wanted chunkLen + (block, input') = Lazy.splitAt (fromIntegral bytesToRead) input + block' = Strict.concat $ Lazy.toChunks block + actualReadBytes = Strict.length block' + chunkLen' = chunkLen - actualReadBytes + atomically $ putTMVar niReceivedBody block' + if chunkLen' ≡ 0 then + case LP.parse chunkFooter input' of + LP.Done input'' _ + → waitForReceiveChunkedBodyReq ctx ni rsrcTid input'' Initial + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "readCurrentChunk: chunkFooter" + else + waitForReceiveChunkedBodyReq ctx ni rsrcTid input' $ InChunk chunkLen' + + gotFinalChunk ∷ Lazy.ByteString → IO () + gotFinalChunk input + = do atomically $ putTMVar niReceivedBody (∅) + case LP.parse chunkTrailer input of + LP.Done input' _ + → acceptRequest ctx input' + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "readCurrentChunk: chunkTrailer" + +chunkWasMalformed ∷ ThreadId → [String] → String → String → IO () +chunkWasMalformed tid eCtx e msg + = let abo = mkAbortion BadRequest [("Connection", "close")] + $ Just + $ "chunkWasMalformed: " + ⊕ T.pack msg + ⊕ ": " + ⊕ T.pack (intercalate ", " eCtx) + ⊕ ": " + ⊕ T.pack e + in + throwTo tid abo + +waitForReceiveNonChunkedBodyReqForTheFirstTime ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → Lazy.ByteString + → Int + → IO () +waitForReceiveNonChunkedBodyReqForTheFirstTime ctx ni@(NI {..}) input bodyLen + = join $ + atomically $ + do req ← takeTMVar niReceiveBodyReq + case req of + ReceiveBody wanted + → do putTMVar niSendContinue niExpectedContinue + return $ readNonChunkedRequestBody ctx ni input bodyLen wanted + WasteAll + → do putTMVar niSendContinue False + return $ wasteNonChunkedRequestBody ctx input bodyLen + +waitForReceiveNonChunkedBodyReq ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → Lazy.ByteString + → Int + → IO () +waitForReceiveNonChunkedBodyReq ctx ni@(NI {..}) input bodyLen + = do req ← atomically $ takeTMVar niReceiveBodyReq + case req of + ReceiveBody wanted + → readNonChunkedRequestBody ctx ni input bodyLen wanted + WasteAll + → wasteNonChunkedRequestBody ctx input bodyLen + +wasteNonChunkedRequestBody ∷ HandleLike h + ⇒ Context h + → Lazy.ByteString + → Int + → IO () +wasteNonChunkedRequestBody ctx input bodyLen + = do let input' = Lazy.drop (fromIntegral bodyLen) input + acceptRequest ctx input' + +readNonChunkedRequestBody ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → Lazy.ByteString + → Int + → Int + → IO () +readNonChunkedRequestBody ctx ni@(NI {..}) input bodyLen wanted + | bodyLen ≡ 0 = gotEndOfRequest + | otherwise = gotBody + where + gotBody ∷ IO () + gotBody + = do let bytesToRead = min wanted bodyLen + (block, input') = Lazy.splitAt (fromIntegral bytesToRead) input + block' = Strict.concat $ Lazy.toChunks block + actualReadBytes = Strict.length block' + bodyLen' = bodyLen - actualReadBytes + atomically $ putTMVar niReceivedBody block' + waitForReceiveNonChunkedBodyReq ctx ni input' bodyLen' + + gotEndOfRequest ∷ IO () + gotEndOfRequest + = do atomically $ putTMVar niReceivedBody (∅) + acceptRequest ctx input + +enqueue ∷ (HandleLike h, Interaction i) ⇒ Context h → i → IO () +enqueue (Context {..}) = enqueue' cQueue + +enqueue' ∷ Interaction i ⇒ InteractionQueue → i → IO () +enqueue' tQueue itr + = atomically $ + do queue ← readTVar tQueue + writeTVar tQueue (toInteraction itr ⊲ queue)