X-Git-Url: http://git.cielonegro.org/gitweb.cgi?p=Lucu.git;a=blobdiff_plain;f=Network%2FHTTP%2FLucu%2FRequestReader.hs;h=4c59b3e9f8b1ac5a1524d634d2595a339c80c853;hp=ab8e5c7528f594242b9f0aeea51d4da5d3f770a0;hb=1ead053df6a792edafa9d714c4c038a8a9c3ad16;hpb=e34910f85f459f049b9e6e6b79db9ef95dfccc13 diff --git a/Network/HTTP/Lucu/RequestReader.hs b/Network/HTTP/Lucu/RequestReader.hs index ab8e5c7..4c59b3e 100644 --- a/Network/HTTP/Lucu/RequestReader.hs +++ b/Network/HTTP/Lucu/RequestReader.hs @@ -1,298 +1,384 @@ {-# LANGUAGE - BangPatterns - , UnboxedTuples + CPP + , DoAndIfThenElse + , OverloadedStrings + , RecordWildCards + , ScopedTypeVariables , UnicodeSyntax #-} module Network.HTTP.Lucu.RequestReader ( requestReader ) where -import Control.Concurrent.STM -import Control.Exception -import Control.Monad -import qualified Data.ByteString.Lazy.Char8 as B -import Data.ByteString.Lazy.Char8 (ByteString) -import Data.Maybe +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception hiding (block) +import Control.Monad +import qualified Data.Attoparsec.Lazy as LP +import qualified Data.ByteString as Strict +import qualified Data.ByteString.Lazy as Lazy +import Data.List +import Data.Maybe +import Data.Monoid.Unicode import qualified Data.Sequence as S -import Data.Sequence ((<|)) -import GHC.Conc (unsafeIOToSTM) -import Network.Socket -import Network.HTTP.Lucu.Config -import Network.HTTP.Lucu.Chunk -import Network.HTTP.Lucu.DefaultPage -import Network.HTTP.Lucu.HandleLike -import Network.HTTP.Lucu.Interaction -import Network.HTTP.Lucu.Postprocess -import Network.HTTP.Lucu.Preprocess -import Network.HTTP.Lucu.Request -import Network.HTTP.Lucu.Response -import Network.HTTP.Lucu.Resource.Tree -import Prelude hiding (catch) -import System.IO (stderr) +import qualified Data.Text as T +import Network.HTTP.Lucu.Abortion +import Network.HTTP.Lucu.Config +import Network.HTTP.Lucu.Chunk +import Network.HTTP.Lucu.HandleLike +import Network.HTTP.Lucu.Interaction +import Network.HTTP.Lucu.Preprocess +import Network.HTTP.Lucu.Request +import Network.HTTP.Lucu.Response +import Network.HTTP.Lucu.Resource.Internal +import Network.HTTP.Lucu.Resource.Tree +import Network.HTTP.Lucu.Utils +import Network.Socket +import Prelude.Unicode +import System.IO (hPutStrLn, stderr) -requestReader :: HandleLike h => Config -> ResTree -> [FallbackHandler] -> h -> PortNumber -> SockAddr -> InteractionQueue -> IO () -requestReader !cnf !tree !fbs !h !port !addr !tQueue - = do input <- hGetLBS h - acceptRequest input +data Context h + = Context { + cConfig ∷ !Config + , cResTree ∷ !ResTree + , cFallbacks ∷ ![FallbackHandler] + , cHandle ∷ !h + , cPort ∷ !PortNumber + , cAddr ∷ !SockAddr + , cQueue ∷ !InteractionQueue + } + +data ChunkReceivingState + = Initial + | InChunk !Int -- ^Number of remaining octets in the current + -- chunk. It's always positive. + +requestReader ∷ HandleLike h + ⇒ Config + → ResTree + → [FallbackHandler] + → h + → PortNumber + → SockAddr + → InteractionQueue + → IO () +requestReader cnf tree fbs h port addr tQueue + = do input ← hGetLBS h + acceptRequest (Context cnf tree fbs h port addr tQueue) input `catches` - [ Handler (( \ _ -> return () ) :: IOException -> IO ()) - , Handler ( \ ThreadKilled -> return () ) - , Handler ( \ BlockedIndefinitelyOnSTM -> hPutStrLn stderr "requestReader: blocked indefinitely" ) - , Handler (( \ e -> hPutStrLn stderr (show e) ) :: SomeException -> IO ()) + [ Handler handleAsyncE + , Handler handleOthers ] + `finally` + enqueue' tQueue EndOfInteraction where - acceptRequest :: ByteString -> IO () - acceptRequest input - -- キューに最大パイプライン深度以上のリクエストが溜まってゐる - -- 時は、それが限度以下になるまで待つ。 - = {-# SCC "acceptRequest" #-} - do atomically $ do queue <- readTVar tQueue - when (S.length queue >= cnfMaxPipelineDepth cnf) - retry + handleAsyncE ∷ AsyncException → IO () + handleAsyncE ThreadKilled = return () + handleAsyncE e = dump e - -- リクエストを讀む。パースできない場合は直ちに 400 Bad - -- Request 應答を設定し、それを出力してから切斷するやう - -- に ResponseWriter に通知する。 - case parse requestP input of - (# Success req , input' #) -> acceptParsableRequest req input' - (# IllegalInput, _ #) -> acceptNonparsableRequest BadRequest - (# ReachedEOF , _ #) -> acceptNonparsableRequest BadRequest + handleOthers ∷ SomeException → IO () + handleOthers = dump - acceptNonparsableRequest :: StatusCode -> IO () - acceptNonparsableRequest status - = {-# SCC "acceptNonparsableRequest" #-} - do itr <- newInteraction cnf port addr Nothing Nothing - atomically $ do updateItr itr itrResponse - $ \ res -> res { - resStatus = status - } - writeItr itr itrWillClose True - writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr + dump ∷ Exception e ⇒ e → IO () + dump e + = do hPutStrLn stderr "Lucu: requestReader caught an exception:" + hPutStrLn stderr $ show e - acceptParsableRequest :: Request -> ByteString -> IO () - acceptParsableRequest req input - = {-# SCC "acceptParsableRequest" #-} - do cert <- hGetPeerCert h - itr <- newInteraction cnf port addr cert (Just req) - action - <- atomically $ - do preprocess itr - isErr <- readItr itr itrResponse (isError . resStatus) - if isErr then - acceptSemanticallyInvalidRequest itr input - else - do rsrcM <- unsafeIOToSTM $ findResource tree fbs $ reqURI req - case rsrcM of - Nothing -- Resource が無かった - -> acceptRequestForNonexistentResource itr input +acceptRequest ∷ HandleLike h ⇒ Context h → Lazy.ByteString → IO () +acceptRequest ctx@(Context {..}) input + = do atomically $ + do queue ← readTVar cQueue + when (S.length queue ≥ cnfMaxPipelineDepth cConfig) + -- Too many requests in the pipeline... + retry + if Lazy.null input then + return () + else + case LP.parse request input of + LP.Done input' req → acceptParsableRequest ctx req input' + LP.Fail _ _ _ → acceptNonparsableRequest ctx - Just (rsrcPath, rsrcDef) -- あった - -> acceptRequestForExistentResource itr input rsrcPath rsrcDef - action +acceptNonparsableRequest ∷ HandleLike h ⇒ Context h → IO () +acceptNonparsableRequest ctx@(Context {..}) + = do syi ← mkSyntacticallyInvalidInteraction cConfig + enqueue ctx syi - acceptSemanticallyInvalidRequest :: Interaction -> ByteString -> STM (IO ()) - acceptSemanticallyInvalidRequest itr input - = {-# SCC "acceptSemanticallyInvalidRequest" #-} - do writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr - return $ acceptRequest input +acceptParsableRequest ∷ HandleLike h + ⇒ Context h + → Request + → Lazy.ByteString + → IO () +acceptParsableRequest ctx@(Context {..}) req input + = do let ar = preprocess (cnfServerHost cConfig) cPort req + if isError $ arInitialStatus ar then + acceptSemanticallyInvalidRequest ctx ar input + else + do rsrc ← findResource cResTree cFallbacks $ reqURI $ arRequest ar + case rsrc of + Nothing + → do let ar' = ar { + arInitialStatus = fromStatusCode NotFound + } + acceptSemanticallyInvalidRequest ctx ar' input + Just (path, def) + → acceptRequestForResource ctx ar input path def - acceptRequestForNonexistentResource :: Interaction -> ByteString -> STM (IO ()) - acceptRequestForNonexistentResource itr input - = {-# SCC "acceptRequestForNonexistentResource" #-} - do updateItr itr itrResponse - $ \res -> res { - resStatus = NotFound - } - writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr - return $ acceptRequest input +acceptSemanticallyInvalidRequest ∷ HandleLike h + ⇒ Context h + → AugmentedRequest + → Lazy.ByteString + → IO () +acceptSemanticallyInvalidRequest ctx@(Context {..}) ar input + = do sei ← mkSemanticallyInvalidInteraction cConfig ar + enqueue ctx sei + acceptRequest ctx input - acceptRequestForExistentResource :: Interaction -> ByteString -> [String] -> ResourceDef -> STM (IO ()) - acceptRequestForExistentResource oldItr input rsrcPath rsrcDef - = {-# SCC "acceptRequestForExistentResource" #-} - do let itr = oldItr { itrResourcePath = Just rsrcPath } - requestHasBody <- readItr itr itrRequestHasBody id - enqueue itr - return $ do _ <- runResource rsrcDef itr - if requestHasBody then - observeRequest itr input - else - acceptRequest input +acceptRequestForResource ∷ HandleLike h + ⇒ Context h + → AugmentedRequest + → Lazy.ByteString + → [Strict.ByteString] + → Resource + → IO () +acceptRequestForResource ctx@(Context {..}) ar@(AugmentedRequest {..}) input rsrcPath rsrcDef + = do +#if defined(HAVE_SSL) + cert ← hGetPeerCert cHandle + ni ← mkNormalInteraction cConfig cAddr cert ar rsrcPath +#else + ni ← mkNormalInteraction cConfig cAddr ar rsrcPath +#endif + tid ← spawnRsrc rsrcDef ni + enqueue ctx ni + if reqMustHaveBody arRequest then + waitForReceiveBodyReq ctx ni tid input + else + acceptRequest ctx input - observeRequest :: Interaction -> ByteString -> IO () - observeRequest itr input - = {-# SCC "observeRequest" #-} - do isChunked <- atomically $ readItr itr itrRequestIsChunked id - if isChunked then - observeChunkedRequest itr input - else - observeNonChunkedRequest itr input +waitForReceiveBodyReq ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → ThreadId + → Lazy.ByteString + → IO () +waitForReceiveBodyReq ctx ni@(NI {..}) rsrcTid input + = case fromJust niReqBodyLength of + Chunked + → waitForReceiveChunkedBodyReqForTheFirstTime ctx ni rsrcTid input + Fixed len + → waitForReceiveNonChunkedBodyReqForTheFirstTime ctx ni input len - observeChunkedRequest :: Interaction -> ByteString -> IO () - observeChunkedRequest itr input - = {-# SCC "observeChunkedRequest" #-} - do action - <- atomically $ - do isOver <- readItr itr itrReqChunkIsOver id - if isOver then - return $ acceptRequest input - else - do wantedM <- readItr itr itrReqBodyWanted id - if wantedM == Nothing then - do wasteAll <- readItr itr itrReqBodyWasteAll id - if wasteAll then - -- 破棄要求が來た - do remainingM <- readItr itr itrReqChunkRemaining id - if fmap (> 0) remainingM == Just True then - -- 現在のチャンクをまだ - -- 讀み終へてゐない - do let (_, input') = B.splitAt (fromIntegral - $ fromJust remainingM) input - (# footerR, input'' #) = parse chunkFooterP input' +-- Toooooo long name for a function... +waitForReceiveChunkedBodyReqForTheFirstTime ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → ThreadId + → Lazy.ByteString + → IO () +waitForReceiveChunkedBodyReqForTheFirstTime ctx ni@(NI {..}) rsrcTid input + = join $ + atomically $ + do req ← takeTMVar niReceiveBodyReq + case req of + ReceiveBody wanted + → do putTMVar niSendContinue niExpectedContinue + return $ readCurrentChunk ctx ni rsrcTid wanted input Initial + WasteAll + → do putTMVar niSendContinue False + return $ wasteAllChunks ctx rsrcTid input Initial - if footerR == Success () then - -- チャンクフッタを正常に讀めた - do writeItr itr itrReqChunkRemaining $ Just 0 - - return $ observeChunkedRequest itr input'' - else - return $ chunkWasMalformed itr - else - -- 次のチャンクを讀み始める - seekNextChunk itr input - else - -- 要求がまだ來ない - retry - else - -- 受信要求が來た - do remainingM <- readItr itr itrReqChunkRemaining id - if fmap (> 0) remainingM == Just True then - -- 現在のチャンクをまだ讀み - -- 終へてゐない - do let wanted = fromJust wantedM - remaining = fromJust remainingM - bytesToRead = fromIntegral $ min wanted remaining - (chunk, input') = B.splitAt bytesToRead input - actualReadBytes = fromIntegral $ B.length chunk - newWanted = case wanted - actualReadBytes of - 0 -> Nothing - n -> Just n - newRemaining = Just $ remaining - actualReadBytes - updateStates - = do writeItr itr itrReqChunkRemaining newRemaining - writeItr itr itrReqBodyWanted newWanted - updateItr itr itrReceivedBody $ flip B.append chunk +waitForReceiveChunkedBodyReq ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → ThreadId + → Lazy.ByteString + → ChunkReceivingState + → IO () +waitForReceiveChunkedBodyReq ctx ni@(NI {..}) rsrcTid input st + = do req ← atomically $ takeTMVar niReceiveBodyReq + case req of + ReceiveBody wanted + → readCurrentChunk ctx ni rsrcTid wanted input st + WasteAll + → wasteAllChunks ctx rsrcTid input st - if newRemaining == Just 0 then - -- チャンクフッタを讀む - case parse chunkFooterP input' of - (# Success _, input'' #) - -> do updateStates - return $ observeChunkedRequest itr input'' - (# _, _ #) - -> return $ chunkWasMalformed itr - else - -- まだチャンクの終はりに達してゐない - do updateStates - return $ observeChunkedRequest itr input' - else - -- 次のチャンクを讀み始める - seekNextChunk itr input - action +wasteAllChunks ∷ HandleLike h + ⇒ Context h + → ThreadId + → Lazy.ByteString + → ChunkReceivingState + → IO () +wasteAllChunks ctx rsrcTid = go + where + go ∷ Lazy.ByteString → ChunkReceivingState → IO () + go input Initial + = case LP.parse chunkHeader input of + LP.Done input' chunkLen + | chunkLen ≡ 0 → gotFinalChunk input' + | otherwise → gotChunk input' chunkLen + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "wasteAllChunks: chunkHeader" + go input (InChunk chunkLen) + = gotChunk input chunkLen - seekNextChunk :: Interaction -> ByteString -> STM (IO ()) - seekNextChunk itr input - = {-# SCC "seekNextChunk" #-} - case parse chunkHeaderP input of - -- 最終チャンク (中身が空) - (# Success 0, input' #) - -> case parse chunkTrailerP input' of - (# Success _, input'' #) - -> do writeItr itr itrReqChunkLength $ Nothing - writeItr itr itrReqChunkRemaining $ Nothing - writeItr itr itrReqChunkIsOver True - - return $ acceptRequest input'' - (# _, _ #) - -> return $ chunkWasMalformed itr - -- 最終でないチャンク - (# Success len, input' #) - -> do writeItr itr itrReqChunkLength $ Just len - writeItr itr itrReqChunkRemaining $ Just len - - return $ observeChunkedRequest itr input' - -- チャンクヘッダがをかしい - (# _, _ #) - -> return $ chunkWasMalformed itr + gotChunk ∷ Lazy.ByteString → Int → IO () + gotChunk input chunkLen + = let input' = Lazy.drop (fromIntegral chunkLen) input + in + case LP.parse chunkFooter input' of + LP.Done input'' _ + → go input'' Initial + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "wasteAllChunks: chunkFooter" - chunkWasMalformed :: Interaction -> IO () - chunkWasMalformed itr - = {-# SCC "chunkWasMalformed" #-} - atomically $ do updateItr itr itrResponse - $ \ res -> res { - resStatus = BadRequest - } - writeItr itr itrWillClose True - writeItr itr itrState Done - writeDefaultPage itr - postprocess itr + gotFinalChunk ∷ Lazy.ByteString → IO () + gotFinalChunk input + = case LP.parse chunkTrailer input of + LP.Done input' _ + → acceptRequest ctx input' + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "wasteAllChunks: chunkTrailer" + +readCurrentChunk ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → ThreadId + → Int + → Lazy.ByteString + → ChunkReceivingState + → IO () +readCurrentChunk ctx ni@(NI {..}) rsrcTid wanted = go + where + go ∷ Lazy.ByteString → ChunkReceivingState → IO () + go input Initial + = case LP.parse chunkHeader input of + LP.Done input' chunkLen + | chunkLen ≡ 0 + → gotFinalChunk input' + | otherwise + → gotChunk input' chunkLen + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "readCurrentChunk: chunkHeader" + go input (InChunk chunkLen) + = gotChunk input chunkLen - observeNonChunkedRequest :: Interaction -> ByteString -> IO () - observeNonChunkedRequest itr input - = {-# SCC "observeNonChunkedRequest" #-} - do action - <- atomically $ - do wantedM <- readItr itr itrReqBodyWanted id - if wantedM == Nothing then - do wasteAll <- readItr itr itrReqBodyWasteAll id - if wasteAll then - -- 破棄要求が來た - do remainingM <- readItr itr itrReqChunkRemaining id - - let (_, input') = if remainingM == Nothing then - (B.takeWhile (\ _ -> True) input, B.empty) - else - B.splitAt (fromIntegral $ fromJust remainingM) input + gotChunk ∷ Lazy.ByteString → Int → IO () + gotChunk input chunkLen + = do let bytesToRead = min wanted chunkLen + (block, input') = Lazy.splitAt (fromIntegral bytesToRead) input + block' = Strict.concat $ Lazy.toChunks block + actualReadBytes = Strict.length block' + chunkLen' = chunkLen - actualReadBytes + atomically $ putTMVar niReceivedBody block' + if chunkLen' ≡ 0 then + case LP.parse chunkFooter input' of + LP.Done input'' _ + → waitForReceiveChunkedBodyReq ctx ni rsrcTid input'' Initial + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "readCurrentChunk: chunkFooter" + else + waitForReceiveChunkedBodyReq ctx ni rsrcTid input' $ InChunk chunkLen' - writeItr itr itrReqChunkRemaining $ Just 0 - writeItr itr itrReqChunkIsOver True + gotFinalChunk ∷ Lazy.ByteString → IO () + gotFinalChunk input + = do atomically $ putTMVar niReceivedBody (∅) + case LP.parse chunkTrailer input of + LP.Done input' _ + → acceptRequest ctx input' + LP.Fail _ eCtx e + → chunkWasMalformed rsrcTid eCtx e + "readCurrentChunk: chunkTrailer" - return $ acceptRequest input' - else - -- 要求がまだ来ない - retry - else - -- 受信要求が來た - do remainingM <- readItr itr itrReqChunkRemaining id +chunkWasMalformed ∷ ThreadId → [String] → String → String → IO () +chunkWasMalformed tid eCtx e msg + = let abo = mkAbortion BadRequest [("Connection", "close")] + $ Just + $ "chunkWasMalformed: " + ⊕ T.pack msg + ⊕ ": " + ⊕ T.pack (intercalate ", " eCtx) + ⊕ ": " + ⊕ T.pack e + in + throwTo tid abo - let wanted = fromJust wantedM - bytesToRead = fromIntegral $ maybe wanted (min wanted) remainingM - (chunk, input') = B.splitAt bytesToRead input - newRemaining = fmap - (\ x -> x - (fromIntegral $ B.length chunk)) - remainingM - isOver = B.length chunk < bytesToRead || newRemaining == Just 0 +waitForReceiveNonChunkedBodyReqForTheFirstTime ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → Lazy.ByteString + → Int + → IO () +waitForReceiveNonChunkedBodyReqForTheFirstTime ctx ni@(NI {..}) input bodyLen + = join $ + atomically $ + do req ← takeTMVar niReceiveBodyReq + case req of + ReceiveBody wanted + → do putTMVar niSendContinue niExpectedContinue + return $ readNonChunkedRequestBody ctx ni input bodyLen wanted + WasteAll + → do putTMVar niSendContinue False + return $ wasteNonChunkedRequestBody ctx input bodyLen + +waitForReceiveNonChunkedBodyReq ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → Lazy.ByteString + → Int + → IO () +waitForReceiveNonChunkedBodyReq ctx ni@(NI {..}) input bodyLen + = do req ← atomically $ takeTMVar niReceiveBodyReq + case req of + ReceiveBody wanted + → readNonChunkedRequestBody ctx ni input bodyLen wanted + WasteAll + → wasteNonChunkedRequestBody ctx input bodyLen + +wasteNonChunkedRequestBody ∷ HandleLike h + ⇒ Context h + → Lazy.ByteString + → Int + → IO () +wasteNonChunkedRequestBody ctx input bodyLen + = do let input' = Lazy.drop (fromIntegral bodyLen) input + acceptRequest ctx input' + +readNonChunkedRequestBody ∷ HandleLike h + ⇒ Context h + → NormalInteraction + → Lazy.ByteString + → Int + → Int + → IO () +readNonChunkedRequestBody ctx ni@(NI {..}) input bodyLen wanted + | bodyLen ≡ 0 = gotEndOfRequest + | otherwise = gotBody + where + gotBody ∷ IO () + gotBody + = do let bytesToRead = min wanted bodyLen + (block, input') = Lazy.splitAt (fromIntegral bytesToRead) input + block' = Strict.concat $ Lazy.toChunks block + actualReadBytes = Strict.length block' + bodyLen' = bodyLen - actualReadBytes + atomically $ putTMVar niReceivedBody block' + waitForReceiveNonChunkedBodyReq ctx ni input' bodyLen' - writeItr itr itrReqChunkRemaining newRemaining - writeItr itr itrReqChunkIsOver isOver - writeItr itr itrReqBodyWanted Nothing - writeItr itr itrReceivedBody chunk + gotEndOfRequest ∷ IO () + gotEndOfRequest + = do atomically $ putTMVar niReceivedBody (∅) + acceptRequest ctx input - if isOver then - return $ acceptRequest input' - else - return $ observeNonChunkedRequest itr input' - action +enqueue ∷ (HandleLike h, Interaction i) ⇒ Context h → i → IO () +enqueue (Context {..}) = enqueue' cQueue - enqueue :: Interaction -> STM () - enqueue itr = {-# SCC "enqueue" #-} - do queue <- readTVar tQueue - writeTVar tQueue (itr <| queue) \ No newline at end of file +enqueue' ∷ Interaction i ⇒ InteractionQueue → i → IO () +enqueue' tQueue itr + = atomically $ + do queue ← readTVar tQueue + writeTVar tQueue (toInteraction itr ⊲ queue)