» Tuppence Tour of Haskell Concurrency Constructs #

Paul R. Brown @ 2007-10-20

One of the little widgets that I need for an experiment is a sequence number generator, and it's a good way to get a tuppence (i.e., less than half a nickel) tour of Haskell's concurrency constructs with a little lesson on laziness thrown in for spice.


The generator should produce unique Int values on demand and support concurrent access, and I want to try out a couple of methods, one that uses GHC's baseline concurrency capabilities and one that uses STM. (Also, I'd like a better feel for the concepts in practice, so this makes a good exercise!)

Take One: Asynchronous Channels

The base GHC concurrency packages (Control.Concurrent and its descendents) include a great set of buildings blocks: one-place buffers (MVar), asynchronous channels (Chan) that can be combined into one-to-many broadcast channels, and quantity semaphores (QSem and QSemN).

The design I have in mind uses two asynchronous channels, one for requests and one for responses. All clients of the generator receive responses on the one channel, which means that one might get the number that another one requested, but that's no big deal.

First, one data structure for the requests and one for the client view of the generator:

import Control.Concurrent ( ThreadId, forkIO )
import Control.Concurrent.Chan ( Chan, newChan, writeChan, readChan )

data Request = Get
             | Set { initial_value :: Int }
             | Die

data Generator = Generator { thread_id :: ThreadId,
                             request_channel :: Chan Request, 
                             response_channel :: Chan Int }

Then some functions (whose signatures I'll match with the STM version below) to manipulate the generator:

reset :: Generator -> Int -> IO ()
reset g i = writeChan (request_channel g) (Set i)

get_next :: Generator -> IO Int
get_next g = (writeChan (request_channel g) (Get)) >> 
             readChan (response_channel g)

stop :: Generator -> IO ()
stop g = writeChan (request_channel g) Die

Each client function is implemented in terms of sending the request data structure to the generator on the request_channel and then, in the case of the Get operation, blocking to read a value from the response_channel.

Finally, the request-handling event loop in a separate lightweight thread:

new_counter :: IO Generator
new_counter = do { in_chan <- newChan
                 ; out_chan <- newChan
                 ; tid <- forkIO $ loop in_chan out_chan 0
                 ; return $ Generator tid in_chan out_chan }

loop :: Chan Request -> Chan Int -> Int -> IO ()
loop ic oc i = do { req <- readChan ic
                  ; case req of 
                      Die -> return ()
                      (Set n) ->
                          (loop ic oc n)
                      Get ->
                          do { writeChan oc i
                             ; loop ic oc $! (i+1) } }

A similar pattern (request channel, response channel or one-place buffer (MVar) either pre-set or passed with the request, tail-recursive event loop) works for a wide variety of problems and presents a reasonable API for clients.

The single-threadedness of the loop makes it intuitively easy to conclude that it does the right thing (returns unique values to clients), but it's easy enough to check with some experiments in ghci:

> :m + Control.Concurrent
> g <- new_counter
> get_next g
> forkIO $ sequence_ $ replicate 10000000 (get_next g)
ThreadId 111
> forkIO $ sequence_ $ replicate 10000000 (get_next g)
ThreadId 112
> forkIO $ sequence_ $ replicate 10000000 (get_next g)
ThreadId 113
[... wait a while ...]
> get_next g

Which is the right answer. Another experiment will show how fair the scheduler is in terms of multiple client threads:

> g <- new_counter
> :set -fno-print-bind-result -fglasgow-exts

By the way, [TAB]-completion in ghci means that the above can be obtained by typing:

:set -fno-pr[TAB] -fgl[TAB]

The -fno-print-bind-result prevents ghci from spoiling our attempts to be lazy by printing (and thus evaluating), and the -fglasgow-exts lets us use a type specification to specify what kind of channel we're creating. (Normally, the compiler would figure it out from context, but that won't work in ghci.)

> tid::(Chan (Int,Int)) <- newChan
> mapM_ (\n -> (forkIO $ sequence_ $ replicate 1000 (get_next g >>= (writeChan tid).((,) n)))) [1..10]

The last expression looks dense, but it breaks down simply. In plain English, fork 10 threads numbered 1 through 10, and on each thread, do the following 1000 times: get a sequence number from the generator and then send the ordered pair of the threads number and the sequence number on the tid channel. (The (,) function makes ordered pairs out of its arguments.) To inspect the contents of the channel once the threads are done:

> x <- getChanContents tid

(Without the -fno-print-bind-result above, this would run forever.) And now a couple of quick checks:

> :m + List
> let y = take 10000 x
> length $ nub $ map snd y
> [ length $ filter (((==) j).fst) y | j <- [1..10] ]
> let w = [ length $ nub (take 10 (drop k (map fst y))) | k <- [0..9990] ]
> [ length $ filter ((==) j) w | j <- [1..10] ]

So, most of the time, in 10 turns, 10 client threads get a chance.

A Quick Word About Laziness

The $! is a piece of Haskell magic that ensures that the value on the right is evaluated. Without it, this happens:

> g <- new_counter
> get_next g
> sequence_ $ replicate 100000000 (get_next g)
> get_next g
*** Exception: stack overflow

The reason lies in Haskell's laziness. At the time that the second get_next is evaluated, there are 100,000,000 (i+1) queued-up and waiting to be evaluated because no one ever actually consumed the values passed back on the response channel. This is just the way that Haskell works: You can tell the runtime about a ridiculous computation, but it won't complain until you ask for the result. The $! ensures that the (i+1) value is evaluated each time Get is performed.

Take Two: TVar

Haskell's STM (software transactional memory; read/watch Simon Peyton Jones on the subject) implementation provides another set of building blocks in the form of atomically mutable cells (TVar), asynchronous channels (TChan), one-place buffers (TMVar), and arrays (TArray).

The sequence generator implementation with the same API but using a TVar to hold the current value is shorter and simpler (no backing thread) than the one above that uses channels:

import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import Control.Monad

data Generator = Generator { counter :: TVar Int }

new_counter :: Int -> IO Generator
new_counter i = (atomically $ newTVar i) >>= (return . Generator)

get_next :: Generator -> IO Int
get_next g = atomically $ do { n <- readTVar $ counter g
                             ; (writeTVar (counter g)) $! (n+1)
                             ; return n }

reset :: Generator -> Int -> IO ()
reset g n = atomically $ writeTVar (counter g) n

Note the $! that appears in get_next; it is there for the same reason as it appears in the Chan version.

The same set of basic verifications as above ends with:

> [ length $ filter ((==) j) w | j <- [1..10] ]

Or, the 10 threads took blocks of 1000 numbers from the sequence because the scheduler had no reason to switch. For a slightly different spawning of clients with a yield added, we get more regular results:

> mapM_ (\n -> (forkIO $ sequence_ $ replicate 1000 (get_next g >>= (writeChan tid).((,) n) >> yield))) [1..10]
[... same steps ...]
> (map fst y) == (concat $ replicate 1000 [1..10])


The STM version is probably the one that I'll use, but I also need some more complicated components where the channel-based design will work well.


← 2007-10-02 — Wiring Haskell into a FastCGI Web Server
→ 2008-01-02 — Administrivia