Version française
Home     About     Download     Resources     Contact us    
Browse thread
Faking concurrency using Unix forks and pipes
[ Home ] [ Index: by date | by threads ]
[ Search: ]

[ Message by date: previous | next ] [ Message in thread: previous | next ] [ Thread: previous | next ]
Date: -- (:)
From: Thomas Fischbacher <tf@f...>
Subject: Re: [Caml-list] Faking concurrency using Unix forks and pipes
Jon Harrop wrote:

> Has anyone implemented a parallel map function in OCaml using Unix forks, 
> pipes and maybe marshalling?
> 
> This seems like an easy way to get concurrency in OCaml...

That is indeed an exercise I like to pose to my PhD students.
(Of course, the question whether this really makes that much
sense is a different issue...)


Here is my own suggestion how to do it:


let compute_uniform_workload_forked
     ?(bailout=
	(fun str ->
	   let () = Printf.fprintf stderr "AIEE! %s\n%!" str in
	     exit 1))
     ~fun_combine
     v_work =
   let bailout s dummy = let _ = bailout s in dummy in
     (* Note that we use the "bailout" function in two different places 
where it expects
        different return types. Hence, we have to bend over backwards to 
get the type
        system to accept what we actually want to do...
     *)
   let nr_processes = Array.length v_work in
   let rec setup_childs nr_process child_info =
     if nr_process = nr_processes
     then List.rev child_info (* This ensures we get the data in proper 
order. *)
     else
       let (fd_read,fd_write) = Unix.socketpair Unix.PF_UNIX 
Unix.SOCK_STREAM 0 in
       let pid = Unix.fork () in
	if pid == (-1) (* fork failure *)
	then
	  bailout "fork() failure!" child_info
	else
	  if pid == 0 (* We are the child - compute our share and exit *)
	then
	  let () = Unix.close fd_read in
	  let s_write = Unix.out_channel_of_descr fd_write in
	  let result = v_work.(nr_process) () in
	  let () = Marshal.to_channel s_write result [] in
	    exit 0
	else
	  (* We are the parent *)
	  let () = Unix.close fd_write in
	  let s_read = Unix.in_channel_of_descr fd_read in
	    setup_childs (1+nr_process) ((s_read,pid)::child_info)
   in
   let all_childs_info = setup_childs 1 [] in
     (* Note that it is important that we start counting at 1 here, as 
the parent will do
        chunk #0!
     *)
   let result_chunk0 = v_work.(0) () in
     (* Note that we just do assume that all pieces of the computation 
take the same time.
        We are not trying to be overly sophisticated, fetching data from 
the fastest
        child first. Also, if we wanted a more powerful tool to compute 
with forked processes,
        we might want to divide the big task in a more fine-grained way 
and hand out sub-tasks
        to processes through a scheduler that takes care of when which 
process finishes
        which sub-task. For now, this is overkill.
     *)
   let rec collect_child_results have child_info_todo =
     match child_info_todo with
       | [] -> have
       | ((s_read,pid)::child_info_todo_next) ->
	  let contrib = Marshal.from_channel s_read in
	  let (returned_pid,status) = Unix.waitpid [] pid in
	    if status <> Unix.WEXITED 0
	    then
	      bailout "Child failure!\n%!" have
	    else
	      collect_child_results
		(fun_combine contrib have)
		child_info_todo_next
   in collect_child_results result_chunk0 all_childs_info
;;


(* ---
(* === Example === *)
let sum_of_inverse_squares =
   compute_uniform_workload_forked
     ~fun_combine:(fun a b -> a+.b)
     (let nr_processes=4 in
      let ranges=split_range nr_processes 1 100000 in
      let work subrange_start subrange_end =
        let () = Printf.printf "PID: %d SUB-RANGE %d - %d\n%!"
	 (Unix.getpid()) subrange_start subrange_end
        in
        let rec walk n sum =
	 if n = subrange_end then sum
	 else walk (1+n) (let fn = float_of_int n in sum +. 1.0/.(fn*.fn))
        in walk subrange_start 0.0
      in
        (Array.init nr_processes
	  (fun n ->
	     let (r_s,r_e) = ranges.(n) in
	       fun () -> work r_s r_e)))
;;

(* This gives: 1.64492406679822967
    The full sum would be pi^2/6 = 1.64493406684822641
*)
--- *)

-- 
best regards,
Thomas Fischbacher
tf@functionality.de