Version française
Home     About     Download     Resources     Contact us    
Browse thread
Smart ways to implement worker threads
[ Home ] [ Index: by date | by threads ]
[ Search: ]

[ Message by date: previous | next ] [ Message in thread: previous | next ] [ Thread: previous | next ]
Date: -- (:)
From: David McClain <dbm@r...>
Subject: Re: [Caml-list] Smart ways to implement worker threads
It is too bad I don't want to lear CML but use Ocaml. The CML examples
from the book don't translate into ocaml since the interface is just a
little bit different and those differences are what throws me off. I

That may appear to be the case from only a cursory review of CML. But  
I find that OCaml's notions of Events, Channels, etc, correspond  
quite closely to what John Reppy describes.

The whole point of Reppy's work was to show how "Events" could be  
made into functional objects, with operations for combination among  
them.

I don't have the "sort" routine translated, but here is some Lisp  
code that attempts to provide the multiple-readers / single-writer  
locks as might be used in a database application. It demonstrates the  
use of wrap, sync, etc...

-----------------------------------
;; rwgate.lisp -- Multiple Reader/Single Writer using Reppy's Channels
;;
;; DM/MCFA  01/00
;; ----------------------------------------------------

(defpackage "RWGATE"
   (:use "USEFUL-MACROS" "COMMON-LISP" "REPPY-CHANNELS" "SPM"  
"LISPWORKS")
   (:export "MAKE-LOCK"
            "WRAP-RDLOCKEVT"
            "WRAP-WRLOCKEVT"
            "WITH-READLOCK"
            "WITH-WRITELOCK"))

(in-package "RWGATE")

;; ---------------------------------------------------------------
;; This package implements a multiple-reader/single-writer lock
;; protocol using the amazing capabilities of the Reppy channels.
;;
;; Rule of engagement:
;;
;; 1. A lock is available for reading if no write locks are in place,
;;    or else the read lock requestor is equal to the write lock holder.
;;
;; 2. A lock is available for writing if no read locks and no write  
locks
;;    are in place,
;;    or else the write lock requestor is equal to the write lock  
holder,
;;    or else the write lock requestor is equal to every read lock  
holder.
;;
;; These rules ensure that multiple readers can run, while only one
;; writer can run. No requirements for nesting of read/write lock
;; requests. That is, a writer can request a read lock and vice versa,
;; and issue lock releases in any order.
;;
;; A lock holder can request any number of additional locks. The lock  
will
;; actually be released when an equal number of releases of like kind
;; have been obtained. For every write lock there is a write release,
;; and for every read lock there is a read release.
;;
;; The Reppy protocol is protected with UNWIND-PROTECT to ensure that
;; locks held are released on exit from the function block being  
executed
;; within the province of a lock. Lock releases are handled  
transparently
;; to the user.
;;
;; ---------------------------------------------------------------
;; Lock server protocol with event combinators

(defclass rw-lock (<serviceable-protocol-mixin>)
   ((rdlocks :accessor  rw-lock-rdlocks  :initform 0)
    (wrlocks :accessor  rw-lock-wrlocks  :initform 0)
    (wrowner :accessor  rw-lock-wrowner  :initform nil)
    (rdqueue :accessor  rw-lock-rdqueue  :initform nil)
    (rdowners :accessor rw-lock-rdowners :initform nil)
    (wrqueue :accessor  rw-lock-wrqueue  :initform nil)))

(defun make-lock ()
   (make-instance
    'rw-lock
    :handlers (list

               :read
               #'(lambda (req gate who)
                   (declare (ignore req))
		  (labels ((take-it ()
			     (incf (rw-lock-rdlocks gate))
			     (push who (rw-lock-rdowners gate))
			     (spawn #'send who t)))
		    (cond ((eq who (rw-lock-wrowner gate))
			   ;; we own a write lock already so go ahead...
			   (take-it))
			
			  ((plusp (rw-lock-wrlocks gate))
			   ;; outstanding write lock so enqueue in
			   ;; pending readers queue...
			   (push who (rw-lock-rdqueue gate)))
			
			  (t
			   ;; no outstanding writer so take it...
			   (take-it))
			  )))
	
               :release-read
               #'(lambda (req gate who)
                   (declare (ignore req))
		  (removef (rw-lock-rdowners gate) who :count 1)
                   (if (and (zerop (decf (rw-lock-rdlocks gate)))
                            (zerop (rw-lock-wrlocks gate)))
		      ;; no more readers and no more writers
		      ;; (a writer might have been me...)
		      ;; so go ahead and start writers
		      ;; there should be no pending readers
		      ;; since there were no writers
                       (let ((writer (pop (rw-lock-wrqueue gate))))
                         (when writer
                           (incf (rw-lock-wrlocks gate))
                           (setf (rw-lock-wrowner gate) writer)
                           (spawn #'send writer t))
                         )))
	
               :write
               #'(lambda (req gate who)
                   (declare (ignore req))
		  (labels ((take-it ()
			     (incf (rw-lock-wrlocks gate))
			     (setf (rw-lock-wrowner gate) who)
			     (spawn #'send who t)))
		    (cond ((and (zerop (rw-lock-rdlocks gate))
				(zerop (rw-lock-wrlocks gate)))
			   ;; gate available so take it
			   (take-it))
			
			  ((eq who (rw-lock-wrowner gate))
			   ;; gate already owned by requestor
			   ;; so incr lock count and tell him its okay...
			   (take-it))
			
			  ((every #'(lambda (rdr)
				      (eq rdr who))
				  (rw-lock-rdowners gate))
			   ;; only one reader and it is me...
                            ;; but I may be in the list numerous  
times...
			   ;; so go ahead and grab a write lock.
			   (take-it))
			
			  (t
			   ;; gate not available -- put caller on
			   ;; waiting writers queue
			   (conc1f (rw-lock-wrqueue gate) who))
			  )))
	
               :release-write
               #'(lambda (req gate who)
                   (declare (ignore req who))
                   (labels
                       ((run-writer ()
                          (let ((writer (pop (rw-lock-wrqueue gate))))
                            (if writer
                                (progn
				 (incf (rw-lock-wrlocks gate))
				 (setf (rw-lock-wrowner gate) writer)
                                  (spawn #'send writer t)
                                  t)
			     nil)))
                        (run-readers ()
                          (let ((readers (rw-lock-rdqueue gate)))
                            (if readers
                                (progn
                                  (setf (rw-lock-rdqueue gate) nil)
				 (appendf (rw-lock-rdowners gate) readers)
                                  (incf (rw-lock-rdlocks gate)  
(length readers))
                                  (dolist (reader readers)
                                    (spawn #'send reader t))
                                  t)
                              nil))))
                     (when (zerop (decf (rw-lock-wrlocks gate)))
		      ;; no more writers (was only me anyway...)
                       (setf (rw-lock-wrowner gate) nil)
		      (if (zerop (rw-lock-rdlocks gate))
			  ;; if no active readers either
			  ;; then it is a toss up whether to
			  ;; start writers or readers
			  (if (zerop (random 2)) ;; add some non-determinism
			      (unless (run-writer)
				(run-readers))
			    (unless (run-readers)
			      (run-writer)))
			;; but if I was a reader too,
			;; then it is only safe to start other
			;; readers.
			(run-readers)))
		    ))
               )))

(defun wrap-lockEvt (lock fn args req rel)
   (guard
    #'(lambda ()
        (let ((replyCh (make-channel)))
          (labels
              ((acquire-lock ()
                 (service-request req lock replyCh))
               (release-lock ()
                 (service-request rel lock replyCh)))
            (spawn #'acquire-lock)
            (wrap-abort
             (wrap (recvEvt replyCh)
                   #'(lambda (reply)
                       (declare (ignore reply))
                       (unwind-protect
                           (apply fn args)
                         (spawn #'release-lock))))
             #'(lambda ()
                 (spawn #'(lambda ()
                            (recv replyCh)
                            (release-lock)))))
            )))
    ))

(defmethod wrap-rdLockEvt ((lock rw-lock) fn &rest args)
   (wrap-lockEvt lock fn args :read :release-read))

(defmethod wrap-wrLockEvt ((lock rw-lock) fn &rest args)
   (wrap-lockEvt lock fn args :write :release-write))

(defmethod with-readlock ((lock rw-lock) fn &rest args)
   (sync (apply #'wrap-rdLockEvt lock fn args)))

(defmethod with-writelock ((lock rw-lock) fn &rest args)
   (sync (apply #'wrap-wrLockEvt lock fn args)))
-----------------------------------

Dr. David McClain
Chief Technical Officer
Refined Audiometrics Laboratory
4391 N. Camino Ferreo
Tucson, AZ  85750

email: dbm@refined-audiometrics.com
phone: 1.520.390.3995
web: http://refined-audiometrics.com