module Rakka.Storage ( Storage , mkStorage -- private , getPage , putPage , getPageA , putPageA , searchPages ) where import Control.Arrow.ArrowIO import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Control.Monad.Trans import Data.Maybe import Data.Set (Set) import qualified Data.Set as S import Network.URI import Rakka.Page import Rakka.Storage.DefaultPage import Subversion.Types import System.Directory import System.FilePath import System.IO import System.Log.Logger import System.Posix.Files import System.Posix.Types import System.Posix.IO import Subversion.FileSystem import Subversion.Repository import Text.HyperEstraier hiding (WriteLock) logger = "Rakka.Storage" data Storage = Storage { stoRepository :: !Repository , stoIndexChan :: !(TChan IndexReq) } data IndexReq = SyncIndex | SearchIndex !Condition !(TMVar [(PageName, RevNum)]) mkStorage :: FilePath -> Repository -> (Page -> IO Document) -> IO Storage mkStorage lsdir repos mkDraft = do chan <- startIndexManager lsdir repos mkDraft let sto = Storage { stoRepository = repos , stoIndexChan = chan } syncIndex sto return sto getPage :: MonadIO m => Storage -> PageName -> Maybe RevNum -> m (Maybe Page) getPage = ((liftIO .) .) . getPage' . stoRepository getPage' :: Repository -> PageName -> Maybe RevNum -> IO (Maybe Page) getPage' repos name rev = loadDefaultPage name -- FIXME putPage :: MonadIO m => Storage -> Page -> RevNum -> m () putPage sto page oldRev = error "FIXME: not implemented" getPageA :: ArrowIO a => Storage -> a (PageName, Maybe RevNum) (Maybe Page) getPageA = arrIO2 . getPage putPageA :: ArrowIO a => Storage -> a (Page, RevNum) () putPageA = arrIO2 . putPage searchPages :: MonadIO m => Storage -> Condition -> m [(PageName, RevNum)] searchPages sto cond = liftIO $ do var <- newEmptyTMVarIO atomically $ writeTChan (stoIndexChan sto) (SearchIndex cond var) atomically $ takeTMVar var syncIndex :: Storage -> IO () syncIndex sto = atomically $ writeTChan (stoIndexChan sto) SyncIndex findAllPages :: Repository -> RevNum -> IO (Set PageName) findAllPages _ 0 = findAllDefaultPages findAllPages repos rev = findAllDefaultPages -- FIXME findChangedPages :: Repository -> RevNum -> RevNum -> IO (Set PageName) findChangedPages repos 0 newRev = findAllPages repos newRev findChangedPages repos oldRev newRev = fail "FIXME: not impl" getCurrentRevNum :: Repository -> IO RevNum getCurrentRevNum repos = getRepositoryFS repos >>= getYoungestRev startIndexManager :: FilePath -> Repository -> (Page -> IO Document) -> IO (TChan IndexReq) startIndexManager lsdir repos mkDraft = do chan <- newTChanIO index <- openIndex indexDir revFile forkIO (loop chan index) return chan where indexDir = lsdir "index" revFile = lsdir "indexRev" loop :: TChan IndexReq -> Database -> IO () loop chan index = do req <- atomically $ readTChan chan case req of SyncIndex -> syncIndex' index revFile repos mkDraft SearchIndex cond var -> do result <- searchIndex index cond atomically $ putTMVar var result loop chan index -- casket を R/W モードで開く。成功したらそのまま返し、失敗したら -- indexDir と revFile を削除してから casket を R/W モードで開く。 openIndex :: FilePath -> FilePath -> IO Database openIndex indexDir revFile = do ret <- openDatabase indexDir (Writer []) case ret of Right index -> do debugM logger ("Opened an H.E. index on " ++ indexDir) return index Left err -> do warningM logger ("Failed to open an H.E. index on " ++ indexDir ++ ": " ++ show err) indexExists <- doesDirectoryExist indexDir when indexExists $ removeDirectoryRecursive indexDir revFileExists <- doesFileExist revFile when revFileExists $ removeFile revFile Right index <- openDatabase indexDir (Writer [Create []]) addAttrIndex index "@uri" SeqIndex addAttrIndex index "rakka:revision" SeqIndex noticeM logger ("Created an H.E. index on " ++ indexDir) return index syncIndex' :: Database -> FilePath -> Repository -> (Page -> IO Document) -> IO () syncIndex' index revFile repos mkDraft = updateIndexRev revFile $ \ oldRev -> do debugM logger ("The index revision is currently " ++ show oldRev) newRev <- getCurrentRevNum repos debugM logger ("The repository revision is currently " ++ show newRev) when (newRev /= oldRev) (syncIndex'' oldRev newRev) return newRev where syncIndex'' :: RevNum -> RevNum -> IO () syncIndex'' oldRev newRev = do pages <- findChangedPages repos oldRev newRev mapM_ (updateIndex index repos mkDraft newRev) (S.toList pages) searchIndex :: Database -> Condition -> IO [(PageName, RevNum)] searchIndex index cond = searchDatabase index cond >>= mapM fromId where fromId :: DocumentID -> IO (PageName, RevNum) fromId docId = do uri <- getDocURI index docId rev <- getDocAttr index docId "rakka:revision" >>= return . read . fromJust return (decodePageName $ uriPath uri, rev) updateIndex :: Database -> Repository -> (Page -> IO Document) -> RevNum -> PageName -> IO () updateIndex index repos mkDraft rev name = do pageM <- getPage' repos name (Just rev) case pageM of -- ページが削除された Nothing -> do docIdM <- getDocIdByURI index (mkRakkaURI name) case docIdM of Nothing -> return () Just docId -> do removeDocument index docId [CleaningRemove] infoM logger ("Removed page " ++ name ++ " from the index") Just page -> do draft <- mkDraft page putDocument index draft [CleaningPut] infoM logger ("Indexed page " ++ name ++ " of revision " ++ show (pageRevision page)) updateIndexRev :: FilePath -> (RevNum -> IO RevNum) -> IO () updateIndexRev revFile f = bracket acquireLock releaseLock update where acquireLock :: IO Fd acquireLock = do fd <- openFd revFile ReadWrite (Just stdFileMode) defaultFileFlags waitToSetLock fd (WriteLock, AbsoluteSeek, 0, 0) return fd releaseLock :: Fd -> IO () releaseLock fd = setLock fd (Unlock, AbsoluteSeek, 0, 0) update :: Fd -> IO () update fd = do fdSeek fd AbsoluteSeek 0 size <- return . fromIntegral . fileSize =<< getFdStatus fd (revStr, gotSize) <- fdRead fd size when (size /= gotSize) $ fail ("read " ++ show gotSize ++ " bytes but expected " ++ show size ++ " bytes") let rev = case revStr of "" -> 0 _ -> read revStr rev' <- f rev let revStr' = show rev' ++ "\n" size' = fromIntegral $ length revStr' fdSeek fd AbsoluteSeek 0 setFdSize fd 0 wroteSize <- fdWrite fd revStr' when (size' /= wroteSize) $ fail ("wrote " ++ show wroteSize ++ " bytes but expected " ++ show size' ++ " bytes")