X-Git-Url: http://git.cielonegro.org/gitweb.cgi?a=blobdiff_plain;f=Network%2FHTTP%2FLucu%2FRequestReader.hs;h=9307c8dcba499b1a3adeeb920ba0fe6238c59b37;hb=3fe5ca3;hp=b0c22be45d93ab9e36612f7d635b4b10df955492;hpb=e624f0db8c4610b36da9e4463a656e0cb8a104dd;p=Lucu.git diff --git a/Network/HTTP/Lucu/RequestReader.hs b/Network/HTTP/Lucu/RequestReader.hs index b0c22be..9307c8d 100644 --- a/Network/HTTP/Lucu/RequestReader.hs +++ b/Network/HTTP/Lucu/RequestReader.hs @@ -1,76 +1,71 @@ +{-# LANGUAGE + BangPatterns + , UnboxedTuples + , UnicodeSyntax + #-} module Network.HTTP.Lucu.RequestReader - ( requestReader -- Config -> ResTree -> Handle -> HostName -> InteractionQueue -> IO () + ( requestReader ) where - import Control.Concurrent.STM import Control.Exception import Control.Monad import qualified Data.ByteString.Lazy.Char8 as B import Data.ByteString.Lazy.Char8 (ByteString) -import Data.Map as M -import Data.Map (Map) import Data.Maybe import qualified Data.Sequence as S -import Data.Sequence (Seq, (<|), ViewR(..)) -import Network +import Data.Sequence ((<|)) +import GHC.Conc (unsafeIOToSTM) +import Network.Socket import Network.HTTP.Lucu.Config +import Network.HTTP.Lucu.Chunk import Network.HTTP.Lucu.DefaultPage -import Network.HTTP.Lucu.HttpVersion +import Network.HTTP.Lucu.HandleLike import Network.HTTP.Lucu.Interaction -import Network.HTTP.Lucu.Parser import Network.HTTP.Lucu.Postprocess import Network.HTTP.Lucu.Preprocess import Network.HTTP.Lucu.Request import Network.HTTP.Lucu.Response -import Network.HTTP.Lucu.Resource +import Network.HTTP.Lucu.Resource.Tree import Prelude hiding (catch) -import System.IO - -import GHC.Conc (unsafeIOToSTM) - -requestReader :: Config -> ResTree -> Handle -> HostName -> InteractionQueue -> IO () -requestReader cnf tree h host tQueue - = do catch (acceptRequest B.empty) $ \ exc -> - case exc of - IOException _ -> return () - AsyncException ThreadKilled -> return () - BlockedIndefinitely -> putStrLn "requestReader: blocked indefinitely" - _ -> print exc +import System.IO (stderr) + +requestReader :: HandleLike h => Config -> ResTree -> [FallbackHandler] -> h -> PortNumber -> SockAddr -> InteractionQueue -> IO () +requestReader !cnf !tree !fbs !h !port !addr !tQueue + = do input <- hGetLBS h + acceptRequest input + `catches` + [ Handler (( \ _ -> return () ) :: IOException -> IO ()) + , Handler ( \ ThreadKilled -> return () ) + , Handler ( \ BlockedIndefinitelyOnSTM -> hPutStrLn stderr "requestReader: blocked indefinitely" ) + , Handler (( \ e -> hPutStrLn stderr (show e) ) :: SomeException -> IO ()) + ] where acceptRequest :: ByteString -> IO () - acceptRequest soFar + acceptRequest input -- キューに最大パイプライン深度以上のリクエストが溜まってゐる -- 時は、それが限度以下になるまで待つ。 - = do atomically $ do queue <- readTVar tQueue + = {-# SCC "acceptRequest" #-} + do atomically $ do queue <- readTVar tQueue when (S.length queue >= cnfMaxPipelineDepth cnf) retry -- リクエストを讀む。パースできない場合は直ちに 400 Bad -- Request 應答を設定し、それを出力してから切斷するやう -- に ResponseWriter に通知する。 - hWaitForInput h (-1) - chunk <- B.hGetNonBlocking h 1024 - - let input = B.append soFar chunk case parse requestP input of - (Success req , input') -> acceptParsableRequest req input' - (IllegalInput, _ ) -> acceptNonparsableRequest BadRequest - (ReachedEOF , _ ) -> if B.length input >= 1024 * 1024 then - -- ヘッダ長過ぎ - acceptNonparsableRequest RequestEntityTooLarge - else - acceptRequest input + (# Success req , input' #) -> acceptParsableRequest req input' + (# IllegalInput, _ #) -> acceptNonparsableRequest BadRequest + (# ReachedEOF , _ #) -> acceptNonparsableRequest BadRequest acceptNonparsableRequest :: StatusCode -> IO () acceptNonparsableRequest status - = do itr <- newInteraction cnf host Nothing - let res = Response { - resVersion = HttpVersion 1 1 - , resStatus = status - , resHeaders = [] - } - atomically $ do writeItr itr itrResponse $ Just res + = {-# SCC "acceptNonparsableRequest" #-} + do itr <- newInteraction cnf port addr Nothing Nothing + atomically $ do updateItr itr itrResponse + $ \ res -> res { + resStatus = status + } writeItr itr itrWillClose True writeItr itr itrState Done writeDefaultPage itr @@ -78,140 +73,227 @@ requestReader cnf tree h host tQueue enqueue itr acceptParsableRequest :: Request -> ByteString -> IO () - acceptParsableRequest req soFar - = do itr <- newInteraction cnf host (Just req) + acceptParsableRequest req input + = {-# SCC "acceptParsableRequest" #-} + do cert <- hGetPeerCert h + itr <- newInteraction cnf port addr cert (Just req) action <- atomically $ do preprocess itr - isErr <- readItrF itr itrResponse (isError . resStatus) - if isErr == Just True then - acceptSemanticallyInvalidRequest itr soFar + isErr <- readItr itr itrResponse (isError . resStatus) + if isErr then + acceptSemanticallyInvalidRequest itr input else - case findResource tree $ (reqURI . fromJust . itrRequest) itr of - Nothing -- Resource が無かった - -> acceptRequestForNonexistentResource itr soFar + do rsrcM <- unsafeIOToSTM $ findResource tree fbs $ reqURI req + case rsrcM of + Nothing -- Resource が無かった + -> acceptRequestForNonexistentResource itr input - Just rsrcDef -- あった - -> acceptRequestForExistentResource itr soFar rsrcDef + Just (rsrcPath, rsrcDef) -- あった + -> acceptRequestForExistentResource itr input rsrcPath rsrcDef action acceptSemanticallyInvalidRequest :: Interaction -> ByteString -> STM (IO ()) - acceptSemanticallyInvalidRequest itr soFar - = do writeItr itr itrState Done + acceptSemanticallyInvalidRequest itr input + = {-# SCC "acceptSemanticallyInvalidRequest" #-} + do writeItr itr itrState Done writeDefaultPage itr postprocess itr enqueue itr - return $ acceptRequest soFar + return $ acceptRequest input acceptRequestForNonexistentResource :: Interaction -> ByteString -> STM (IO ()) - acceptRequestForNonexistentResource itr soFar - = do let res = Response { - resVersion = HttpVersion 1 1 - , resStatus = NotFound - , resHeaders = [] - } - writeItr itr itrResponse $ Just res + acceptRequestForNonexistentResource itr input + = {-# SCC "acceptRequestForNonexistentResource" #-} + do updateItr itr itrResponse + $ \res -> res { + resStatus = NotFound + } writeItr itr itrState Done writeDefaultPage itr postprocess itr enqueue itr - return $ acceptRequest soFar - - acceptRequestForExistentResource :: Interaction -> ByteString -> ResourceDef -> STM (IO ()) - acceptRequestForExistentResource itr soFar rsrcDef - = do requestHasBody <- readItr itr itrRequestHasBody id - writeItr itr itrState (if requestHasBody - then ExaminingHeader - else DecidingHeader) + return $ acceptRequest input + + acceptRequestForExistentResource :: Interaction -> ByteString -> [String] -> ResourceDef -> STM (IO ()) + acceptRequestForExistentResource oldItr input rsrcPath rsrcDef + = {-# SCC "acceptRequestForExistentResource" #-} + do let itr = oldItr { itrResourcePath = Just rsrcPath } + requestHasBody <- readItr itr itrRequestHasBody id enqueue itr - return $ do runResource rsrcDef itr + return $ do _ <- runResource rsrcDef itr if requestHasBody then - observeRequest itr soFar + observeRequest itr input else - acceptRequest soFar + acceptRequest input observeRequest :: Interaction -> ByteString -> IO () - observeRequest itr soFar - = do isChunked <- atomically $ readItr itr itrRequestIsChunked id + observeRequest itr input + = {-# SCC "observeRequest" #-} + do isChunked <- atomically $ readItr itr itrRequestIsChunked id if isChunked then - observeChunkedRequest itr soFar + observeChunkedRequest itr input else - observeNonChunkedRequest itr soFar + observeNonChunkedRequest itr input observeChunkedRequest :: Interaction -> ByteString -> IO () - observeChunkedRequest itr soFar - = fail "FIXME: not implemented" + observeChunkedRequest itr input + = {-# SCC "observeChunkedRequest" #-} + do action + <- atomically $ + do isOver <- readItr itr itrReqChunkIsOver id + if isOver then + return $ acceptRequest input + else + do wantedM <- readItr itr itrReqBodyWanted id + if wantedM == Nothing then + do wasteAll <- readItr itr itrReqBodyWasteAll id + if wasteAll then + -- 破棄要求が來た + do remainingM <- readItr itr itrReqChunkRemaining id + if fmap (> 0) remainingM == Just True then + -- 現在のチャンクをまだ + -- 讀み終へてゐない + do let (_, input') = B.splitAt (fromIntegral + $ fromJust remainingM) input + (# footerR, input'' #) = parse chunkFooterP input' + + if footerR == Success () then + -- チャンクフッタを正常に讀めた + do writeItr itr itrReqChunkRemaining $ Just 0 + + return $ observeChunkedRequest itr input'' + else + return $ chunkWasMalformed itr + else + -- 次のチャンクを讀み始める + seekNextChunk itr input + else + -- 要求がまだ來ない + retry + else + -- 受信要求が來た + do remainingM <- readItr itr itrReqChunkRemaining id + if fmap (> 0) remainingM == Just True then + -- 現在のチャンクをまだ讀み + -- 終へてゐない + do let wanted = fromJust wantedM + remaining = fromJust remainingM + bytesToRead = fromIntegral $ min wanted remaining + (chunk, input') = B.splitAt bytesToRead input + actualReadBytes = fromIntegral $ B.length chunk + newWanted = case wanted - actualReadBytes of + 0 -> Nothing + n -> Just n + newRemaining = Just $ remaining - actualReadBytes + updateStates + = do writeItr itr itrReqChunkRemaining newRemaining + writeItr itr itrReqBodyWanted newWanted + updateItr itr itrReceivedBody $ flip B.append chunk + updateItr itrReceivedBodyLen (+ actualReadBytes) itr + + if newRemaining == Just 0 then + -- チャンクフッタを讀む + case parse chunkFooterP input' of + (# Success _, input'' #) + -> do updateStates + return $ observeChunkedRequest itr input'' + (# _, _ #) + -> return $ chunkWasMalformed itr + else + -- まだチャンクの終はりに達してゐない + do updateStates + return $ observeChunkedRequest itr input' + else + -- 次のチャンクを讀み始める + seekNextChunk itr input + action + + seekNextChunk :: Interaction -> ByteString -> STM (IO ()) + seekNextChunk itr input + = {-# SCC "seekNextChunk" #-} + case parse chunkHeaderP input of + -- 最終チャンク (中身が空) + (# Success 0, input' #) + -> case parse chunkTrailerP input' of + (# Success _, input'' #) + -> do writeItr itr itrReqChunkLength $ Nothing + writeItr itr itrReqChunkRemaining $ Nothing + writeItr itr itrReqChunkIsOver True + + return $ acceptRequest input'' + (# _, _ #) + -> return $ chunkWasMalformed itr + -- 最終でないチャンク + (# Success len, input' #) + -> do writeItr itr itrReqChunkLength $ Just len + writeItr itr itrReqChunkRemaining $ Just len + + return $ observeChunkedRequest itr input' + -- チャンクヘッダがをかしい + (# _, _ #) + -> return $ chunkWasMalformed itr + + chunkWasMalformed :: Interaction -> IO () + chunkWasMalformed itr + = {-# SCC "chunkWasMalformed" #-} + atomically $ do updateItr itr itrResponse + $ \ res -> res { + resStatus = BadRequest + } + writeItr itr itrWillClose True + writeItr itr itrState Done + writeDefaultPage itr + postprocess itr observeNonChunkedRequest :: Interaction -> ByteString -> IO () - observeNonChunkedRequest itr soFar - = fail "FIXME: not implemented" -{- - = do action + observeNonChunkedRequest itr input + = {-# SCC "observeNonChunkedRequest" #-} + do action <- atomically $ do wantedM <- readItr itr itrReqBodyWanted id if wantedM == Nothing then do wasteAll <- readItr itr itrReqBodyWasteAll id if wasteAll then - return $ wasteAllReqBody itr soFar + -- 破棄要求が來た + do remainingM <- readItr itr itrReqChunkRemaining id + + let (_, input') = if remainingM == Nothing then + (B.takeWhile (\ _ -> True) input, B.empty) + else + B.splitAt (fromIntegral $ fromJust remainingM) input + + writeItr itr itrReqChunkRemaining $ Just 0 + writeItr itr itrReqChunkIsOver True + + return $ acceptRequest input' else + -- 要求がまだ来ない retry else - -- 受信要求が來た。 - if B.empty soFar then - return $ receiveNonChunkedReqBody itr - else - do remaining <- readItr itr itrReqChunkRemaining fromJust - - let wanted = fromJust wanted - (chunk, input') = B.splitAt (min wanted remaining) soFar - newRemaining = remaining - B.length chunk - isOver = newRemaining == 0 - - writeItr itr itrReqChunkRemaining newRemaining - writeItr itr itrReqChunkIsOver isOver - writeItr itr itrReqBodyWanted (if isOver then - Nothing - else - Just wanted) - writeItr itr itrReceivedBody chunk - - if isOver then - return $ acceptRequest input' - else - return $ observeNonChunkedRequest itr input' - action + -- 受信要求が來た + do remainingM <- readItr itr itrReqChunkRemaining id - receiveNonChunkedReqBody :: Interaction -> IO () - receiveNonChunkedReqBody itr - = do wanted <- atomically $ readItr itr itrReqBodyWanted fromJust - remaining <- atomically $ readItr itr itrReqChunkRemaining fromJust - - hWaitForInput h (-1) - chunk <- B.hGetNonBlocking h (min wanted remaining) - - let newRemaining = remaining - B.length chunk - isOver = newRemaining == 0 - - atomically $ do writeItr itr itrReqChunkRemaining newRemaining - writeItr itr itrReqChunkIsOver isOver - writeItr itr itrReqBodyWanted (if isOver then - Nothing - else - Just wanted) - writeItr itr itrReceivedBody chunk - - if isOver then - return $ acceptRequest B.empty - else - return $ observeNonChunkedRequest itr B.empty + let wanted = fromJust wantedM + bytesToRead = fromIntegral $ maybe wanted (min wanted) remainingM + (chunk, input') = B.splitAt bytesToRead input + actualReadBytes = fromIntegral $ B.length chunk + newRemaining = (- actualReadBytes) <$> remainingM + isOver = actualReadBytes < bytesToRead ∨ newRemaining ≡ Just 0 + writeItr itr itrReqChunkRemaining newRemaining + writeItr itr itrReqChunkIsOver isOver + writeItr itr itrReqBodyWanted Nothing + writeItr itr itrReceivedBody chunk + writeItr itrReceivedBody actualReadBytes - wasteAllReqBody :: Interaction -> ByteString -> IO () - wasteAllReqBody itr soFar - = - --} + if isOver then + return $ acceptRequest input' + else + return $ observeNonChunkedRequest itr input' + action enqueue :: Interaction -> STM () - enqueue itr = do queue <- readTVar tQueue + enqueue itr = {-# SCC "enqueue" #-} + do queue <- readTVar tQueue writeTVar tQueue (itr <| queue) \ No newline at end of file