English version
Accueil     À propos     Téléchargement     Ressources     Contactez-nous    

Ce site est rarement mis à jour. Pour les informations les plus récentes, rendez-vous sur le nouveau site OCaml à l'adresse ocaml.org.

Browse thread
Concurrency for services
[ Home ] [ Index: by date | by threads ]
[ Search: ]

[ Message by date: previous | next ] [ Message in thread: previous | next ] [ Thread: previous | next ]
Date: 2005-07-13 (11:27)
From: Gerd Stolpmann <info@g...>
Subject: Re: [Caml-list] Concurrency for services
Am Mittwoch, den 13.07.2005, 00:40 +0200 schrieb Christophe TROESTLER:
> Hi,
> I have been confronted a couple of times to the following situation
> and I am looking for advice for a good (the best, if possible :)
> reusable solution.
> Imagine you want to build a server library (e.g. a Lpd daemon, a file
> server, a web service,...).  In broad terms, you can describe it as a
> protocol for which you will handle the various "events" by callbacks.
> You do not want to wire any concurrency model in your library but
> instead provide the user with the appropriate functions so it can use
> his favorite concurrency schema.  My question is: what is the best way
> to do that?

For Nethttpd, the soon-to-be-published web server component, I had the
same problem. Event-driven code is the most general, so the core of the
protocol engine was written in this style (manually, without twister).
On top of that there are several encapsulations, and the user can also
choose multi-threading or multi-processing (at least in principal).

> Perhaps naively, I am thinking along these lines.  There are three
> points where a possibility of concurrency naturally offers itself:
> 1. several threads/processes can listen (run "accept") on the same
>    socket;
> 2. each time a connection is accepted, one may decide to process it in
>    a new thread/process (or send it to a pool of threads/processes,...);
> 3. each time one calls a callback (that only makes sense if several
>    callbacks can be called independently for a single connection).

The common property is that you have to establish a socket descriptor in
some kind of container. The multi-processing model is the most
restrictive one because of the process boundaries and because the
descriptors can only be shared by inheriting them from a common parent
process. You cannot hide the properties of the service containers from
the user.

> It seems to me that 3. belongs to the design of the library and cannot
> easily be abstracted.  1. and 2. however are basically the same from
> one library to the next.  For 2., one has to distinguish whether one
> uses threads of processes to know when the accept socket has to be
> closed.  So one ends up with 3 functions:
> val socket : unit -> Unix.file_descr
> val accept_fork :
>   fork:((Unix.file_descr -> unit) -> int * Unix.file_descr) ->
>   'a connection_handler -> Unix.inet_addr -> unit
> val accept_threads :
>   ?thread:((unit -> unit) -> unit) ->
>   'a connection_handler -> Unix.inet_addr -> unit
> The [connection_handler] is an abstract type that is created by a
> [handle_connection] which states which callback(s) to use.  The file
> descriptor in [fork] is a communication channel for the son to send
> messages to the father (the father may need to react to some commands
> in the protocol e.g. "shutdown").  A simple usage is
> let () = accept_threads (handle_connection f) (socket())
> Of course, there usually will be many optional arguments to tailor the
> behavior of these functions.  I wonder:
> i. are the above functions able to take care of about any concurrency
>    model one can imagine (new thread/process, pool of threads,
>    dispatching to a set of machines,...);
> ii. are they designed well enough so that they can form a signature on
>     which one can build usual concurrency models through functors?

These functions outline what to do, but aren't complete enough. I see
the following limitations:

- Missing parameters for timeouts
- Missing parameters for controlling the speed of starting and stopping
- Missing parameters for error behaviour: What to do if one container
  does not respond?
- Many services listen on several sockets, and not only Internet 
- Missing hooks that could be executed on startup/shutdown of a 
- A means to broadcast messages is very helpful

For a general solution I would prefer an abstract type [controller].
That could be a class type like:

class type controller =
  method type_of_container : [`Process|`Thread|...]
  method services : Unix.sockaddr list
  method on_container_startup : unit -> unit
  method on_container_shutdown : unit -> unit
  method serve : unit -> unit
  method shutdown : unit -> unit   (* shutdown all services *)

class type container =
  method controller : controller
  method broadcast : message -> unit
  method accept : unit -> [ `Connection of sockaddr * file_descr
                          | `Message of message
                          | `Shutdown ]

And implementations are classes:

class process_controller : 
  config:config -> handler:(container->unit) -> controller
class thread_controller : 
  config:config -> handler:(container->unit) -> controller

The idea is that [config] describes the properties of every
implementation. There can be common configuration options (like the list
of socket addresses), and options that are only applicable for a certain
type of controller (e.g. timeouts). The [handler] is called when the
container is established (i.e. a process or thread). The handler is
typically a loop that calls [accept] until it receives a shutdown
message. With [broadcast] containers can send messages to all other

An example would be:

let rec my_handler cont =
  match cont # accept() with
    | `Connection (servaddr,fd) ->
         (* Got a new connection for service [servaddr] as descriptor [fd] *)
         my_handler cont
    | `Message msg ->
         (* Got a new message *)
         my_handler cont
    | `Shutdown ->

let my_service =
  new process_controller
               method services = [ Unix.INET_ADDR ... ]
               method on_startup = (fun () -> ...)
               method on_shutdown = (fun () -> ...)
               method min_processes = 10
               method max_processes = 20
               method timeout = 30
     handler:my_handler in
my_service # serve()

It is easy to see that one only needs to instantiate thread_controller
instead of process_controller to get a multi-threading server.

One problem of this approach that it is not easy to unify two container
handlers that implement services indepently in order to get a handler
that serves both, i.e. the function

val union : controller -> controller -> controller

is non-trivial.

Btw, I would like to include such a feature into OCamlnet. A sponsor
would be very welcome.

Gerd Stolpmann * Viktoriastr. 45 * 64293 Darmstadt * Germany 
gerd@gerd-stolpmann.de          http://www.gerd-stolpmann.de
Telefon: 06151/153855                  Telefax: 06151/997714