X-Git-Url: http://git.cielonegro.org/gitweb.cgi?a=blobdiff_plain;f=Network%2FHTTP%2FLucu%2FRequestReader.hs;h=49317a99ea8343270f222b7061c8bdd8c00cb322;hb=ea2b7838f1b3d9d4923a220a601be2e04cc559d7;hp=b0c22be45d93ab9e36612f7d635b4b10df955492;hpb=e624f0db8c4610b36da9e4463a656e0cb8a104dd;p=Lucu.git diff --git a/Network/HTTP/Lucu/RequestReader.hs b/Network/HTTP/Lucu/RequestReader.hs index b0c22be..49317a9 100644 --- a/Network/HTTP/Lucu/RequestReader.hs +++ b/Network/HTTP/Lucu/RequestReader.hs @@ -1,217 +1,336 @@ +{-# LANGUAGE + DoAndIfThenElse + , RecordWildCards + , ScopedTypeVariables + , UnicodeSyntax + #-} module Network.HTTP.Lucu.RequestReader - ( requestReader -- Config -> ResTree -> Handle -> HostName -> InteractionQueue -> IO () + ( requestReader ) where - -import Control.Concurrent.STM -import Control.Exception -import Control.Monad -import qualified Data.ByteString.Lazy.Char8 as B -import Data.ByteString.Lazy.Char8 (ByteString) -import Data.Map as M -import Data.Map (Map) -import Data.Maybe +import Control.Applicative +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import qualified Data.Attoparsec.Lazy as LP +import qualified Data.ByteString.Lazy as Lazy +import Data.Maybe import qualified Data.Sequence as S -import Data.Sequence (Seq, (<|), ViewR(..)) -import Network -import Network.HTTP.Lucu.Config -import Network.HTTP.Lucu.DefaultPage -import Network.HTTP.Lucu.HttpVersion -import Network.HTTP.Lucu.Interaction -import Network.HTTP.Lucu.Parser -import Network.HTTP.Lucu.Postprocess -import Network.HTTP.Lucu.Preprocess -import Network.HTTP.Lucu.Request -import Network.HTTP.Lucu.Response -import Network.HTTP.Lucu.Resource -import Prelude hiding (catch) -import System.IO - -import GHC.Conc (unsafeIOToSTM) - -requestReader :: Config -> ResTree -> Handle -> HostName -> InteractionQueue -> IO () -requestReader cnf tree h host tQueue - = do catch (acceptRequest B.empty) $ \ exc -> - case exc of - IOException _ -> return () - AsyncException ThreadKilled -> return () - BlockedIndefinitely -> putStrLn "requestReader: blocked indefinitely" - _ -> print exc - where - acceptRequest :: ByteString -> IO () - acceptRequest soFar - -- キューに最大パイプライン深度以上のリクエストが溜まってゐる - -- 時は、それが限度以下になるまで待つ。 - = do atomically $ do queue <- readTVar tQueue - when (S.length queue >= cnfMaxPipelineDepth cnf) - retry - - -- リクエストを讀む。パースできない場合は直ちに 400 Bad - -- Request 應答を設定し、それを出力してから切斷するやう - -- に ResponseWriter に通知する。 - hWaitForInput h (-1) - chunk <- B.hGetNonBlocking h 1024 - - let input = B.append soFar chunk - case parse requestP input of - (Success req , input') -> acceptParsableRequest req input' - (IllegalInput, _ ) -> acceptNonparsableRequest BadRequest - (ReachedEOF , _ ) -> if B.length input >= 1024 * 1024 then - -- ヘッダ長過ぎ - acceptNonparsableRequest RequestEntityTooLarge - else - acceptRequest input - - acceptNonparsableRequest :: StatusCode -> IO () - acceptNonparsableRequest status - = do itr <- newInteraction cnf host Nothing - let res = Response { - resVersion = HttpVersion 1 1 - , resStatus = status - , resHeaders = [] - } - atomically $ do writeItr itr itrResponse $ Just res - writeItr itr itrWillClose True - writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr - - acceptParsableRequest :: Request -> ByteString -> IO () - acceptParsableRequest req soFar - = do itr <- newInteraction cnf host (Just req) - action - <- atomically $ - do preprocess itr - isErr <- readItrF itr itrResponse (isError . resStatus) - if isErr == Just True then - acceptSemanticallyInvalidRequest itr soFar - else - case findResource tree $ (reqURI . fromJust . itrRequest) itr of - Nothing -- Resource が無かった - -> acceptRequestForNonexistentResource itr soFar - - Just rsrcDef -- あった - -> acceptRequestForExistentResource itr soFar rsrcDef - action - - acceptSemanticallyInvalidRequest :: Interaction -> ByteString -> STM (IO ()) - acceptSemanticallyInvalidRequest itr soFar - = do writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr - return $ acceptRequest soFar - - acceptRequestForNonexistentResource :: Interaction -> ByteString -> STM (IO ()) - acceptRequestForNonexistentResource itr soFar - = do let res = Response { - resVersion = HttpVersion 1 1 - , resStatus = NotFound - , resHeaders = [] - } - writeItr itr itrResponse $ Just res - writeItr itr itrState Done - writeDefaultPage itr - postprocess itr - enqueue itr - return $ acceptRequest soFar - - acceptRequestForExistentResource :: Interaction -> ByteString -> ResourceDef -> STM (IO ()) - acceptRequestForExistentResource itr soFar rsrcDef - = do requestHasBody <- readItr itr itrRequestHasBody id - writeItr itr itrState (if requestHasBody - then ExaminingHeader - else DecidingHeader) - enqueue itr - return $ do runResource rsrcDef itr - if requestHasBody then - observeRequest itr soFar - else - acceptRequest soFar - - observeRequest :: Interaction -> ByteString -> IO () - observeRequest itr soFar - = do isChunked <- atomically $ readItr itr itrRequestIsChunked id - if isChunked then - observeChunkedRequest itr soFar - else - observeNonChunkedRequest itr soFar - - observeChunkedRequest :: Interaction -> ByteString -> IO () - observeChunkedRequest itr soFar - = fail "FIXME: not implemented" - - observeNonChunkedRequest :: Interaction -> ByteString -> IO () - observeNonChunkedRequest itr soFar - = fail "FIXME: not implemented" -{- - = do action - <- atomically $ - do wantedM <- readItr itr itrReqBodyWanted id - if wantedM == Nothing then - do wasteAll <- readItr itr itrReqBodyWasteAll id - if wasteAll then - return $ wasteAllReqBody itr soFar - else - retry - else - -- 受信要求が來た。 - if B.empty soFar then - return $ receiveNonChunkedReqBody itr - else - do remaining <- readItr itr itrReqChunkRemaining fromJust - - let wanted = fromJust wanted - (chunk, input') = B.splitAt (min wanted remaining) soFar - newRemaining = remaining - B.length chunk - isOver = newRemaining == 0 - - writeItr itr itrReqChunkRemaining newRemaining - writeItr itr itrReqChunkIsOver isOver - writeItr itr itrReqBodyWanted (if isOver then - Nothing - else - Just wanted) - writeItr itr itrReceivedBody chunk - - if isOver then - return $ acceptRequest input' - else - return $ observeNonChunkedRequest itr input' - action - - receiveNonChunkedReqBody :: Interaction -> IO () - receiveNonChunkedReqBody itr - = do wanted <- atomically $ readItr itr itrReqBodyWanted fromJust - remaining <- atomically $ readItr itr itrReqChunkRemaining fromJust - - hWaitForInput h (-1) - chunk <- B.hGetNonBlocking h (min wanted remaining) - - let newRemaining = remaining - B.length chunk - isOver = newRemaining == 0 - - atomically $ do writeItr itr itrReqChunkRemaining newRemaining - writeItr itr itrReqChunkIsOver isOver - writeItr itr itrReqBodyWanted (if isOver then - Nothing - else - Just wanted) - writeItr itr itrReceivedBody chunk - - if isOver then - return $ acceptRequest B.empty - else - return $ observeNonChunkedRequest itr B.empty - - - wasteAllReqBody :: Interaction -> ByteString -> IO () - wasteAllReqBody itr soFar - = - --} - - enqueue :: Interaction -> STM () - enqueue itr = do queue <- readTVar tQueue - writeTVar tQueue (itr <| queue) \ No newline at end of file +import Data.Sequence.Unicode +import Data.Text (Text) +import Network.HTTP.Lucu.Config +import Network.HTTP.Lucu.Chunk +import Network.HTTP.Lucu.DefaultPage +import Network.HTTP.Lucu.HandleLike +import Network.HTTP.Lucu.Interaction +import Network.HTTP.Lucu.Postprocess +import Network.HTTP.Lucu.Preprocess +import Network.HTTP.Lucu.Request +import Network.HTTP.Lucu.Response +import Network.HTTP.Lucu.Resource.Tree +import Network.Socket +import Network.URI +import Prelude.Unicode +import System.IO (hPutStrLn, stderr) + +data Context h + = Context { + cConfig ∷ !Config + , cResTree ∷ !ResTree + , cFallbacks ∷ ![FallbackHandler] + , cHandle ∷ !h + , cPort ∷ !PortNumber + , cAddr ∷ !SockAddr + , cQueue ∷ !InteractionQueue + } + +requestReader ∷ HandleLike h + ⇒ Config + → ResTree + → [FallbackHandler] + → h + → PortNumber + → SockAddr + → InteractionQueue + → IO () +requestReader cnf tree fbs h port addr tQueue + = do input ← hGetLBS h + acceptRequest (Context cnf tree fbs h port addr tQueue) input + `catches` + [ Handler $ \ (_ ∷ IOException) → return () + , Handler $ \ e → case e of + ThreadKilled → return () + _ → hPutStrLn stderr (show e) + , Handler $ \ BlockedIndefinitelyOnSTM → hPutStrLn stderr "requestReader: blocked indefinitely" + , Handler $ \ (e ∷ SomeException) → hPutStrLn stderr (show e) + ] + +acceptRequest ∷ HandleLike h ⇒ Context h → Lazy.ByteString → IO () +acceptRequest ctx@(Context {..}) input + -- キューに最大パイプライン深度以上のリクエストが溜まってゐる時は、 + -- それが限度以下になるまで待つ。 + = do atomically $ + do queue ← readTVar cQueue + when (S.length queue ≥ cnfMaxPipelineDepth cConfig) $ + retry + -- リクエストを讀む。パースできない場合は直ちに 400 Bad + -- Request 應答を設定し、それを出力してから切斷するやうに + -- ResponseWriter に通知する。 + case LP.parse requestP input of + LP.Done input' req → acceptParsableRequest ctx req input' + LP.Fail _ _ _ → acceptNonparsableRequest ctx BadRequest + +acceptNonparsableRequest ∷ HandleLike h ⇒ Context h → StatusCode → IO () +acceptNonparsableRequest ctx@(Context {..}) sc + = do itr ← newInteraction cConfig cPort cAddr Nothing (Left sc) + atomically $ + do writeTVar (itrState itr) Done + writeDefaultPage itr + postprocess itr + enqueue ctx itr + +acceptParsableRequest ∷ HandleLike h + ⇒ Context h + → Request + → Lazy.ByteString + → IO () +acceptParsableRequest ctx@(Context {..}) req input + = do cert ← hGetPeerCert cHandle + itr ← newInteraction cConfig cPort cAddr cert (Right req) + join $ atomically + $ do isErr ← (isError ∘ resStatus) <$> readTVar (itrResponse itr) + if isErr then + acceptSemanticallyInvalidRequest ctx itr input + else + return $ acceptSemanticallyValidRequest ctx itr (reqURI req) input + +acceptSemanticallyInvalidRequest ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → STM (IO ()) +acceptSemanticallyInvalidRequest ctx itr input + = do writeTVar (itrState itr) Done + writeDefaultPage itr + postprocess itr + enqueue ctx itr + return $ acceptRequest ctx input + +acceptSemanticallyValidRequest ∷ HandleLike h + ⇒ Context h + → Interaction + → URI + → Lazy.ByteString + → IO () +acceptSemanticallyValidRequest ctx@(Context {..}) itr uri input + = do rsrcM ← findResource cResTree cFallbacks uri + case rsrcM of + Nothing + → acceptRequestForNonexistentResource ctx itr input + Just (rsrcPath, rsrcDef) + → acceptRequestForExistentResource ctx itr input rsrcPath rsrcDef + +acceptRequestForNonexistentResource ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → IO () +acceptRequestForNonexistentResource ctx itr input + = do atomically $ + do setResponseStatus itr NotFound + writeTVar (itrState itr) Done + writeDefaultPage itr + postprocess itr + enqueue ctx itr + acceptRequest ctx input + +acceptRequestForExistentResource ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → [Text] + → ResourceDef + → IO () +acceptRequestForExistentResource ctx oldItr input rsrcPath rsrcDef + = do let itr = oldItr { itrResourcePath = Just rsrcPath } + atomically $ enqueue ctx itr + do _ ← runResource rsrcDef itr + if reqHasBody $ fromJust $ itrRequest itr then + observeRequest ctx itr input + else + acceptRequest ctx input + +observeRequest ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → IO () +observeRequest ctx itr input + = case fromJust $ itrReqBodyLength itr of + Chunked + → observeChunkedRequest ctx itr input 0 + Fixed len + → observeNonChunkedRequest ctx itr input len + +observeChunkedRequest ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → Int + → IO () +observeChunkedRequest ctx itr input remaining + = join $ + atomically $ + do isOver ← readTVar $ itrReqChunkIsOver itr + if isOver then + return $ acceptRequest ctx input + else + do wanted ← readTVar $ itrReqBodyWanted itr + case wanted of + 0 → do wasteAll ← readTVar $ itrReqBodyWasteAll itr + if wasteAll then + return $ wasteCurrentChunk ctx itr input remaining + else + retry + _ → return $ readCurrentChunk ctx itr input wanted remaining + +wasteCurrentChunk ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → Int + → IO () +wasteCurrentChunk ctx itr input len + | len > 0 + = let input' = Lazy.drop (fromIntegral len) input + in + case LP.parse chunkFooterP input' of + LP.Done input'' _ + → observeChunkedRequest ctx itr input'' 0 + LP.Fail _ _ _ + → chunkWasMalformed itr + | otherwise + = seekNextChunk ctx itr input + +readCurrentChunk ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → Int + → Int + → IO () +readCurrentChunk ctx itr input wanted remaining + | remaining > 0 + = do let bytesToRead = fromIntegral $ min wanted remaining + (chunk, input') = Lazy.splitAt bytesToRead input + actualReadBytes = fromIntegral $ Lazy.length chunk + newWanted = wanted - actualReadBytes + newRemaining = remaining - actualReadBytes + chunk' = S.fromList $ Lazy.toChunks chunk + updateStates = atomically $ + do writeTVar (itrReqBodyWanted itr) newWanted + oldBody ← readTVar $ itrReceivedBody itr + oldBodyLen ← readTVar $ itrReceivedBodyLen itr + writeTVar (itrReceivedBody itr) $ oldBody ⋈ chunk' + writeTVar (itrReceivedBodyLen itr) $ oldBodyLen + actualReadBytes + if newRemaining ≡ 0 then + case LP.parse chunkFooterP input' of + LP.Done input'' _ + → do updateStates + observeChunkedRequest ctx itr input'' 0 + LP.Fail _ _ _ + → chunkWasMalformed itr + else + do updateStates + observeChunkedRequest ctx itr input' newRemaining + | otherwise + = seekNextChunk ctx itr input + +seekNextChunk ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → IO () +seekNextChunk ctx itr input + = case LP.parse chunkHeaderP input of + LP.Done input' len + | len ≡ 0 -- Final chunk + → case LP.parse chunkTrailerP input' of + LP.Done input'' _ + → do atomically $ + writeTVar (itrReqChunkIsOver itr) True + acceptRequest ctx input'' + LP.Fail _ _ _ + → chunkWasMalformed itr + | otherwise -- Non-final chunk + → observeChunkedRequest ctx itr input' len + LP.Fail _ _ _ + → chunkWasMalformed itr + +chunkWasMalformed ∷ Interaction → IO () +chunkWasMalformed itr + = atomically $ + do setResponseStatus itr BadRequest + writeTVar (itrWillClose itr) True + writeTVar (itrState itr) Done + writeDefaultPage itr + postprocess itr + +observeNonChunkedRequest ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → Int + → IO () +observeNonChunkedRequest ctx itr input remaining + = join $ + atomically $ + do wanted ← readTVar $ itrReqBodyWanted itr + case wanted of + 0 → do wasteAll ← readTVar $ itrReqBodyWasteAll itr + if wasteAll then + return $ wasteNonChunkedRequestBody ctx itr input remaining + else + retry + _ → return $ readNonChunkedRequestBody ctx itr input wanted remaining + +wasteNonChunkedRequestBody ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → Int + → IO () +wasteNonChunkedRequestBody ctx itr input remaining + = do let input' = Lazy.drop (fromIntegral remaining) input + atomically $ writeTVar (itrReqChunkIsOver itr) True + acceptRequest ctx input' + +readNonChunkedRequestBody ∷ HandleLike h + ⇒ Context h + → Interaction + → Lazy.ByteString + → Int + → Int + → IO () +readNonChunkedRequestBody ctx itr input wanted remaining + = do let bytesToRead = min wanted remaining + (chunk, input') = Lazy.splitAt (fromIntegral bytesToRead) input + actualReadBytes = fromIntegral $ Lazy.length chunk + newWanted = wanted - actualReadBytes + newRemaining = remaining - actualReadBytes + isOver = actualReadBytes < bytesToRead ∨ newRemaining ≡ 0 + chunk' = S.fromList $ Lazy.toChunks chunk + atomically $ + do writeTVar (itrReqChunkIsOver itr) isOver + writeTVar (itrReqBodyWanted itr) newWanted + writeTVar (itrReceivedBody itr) chunk' + writeTVar (itrReceivedBodyLen itr) actualReadBytes + if isOver then + acceptRequest ctx input' + else + observeNonChunkedRequest ctx itr input' newRemaining + +enqueue ∷ HandleLike h ⇒ Context h → Interaction → STM () +enqueue (Context {..}) itr + = do queue ← readTVar cQueue + writeTVar cQueue (itr ⊲ queue)