+{-# LANGUAGE
+ CPP
+ , DoAndIfThenElse
+ , OverloadedStrings
+ , RecordWildCards
+ , ScopedTypeVariables
+ , UnicodeSyntax
+ #-}
module Network.HTTP.Lucu.RequestReader
- ( requestReader -- Config -> ResTree -> Handle -> HostName -> InteractionQueue -> IO ()
+ ( requestReader
)
where
-
-import Control.Concurrent.STM
-import Control.Exception
-import Control.Monad
-import qualified Data.ByteString.Lazy.Char8 as B
-import Data.ByteString.Lazy.Char8 (ByteString)
-import Data.Map as M
-import Data.Map (Map)
-import Data.Maybe
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Exception hiding (block)
+import Control.Monad
+import Control.Monad.Trans.Maybe
+import qualified Data.Attoparsec.Lazy as LP
+import qualified Data.ByteString as Strict
+import qualified Data.ByteString.Lazy as Lazy
+import Data.List
+import Data.Maybe
+import Data.Monoid.Unicode
import qualified Data.Sequence as S
-import Data.Sequence (Seq, (<|), ViewR(..))
-import Network
-import Network.HTTP.Lucu.Config
-import Network.HTTP.Lucu.DefaultPage
-import Network.HTTP.Lucu.HttpVersion
-import Network.HTTP.Lucu.Interaction
-import Network.HTTP.Lucu.Parser
-import Network.HTTP.Lucu.Postprocess
-import Network.HTTP.Lucu.Preprocess
-import Network.HTTP.Lucu.Request
-import Network.HTTP.Lucu.Response
-import Network.HTTP.Lucu.Resource
-import Prelude hiding (catch)
-import System.IO
-
-import GHC.Conc (unsafeIOToSTM)
-
-requestReader :: Config -> ResTree -> Handle -> HostName -> InteractionQueue -> IO ()
-requestReader cnf tree h host tQueue
- = do catch (acceptRequest B.empty) $ \ exc ->
- case exc of
- IOException _ -> return ()
- AsyncException ThreadKilled -> return ()
- BlockedIndefinitely -> putStrLn "requestReader: blocked indefinitely"
- _ -> print exc
+import qualified Data.Text as T
+import Network.HTTP.Lucu.Abortion
+import Network.HTTP.Lucu.Config
+import Network.HTTP.Lucu.Chunk
+import Network.HTTP.Lucu.Dispatcher.Internal
+import Network.HTTP.Lucu.HandleLike
+import Network.HTTP.Lucu.Interaction
+import Network.HTTP.Lucu.Preprocess
+import Network.HTTP.Lucu.Request
+import Network.HTTP.Lucu.Response
+import Network.HTTP.Lucu.Resource.Internal
+import Network.HTTP.Lucu.Utils
+import Network.Socket
+import Prelude.Unicode
+import System.IO (hPutStrLn, stderr)
+
+data Context h
+ = Context {
+ cConfig ∷ !Config
+ , cHostMap ∷ !HostMap
+ , cHandle ∷ !h
+ , cPort ∷ !PortNumber
+ , cAddr ∷ !SockAddr
+ , cQueue ∷ !InteractionQueue
+ }
+
+data ChunkReceivingState
+ = Initial
+ | InChunk !Int -- ^Number of remaining octets in the current
+ -- chunk. It's always positive.
+
+requestReader ∷ (HostMapper hm, HandleLike h)
+ ⇒ Config
+ → hm
+ → h
+ → PortNumber
+ → SockAddr
+ → InteractionQueue
+ → IO ()
+requestReader cnf hm h port addr tQueue
+ = do input ← hGetLBS h
+ acceptRequest (Context cnf (hostMap hm) h port addr tQueue) input
+ `catches`
+ [ Handler handleAsyncE
+ , Handler handleOthers
+ ]
+ `finally`
+ enqueue' tQueue EndOfInteraction
+ where
+ handleAsyncE ∷ AsyncException → IO ()
+ handleAsyncE ThreadKilled = return ()
+ handleAsyncE e = dump e
+
+ handleOthers ∷ SomeException → IO ()
+ handleOthers = dump
+
+ dump ∷ Exception e ⇒ e → IO ()
+ dump e
+ = do hPutStrLn stderr "Lucu: requestReader caught an exception:"
+ hPutStrLn stderr $ show e
+
+acceptRequest ∷ HandleLike h ⇒ Context h → Lazy.ByteString → IO ()
+acceptRequest ctx@(Context {..}) input
+ = do atomically $
+ do queue ← readTVar cQueue
+ when (S.length queue ≥ cnfMaxPipelineDepth cConfig)
+ -- Too many requests in the pipeline...
+ retry
+ if Lazy.null input then
+ return ()
+ else
+ case LP.parse request input of
+ LP.Done input' req → acceptParsableRequest ctx req input'
+ LP.Fail _ _ _ → acceptNonparsableRequest ctx
+
+acceptNonparsableRequest ∷ HandleLike h ⇒ Context h → IO ()
+acceptNonparsableRequest ctx@(Context {..})
+ = do syi ← mkSyntacticallyInvalidInteraction cConfig
+ enqueue ctx syi
+
+acceptParsableRequest ∷ HandleLike h
+ ⇒ Context h
+ → Request
+ → Lazy.ByteString
+ → IO ()
+acceptParsableRequest ctx@(Context {..}) req input
+ = do let ar = preprocess (cnfServerHost cConfig) cPort req
+ if isError $ arInitialStatus ar then
+ acceptSemanticallyInvalidRequest ctx ar input
+ else
+ do rsrc ← runMaybeT $ dispatch (reqURI $ arRequest ar) cHostMap
+ case rsrc of
+ Nothing
+ → do let ar' = ar {
+ arInitialStatus = fromStatusCode NotFound
+ }
+ acceptSemanticallyInvalidRequest ctx ar' input
+ Just (path, def)
+ → acceptRequestForResource ctx ar input path def
+
+acceptSemanticallyInvalidRequest ∷ HandleLike h
+ ⇒ Context h
+ → AugmentedRequest
+ → Lazy.ByteString
+ → IO ()
+acceptSemanticallyInvalidRequest ctx@(Context {..}) ar input
+ = do sei ← mkSemanticallyInvalidInteraction cConfig ar
+ enqueue ctx sei
+ acceptRequest ctx input
+
+acceptRequestForResource ∷ HandleLike h
+ ⇒ Context h
+ → AugmentedRequest
+ → Lazy.ByteString
+ → [Strict.ByteString]
+ → Resource
+ → IO ()
+acceptRequestForResource ctx@(Context {..}) ar@(AugmentedRequest {..}) input rsrcPath rsrcDef
+ = do
+#if defined(HAVE_SSL)
+ cert ← hGetPeerCert cHandle
+ ni ← mkNormalInteraction cConfig cAddr cert ar rsrcPath
+#else
+ ni ← mkNormalInteraction cConfig cAddr ar rsrcPath
+#endif
+ tid ← spawnRsrc rsrcDef ni
+ enqueue ctx ni
+ if reqMustHaveBody arRequest then
+ waitForReceiveBodyReq ctx ni tid input
+ else
+ acceptRequest ctx input
+
+waitForReceiveBodyReq ∷ HandleLike h
+ ⇒ Context h
+ → NormalInteraction
+ → ThreadId
+ → Lazy.ByteString
+ → IO ()
+waitForReceiveBodyReq ctx ni@(NI {..}) rsrcTid input
+ = case fromJust niReqBodyLength of
+ Chunked
+ → waitForReceiveChunkedBodyReqForTheFirstTime ctx ni rsrcTid input
+ Fixed len
+ → waitForReceiveNonChunkedBodyReqForTheFirstTime ctx ni input len
+
+-- Toooooo long name for a function...
+waitForReceiveChunkedBodyReqForTheFirstTime ∷ HandleLike h
+ ⇒ Context h
+ → NormalInteraction
+ → ThreadId
+ → Lazy.ByteString
+ → IO ()
+waitForReceiveChunkedBodyReqForTheFirstTime ctx ni@(NI {..}) rsrcTid input
+ = join $
+ atomically $
+ do req ← takeTMVar niReceiveBodyReq
+ case req of
+ ReceiveBody wanted
+ → do putTMVar niSendContinue niExpectedContinue
+ return $ readCurrentChunk ctx ni rsrcTid wanted input Initial
+ WasteAll
+ → do putTMVar niSendContinue False
+ return $ wasteAllChunks ctx rsrcTid input Initial
+
+waitForReceiveChunkedBodyReq ∷ HandleLike h
+ ⇒ Context h
+ → NormalInteraction
+ → ThreadId
+ → Lazy.ByteString
+ → ChunkReceivingState
+ → IO ()
+waitForReceiveChunkedBodyReq ctx ni@(NI {..}) rsrcTid input st
+ = do req ← atomically $ takeTMVar niReceiveBodyReq
+ case req of
+ ReceiveBody wanted
+ → readCurrentChunk ctx ni rsrcTid wanted input st
+ WasteAll
+ → wasteAllChunks ctx rsrcTid input st
+
+wasteAllChunks ∷ HandleLike h
+ ⇒ Context h
+ → ThreadId
+ → Lazy.ByteString
+ → ChunkReceivingState
+ → IO ()
+wasteAllChunks ctx rsrcTid = go
where
- acceptRequest :: ByteString -> IO ()
- acceptRequest soFar
- -- キューに最大パイプライン深度以上のリクエストが溜まってゐる
- -- 時は、それが限度以下になるまで待つ。
- = do atomically $ do queue <- readTVar tQueue
- when (S.length queue >= cnfMaxPipelineDepth cnf)
- retry
-
- -- リクエストを讀む。パースできない場合は直ちに 400 Bad
- -- Request 應答を設定し、それを出力してから切斷するやう
- -- に ResponseWriter に通知する。
- hWaitForInput h (-1)
- chunk <- B.hGetNonBlocking h 1024
-
- let input = B.append soFar chunk
- case parse requestP input of
- (Success req , input') -> acceptParsableRequest req input'
- (IllegalInput, _ ) -> acceptNonparsableRequest BadRequest
- (ReachedEOF , _ ) -> if B.length input >= 1024 * 1024 then
- -- ヘッダ長過ぎ
- acceptNonparsableRequest RequestEntityTooLarge
- else
- acceptRequest input
-
- acceptNonparsableRequest :: StatusCode -> IO ()
- acceptNonparsableRequest status
- = do itr <- newInteraction cnf host Nothing
- let res = Response {
- resVersion = HttpVersion 1 1
- , resStatus = status
- , resHeaders = []
- }
- atomically $ do writeItr itr itrResponse $ Just res
- writeItr itr itrWillClose True
- writeItr itr itrState Done
- writeDefaultPage itr
- postprocess itr
- enqueue itr
-
- acceptParsableRequest :: Request -> ByteString -> IO ()
- acceptParsableRequest req soFar
- = do itr <- newInteraction cnf host (Just req)
- action
- <- atomically $
- do preprocess itr
- isErr <- readItrF itr itrResponse (isError . resStatus)
- if isErr == Just True then
- acceptSemanticallyInvalidRequest itr soFar
- else
- case findResource tree $ (reqURI . fromJust . itrRequest) itr of
- Nothing -- Resource が無かった
- -> acceptRequestForNonexistentResource itr soFar
-
- Just rsrcDef -- あった
- -> acceptRequestForExistentResource itr soFar rsrcDef
- action
-
- acceptSemanticallyInvalidRequest :: Interaction -> ByteString -> STM (IO ())
- acceptSemanticallyInvalidRequest itr soFar
- = do writeItr itr itrState Done
- writeDefaultPage itr
- postprocess itr
- enqueue itr
- return $ acceptRequest soFar
-
- acceptRequestForNonexistentResource :: Interaction -> ByteString -> STM (IO ())
- acceptRequestForNonexistentResource itr soFar
- = do let res = Response {
- resVersion = HttpVersion 1 1
- , resStatus = NotFound
- , resHeaders = []
- }
- writeItr itr itrResponse $ Just res
- writeItr itr itrState Done
- writeDefaultPage itr
- postprocess itr
- enqueue itr
- return $ acceptRequest soFar
-
- acceptRequestForExistentResource :: Interaction -> ByteString -> ResourceDef -> STM (IO ())
- acceptRequestForExistentResource itr soFar rsrcDef
- = do requestHasBody <- readItr itr itrRequestHasBody id
- writeItr itr itrState (if requestHasBody
- then ExaminingHeader
- else DecidingHeader)
- enqueue itr
- return $ do runResource rsrcDef itr
- if requestHasBody then
- observeRequest itr soFar
- else
- acceptRequest soFar
-
- observeRequest :: Interaction -> ByteString -> IO ()
- observeRequest itr soFar
- = do isChunked <- atomically $ readItr itr itrRequestIsChunked id
- if isChunked then
- observeChunkedRequest itr soFar
- else
- observeNonChunkedRequest itr soFar
-
- observeChunkedRequest :: Interaction -> ByteString -> IO ()
- observeChunkedRequest itr soFar
- = fail "FIXME: not implemented"
-
- observeNonChunkedRequest :: Interaction -> ByteString -> IO ()
- observeNonChunkedRequest itr soFar
- = fail "FIXME: not implemented"
-{-
- = do action
- <- atomically $
- do wantedM <- readItr itr itrReqBodyWanted id
- if wantedM == Nothing then
- do wasteAll <- readItr itr itrReqBodyWasteAll id
- if wasteAll then
- return $ wasteAllReqBody itr soFar
- else
- retry
- else
- -- 受信要求が來た。
- if B.empty soFar then
- return $ receiveNonChunkedReqBody itr
- else
- do remaining <- readItr itr itrReqChunkRemaining fromJust
-
- let wanted = fromJust wanted
- (chunk, input') = B.splitAt (min wanted remaining) soFar
- newRemaining = remaining - B.length chunk
- isOver = newRemaining == 0
-
- writeItr itr itrReqChunkRemaining newRemaining
- writeItr itr itrReqChunkIsOver isOver
- writeItr itr itrReqBodyWanted (if isOver then
- Nothing
- else
- Just wanted)
- writeItr itr itrReceivedBody chunk
-
- if isOver then
- return $ acceptRequest input'
- else
- return $ observeNonChunkedRequest itr input'
- action
-
- receiveNonChunkedReqBody :: Interaction -> IO ()
- receiveNonChunkedReqBody itr
- = do wanted <- atomically $ readItr itr itrReqBodyWanted fromJust
- remaining <- atomically $ readItr itr itrReqChunkRemaining fromJust
-
- hWaitForInput h (-1)
- chunk <- B.hGetNonBlocking h (min wanted remaining)
-
- let newRemaining = remaining - B.length chunk
- isOver = newRemaining == 0
-
- atomically $ do writeItr itr itrReqChunkRemaining newRemaining
- writeItr itr itrReqChunkIsOver isOver
- writeItr itr itrReqBodyWanted (if isOver then
- Nothing
- else
- Just wanted)
- writeItr itr itrReceivedBody chunk
-
- if isOver then
- return $ acceptRequest B.empty
- else
- return $ observeNonChunkedRequest itr B.empty
-
-
- wasteAllReqBody :: Interaction -> ByteString -> IO ()
- wasteAllReqBody itr soFar
- =
-
--}
-
- enqueue :: Interaction -> STM ()
- enqueue itr = do queue <- readTVar tQueue
- writeTVar tQueue (itr <| queue)
\ No newline at end of file
+ go ∷ Lazy.ByteString → ChunkReceivingState → IO ()
+ go input Initial
+ = case LP.parse chunkHeader input of
+ LP.Done input' chunkLen
+ | chunkLen ≡ 0 → gotFinalChunk input'
+ | otherwise → gotChunk input' chunkLen
+ LP.Fail _ eCtx e
+ → chunkWasMalformed rsrcTid eCtx e
+ "wasteAllChunks: chunkHeader"
+ go input (InChunk chunkLen)
+ = gotChunk input chunkLen
+
+ gotChunk ∷ Lazy.ByteString → Int → IO ()
+ gotChunk input chunkLen
+ = let input' = Lazy.drop (fromIntegral chunkLen) input
+ in
+ case LP.parse chunkFooter input' of
+ LP.Done input'' _
+ → go input'' Initial
+ LP.Fail _ eCtx e
+ → chunkWasMalformed rsrcTid eCtx e
+ "wasteAllChunks: chunkFooter"
+
+ gotFinalChunk ∷ Lazy.ByteString → IO ()
+ gotFinalChunk input
+ = case LP.parse chunkTrailer input of
+ LP.Done input' _
+ → acceptRequest ctx input'
+ LP.Fail _ eCtx e
+ → chunkWasMalformed rsrcTid eCtx e
+ "wasteAllChunks: chunkTrailer"
+
+readCurrentChunk ∷ HandleLike h
+ ⇒ Context h
+ → NormalInteraction
+ → ThreadId
+ → Int
+ → Lazy.ByteString
+ → ChunkReceivingState
+ → IO ()
+readCurrentChunk ctx ni@(NI {..}) rsrcTid wanted = go
+ where
+ go ∷ Lazy.ByteString → ChunkReceivingState → IO ()
+ go input Initial
+ = case LP.parse chunkHeader input of
+ LP.Done input' chunkLen
+ | chunkLen ≡ 0
+ → gotFinalChunk input'
+ | otherwise
+ → gotChunk input' chunkLen
+ LP.Fail _ eCtx e
+ → chunkWasMalformed rsrcTid eCtx e
+ "readCurrentChunk: chunkHeader"
+ go input (InChunk chunkLen)
+ = gotChunk input chunkLen
+
+ gotChunk ∷ Lazy.ByteString → Int → IO ()
+ gotChunk input chunkLen
+ = do let bytesToRead = min wanted chunkLen
+ (block, input') = Lazy.splitAt (fromIntegral bytesToRead) input
+ block' = Strict.concat $ Lazy.toChunks block
+ actualReadBytes = Strict.length block'
+ chunkLen' = chunkLen - actualReadBytes
+ atomically $ putTMVar niReceivedBody block'
+ if chunkLen' ≡ 0 then
+ case LP.parse chunkFooter input' of
+ LP.Done input'' _
+ → waitForReceiveChunkedBodyReq ctx ni rsrcTid input'' Initial
+ LP.Fail _ eCtx e
+ → chunkWasMalformed rsrcTid eCtx e
+ "readCurrentChunk: chunkFooter"
+ else
+ waitForReceiveChunkedBodyReq ctx ni rsrcTid input' $ InChunk chunkLen'
+
+ gotFinalChunk ∷ Lazy.ByteString → IO ()
+ gotFinalChunk input
+ = do atomically $ putTMVar niReceivedBody (∅)
+ case LP.parse chunkTrailer input of
+ LP.Done input' _
+ → acceptRequest ctx input'
+ LP.Fail _ eCtx e
+ → chunkWasMalformed rsrcTid eCtx e
+ "readCurrentChunk: chunkTrailer"
+
+chunkWasMalformed ∷ ThreadId → [String] → String → String → IO ()
+chunkWasMalformed tid eCtx e msg
+ = let abo = mkAbortion BadRequest [("Connection", "close")]
+ $ Just
+ $ "chunkWasMalformed: "
+ ⊕ T.pack msg
+ ⊕ ": "
+ ⊕ T.pack (intercalate ", " eCtx)
+ ⊕ ": "
+ ⊕ T.pack e
+ in
+ throwTo tid abo
+
+waitForReceiveNonChunkedBodyReqForTheFirstTime ∷ HandleLike h
+ ⇒ Context h
+ → NormalInteraction
+ → Lazy.ByteString
+ → Int
+ → IO ()
+waitForReceiveNonChunkedBodyReqForTheFirstTime ctx ni@(NI {..}) input bodyLen
+ = join $
+ atomically $
+ do req ← takeTMVar niReceiveBodyReq
+ case req of
+ ReceiveBody wanted
+ → do putTMVar niSendContinue niExpectedContinue
+ return $ readNonChunkedRequestBody ctx ni input bodyLen wanted
+ WasteAll
+ → do putTMVar niSendContinue False
+ return $ wasteNonChunkedRequestBody ctx input bodyLen
+
+waitForReceiveNonChunkedBodyReq ∷ HandleLike h
+ ⇒ Context h
+ → NormalInteraction
+ → Lazy.ByteString
+ → Int
+ → IO ()
+waitForReceiveNonChunkedBodyReq ctx ni@(NI {..}) input bodyLen
+ = do req ← atomically $ takeTMVar niReceiveBodyReq
+ case req of
+ ReceiveBody wanted
+ → readNonChunkedRequestBody ctx ni input bodyLen wanted
+ WasteAll
+ → wasteNonChunkedRequestBody ctx input bodyLen
+
+wasteNonChunkedRequestBody ∷ HandleLike h
+ ⇒ Context h
+ → Lazy.ByteString
+ → Int
+ → IO ()
+wasteNonChunkedRequestBody ctx input bodyLen
+ = do let input' = Lazy.drop (fromIntegral bodyLen) input
+ acceptRequest ctx input'
+
+readNonChunkedRequestBody ∷ HandleLike h
+ ⇒ Context h
+ → NormalInteraction
+ → Lazy.ByteString
+ → Int
+ → Int
+ → IO ()
+readNonChunkedRequestBody ctx ni@(NI {..}) input bodyLen wanted
+ | bodyLen ≡ 0 = gotEndOfRequest
+ | otherwise = gotBody
+ where
+ gotBody ∷ IO ()
+ gotBody
+ = do let bytesToRead = min wanted bodyLen
+ (block, input') = Lazy.splitAt (fromIntegral bytesToRead) input
+ block' = Strict.concat $ Lazy.toChunks block
+ actualReadBytes = Strict.length block'
+ bodyLen' = bodyLen - actualReadBytes
+ atomically $ putTMVar niReceivedBody block'
+ waitForReceiveNonChunkedBodyReq ctx ni input' bodyLen'
+
+ gotEndOfRequest ∷ IO ()
+ gotEndOfRequest
+ = do atomically $ putTMVar niReceivedBody (∅)
+ acceptRequest ctx input
+
+enqueue ∷ (HandleLike h, Interaction i) ⇒ Context h → i → IO ()
+enqueue (Context {..}) = enqueue' cQueue
+
+enqueue' ∷ Interaction i ⇒ InteractionQueue → i → IO ()
+enqueue' tQueue itr
+ = atomically $
+ do queue ← readTVar tQueue
+ writeTVar tQueue (toInteraction itr ⊲ queue)