From: pho Date: Fri, 26 Oct 2007 22:42:26 +0000 (+0900) Subject: record before experiment X-Git-Url: https://git.cielonegro.org/gitweb.cgi?p=Rakka.git;a=commitdiff_plain;h=57831112f1aec09a5891260b5ff7e0f9ea5577e8 record before experiment darcs-hash:20071026224226-62b54-2f3b21fb880d0268037a1d300a94d6bef897681a.gz --- diff --git a/Rakka/Storage.hs b/Rakka/Storage.hs index 6b0e098..078feda 100644 --- a/Rakka/Storage.hs +++ b/Rakka/Storage.hs @@ -8,16 +8,21 @@ module Rakka.Storage , getPageA , putPageA + + , searchPages ) where import Control.Arrow.ArrowIO +import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Control.Monad.Trans +import Data.Maybe import Data.Set (Set) import qualified Data.Set as S +import Network.URI import Rakka.Page import Rakka.Storage.DefaultPage import Subversion.Types @@ -37,30 +42,23 @@ logger = "Rakka.Storage" data Storage = Storage { - stoIndexRevLocked :: !(TVar Bool) - , stoIndexRevFile :: !FilePath - , stoIndexDB :: !Database - , stoRepository :: !Repository - , stoMakeDraft :: !(Page -> IO Document) + stoRepository :: !Repository + , stoIndexChan :: !(TChan IndexReq) } +data IndexReq + = SyncIndex + | SearchIndex !Condition !(TMVar [(PageName, RevNum)]) + + mkStorage :: FilePath -> Repository -> (Page -> IO Document) -> IO Storage mkStorage lsdir repos mkDraft - = do let indexDir = lsdir "index" - revFile = lsdir "indexRev" - - revLocked <- newTVarIO False - indexDB <- openIndex indexDir revFile - + = do chan <- startIndexManager lsdir repos mkDraft let sto = Storage { - stoIndexRevLocked = revLocked - , stoIndexRevFile = revFile - , stoIndexDB = indexDB - , stoRepository = repos - , stoMakeDraft = mkDraft + stoRepository = repos + , stoIndexChan = chan } - syncIndex sto return sto @@ -87,21 +85,56 @@ putPageA :: ArrowIO a => Storage -> a (Page, RevNum) () putPageA = arrIO2 . putPage -findAllPages :: Storage -> RevNum -> IO (Set PageName) -findAllPages _ 0 = findAllDefaultPages -findAllPages sto rev +searchPages :: MonadIO m => Storage -> Condition -> m [(PageName, RevNum)] +searchPages sto cond + = liftIO $ + do var <- newEmptyTMVarIO + atomically $ writeTChan (stoIndexChan sto) (SearchIndex cond var) + atomically $ takeTMVar var + + +syncIndex :: Storage -> IO () +syncIndex sto + = atomically $ writeTChan (stoIndexChan sto) SyncIndex + + +findAllPages :: Repository -> RevNum -> IO (Set PageName) +findAllPages _ 0 = findAllDefaultPages +findAllPages repos rev = findAllDefaultPages -- FIXME -findChangedPages :: Storage -> RevNum -> RevNum -> IO (Set PageName) -findChangedPages sto 0 newRev = findAllPages sto newRev -findChangedPages sto oldRev newRev +findChangedPages :: Repository -> RevNum -> RevNum -> IO (Set PageName) +findChangedPages repos 0 newRev = findAllPages repos newRev +findChangedPages repos oldRev newRev = fail "FIXME: not impl" -getCurrentRevNum :: Storage -> IO RevNum -getCurrentRevNum sto - = getRepositoryFS (stoRepository sto) >>= getYoungestRev +getCurrentRevNum :: Repository -> IO RevNum +getCurrentRevNum repos + = getRepositoryFS repos >>= getYoungestRev + + +startIndexManager :: FilePath -> Repository -> (Page -> IO Document) -> IO (TChan IndexReq) +startIndexManager lsdir repos mkDraft + = do chan <- newTChanIO + index <- openIndex indexDir revFile + forkIO (loop chan index) + return chan + where + indexDir = lsdir "index" + revFile = lsdir "indexRev" + + loop :: TChan IndexReq -> Database -> IO () + loop chan index + = do req <- atomically $ readTChan chan + case req of + SyncIndex + -> syncIndex' index revFile repos mkDraft + SearchIndex cond var + -> do result <- searchIndex index cond + atomically $ putTMVar var result + loop chan index -- casket を R/W モードで開く。成功したらそのまま返し、失敗したら @@ -110,12 +143,12 @@ openIndex :: FilePath -> FilePath -> IO Database openIndex indexDir revFile = do ret <- openDatabase indexDir (Writer []) case ret of - Right db - -> do debugM logger ("Opened an H.E. database on " ++ indexDir) - return db + Right index + -> do debugM logger ("Opened an H.E. index on " ++ indexDir) + return index Left err - -> do warningM logger ("Failed to open an H.E. database on " + -> do warningM logger ("Failed to open an H.E. index on " ++ indexDir ++ ": " ++ show err) indexExists <- doesDirectoryExist indexDir @@ -126,64 +159,77 @@ openIndex indexDir revFile when revFileExists $ removeFile revFile - Right db <- openDatabase indexDir (Writer [Create []]) - noticeM logger ("Created an H.E. database on " ++ indexDir) + Right index <- openDatabase indexDir (Writer [Create []]) + addAttrIndex index "@uri" SeqIndex + addAttrIndex index "rakka:revision" SeqIndex + noticeM logger ("Created an H.E. index on " ++ indexDir) - return db + return index -syncIndex :: Storage -> IO () -syncIndex sto - = updateIndexRev sto $ \ oldRev -> +syncIndex' :: Database -> FilePath -> Repository -> (Page -> IO Document) -> IO () +syncIndex' index revFile repos mkDraft + = updateIndexRev revFile $ \ oldRev -> do debugM logger ("The index revision is currently " ++ show oldRev) - newRev <- getCurrentRevNum sto + newRev <- getCurrentRevNum repos debugM logger ("The repository revision is currently " ++ show newRev) - when (newRev /= oldRev) (syncIndex' oldRev newRev) + when (newRev /= oldRev) (syncIndex'' oldRev newRev) return newRev where - syncIndex' :: RevNum -> RevNum -> IO () - syncIndex' oldRev newRev - = do pages <- findChangedPages sto oldRev newRev - mapM_ (updateIndex sto newRev) (S.toList pages) + syncIndex'' :: RevNum -> RevNum -> IO () + syncIndex'' oldRev newRev + = do pages <- findChangedPages repos oldRev newRev + mapM_ (updateIndex index repos mkDraft newRev) (S.toList pages) -updateIndex :: Storage -> RevNum -> PageName -> IO () -updateIndex sto rev name - = do pageM <- getPage sto name (Just rev) +searchIndex :: Database -> Condition -> IO [(PageName, RevNum)] +searchIndex index cond + = searchDatabase index cond >>= mapM fromId + where + fromId :: DocumentID -> IO (PageName, RevNum) + fromId docId + = do uri <- getDocURI index docId + rev <- getDocAttr index docId "rakka:revision" + >>= return . read . fromJust + return (decodePageName $ uriPath uri, rev) + + +updateIndex :: Database + -> Repository + -> (Page -> IO Document) + -> RevNum + -> PageName + -> IO () +updateIndex index repos mkDraft rev name + = do pageM <- getPage' repos name (Just rev) case pageM of -- ページが削除された Nothing - -> do docIdM <- getDocIdByURI (stoIndexDB sto) (mkRakkaURI name) + -> do docIdM <- getDocIdByURI index (mkRakkaURI name) case docIdM of Nothing -> return () - Just docId -> do removeDocument (stoIndexDB sto) docId [CleaningRemove] + Just docId -> do removeDocument index docId [CleaningRemove] infoM logger ("Removed page " ++ name ++ " from the index") Just page - -> do draft <- stoMakeDraft sto page - putDocument (stoIndexDB sto) draft [CleaningPut] + -> do draft <- mkDraft page + putDocument index draft [CleaningPut] infoM logger ("Indexed page " ++ name ++ " of revision " ++ show (pageRevision page)) -updateIndexRev :: Storage -> (RevNum -> IO RevNum) -> IO () -updateIndexRev sto f = bracket acquireLock releaseLock update +updateIndexRev :: FilePath -> (RevNum -> IO RevNum) -> IO () +updateIndexRev revFile f = bracket acquireLock releaseLock update where acquireLock :: IO Fd acquireLock - = do atomically $ do revLocked <- readTVar (stoIndexRevLocked sto) - if revLocked then - retry - else - writeTVar (stoIndexRevLocked sto) True - fd <- openFd (stoIndexRevFile sto) ReadWrite (Just stdFileMode) defaultFileFlags + = do fd <- openFd revFile ReadWrite (Just stdFileMode) defaultFileFlags waitToSetLock fd (WriteLock, AbsoluteSeek, 0, 0) return fd releaseLock :: Fd -> IO () releaseLock fd - = do setLock fd (Unlock, AbsoluteSeek, 0, 0) - atomically $ writeTVar (stoIndexRevLocked sto) False + = setLock fd (Unlock, AbsoluteSeek, 0, 0) update :: Fd -> IO () update fd