Previous Contents Next

Synchronization of Processes

In the setting of processes sharing a common zone of memory, the word ``concurrency'' carries its full meaning: the various processes involved are compete for access to the unique resource of the memory2. To the problem of division of resources, is added that of the lack of control of the alternation and of the execution times of the concurrent processes.

The system which manages the collection of processes can at any moment interrupt a calculation in progress. Thus when two processes cooperate, they must be able to guarantee the integrity of the manipulations of certain shared data. For this, a process should be able to remain owner of these data as long as it has not completed a calculation or any other operation (for example, an acquisition of data from a peripheral). To guarantee the exclusivity of access to the data to a single process, we set up a mechanism called mutual exclusion.

Critical Section and Mutual Exclusion

The mechanisms of mutual exclusion are implemented with the help of particular data structures called mutexes. The operations on mutexes are limited to their creation, their setting, and their disposal. A mutex is the smallest item of data shared by a collection of concurrent processes. Its manipulation is always exclusive. To the notion of exclusivity of manipulation of a mutex is added that of exclusivity of possession: only the process which has taken a mutex can free it; if other processes wish to use the mutex, then they must wait for it to be released by the process that is holding it.

Mutex Module

Module Mutex is used to create mutexes between processes related by mutual exclusion on an area of memory. We will illustrate their use with two small classic examples of concurrency.

The functions of creation, locking, and unlocking of mutexes are:

# Mutex.create ;;
- : unit -> Mutex.t = <fun>
# Mutex.lock ;;
- : Mutex.t -> unit = <fun>
# Mutex.unlock ;;
- : Mutex.t -> unit = <fun>


There exists a variant of mutex locking that is non-blocking:

# Mutex.try_lock;;
- : Mutex.t -> bool = <fun>
If the mutex is already locked, the function returns false. Otherwise, the function locks the mutex and returns true.

The Dining Philosophers

This little story, due to Dijkstra, illustrates a pure problem of resource allocation. It goes as follows:

``Five oriental philosophers divide their time between study and coming to the refectory to eat a bowl of rice. The room devoted to feeding the philosophers contains nothing but a single round table on which there is a large dish of rice (always full), five bowls, and five chopsticks.''




Figure 19.1: The Table of the Dining Philosophers


As we can see in the figure 19.1, a philosopher who takes his two chopsticks beside his bowl stops his neighbours from doing the same. When he puts down one of his chopsticks, his neighbour, famished, can grab it. If needs be, this latter should wait until the other chopstick is available. Here the chopsticks are the resources to be allocated.

To simplify things, we suppose that each philosopher habitually comes to the same place at the table. We model the five chopsticks as five mutexes stored in a vector b.

# let b =
let b0 = Array.create 5 (Mutex.create()) in
for i=1 to 4 do b0.(i) <- Mutex.create() done;
b0 ;;
val b : Mutex.t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]


Eating and meditation are simulated by a suspension of processes.

# let meditation = Thread.delay
and eating = Thread.delay ;;
val meditation : float -> unit = <fun>
val eating : float -> unit = <fun>


We model a philosopher by a function which executes an infinite sequence of actions from Dijsktra's story. Taking a chopstick is simulated by the acquisition of a mutex, thus a single philosopher can hold a given chopstick at a time. We introduce a little time of reflection between taking and dropping of each of the two chopsticks while a number of output commands track the activity of the philosopher.


# let philosopher i =
let ii = (i+1) mod 5
in while true do
meditation 3. ;
Mutex.lock b.(i);
Printf.printf "Philosopher (%d) takes his left-hand chopstick" i ;
Printf.printf " and meditates a little while more\n";
meditation 0.2;
Mutex.lock b.(ii);
Printf.printf "Philosopher (%d) takes his right-hand chopstick\n" i;
eating 0.5;
Mutex.unlock b.(i);
Printf.printf "Philosopher (%d) puts down his left-hand chopstick" i;
Printf.printf " and goes back to meditating\n";
meditation 0.15;
Mutex.unlock b.(ii);
Printf.printf "Philosopher (%d) puts down his right-hand chopstick\n" i
done ;;
val philosopher : int -> unit = <fun>


We can test this little program by executing:

for i=0 to 4 do ignore (Thread.create philosopher i) done ;
while true do Thread.delay 5. done ;;


We suspend, in the infinite loop while, the main process in order to increase the chances of the philosopher processes to run. We use randomly chosen delays in the activity loop with the aim of creating some disparity in the parallel execution of the processes.

Problems of the naïve solution.
A terrible thing can happen to our philosophers: they all arrive at the same time and seize the chopstick on their left. In this case we are in a situation of dead-lock. None of the philosophers can eat! We are in a situation of starvation.

To avoid this, the philosophers can put down a chopstick if they do not manage to take the second one. This is highly courteous, but still allows two philosophers to gang up against a third to stop him from eating, by not letting go of their chopsticks, except the ones that their other neighbour has given them. There exist numerous solutions to this problem. One of them is the object of the exercise on page ??.

Producers and Consumers I

The pair of producers-consumers is a classic example of concurrent programming. A group of processes, designated the producers, are in charge of storing data in a queue: a second group, the consumers, is in charge of removing it. Each intervening party excludes the others.

We implement this scheme using a queue shared between the producers and the consumers. To guarantee the proper operation of the system, the queue is manipulated in mutual exclusion in order to guarantee the integrity of the operations of addition and removal.

f is the shared queue, and m  is the mutex.

# let f = Queue.create () and m = Mutex.create () ;;
val f : '_a Queue.t = <abstr>
val m : Mutex.t = <abstr>


We divide the activity of a producer into two parts: creating a product (function produce) and storing a product (fonction store). Only the operation of storage needs the mutex.

# let produce i p d =
incr p ;
Thread.delay d ;
Printf.printf "Producer (%d) has produced %d\n" i !p ;
flush stdout ;;
val produce : int -> int ref -> float -> unit = <fun>

# let store i p =
Mutex.lock m ;
Queue.add (i,!p) f ;
Printf.printf "Producer (%d) has added its %dth product\n" i !p ;
flush stdout ;
Mutex.unlock m ;;
val store : int -> int ref -> unit = <fun>


The code of the producer is an endless loop of creation and storage. We introduce a random delay at the end of each iteration in order to desynchronize the execution.

# let producer i =
let p = ref 0 and d = Random.float 2.
in while true do
produce i p d ;
store i p ;
Thread.delay (Random.float 2.5)
done ;;
val producer : int -> unit = <fun>


The only operation of the consumer is the retrieval of an element of the queue, taking care that the product is actually there.

# let consumer i =
while true do
Mutex.lock m ;
( try
let ip, p = Queue.take f
in Printf.printf "The consumer(%d) " i ;
Printf.printf "has taken product (%d,%d)\n" ip p ;
flush stdout ;
with
Queue.Empty ->
Printf.printf "The consumer(%d) " i ;
print_string "has returned empty-handed\n" ) ;
Mutex.unlock m ;
Thread.delay (Random.float 2.5)
done ;;
val consumer : int -> unit = <fun>


The following test program creates four producers and four consumers.

for i = 0 to 3 do
ignore (Thread.create producer i);
ignore (Thread.create consumer i)
done ;
while true do Thread.delay 5. done ;;


Waiting and Synchronization

The relation of mutual exclusion is not ``fine'' enough to describe synchronization between processes. It is not rare that the work of a process depends on the completion of an action by another process, thus modifying a certain condition. It is therefore desirable that the processes should be able to communicate the fact that this condition might have changed, indicating to the waiting processes to test it again. The different processes are thus in a relation of mutual exclusion with communication.

In the preceding example, a consumer, rather than returning empty-handed, could wait until a producer came to resupply the stock. This last could signal to the waiting consumer that there is something to take. The model of waiting on a condition to take a mutex is known as semaphore.

Semaphores.
A semaphore is an integral variable s which can only take non negative values. Once s is initialised, the only operations allowed are: wait(s) and signal(s), written P(s) and V(s), respectively. They are defined thus, s corresponding to the number of resources of a given type. A semaphore which only takes the values 0 or 1 is called a binary semaphore.

Condition Module

The functions of the module Condition implement the primitives of putting to sleep and waking up processes on a signal. A signal, in this case, is a variable shared by a collection of processes. Its type is abstract and the manipulation functions are:
create
: unit -> Condition.t which creates a new signal.
signal
: Condition.t -> unit which wakes up one of the processes waiting on a signal.

broadcast
: Condition.t -> unit which wakes up all of the processes waiting on a signal.

wait
: Condition.t -> Mutex.t -> unit which suspends the calling process on the signal passed as the first argument. The second argument is a mutex used to protect the manipulation of the signal. It is released, and then reset at each execution of the function.

Producers and Consumers (2)

We revisit the example of producers and consumeres by using the mechanism of condition variables to put to sleep a consumer arriving when the storehouse is empty.

To implement synchronization between waiting consumers and production, we declare:


# let c = Condition.create () ;;
val c : Condition.t = <abstr>


We modify the storage function of the producer by adding to it the sending of a signal:

# let store2 i p =
Mutex.lock m ;
Queue.add (i,!p) f ;
Printf.printf "Producer (%d) has added its %dth product\n" i !p ;
flush stdout ;
Condition.signal c ;
Mutex.unlock m ;;
val store2 : int -> int ref -> unit = <fun>
# let producer2 i =
let p = ref 0 in
let d = Random.float 2.
in while true do
produce i p d;
store2 i p;
Thread.delay (Random.float 2.5)
done ;;
val producer2 : int -> unit = <fun>


The activity of the consumer takes place in two phases: waiting until a product is available, then taking the product. The mutex is taken when the wait is finished and it is released when the consumer has taken its product. The wait takes place on the variable c.

# let wait2 i =
Mutex.lock m ;
while Queue.length f = 0 do
Printf.printf "Consumer (%d) is waiting\n" i ;
Condition.wait c m
done ;;
val wait2 : int -> unit = <fun>
# let take2 i =
let ip, p = Queue.take f in
Printf.printf "Consumer (%d) " i ;
Printf.printf "takes product (%d, %d)\n" ip p ;
flush stdout ;
Mutex.unlock m ;;
val take2 : int -> unit = <fun>
# let consumer2 i =
while true do
wait2 i;
take2 i;
Thread.delay (Random.float 2.5)
done ;;
val consumer2 : int -> unit = <fun>
We note that it is no longer necessary, once a consumer has begun to wait in the queue, to check for the existence of a product. Since the end of its wait corresponds to the locking of the mutex, it does not run the risk of having the new product stolen before it takes it.

Readers and Writers

Here is another classic example of concurrent processes in which the agents do not have the same behaviour with respect to the shared data.

A writer and some readers operate on some shared data. The action of the first may cause the data to be momentarily inconsistent, while the second group only have a passive action. The difficulty arises from the fact that we do not wish to prohibit multiple readers from examining the data simultaneously. One solution to this problem is to keep a counter of the number of readers in the processes of accessing the data. Writing is not allowed except if the number of readers is 0.

The data is symbolized by the integer data which takes the value 0 or 1. The value 0 indicates that the data is ready for reading:


# let data = ref 0 ;;
val data : int ref = {contents=0}


Operations on the counter n are protected by the mutex m:

# let n = ref 0 ;;
val n : int ref = {contents=0}
# let m = Mutex.create () ;;
val m : Mutex.t = <abstr>
# let cpt_incr () = Mutex.lock m ; incr n ; Mutex.unlock m ;;
val cpt_incr : unit -> unit = <fun>
# let cpt_decr () = Mutex.lock m ; decr n ; Mutex.unlock m ;;
val cpt_decr : unit -> unit = <fun>
# let cpt_signal () = Mutex.lock m ;
if !n=0 then Condition.signal c ;
Mutex.unlock m ;;
val cpt_signal : unit -> unit = <fun>


The readers update the counter and emit the signal c when no more readers are present. This is how they indicate to the writer that it may come into action.


# let c = Condition.create () ;;
val c : Condition.t = <abstr>
# let read i =
cpt_incr () ;
Printf.printf "Reader (%d) read (data=%d)\n" i !data ;
Thread.delay (Random.float 1.5) ;
Printf.printf "Reader (%d) has finished reading\n" i ;
cpt_decr () ;
cpt_signal () ;;
val read : int -> unit = <fun>

# let reader i = while true do read i ; Thread.delay (Random.float 1.5) done ;;
val reader : int -> unit = <fun>


The writer needs to block the counter to prevent the readers from accessing the shared data. But it can only do so if the counter is 0, otherwise it waits for the signal indicating that this is the case.

# let write () =
Mutex.lock m ;
while !n<>0 do Condition.wait c m done ;
print_string "The writer is writing\n" ; flush stdout ;
data := 1 ; Thread.delay (Random.float 1.) ; data := 0 ;
Mutex.unlock m ;;
val write : unit -> unit = <fun>

# let writer () =
while true do write () ; Thread.delay (Random.float 1.5) done ;;
val writer : unit -> unit = <fun>


We create a reader and six writers to test these functions.

ignore (Thread.create writer ());
for i=0 to 5 do ignore(Thread.create reader i) done;
while true do Thread.delay 5. done ;;


This solution guarantees that the writer and the readers cannot have access to the data at the same time. On the contrary, nothing guarantees that the writer could ever ``fufill his officé', there we are confronted again with a case of starvation.


Previous Contents Next