Previous Contents Next

Synchronous Communication

Module Event from the thread library implements the communication of assorted values between two processes through particular ``communication channels''. The effective communication of the value is synchronized through send and receive events.

This model of communication synchronized by events allows the transfer through typed channels of the values of the language, including closures, objects, and events.

It is described in [Rep99].

Synchronization using Communication Events

The primitive communication events are: So as to implement the physical action with which they are associated, two events should be synchronized. For this purpose, we introduce an operation of synchronization (sync) on events. The sending and receiving of a value are not effective unless the two communicating processes are in phase. If a single process wishes to synchronize itself, the operation gets blocked, waiting for the second process to perform its synchronization. This implies that a sender wishing to synchronize the sending of a value (sync (send c v)) can find itself blocked waiting for a synchronization from a receiver (sync (receive c)).

Transmitted Values

The communication channels through which the exchanged values travel are typed: Nothing prevents us from creating multiple channels for communicating each type of value. As this communication takes place between Objective CAML threads, any value of the language can be sent on a channel of the same type. This is useful for closures, objects, and also events, for a ``relayed'' synchronization request.

Module Event

The values encapsulated in communication events travel through communication channels of the abstract data type 'a channel. The creation function for channels is:

# Event.new_channel ;;
- : unit -> 'a Event.channel = <fun>


Send and receive events are created by a function call:

# Event.send ;;
- : 'a Event.channel -> 'a -> unit Event.event = <fun>
# Event.receive ;;
- : 'a Event.channel -> 'a Event.event = <fun>


We can consider the functions send and receive as constructors of the abstract type 'a event. The event constructed by send does not preserve the information about the type of the value to transmit (type unit Event.event). On the other hand, the receive event takes account of it to recover the value during a synchronization. These functions are non-blocking in the sense that the transmission of a value does not take place until the time of the synchronization of two processes by the function:

# Event.sync ;;
- : 'a Event.event -> 'a = <fun>
This function may be blocking for the sender and the receiver.

There is a non-blocking version:

# Event.poll ;;
- : 'a Event.event -> 'a option = <fun>


This function verifies that another process is waiting for synchronization.

If this is the case, it performs the transmissions, and returns the value Some v, if v is the value associated with the event, and None otherwise. The received message, extracted by the function sync, can be the result of a more or less complicated process, triggering other exchanges of messages.

Example of synchronization.
We define three threads. The first, t1, sends a chain of characters on channel c (function g) shared by all the processes. The two others t2 and t3 wait for a value on the same channel. Here are the functions executed by the different processes:


# let c = Event.new_channel ();;
val c : '_a Event.channel = <abstr>
# let f () =
let ids = string_of_int (Thread.id (Thread.self ()))
in print_string ("-------- before -------" ^ ids) ; print_newline() ;
let e = Event.receive c
in print_string ("-------- during -------" ^ ids) ; print_newline() ;
let v = Event.sync e
in print_string (v ^ " " ^ ids ^ " ") ;
print_string ("-------- after -------" ^ ids) ; print_newline() ;;
val f : unit -> unit = <fun>
# let g () =
let ids = string_of_int (Thread.id (Thread.self ()))
in print_string ("Start of " ^ ids ^ "\n");
let e2 = Event.send c "hello"
in Event.sync e2 ;
print_string ("End of " ^ ids) ;
print_newline () ;;
val g : unit -> unit = <fun>


The three processes are created and executed:

# let t1,t2,t3 = Thread.create f (), Thread.create f (), Thread.create g ();;
val t1 : Thread.t = <abstr>
val t2 : Thread.t = <abstr>
val t3 : Thread.t = <abstr>
# Thread.delay 1.0;;
Start of 5
-------- before -------6
-------- during -------6
hello 6 -------- after -------6
-------- before -------7
-------- during -------7
End of 5
- : unit = <unknown constructor>


The transmission may block. The trace of t1 is displayed after the synchronization traces of t2 and t3. Only one of the two processes t1 or t2 is really terminated, as the following calls show:

# Thread.kill t1;;
- : unit = ()
# Thread.kill t2;;
Uncaught exception: Failure("Thread.kill: killed thread")



Previous Contents Next