Browse thread
Faking concurrency using Unix forks and pipes
[
Home
]
[ Index:
by date
|
by threads
]
[ Message by date: previous | next ] [ Message in thread: previous | next ] [ Thread: previous | next ]
[ Message by date: previous | next ] [ Message in thread: previous | next ] [ Thread: previous | next ]
| Date: | -- (:) |
| From: | Thomas Fischbacher <tf@f...> |
| Subject: | Re: [Caml-list] Faking concurrency using Unix forks and pipes |
Jon Harrop wrote:
> Has anyone implemented a parallel map function in OCaml using Unix forks,
> pipes and maybe marshalling?
>
> This seems like an easy way to get concurrency in OCaml...
That is indeed an exercise I like to pose to my PhD students.
(Of course, the question whether this really makes that much
sense is a different issue...)
Here is my own suggestion how to do it:
let compute_uniform_workload_forked
?(bailout=
(fun str ->
let () = Printf.fprintf stderr "AIEE! %s\n%!" str in
exit 1))
~fun_combine
v_work =
let bailout s dummy = let _ = bailout s in dummy in
(* Note that we use the "bailout" function in two different places
where it expects
different return types. Hence, we have to bend over backwards to
get the type
system to accept what we actually want to do...
*)
let nr_processes = Array.length v_work in
let rec setup_childs nr_process child_info =
if nr_process = nr_processes
then List.rev child_info (* This ensures we get the data in proper
order. *)
else
let (fd_read,fd_write) = Unix.socketpair Unix.PF_UNIX
Unix.SOCK_STREAM 0 in
let pid = Unix.fork () in
if pid == (-1) (* fork failure *)
then
bailout "fork() failure!" child_info
else
if pid == 0 (* We are the child - compute our share and exit *)
then
let () = Unix.close fd_read in
let s_write = Unix.out_channel_of_descr fd_write in
let result = v_work.(nr_process) () in
let () = Marshal.to_channel s_write result [] in
exit 0
else
(* We are the parent *)
let () = Unix.close fd_write in
let s_read = Unix.in_channel_of_descr fd_read in
setup_childs (1+nr_process) ((s_read,pid)::child_info)
in
let all_childs_info = setup_childs 1 [] in
(* Note that it is important that we start counting at 1 here, as
the parent will do
chunk #0!
*)
let result_chunk0 = v_work.(0) () in
(* Note that we just do assume that all pieces of the computation
take the same time.
We are not trying to be overly sophisticated, fetching data from
the fastest
child first. Also, if we wanted a more powerful tool to compute
with forked processes,
we might want to divide the big task in a more fine-grained way
and hand out sub-tasks
to processes through a scheduler that takes care of when which
process finishes
which sub-task. For now, this is overkill.
*)
let rec collect_child_results have child_info_todo =
match child_info_todo with
| [] -> have
| ((s_read,pid)::child_info_todo_next) ->
let contrib = Marshal.from_channel s_read in
let (returned_pid,status) = Unix.waitpid [] pid in
if status <> Unix.WEXITED 0
then
bailout "Child failure!\n%!" have
else
collect_child_results
(fun_combine contrib have)
child_info_todo_next
in collect_child_results result_chunk0 all_childs_info
;;
(* ---
(* === Example === *)
let sum_of_inverse_squares =
compute_uniform_workload_forked
~fun_combine:(fun a b -> a+.b)
(let nr_processes=4 in
let ranges=split_range nr_processes 1 100000 in
let work subrange_start subrange_end =
let () = Printf.printf "PID: %d SUB-RANGE %d - %d\n%!"
(Unix.getpid()) subrange_start subrange_end
in
let rec walk n sum =
if n = subrange_end then sum
else walk (1+n) (let fn = float_of_int n in sum +. 1.0/.(fn*.fn))
in walk subrange_start 0.0
in
(Array.init nr_processes
(fun n ->
let (r_s,r_e) = ranges.(n) in
fun () -> work r_s r_e)))
;;
(* This gives: 1.64492406679822967
The full sum would be pi^2/6 = 1.64493406684822641
*)
--- *)
--
best regards,
Thomas Fischbacher
tf@functionality.de