@@ -114,19 +114,19 @@ builder db@Database{..} keys = do
114
114
val <- case fromMaybe (fromRight undefined idKey, Dirty Nothing ) status of
115
115
(_, Clean r) -> pure r
116
116
(_, Running force val _) -> do
117
- liftIO $ modifyIORef toForce (force: )
117
+ liftIO $ modifyIORef toForce (Wait force : )
118
118
pure val
119
119
(key, Dirty s) -> do
120
120
act <- unliftAIO (refresh db key id s)
121
121
let (force, val) = splitIO (join act)
122
122
liftIO $ Ids. insert databaseValues id (key, Running force val s)
123
- liftIO $ modifyIORef toForce (force: )
123
+ liftIO $ modifyIORef toForce (Spawn force: )
124
124
pure val
125
125
126
126
pure (id , val)
127
127
128
128
toForceList <- liftIO $ readIORef toForce
129
- waitAll <- unliftAIO $ mapConcurrentlyAIO_ sequence_ $ increasingChunks toForceList
129
+ waitAll <- unliftAIO $ mapConcurrentlyAIO_ id toForceList
130
130
case toForceList of
131
131
[] -> return $ Left results
132
132
_ -> return $ Right $ do
@@ -204,7 +204,7 @@ updateReverseDeps
204
204
-> [Id ] -- ^ Previous direct dependencies of Id
205
205
-> IntSet -- ^ Current direct dependencies of Id
206
206
-> IO ()
207
- updateReverseDeps myId db prev new = uninterruptibleMask_ $ withLock (databaseReverseDepsLock db) $ do
207
+ updateReverseDeps myId db prev new = withLock (databaseReverseDepsLock db) $ uninterruptibleMask_ $ do
208
208
forM_ prev $ \ d ->
209
209
unless (d `Set.member` new) $
210
210
doOne (Set. delete myId) d
@@ -263,21 +263,23 @@ cleanupAsync ref = uninterruptibleMask_ $ do
263
263
mapM_ (\ a -> throwTo (asyncThreadId a) AsyncCancelled ) asyncs
264
264
mapM_ waitCatch asyncs
265
265
266
+ data Wait a
267
+ = Wait { justWait :: ! a }
268
+ | Spawn { justWait :: ! a }
269
+ deriving Functor
266
270
267
- mapConcurrentlyAIO_ :: (a -> IO () ) -> [a ] -> AIO ()
271
+ waitOrSpawn :: Wait (IO a ) -> IO (Either (IO a ) (Async a ))
272
+ waitOrSpawn (Wait io) = pure $ Left io
273
+ waitOrSpawn (Spawn io) = Right <$> async io
274
+
275
+ mapConcurrentlyAIO_ :: (a -> IO () ) -> [Wait a ] -> AIO ()
268
276
mapConcurrentlyAIO_ _ [] = pure ()
269
- mapConcurrentlyAIO_ f [one] = liftIO $ f one
277
+ mapConcurrentlyAIO_ f [one] = liftIO $ justWait $ fmap f one
270
278
mapConcurrentlyAIO_ f many = do
271
279
ref <- AIO ask
272
- liftIO $ uninterruptibleMask $ \ restore -> do
273
- asyncs <- liftIO $ traverse async (map (restore . f) many)
280
+ waits <- liftIO $ uninterruptibleMask $ \ restore -> do
281
+ waits <- liftIO $ traverse waitOrSpawn (map (fmap (restore . f)) many)
282
+ let asyncs = rights waits
274
283
liftIO $ atomicModifyIORef'_ ref (asyncs ++ )
275
- traverse_ wait asyncs
276
-
277
- -- >>> increasingChunks [1..20]
278
- -- [[1,2],[3,4,5,6],[7,8,9,10,11,12,13,14],[15,16,17,18,19,20]]
279
- increasingChunks :: [a ] -> [[a ]]
280
- increasingChunks = go 2 where
281
- go :: Int -> [a ] -> [[a ]]
282
- go _ [] = []
283
- go n xx = let (chunk, rest) = splitAt n xx in chunk : go (min 10 (n* 2 )) rest
284
+ return waits
285
+ liftIO $ traverse_ (either id wait) waits
0 commit comments