1 module Rakka.Storage.Impl
7 import Control.Concurrent
8 import Control.Concurrent.STM
9 import Control.Exception
13 import qualified Data.Set as S
16 import Rakka.Storage.DefaultPage
17 import Rakka.Storage.Types
18 import Subversion.Types
19 import System.Directory
20 import System.FilePath
22 import System.Log.Logger
23 import System.Posix.Files
24 import System.Posix.Types
25 import System.Posix.IO
26 import Subversion.FileSystem
27 import Subversion.Repository
28 import Text.HyperEstraier hiding (WriteLock)
32 logger = "Rakka.Storage"
35 getPage' :: Repository -> PageName -> Maybe RevNum -> IO (Maybe Page)
36 getPage' _repos name _rev
37 = loadDefaultPage name -- FIXME
40 findAllPages :: Repository -> RevNum -> IO (Set PageName)
41 findAllPages _ 0 = findAllDefaultPages
42 findAllPages _repos _rev
43 = findAllDefaultPages -- FIXME
46 findChangedPages :: Repository -> RevNum -> RevNum -> IO (Set PageName)
47 findChangedPages repos 0 newRev = findAllPages repos newRev
48 findChangedPages _repos _oldRev _newRev
49 = fail "FIXME: not impl"
52 getCurrentRevNum :: Repository -> IO RevNum
53 getCurrentRevNum repos
54 = getRepositoryFS repos >>= getYoungestRev
57 startIndexManager :: FilePath -> Repository -> (Page -> IO Document) -> IO (TChan IndexReq)
58 startIndexManager lsdir repos mkDraft
59 = do chan <- newTChanIO
60 index <- openIndex indexDir revFile
61 forkIO (loop chan index)
64 indexDir = lsdir </> "index"
65 revFile = lsdir </> "indexRev"
67 loop :: TChan IndexReq -> Database -> IO ()
69 = do req <- atomically $ readTChan chan
72 -> do noticeM logger "Rebuilding the H.E. index..."
74 removeDirectoryRecursive indexDir
75 index' <- openIndex indexDir revFile
76 syncIndex' index' revFile repos mkDraft
80 -> do syncIndex' index revFile repos mkDraft
84 -> do result <- searchIndex index cond
85 atomically $ putTMVar var result
89 -- casket を R/W モードで開く。成功したらそのまま返し、失敗したら
90 -- indexDir と revFile を削除してから casket を R/W モードで開く。
91 openIndex :: FilePath -> FilePath -> IO Database
92 openIndex indexDir revFile
93 = do ret <- openDatabase indexDir (Writer [])
96 -> do debugM logger ("Opened an H.E. index on " ++ indexDir)
100 -> do noticeM logger ("Failed to open an H.E. index on "
101 ++ indexDir ++ ": " ++ show err)
103 indexExists <- doesDirectoryExist indexDir
105 $ removeDirectoryRecursive indexDir
107 revFileExists <- doesFileExist revFile
111 Right index <- openDatabase indexDir (Writer [Create []])
112 addAttrIndex index "@uri" SeqIndex
113 addAttrIndex index "rakka:revision" SeqIndex
114 noticeM logger ("Created an H.E. index on " ++ indexDir)
119 syncIndex' :: Database -> FilePath -> Repository -> (Page -> IO Document) -> IO ()
120 syncIndex' index revFile repos mkDraft
121 = updateIndexRev revFile $ \ oldRev ->
122 do debugM logger ("The index revision is currently " ++ show oldRev)
124 newRev <- getCurrentRevNum repos
125 debugM logger ("The repository revision is currently " ++ show newRev)
127 when (oldRev == 0 || newRev /= oldRev)
128 $ syncIndex'' oldRev newRev
131 syncIndex'' :: RevNum -> RevNum -> IO ()
132 syncIndex'' oldRev newRev
133 = do pages <- findChangedPages repos oldRev newRev
134 mapM_ (updateIndex index repos mkDraft newRev) (S.toList pages)
137 searchIndex :: Database -> Condition -> IO [(PageName, RevNum)]
138 searchIndex index cond
139 = searchDatabase index cond >>= mapM fromId
141 fromId :: DocumentID -> IO (PageName, RevNum)
143 = do uri <- getDocURI index docId
144 rev <- getDocAttr index docId "rakka:revision"
145 >>= return . read . fromJust
146 return (decodePageName $ uriPath uri, rev)
149 updateIndex :: Database
151 -> (Page -> IO Document)
155 updateIndex index repos mkDraft rev name
156 = do pageM <- getPage' repos name (Just rev)
160 -> do docIdM <- getDocIdByURI index (mkRakkaURI name)
163 Just docId -> do removeDocument index docId [CleaningRemove]
164 infoM logger ("Removed page " ++ name ++ " from the index")
166 -> do draft <- mkDraft page
167 putDocument index draft [CleaningPut]
168 infoM logger ("Indexed page " ++ name ++ " of revision " ++ show (pageRevision page))
171 updateIndexRev :: FilePath -> (RevNum -> IO RevNum) -> IO ()
172 updateIndexRev revFile f = bracket acquireLock releaseLock update
176 = do fd <- openFd revFile ReadWrite (Just stdFileMode) defaultFileFlags
177 waitToSetLock fd (WriteLock, AbsoluteSeek, 0, 0)
180 releaseLock :: Fd -> IO ()
182 = setLock fd (Unlock, AbsoluteSeek, 0, 0)
184 update :: Fd -> IO ()
186 = do fdSeek fd AbsoluteSeek 0
187 size <- return . fromIntegral . fileSize =<< getFdStatus fd
188 (revStr, gotSize) <- fdRead fd size
189 when (size /= gotSize) $ fail ("read " ++ show gotSize ++
190 " bytes but expected " ++ show size ++ " bytes")
192 let rev = case revStr of
198 let revStr' = show rev' ++ "\n"
199 size' = fromIntegral $ length revStr'
201 fdSeek fd AbsoluteSeek 0
203 wroteSize <- fdWrite fd revStr'
204 when (size' /= wroteSize) $ fail ("wrote " ++ show wroteSize ++
205 " bytes but expected " ++ show size' ++ " bytes")