Example: Post Office

We present, to end this chapter, a slightly more complete example of a concurrent program: modelling a common queue at a number of counters at a post office.

As always in concurrent programming the problems are posed metaphorically, but replace the counters of the post office by a collection of printers and you have the solution to a genuine problem in computing.

Here the policy of service that we propose; it is well tried and tested, rather than original: each client takes a number when he arrives; when a clerk has finished serving a client, he calls for a number. When his number is called, the client goes to the corresponding counter.

Organization of development.
We distinguish in our development resources, and agents. The former are: the number dispenser, the number announcer, and the windows. The latter are: the clerks and the clients. The resources are modeled by objects which manage their own mutual exclusion mechanisms. The agents are modelled by functions executed by a thread. When an agent wishes to modify or examine the state of an object, it does not itself have to know about or manipulate mutexes, which allows a simplified organization for access to sensitive data, and avoids oversights in the coding of the agents.

The Components

The Dispenser.
The number dispenser contains two fields: a counter and a mutex. The only method provided by the distributor is the taking of a new number.

# class dispenser () =
val mutable n = 0
val m = Mutex.create()
method take () = let r = Mutex.lock m ; n <- n+1 ; n
in Mutex.unlock m ; r
end ;;
class dispenser :
unit ->
object val m : Mutex.t val mutable n : int method take : unit -> int end
The mutex prevents two clients from taking a number at the same time. Note the way in which we use an intermediate variable (r) to guarantee that the number calculated in the critical section is the same as the one return by the method call.

The Announcer.
The announcer contains three fields: an integer (the client number being called); a mutex and a condition variable. The two methods are: (wait) which reads the number, and (call), which modifies it.

# class announcer () =
val mutable nclient = 0
val m = Mutex.create()
val c = Condition.create()

method wait n =
Mutex.lock m;
while n > nclient do Condition.wait c m done;
Mutex.unlock m;

method call () =
let r = Mutex.lock m ;
nclient <- nclient+1 ;
in Condition.broadcast c ;
Mutex.unlock m ;
end ;;
The condition variable is used to put the clients to sleep, waiting for their number. They are all woken up when the method call is invoked. Reading or writing access to the called number is protected by the mutex.

The window.
The window consists of five fields: a fixed window number (variable ncounter); the number of the client being waited for (variable nclient); a boolean (variable available); a mutex, and a condition variable.

It offers eight methods, of which two are private: two simple access methods (methods get_ncounter and get_nclient): a group of three methods simulating the waiting period of the clerk between two clients (private method wait and public methods await_arrival, await_departure); a group of three methods simulate the occupation of the window (private method set_available and methods arrive, depart).

# class counter (i:int) =
val ncounter = i
val mutable nclient = 0
val mutable available = true
val m = Mutex.create()
val c = Condition.create()

method get_ncounter = ncounter
method get_nclient = nclient

method private wait f =
Mutex.lock m ;
while f () do Condition.wait c m done ;
Mutex.unlock m

method wait_arrival n = nclient <- n ; self#wait (fun () -> available)
method wait_departure () = self#wait (fun () -> not available)

method private set_available b =
Mutex.lock m ;
available <- b ;
Condition.signal c ;
Mutex.unlock m
method arrive () = self#set_available false
method leave () = self#set_available true

end ;;

A post office.
We collect these three resources in a record type:

# type office = { d : dispenser ; a : announcer ; cs : counter array } ;;

Clients and Clerks

The behaviour of the system as a whole will depend on the three following parameters:

# let service_delay = 1.7 ;;
# let arrival_delay = 1.7 ;;
# let counter_delay = 0.5 ;;

Each represents the maximum value of the range from which each effective value will be randomly chosen. The first parameter models the time taken to serve a client; the second, the delay between the arrival of clients in the post office; the last, the time it takes a clerk to call a new client after the last one has left.

The Clerk.
The work of a clerk consists of looping indefinitely over the following sequence:
  1. Call for a number.
  2. Wait for the arrival of a client holding the called number.
  3. Wait for the departure of the client occupying his counter.
Adding some output, we get the function:

# let clerk ((a:announcer), (c:counter)) =
while true do
let n = a#call ()
in Printf.printf "Counter %d calls %d\n" c#get_ncounter n ;
c#wait_arrival n ;
c#wait_departure () ;
Thread.delay (Random.float counter_delay)
done ;;
val clerk : announcer * counter -> unit = <fun>

The Client.
A client executes the following sequence:
  1. Take a waiting number.
  2. Wait until his number is called.
  3. Go to the window having called for the number to obtain service.
The only slightly complex activity of the client is to find the counter where they are expected.

We give, for this, the auxiliary function:

# let find_counter n cs =
let i = ref 0 in while cs.(!i)#get_ncounter <> n do incr i done ; !i ;;
val find_counter : 'a -> < get_ncounter : 'a; .. > array -> int = <fun>

Adding some output, the principal function of the client is:

# let client o =
let n = o.d#take()
in Printf.printf "Arrival of client %d\n" n ; flush stdout ;
o.a#wait n ;
let ic = find_counter n o.cs
in o.cs.(ic)#arrive () ;
Printf.printf "Client %d occupies window %d\n" n ic ;
flush stdout ;
Thread.delay (Random.float service_delay) ;
o.cs.(ic)#leave () ;
Printf.printf "Client %d leaves\n" n ; flush stdout ;;
val client : office -> unit = <fun>

The System

The main programme of the application creates a post office and its clerks (each clerk is a process) then launches a process which creates an infinite stream of clients (each client is also a process).

# let main () =
let o =
{ d = new dispenser();
a = new announcer();
cs = (let cs0 = Array.create 5 (new counter 0) in
for i=0 to 4 do cs0.(i) <- new counter i done;
in for i=0 to 4 do ignore (Thread.create clerk (o.a, o.cs.(i))) done ;
let create_clients o = while true do
ignore (Thread.create client o) ;
Thread.delay (Random.float arrival_delay)
in ignore (Thread.create create_clients o) ;
Thread.sleep () ;;
val main : unit -> unit = <fun>
The last instruction puts the process associated with the program to sleep in order to pass control immediately to the other active processes of the application.

