1 module Rakka.Storage.Impl
7 import Control.Concurrent
8 import Control.Concurrent.STM
9 import Control.Exception
13 import qualified Data.Set as S
16 import Rakka.Storage.DefaultPage
17 import Rakka.Storage.Types
18 import Subversion.Types
19 import System.Directory
20 import System.FilePath
22 import System.Log.Logger
23 import System.Posix.Files
24 import System.Posix.Types
25 import System.Posix.IO
26 import Subversion.FileSystem
27 import Subversion.Repository
28 import Text.HyperEstraier hiding (WriteLock)
30 logger = "Rakka.Storage"
33 getPage' :: Repository -> PageName -> Maybe RevNum -> IO (Maybe Page)
34 getPage' repos name rev
35 = loadDefaultPage name -- FIXME
38 findAllPages :: Repository -> RevNum -> IO (Set PageName)
39 findAllPages _ 0 = findAllDefaultPages
40 findAllPages repos rev
41 = findAllDefaultPages -- FIXME
44 findChangedPages :: Repository -> RevNum -> RevNum -> IO (Set PageName)
45 findChangedPages repos 0 newRev = findAllPages repos newRev
46 findChangedPages repos oldRev newRev
47 = fail "FIXME: not impl"
50 getCurrentRevNum :: Repository -> IO RevNum
51 getCurrentRevNum repos
52 = getRepositoryFS repos >>= getYoungestRev
55 startIndexManager :: FilePath -> Repository -> (Page -> IO Document) -> IO (TChan IndexReq)
56 startIndexManager lsdir repos mkDraft
57 = do chan <- newTChanIO
58 index <- openIndex indexDir revFile
59 forkIO (loop chan index)
62 indexDir = lsdir </> "index"
63 revFile = lsdir </> "indexRev"
65 loop :: TChan IndexReq -> Database -> IO ()
67 = do req <- atomically $ readTChan chan
70 -> do noticeM logger "Rebuilding the H.E. index..."
72 removeDirectoryRecursive indexDir
73 index' <- openIndex indexDir revFile
74 syncIndex' index' revFile repos mkDraft
78 -> do syncIndex' index revFile repos mkDraft
82 -> do result <- searchIndex index cond
83 atomically $ putTMVar var result
87 -- casket を R/W モードで開く。成功したらそのまま返し、失敗したら
88 -- indexDir と revFile を削除してから casket を R/W モードで開く。
89 openIndex :: FilePath -> FilePath -> IO Database
90 openIndex indexDir revFile
91 = do ret <- openDatabase indexDir (Writer [])
94 -> do debugM logger ("Opened an H.E. index on " ++ indexDir)
98 -> do noticeM logger ("Failed to open an H.E. index on "
99 ++ indexDir ++ ": " ++ show err)
101 indexExists <- doesDirectoryExist indexDir
103 $ removeDirectoryRecursive indexDir
105 revFileExists <- doesFileExist revFile
109 Right index <- openDatabase indexDir (Writer [Create []])
110 addAttrIndex index "@uri" SeqIndex
111 addAttrIndex index "rakka:revision" SeqIndex
112 noticeM logger ("Created an H.E. index on " ++ indexDir)
117 syncIndex' :: Database -> FilePath -> Repository -> (Page -> IO Document) -> IO ()
118 syncIndex' index revFile repos mkDraft
119 = updateIndexRev revFile $ \ oldRev ->
120 do debugM logger ("The index revision is currently " ++ show oldRev)
122 newRev <- getCurrentRevNum repos
123 debugM logger ("The repository revision is currently " ++ show newRev)
125 when (oldRev == 0 || newRev /= oldRev)
126 $ syncIndex'' oldRev newRev
129 syncIndex'' :: RevNum -> RevNum -> IO ()
130 syncIndex'' oldRev newRev
131 = do pages <- findChangedPages repos oldRev newRev
132 mapM_ (updateIndex index repos mkDraft newRev) (S.toList pages)
135 searchIndex :: Database -> Condition -> IO [(PageName, RevNum)]
136 searchIndex index cond
137 = searchDatabase index cond >>= mapM fromId
139 fromId :: DocumentID -> IO (PageName, RevNum)
141 = do uri <- getDocURI index docId
142 rev <- getDocAttr index docId "rakka:revision"
143 >>= return . read . fromJust
144 return (decodePageName $ uriPath uri, rev)
147 updateIndex :: Database
149 -> (Page -> IO Document)
153 updateIndex index repos mkDraft rev name
154 = do pageM <- getPage' repos name (Just rev)
158 -> do docIdM <- getDocIdByURI index (mkRakkaURI name)
161 Just docId -> do removeDocument index docId [CleaningRemove]
162 infoM logger ("Removed page " ++ name ++ " from the index")
164 -> do draft <- mkDraft page
165 putDocument index draft [CleaningPut]
166 infoM logger ("Indexed page " ++ name ++ " of revision " ++ show (pageRevision page))
169 updateIndexRev :: FilePath -> (RevNum -> IO RevNum) -> IO ()
170 updateIndexRev revFile f = bracket acquireLock releaseLock update
174 = do fd <- openFd revFile ReadWrite (Just stdFileMode) defaultFileFlags
175 waitToSetLock fd (WriteLock, AbsoluteSeek, 0, 0)
178 releaseLock :: Fd -> IO ()
180 = setLock fd (Unlock, AbsoluteSeek, 0, 0)
182 update :: Fd -> IO ()
184 = do fdSeek fd AbsoluteSeek 0
185 size <- return . fromIntegral . fileSize =<< getFdStatus fd
186 (revStr, gotSize) <- fdRead fd size
187 when (size /= gotSize) $ fail ("read " ++ show gotSize ++
188 " bytes but expected " ++ show size ++ " bytes")
190 let rev = case revStr of
196 let revStr' = show rev' ++ "\n"
197 size' = fromIntegral $ length revStr'
199 fdSeek fd AbsoluteSeek 0
201 wroteSize <- fdWrite fd revStr'
202 when (size' /= wroteSize) $ fail ("wrote " ++ show wroteSize ++
203 " bytes but expected " ++ show size' ++ " bytes")