Version française
Home     About     Download     Resources     Contact us    
Browse thread
Smart ways to implement worker threads
[ Home ] [ Index: by date | by threads ]
[ Search: ]

[ Message by date: previous | next ] [ Message in thread: previous | next ] [ Thread: previous | next ]
Date: -- (:)
From: Goswin von Brederlow <goswin-v-b@w...>
Subject: Re: [Caml-list] Smart ways to implement worker threads
Hi,

so I've been thinking and testing some possibilities.

It is now clear that I need 2 queues. One queue for jobs the worker
threads are to do and one queue for results from the worker threads.

The queue for the worker thread seems easy. Combine a Queue.t, Mutex.t
and Condition.t:

type 'a t = {
  data : 'a Queue.t;
  mutex : Mutext.t;
  condition : Condition.t;
}

let take q =
  Mutex.lock q.mutex;
  while Queue.is_empty q.data do
    Condition.wait q.condition q.mutex
  done;
  let d = Queue.take q.data
  in
    Mutex.unlock q.mutex;
    d

let add q d =
  Mutex.lock q.mutex;
  Queue.add q.data d;
  Condition.signal q.condition;
  Mutex.unlock q.mutex

and so on. The standard mechanism for a thread safe queue used in most
languages.


Now I have been thinking about the other queue, for reporting the
results from the worker thread. The thing is that the main thread is
stuck in a select waiting for new requests to come in or for the send
buffer to clear so it can send out more replies (or more of a large
reply). So wouldn't it be nice if I could use select on a queue?

Well, in Linux there is the eventfd() system call.

So instead of using Condition.t to signal something was put into the
queue I use the eventfd.

type 'a t = {
  data : 'a Queue.t;
  mutex : Mutext.t;
  fd : EventFD.t;
}

let take q =
  let num = EventFD.read q.fd
  in
    Mutex.lock q.mutex;
    let res = Queue.take q.data
    in
      Mutex.unlock q.mutex;
      (* We only take one element, restore counter *)
      EventFD.write q.fd (Int64.pred num);
      res

let take_all q =
  let num = EventFD.read q.fd in
  let rec loop acc = function
      0 -> List.rev acc
    | n -> loop ((Queue.take q.data) :: acc) (n - 1)
  in
    Mutex.lock q.mutex;
    let res = loop num
    in
      Mutex.unlock q.mutex;
      res

let process q f =
  let num = EventFD.read q.fd in
  let rec loop  = function
      0 -> ()
    | n ->
        Mutex.lock q.mutex;
        let res = Queue.take q.data
        in
          Mutex.unlock q.mutex;
          f res;
          loop (n - 1)
  in
    loop num

let add q d =
    Mutex.lock q.mutex;
    Queue.add q.data d;
    EventFD.write q.fd 1;
    Mutex.unlock q.mutex;


Now I have a queue that I can include in select. The take function can
be used by the worker threads. The main thread can use either take_all
or process to save on syscalls or stick with take to ensure the queue
does not starve the other FDs.

Also I think I can use the EventFD to terminate a queue and wake up all
listeners. Closing the EventFD should wake up any thread stuck in read
and prevent any threads from becoming stuck in the future. Haven't
tested that yet though.

MfG
        Goswin