The suggested exercises allow you to try different types of distributed applications. The first offers a new network service for setting the time on client machines. The second exercise shows how to use resources on different machines to distribute a calculation.

Service: Clock

This exercise consists of implementing a ``clock'' service that gives the time to any client. The idea is to have a reference machine to set the time for different machines on a network.

  1. Define a protocol for transmitting a date containing the day, month, hour, minute, and second.

    # type horloge = { jour:int; mois:int; heure:int; minute:int; seconde:int } ;;
    type horloge =
    { jour: int;
    mois: int;
    heure: int;
    minute: int;
    seconde: int }
    # let encode date =
    let str = String.create 5 in
    str.[0] <- char_of_int date.jour ;
    str.[1] <- char_of_int date.mois ;
    str.[2] <- char_of_int date.heure ;
    str.[3] <- char_of_int date.minute ;
    str.[4] <- char_of_int date.seconde ;
    str ;;
    val encode : horloge -> string = <fun>
    # let decode str =
    { jour = int_of_char str.[0] ;
    mois = int_of_char str.[1] ;
    heure = int_of_char str.[2] ;
    minute = int_of_char str.[3] ;
    seconde = int_of_char str.[4] } ;;
    val decode : string -> horloge = <fun>

  2. Write the function or the class for the service reusing one of the generic servers presented in the Chapter. The service sends date information over each accepted connection, then closes the socket. Nous utilisons la fonction main_serveur(20) :

    # main_serveur ;;
    - : (in_channel -> out_channel -> 'a) -> unit = <fun>

    # let horloge_service ic oc =
    let date = Unix.localtime (Unix.time ()) in
    let date_horloge =
    { jour = date.Unix.tm_mday ;
    mois = date.Unix.tm_mon + 1 ;
    heure = date.Unix.tm_hour ;
    minute = date.Unix.tm_min ;
    seconde = date.Unix.tm_sec } in
    output_string oc (encode date_horloge) ;
    flush oc
    with exn -> print_endline "Fin du traitement"; flush stdout

    let main_horloge () = main_serveur horloge_service ;;
    val horloge_service : 'a -> out_channel -> unit = <fun>
    val main_horloge : unit -> unit = <fun>

  3. Write the client , which sets the clock every hour. Nous utilisons la fonction main_client20 :

    # main_client ;;
    - : (in_channel -> out_channel -> 'a) -> unit = <fun>

    # let client_horloge ic oc =
    let date = ref { jour=0; mois=0; heure=0; minute=0; seconde=0 } in
    while true do
    let buffer = "xxxxx" in
    ignore (input ic buffer 0 5) ;
    date := decode buffer ;
    print_endline "BIP";
    flush stdout ;
    Unix.sleep 3600
    exn -> shutdown_connection ic ; raise exn ;;
    val client_horloge : in_channel -> 'a -> unit = <fun>

    # let main_horloge () = main_client client_horloge ;;
    val main_horloge : unit -> unit = <fun>

  4. Keep track of time differences when requests are sent. On peut mesurer le temps écoulé entre la demande de connexion et la réception de la réponse. On suppose que ce délai est le double de celui mis par la réponse et on corrige le résultat en conséquence.

A Network Coffee Machine

We can build a little service that simulates a beverage vending machine. A summary description of the protocol between the client and service is as follows: The server may also respond with an error message if it has not understood a request, does not have enough change, etc. A client request always contains just one piece of information.

The exchanges between client and server are in the form of strings of characters. The different components of a message are separated by two periods and all strings end in :$\n.

The service function communicates with the coffee machine by using a file to pass commands and a hash table for recovering drinks and change.

This exercise will make use of sockets, lightweight processes with a little concurrency, and objects.

  1. Rewrite the function establish_server using the primitives in ThreadUnix. On reprend les fonctions hostaddr et my_inet_addr de ce chapitre.

    # val hostaddr : string -> Unix.inet_addr = <fun>
    val my_inet_addr : unit -> Unix.inet_addr = <fun>

    let establish_server f saddr =
    let sock = ThreadUnix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
    Unix.bind sock saddr;
    Unix.listen sock 5;
    while true do
    let (s,_) = ThreadUnix.accept sock in
    let ic = Unix.in_channel_of_descr s
    and oc = Unix.out_channel_of_descr s in
    ignore (Thread.create (f ic) oc)
    val establish_server :
    (in_channel -> out_channel -> 'a) -> Unix.sockaddr -> unit = <fun>

  2. Write two functions, get_request and send_answer. The first function reads and encodes a request and the second formats and sends a response beginning with a list of strings of characters.

    # let read fd =
    let buf = String.create 1024 in
    let n = fd buf 0 1024 in
    let s = String.sub buf 0 n in
    s ;;
    val read : Unix.file_descr -> string = <fun>

    # let get_request fd =
    let s = read fd in
    match Str.split (Str.regexp "[:]") (String.sub s 0 (String.index s '$')) with
    [s1] -> s1
    | _ -> failwith "BadRequestFormat" ;;
    val get_request : Unix.file_descr -> string = <fun>
    On redéfinit des fonctions d'entrées-sorties utilisant celles de Threadunix puis on les utilise pour get_request et send_answer. Comme elle revient souvent, on définit également une fonction send_cancel qui envoie un message d'erreur.

    # let write fd s =
    let leng = (String.length s) in
    let n = ThreadUnix.write fd s 0 leng in
    if n<leng then failwith "I/O error" ;;
    val write : Unix.file_descr -> string -> unit = <fun>

    # let send_answer fd ss =
    let rec mk_answer = function
    [] -> ":$\n"
    | [s] -> s ^ ":$\n"
    | s::ss -> s ^ ":" ^ (mk_answer ss)
    write fd (mk_answer ss) ;;
    val send_answer : Unix.file_descr -> string list -> unit = <fun>

    # let send_cancel = let s = "cancel:$\n" in function fd -> write fd s ;;
    val send_cancel : Unix.file_descr -> unit = <fun>

  3. Write a class cmd_fifo to manage pending commands. Each new command is assigned a unique number. For this purpose, implement a class num_cmd_gen.

    # class cmd_fifo =
    val n = new num_cmd_gen
    val f = (Queue.create (): (int*int*int) Queue.t)
    val m = Mutex.create ()
    val c = Condition.create ()

    method add num_drink paid =
    let num_cmd = n#get() in
    Mutex.lock m ;
    Queue.add (num_cmd, num_drink, paid) f ;
    Mutex.unlock m ;
    Condition.signal c ;

    method wait () =
    Mutex.lock m ;
    Condition.wait c m ;
    let cmd = Queue.take f in
    Mutex.unlock m ;
    end ;;
    class cmd_fifo :
    val c : Condition.t
    val f : (int * int * int) Queue.t
    val m : Mutex.t
    val n : num_cmd_gen
    method add : int -> int -> int
    method wait : unit -> int * int * int

    # class num_cmd_gen =
    val mutable x = 0
    val m = Mutex.create ()

    method get() =
    Mutex.lock m ;
    x <- x+1 ;
    let r = x in
    Mutex.unlock m ;
    end ;;
    class num_cmd_gen :
    object val m : Mutex.t val mutable x : int method get : unit -> int end

  4. Write a class ready_table for stocking the machine with drinks.

    # class ready_table size =
    val t = (Hashtbl.create size : (int, (string * int)) Hashtbl.t)
    val m = Mutex.create ()
    val c = Condition.create ()

    method add num_cmd num_drink change =
    Mutex.lock m ;
    Hashtbl.add t num_cmd (num_drink, change) ;
    Mutex.unlock m ;
    Condition.broadcast c

    method wait num_cmd =
    Mutex.lock m;
    while not(Hashtbl.mem t num_cmd) do Condition.wait c m done ;
    let cmd = Hashtbl.find t num_cmd in
    Hashtbl.remove t num_cmd ;
    Mutex.unlock m ;
    end ;;
    class ready_table :
    int ->
    val c : Condition.t
    val m : Mutex.t
    val t : (int, string * int) Hashtbl.t
    method add : int -> string -> int -> unit
    method wait : int -> string * int

  5. Write the class machine that models the coffee machine. The class contains a method run that loops through the sequence: wait for a command, then execute it, as long as there remain drinks available. Define a type drink_descr indicating, for each drink: its name, the quantity in stock, the quantity that will remain after satisying pending commands, and its price. We can use an auxiliary function array_index which returns the index of the first element in a table satisfying a criterion passed as a parameter.

    # class machine (f_cmd0:cmd_fifo) (t_ready0:ready_table) =
    val f_cmd = f_cmd0
    val t_ready = t_ready0
    val mutable nb_available_drinks = 0
    val drinks_table =
    [| { name="cafe"; real_stock=10; virtual_stock=10; price=300 };
    { name="the"; real_stock=5; virtual_stock=5; price=250 };
    { name="chocolat"; real_stock=10; virtual_stock=10; price=250 } |]
    val mutable cash = 0
    val m = Mutex.create()

    initializer nb_available_drinks <- Array.length drinks_table

    method get_drink_price i = drinks_table.(i).price
    method get_drink_index s = array_index drinks_table (fun d ->

    method get_menu () =
    let f d ns = if d.real_stock > 0 then else ns in
    Array.fold_right f drinks_table []

    method cancel_cmd num_drink =
    let drink = drinks_table.(num_drink) in
    drink.virtual_stock <- drink.virtual_stock+1

    method set_cmd num_drink paid = f_cmd#add num_drink paid

    method wait_cmd num_cmd = t_ready#wait num_cmd

    method deliver_drink num_drink =
    let drink = drinks_table.(num_drink) in
    drink.real_stock <- drink.real_stock-1 ;
    if drink.real_stock = 0 then nb_available_drinks <- nb_available_drinks-1

    method run() =
    while nb_available_drinks>0 do
    let (num_cmd, num_drink, amount) = f_cmd#wait () in
    let drink = drinks_table.(num_drink) in
    let change = amount - drink.price in
    Mutex.lock m ;
    if (drink.virtual_stock > 0) & (cash >= change)
    drink.virtual_stock <- drink.virtual_stock-1 ;
    cash <- cash + drink.price ;
    t_ready#add num_cmd change
    else t_ready#add num_cmd "cancel" 0 ;
    Mutex.unlock m
    end ;;
    class machine :
    cmd_fifo ->
    ready_table ->
    val mutable cash : int
    val drinks_table : drink_descr array
    val f_cmd : cmd_fifo
    val m : Mutex.t
    val mutable nb_available_drinks : int
    val t_ready : ready_table
    method cancel_cmd : int -> unit
    method deliver_drink : int -> unit
    method get_drink_index : string -> int
    method get_drink_price : int -> int
    method get_menu : unit -> string list
    method run : unit -> unit
    method set_cmd : int -> int -> int
    method wait_cmd : int -> string * int

    # type drink_descr =
    { name : string;
    mutable real_stock : int;
    mutable virtual_stock : int;
    price : int } ;;

    # let array_index t f =
    let i = ref 0 in
    let n = Array.length t in
    while (!i < n) & (not (f t.(!i))) do incr i done ;
    if !i=n then raise Not_found else !i ;;
    val array_index : 'a array -> ('a -> bool) -> int = <fun>

  6. Write the service function waiter.

    # let waiter mach ic oc =
    let f_in = Unix.descr_of_in_channel ic in
    let f_out = Unix.descr_of_out_channel oc in
    send_answer f_out (mach#get_menu()) ;
    let drink_name = get_request f_in in
    let num_drink = mach#get_drink_index drink_name in
    let drink_price = mach#get_drink_price num_drink in
    send_answer f_out [string_of_int drink_price] ;
    let paid = int_of_string (get_request f_in) in
    if paid < drink_price then failwith"NotEnough" ;
    let num_cmd = mach#set_cmd num_drink paid in
    let drink_name, change = mach#wait_cmd num_cmd in
    mach#deliver_drink num_drink;
    send_answer f_out [drink_name; (string_of_int change)]
    Not_found -> send_cancel f_out
    | Failure("int_of_string") -> send_cancel f_out
    | Failure("I/O error") -> send_cancel f_out
    | Failure("NotEnough") -> send_cancel f_out
    | Failure("BadRequestFormat") -> send_cancel f_out
    close_in ic ;
    flush oc ;
    close_out oc ;
    Thread.exit () ;;
    val waiter :
    < deliver_drink : 'a -> 'b; get_drink_index : string -> 'a;
    get_drink_price : 'a -> int; get_menu : unit -> string list;
    set_cmd : 'a -> int -> 'c; wait_cmd : 'c -> string * int; .. > ->
    in_channel -> out_channel -> unit = <fun>

  7. Write the principal function main that obtains a port number for the service from the command line and performs a number of initialization tasks. In particular, the coffee machine executes in a process.

    # let main () =
    if Array.length Sys.argv < 2
    Printf.eprintf "usage : %s port\n" Sys.argv.(0) ;
    exit 1
    let port = int_of_string Sys.argv.(1) in
    let f_cmd = new cmd_fifo in
    let t_ready = new ready_table in
    let mach = new machine f_cmd (t_ready 13) in
    ignore (Thread.create mach#run ()) ;
    establish_server (waiter mach) (Unix.ADDR_INET (my_inet_addr (), port))
    end ;;
    val main : unit -> unit = <fun>

