Previous Contents Next

Concurrent Processes

With an application composed of many concurrent processes, we lose the convenience offered by the determinism of sequential programs. For processes sharing the same zone of memory, the result of the following program cannot be deduced from reading it.

main program
let x = ref 1;;
process P process Q
x := !x + 1;; x := !x * 2;;
At the end of the execution of P and Q, the reference x can point to 2, 3 or 4, depending on the order of execution of each process.

This indeterminism applies also to terminations. When the memory state depends on the execution of each parallel process, an application can fail to terminate on a particular execution, and terminate on another. To provide some control over the execution, the processes must be synchronized.

For processes using distinct memory areas, but communicating between each other, their interaction depends on the type of communication. We introduce for the following example two communication primitives: send which sends a value, showing the destination, and receive which receives a value from a process. Let P and Q be two communicating processes:
process P process Q
let x = ref 1;; let y = ref 1;;
send(Q,!x); y := !y + 3;
x := !x * 2; y := !y + receive(P);
send(Q,!x); send(P,!y);
x := !x + receive(Q); y := !y + receive(P);
In the case of a transient communication, process Q can miss the messages of P. We fall back into the non-determinism of the preceding model.

For an asynchronous communication, the medium of the communication channel stores the different values that have been transmitted. Only reception is blocking. Process P can be waiting for Q, even if the latter has not yet read the two messages from P. However, this does not prevent it from transmitting.

We can classify concurrent applications into five categories according to the program units that compose them:
  1. unrelated;
  2. related, but without synchronization;
  3. related, with mutual exclusion;
  4. related, with mutual exclusion and communication;
  5. related, without mutual exclusion, and with synchronous communication.
The difficulty of implementation comes principally from these last categories. Now we will see how to resolve these difficulties by using the Objective CAML libraries.

Compilation with Threads

The Objective CAML thread library is divided into five modules, of which the first four each define an abstract type: This library is not part of the execution library of Objective CAML. Its use requires the option -custom both for compiling programs and for constructing a new toplevel by using the commands:
$ ocamlc -thread -custom threads.cma  files.ml -cclib -lthreads
$ ocamlmktop -tread -custom -o threadtop thread.cma -cclib -lthreads
The Threads library is not usable with the native compiler unless the platform implements threads conforming to the POSIX 10031. Thus we compile executables by adding the libraries unix.a and pthread.a:

$ ocamlc -thread -custom threads.cma files.ml -cclib -lthreads \
  -cclib -lunix -cclib -lpthread
$ ocamltop -thread -custom threads.cma files.ml -cclib -lthreads \
  -cclib -lunix -cclib -lpthread
$ ocamlcopt -thread threads.cmxa files.ml -cclib -lthreads \
  -cclib -lunix -cclib -lpthread

Module Thread

The Objective CAML Thread module contains the primitives for creation and management of threads. We will not make an exhaustive presentation, for instance the operations of file I/O have been described in the preceding chapter.

A thread is created through a call to:

# Thread.create ;;
- : ('a -> 'b) -> 'a -> Thread.t = <fun>
The first argument, of type 'a -> 'b, corresponds to the function executed by the created process; the second argument, of type 'a, is the argument required by the executed function; the result of the call is the descriptor associated with the process. The process thus created is automatically destroyed when the associated function terminates.

Knowing its descriptor, we can ask for the execution of a process and wait for it to finish by using the function join. Here is a usage example:

# let f_proc1 () = for i=0 to 10 do Printf.printf "(%d)" i; flush stdout done;
print_newline() ;;
val f_proc1 : unit -> unit = <fun>
# let t1 = Thread.create f_proc1 () ;;
val t1 : Thread.t = <abstr>
# Thread.join t1 ;;
(0)(1)(2)(3)(4)(5)(6)(7)(8)(9)(10)
- : unit = <unknown constructor>


Warning


The result of the execution of a process is not recovered by the parent process, but lost when the child process terminates.


We can also brutally interrupt the execution of a process of which we know the descriptor with the function kill. For instance, we create a process which is immediately interrupted:

# let n = ref 0 ;;
val n : int ref = {contents=0}
# let f_proc1 () = while true do incr n done ;;
val f_proc1 : unit -> unit = <fun>
# let go () = n := 0 ;
let t1 = Thread.create f_proc1 ()
in Thread.kill t1 ;
Printf.printf "n = %d\n" !n ;;
val go : unit -> unit = <fun>
# go () ;;
n = 0
- : unit = ()


A process can put an end to its own activity by the function:

# Thread.exit ;;
- : unit -> unit = <fun>


It can suspend its activity for a given time by a call to:

# Thread.delay ;;
- : float -> unit = <fun>


The argument stands for the number of seconds to wait.

Let us consider the previous example, and add timing. We create a first process t1 of which the associated function f_proc2 creates in its turn a process t2 which executes f_proc1, then f_proc2 delays for d seconds, and then terminates t2. On termination of t1, we print the contents of n.

# let f_proc2 d =
n := 0 ;
let t2 = Thread.create f_proc1 ()
in Thread.delay d ;
Thread.kill t2 ;;
val f_proc2 : float -> unit = <fun>
# let t1 = Thread.create f_proc2 0.25
in Thread.join t1 ; Printf.printf "n = %d\n" !n ;;
n = 128827
- : unit = ()



Previous Contents Next