import Control.Concurrent import Control.Concurrent.Chan import Control.Concurrent.STM import Control.Concurrent.STM.TVar import Data.Array import System.Time {- Sample usage: > q <- atomically $ emptySTM 3 (undefined :: Int) -- three-element Int queue > pushSTM q 5 > pushSTM q 6 > pushSTM q 7 > pushSTM q 8 -- hangs because full; hit C-c > peekSTM q Just 5 > takeSTM q 5 > takeSTM q 6 > takeSTM q 7 > peekSTM q Nothing or > :m +Control.Concurrent > do { forkIO $ pushSTM q 1; pushSTM q 2 } > sequence [takeSTM q, takeSTM q] [2,1] -- in my tests; could be [1,2] or > peekSTM q Nothing > do { pollTimeoutSTM q onesecond >>= print; pushSTM q 6 } {- hangs for a second.. -}Nothing > takeSTM q 6 > peekSTM q Nothing > do { forkIO $ (pollTimeoutSTM q onesecond >>= print); pushSTM q 6 } 6 > peekSTM q Nothing -} data ArrayBlockingQueueSTM e = ArrayBlockingQueueSTM { shead :: TVar Int, stail :: TVar Int, sused :: TVar Int, slen :: Int, sa :: Array Int (TVar e) } takeSTM :: ArrayBlockingQueueSTM e -> IO e takeSTM abq = do me <- atomically (readHeadElementSTM abq True True) case me of Just e -> return e peekSTM :: ArrayBlockingQueueSTM e -> IO (Maybe e) peekSTM abq = atomically (readHeadElementSTM abq False False) emptySTM :: Int -> e -> STM (ArrayBlockingQueueSTM e) emptySTM len proto = do head <- newTVar 0 tail <- newTVar 0 used <- newTVar 0 elems <- sequence (replicate len (newTVar proto)) let a = listArray (0, len-1) elems return ArrayBlockingQueueSTM { shead = head, stail = tail, sused = used, slen = len, sa = a } pushSTM :: ArrayBlockingQueueSTM e -> e -> IO () pushSTM abq e = atomically $ do u <- readTVar (sused abq) if u == len then retry {- TODO allow nonblocking -} else do t <- readTVar (stail abq) writeTVar (sa abq ! t) e writeTVar (stail abq) ((t+1) `mod` len) writeTVar (sused abq) (u+1) where len = slen abq readHeadElementSTM :: ArrayBlockingQueueSTM e -> Bool -> Bool -> STM (Maybe e) readHeadElementSTM abq remove block = do u <- readTVar (sused abq) if u == 0 then if block then retry else return Nothing else do h <- readTVar (shead abq) let tv = sa abq ! h e <- readTVar tv if remove then do let len = slen abq let newh = (h+1) `mod` len writeTVar (shead abq) $! newh writeTVar (sused abq) $! (u-1) else return () return (Just e) startTimer :: TimeDiff -> IO (TChan ()) startTimer timeout = do c <- newTChanIO forkIO (timer c timeout) return c timer :: TChan () -> TimeDiff -> IO () timer c timeout = do let td = normalizeTimeDiff timeout let ps = (tdSec td) * 1000000 threadDelay ps atomically $ writeTChan c () return () pollTimeoutSTM :: ArrayBlockingQueueSTM e -> TimeDiff -> IO (Maybe e) pollTimeoutSTM abq timeout = do c <- startTimer timeout atomically ((do readTChan c return Nothing) `orElse` (do me <- readHeadElementSTM abq True True return me)) onesecond :: TimeDiff onesecond = diffClockTimes (TOD 1 0) (TOD 0 0)