16 import Control.Arrow.ArrowIO
17 import Control.Concurrent
18 import Control.Concurrent.STM
19 import Control.Exception
21 import Control.Monad.Trans
24 import qualified Data.Set as S
27 import Rakka.Storage.DefaultPage
28 import Subversion.Types
29 import System.Directory
30 import System.FilePath
32 import System.Log.Logger
33 import System.Posix.Files
34 import System.Posix.Types
35 import System.Posix.IO
36 import Subversion.FileSystem
37 import Subversion.Repository
38 import Text.HyperEstraier hiding (WriteLock)
40 logger = "Rakka.Storage"
45 stoRepository :: !Repository
46 , stoIndexChan :: !(TChan IndexReq)
52 | SearchIndex !Condition !(TMVar [(PageName, RevNum)])
55 mkStorage :: FilePath -> Repository -> (Page -> IO Document) -> IO Storage
56 mkStorage lsdir repos mkDraft
57 = do chan <- startIndexManager lsdir repos mkDraft
66 getPage :: MonadIO m => Storage -> PageName -> Maybe RevNum -> m (Maybe Page)
67 getPage = ((liftIO .) .) . getPage' . stoRepository
70 getPage' :: Repository -> PageName -> Maybe RevNum -> IO (Maybe Page)
71 getPage' repos name rev
72 = loadDefaultPage name -- FIXME
75 putPage :: MonadIO m => Storage -> Page -> RevNum -> m ()
76 putPage sto page oldRev
77 = error "FIXME: not implemented"
80 getPageA :: ArrowIO a => Storage -> a (PageName, Maybe RevNum) (Maybe Page)
81 getPageA = arrIO2 . getPage
84 putPageA :: ArrowIO a => Storage -> a (Page, RevNum) ()
85 putPageA = arrIO2 . putPage
88 searchPages :: MonadIO m => Storage -> Condition -> m [(PageName, RevNum)]
91 do var <- newEmptyTMVarIO
92 atomically $ writeTChan (stoIndexChan sto) (SearchIndex cond var)
93 atomically $ takeTMVar var
96 syncIndex :: Storage -> IO ()
98 = atomically $ writeTChan (stoIndexChan sto) SyncIndex
101 findAllPages :: Repository -> RevNum -> IO (Set PageName)
102 findAllPages _ 0 = findAllDefaultPages
103 findAllPages repos rev
104 = findAllDefaultPages -- FIXME
107 findChangedPages :: Repository -> RevNum -> RevNum -> IO (Set PageName)
108 findChangedPages repos 0 newRev = findAllPages repos newRev
109 findChangedPages repos oldRev newRev
110 = fail "FIXME: not impl"
113 getCurrentRevNum :: Repository -> IO RevNum
114 getCurrentRevNum repos
115 = getRepositoryFS repos >>= getYoungestRev
118 startIndexManager :: FilePath -> Repository -> (Page -> IO Document) -> IO (TChan IndexReq)
119 startIndexManager lsdir repos mkDraft
120 = do chan <- newTChanIO
121 index <- openIndex indexDir revFile
122 forkIO (loop chan index)
125 indexDir = lsdir </> "index"
126 revFile = lsdir </> "indexRev"
128 loop :: TChan IndexReq -> Database -> IO ()
130 = do req <- atomically $ readTChan chan
133 -> syncIndex' index revFile repos mkDraft
135 -> do result <- searchIndex index cond
136 atomically $ putTMVar var result
140 -- casket を R/W モードで開く。成功したらそのまま返し、失敗したら
141 -- indexDir と revFile を削除してから casket を R/W モードで開く。
142 openIndex :: FilePath -> FilePath -> IO Database
143 openIndex indexDir revFile
144 = do ret <- openDatabase indexDir (Writer [])
147 -> do debugM logger ("Opened an H.E. index on " ++ indexDir)
151 -> do warningM logger ("Failed to open an H.E. index on "
152 ++ indexDir ++ ": " ++ show err)
154 indexExists <- doesDirectoryExist indexDir
156 $ removeDirectoryRecursive indexDir
158 revFileExists <- doesFileExist revFile
162 Right index <- openDatabase indexDir (Writer [Create []])
163 addAttrIndex index "@uri" SeqIndex
164 addAttrIndex index "rakka:revision" SeqIndex
165 noticeM logger ("Created an H.E. index on " ++ indexDir)
170 syncIndex' :: Database -> FilePath -> Repository -> (Page -> IO Document) -> IO ()
171 syncIndex' index revFile repos mkDraft
172 = updateIndexRev revFile $ \ oldRev ->
173 do debugM logger ("The index revision is currently " ++ show oldRev)
175 newRev <- getCurrentRevNum repos
176 debugM logger ("The repository revision is currently " ++ show newRev)
178 when (newRev /= oldRev) (syncIndex'' oldRev newRev)
181 syncIndex'' :: RevNum -> RevNum -> IO ()
182 syncIndex'' oldRev newRev
183 = do pages <- findChangedPages repos oldRev newRev
184 mapM_ (updateIndex index repos mkDraft newRev) (S.toList pages)
187 searchIndex :: Database -> Condition -> IO [(PageName, RevNum)]
188 searchIndex index cond
189 = searchDatabase index cond >>= mapM fromId
191 fromId :: DocumentID -> IO (PageName, RevNum)
193 = do uri <- getDocURI index docId
194 rev <- getDocAttr index docId "rakka:revision"
195 >>= return . read . fromJust
196 return (decodePageName $ uriPath uri, rev)
199 updateIndex :: Database
201 -> (Page -> IO Document)
205 updateIndex index repos mkDraft rev name
206 = do pageM <- getPage' repos name (Just rev)
210 -> do docIdM <- getDocIdByURI index (mkRakkaURI name)
213 Just docId -> do removeDocument index docId [CleaningRemove]
214 infoM logger ("Removed page " ++ name ++ " from the index")
216 -> do draft <- mkDraft page
217 putDocument index draft [CleaningPut]
218 infoM logger ("Indexed page " ++ name ++ " of revision " ++ show (pageRevision page))
221 updateIndexRev :: FilePath -> (RevNum -> IO RevNum) -> IO ()
222 updateIndexRev revFile f = bracket acquireLock releaseLock update
226 = do fd <- openFd revFile ReadWrite (Just stdFileMode) defaultFileFlags
227 waitToSetLock fd (WriteLock, AbsoluteSeek, 0, 0)
230 releaseLock :: Fd -> IO ()
232 = setLock fd (Unlock, AbsoluteSeek, 0, 0)
234 update :: Fd -> IO ()
236 = do fdSeek fd AbsoluteSeek 0
237 size <- return . fromIntegral . fileSize =<< getFdStatus fd
238 (revStr, gotSize) <- fdRead fd size
239 when (size /= gotSize) $ fail ("read " ++ show gotSize ++
240 " bytes but expected " ++ show size ++ " bytes")
242 let rev = case revStr of
248 let revStr' = show rev' ++ "\n"
249 size' = fromIntegral $ length revStr'
251 fdSeek fd AbsoluteSeek 0
253 wroteSize <- fdWrite fd revStr'
254 when (size' /= wroteSize) $ fail ("wrote " ++ show wroteSize ++
255 " bytes but expected " ++ show size' ++ " bytes")