Version française
Home     About     Download     Resources     Contact us    
Browse thread
wrap_abort missing in thread library
[ Home ] [ Index: by date | by threads ]
[ Search: ]

[ Message by date: previous | next ] [ Message in thread: previous | next ] [ Thread: previous | next ]
Date: -- (:)
From: Pierre CREGUT - FT . BD/CNET/DTL/MSV <pierre.cregut@c...>
Subject: wrap_abort missing in thread library
The OCAML threads API implements the basic primitives of CML but some
features are missing :
- choose [] raises an exception and is not a neutral element
- wrap_abort (wrapAbort in CML) is not implemented. 
    [wrap abort ev fn] fn is executed after synchronisation of the 
    embedding event if ev was not selected.

The first one is useful to describe guarded events that may not exist :

  if "i have sthing to send" then send channel sthing else choose []

The second one is useful to destroy some computations that will not be used.
The following example implements a timeout of d second that triggers a unit
event if selected. If not, the waiting thread is destroyed as soon as
another event is chosen and not when the waiting time has elapsed.

  let timeout d =
    let mk_event () =
      let chan = Event.new_channel () in
      let timer () = Thread.delay d; Event.poll (Event.send chan ()) in
      let tid = Thread.create timer () in
      Event.wrap_abort (Event.receive chan) (fun () -> Thread.kill tid) in
    Event.guard mk_event

A patch addresing those two issues follows. It must be applied 
on top of Ocaml2.02 threads library. It would be nice if something similar
could be added to the official library or if I could get an explanation
telling me how to program without it.

Pierre Crégut

-- 
Pierre Cregut - pierre.cregut@cnet.francetelecom.fr - +33 2 96 05 16 28
FT.CNET - DTL/MSV - 2 avenue Pierre Marzin - 22307 Lannion Cedex - France

=========================================================================

diff -Nbaur /home/lsun561/Logiciels/ocaml-2.02/otherlibs/threads/event.ml threads/event.ml
--- /home/lsun561/Logiciels/ocaml-2.02/otherlibs/threads/event.ml	Mon Feb  8 15:20:31 1999
+++ threads/event.ml	Wed Jun 23 08:55:53 1999
@@ -21,9 +21,11 @@
     result: unit -> 'a }
       (* Return the result of the communication *)
 
+type 'a behavior = int ref -> Condition.t -> int -> 'a basic_event
 type 'a event =
     Communication of (int ref -> Condition.t -> int -> 'a basic_event)
   | Choose of 'a event list
+  | WrapAbort of 'a event * (unit -> unit)
   | Guard of (unit -> 'a event)
 
 (* Communication channels *)
@@ -51,13 +53,23 @@
 
 let masterlock = Mutex.create()
 
-let basic_sync genev =
+let do_aborts abort_env genev performed = 
+  if abort_env <> [] then begin
+    if performed >= 0 then begin
+      let ids_done = snd genev.(performed) in
+      List.iter 
+	(fun (id,f) -> if not (List.mem id ids_done) then f ())
+	abort_env
+    end else List.iter (fun (_,f) -> f ()) abort_env
+  end
+
+let basic_sync abort_env genev =
   let performed = ref (-1) in
   let condition = Condition.create() in
   let bev = Array.create (Array.length genev)
-                         (genev.(0) performed condition 0) in
+                         (fst (genev.(0)) performed condition 0) in
   for i = 1 to Array.length genev - 1 do
-    bev.(i) <- genev.(i) performed condition i
+    bev.(i) <- (fst genev.(i)) performed condition i
   done;
   (* See if any of the events is already activable *)
   let rec poll_events i =
@@ -73,12 +85,15 @@
   end;
   Mutex.unlock masterlock;
   (* Extract the result *)
-  bev.(!performed).result()
+  let num = !performed in
+  let result = bev.(!performed).result() in
+  do_aborts abort_env genev num; result
 
 (* Apply a random permutation on an array *)
 
 let scramble_array a =
   let len = Array.length a in
+  if len = 0 then invalid_arg "Event.choose";
   for i = len - 1 downto 1 do
     let j = Random.int (i + 1) in
     let temp = a.(i) in a.(i) <- a.(j); a.(j) <- temp
@@ -87,24 +102,41 @@
 
 (* Main synchronization function *)
 
-let rec flatten_event ev accu =
+let gensym = let count = ref 0 in fun () -> incr count; !count
+
+let rec flatten_event 
+      (abort_list : int list) 
+      (accu : ('a behavior * int list) list)
+      (accu_abort : (int * (unit -> unit)) list) 
+      ev =
   match ev with
-    Communication bev -> bev :: accu
-  | Choose evl -> List.fold_right flatten_event evl accu
-  | Guard fn -> flatten_event (fn ()) accu
+     Communication bev -> ((bev,abort_list) :: accu) , accu_abort
+  | WrapAbort (ev,fn) ->
+      let id = gensym () in 
+      flatten_event (id :: abort_list) accu ((id,fn)::accu_abort) ev
+  | Choose evl ->
+      let rec flatten_list accu' accu_abort'= function
+	 ev :: l ->
+	   let (accu'',accu_abort'') = 
+	     flatten_event abort_list accu' accu_abort' ev in
+	   flatten_list accu'' accu_abort'' l
+       | [] -> (accu',accu_abort') in
+      flatten_list accu accu_abort evl
+  | Guard fn -> flatten_event abort_list accu accu_abort (fn ()) 
 
 let sync ev =
-  basic_sync(scramble_array(Array.of_list(flatten_event ev [])))
+  let evl,abort_env = flatten_event [] [] [] ev in
+  basic_sync abort_env (scramble_array(Array.of_list evl))
 
 (* Event polling -- like sync, but non-blocking *)
 
-let basic_poll genev =
+let basic_poll abort_env genev =
   let performed = ref (-1) in
   let condition = Condition.create() in
   let bev = Array.create(Array.length genev)
-                        (genev.(0) performed condition 0) in
+                        (fst genev.(0) performed condition 0) in
   for i = 1 to Array.length genev - 1 do
-    bev.(i) <- genev.(i) performed condition i
+    bev.(i) <- fst genev.(i) performed condition i
   done;
   (* See if any of the events is already activable *)
   let rec poll_events i =
@@ -116,16 +148,19 @@
   if ready then begin
     (* Extract the result *)
     Mutex.unlock masterlock;
-    Some(bev.(!performed).result())
+    let result = Some(bev.(!performed).result()) in
+    do_aborts abort_env genev !performed; result
   end else begin
     (* Cancel the communication offers *)
     performed := 0;
     Mutex.unlock masterlock;
+    do_aborts abort_env genev (-1);
     None
   end
 
 let poll ev =
-  basic_poll(scramble_array(Array.of_list(flatten_event ev [])))
+  let evl,abort_env = flatten_event [] [] [] ev in
+  basic_poll abort_env (scramble_array(Array.of_list evl))
 
 (* Remove all communication opportunities already synchronized *)
 
@@ -201,9 +236,9 @@
         None -> invalid_arg "Event.receive"
       | Some res -> res) })
 
-let choose = function
-    [] -> invalid_arg "Event.choose"
-  | evl -> Choose evl
+let choose evl = Choose evl
+
+let wrap_abort ev fn = WrapAbort(ev,fn)
 
 let guard fn = Guard fn
 
@@ -217,6 +252,8 @@
           result = (fun () -> fn(bev.result())) })
   | Choose evl ->
       Choose(List.map (fun ev -> wrap ev fn) evl)
+  | WrapAbort (ev,f') ->
+      WrapAbort (wrap ev fn, f')
   | Guard gu ->
       Guard(fun () -> wrap (gu()) fn)
 
diff -Nbaur /home/lsun561/Logiciels/ocaml-2.02/otherlibs/threads/event.mli threads/event.mli
--- /home/lsun561/Logiciels/ocaml-2.02/otherlibs/threads/event.mli	Tue Dec  9 10:10:23 1997
+++ threads/event.mli	Wed Jun 23 08:55:58 1999
@@ -42,6 +42,10 @@
         (* [wrap ev fn] returns the event that performs the same communications
            as [ev], then applies the post-processing function [fn]
            on the return value. *)
+val wrap_abort: 'a event -> (unit -> unit) -> 'a event
+        (* [wrap ev fn] returns the event that performs the same communications
+           as [ev], but if it is not selected the function fn is executed
+	   after the synchronisation. *)
 val guard: (unit -> 'a event) -> 'a event
         (* [guard fn] returns the event that, when synchronized, computes
            [fn()] and behaves as the resulting event. This allows to