Index: scheduler.scm =================================================================== --- scheduler.scm (Revision 12496) +++ scheduler.scm (Arbeitskopie) @@ -31,12 +31,30 @@ (disable-interrupts) (usual-integrations) (disable-warning var) - (hide ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#timeout-list + (hide ##sys#ready-queue-head ##sys#ready-queue-tail + ##sys#waiting-queue-head ##sys#waiting-queue-tail + ##sys#timeout-list ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer ##sys#remove-from-ready-queue ##sys#unblock-threads-for-i/o ##sys#force-primordial ##sys#fdset-input-set ##sys#fdset-output-set ##sys#fdset-clear ##sys#fdset-select-timeout ##sys#fdset-restore - ##sys#clear-i/o-state-for-thread!) + ##sys#clear-i/o-state-for-thread! + + make-int-priority-queue-entry int-priority-queue-entry? + int-priority-queue-color int-priority-queue-color-set! + int-priority-queue-parent int-priority-queue-parent-set! + int-priority-queue-left int-priority-queue-left-set! + int-priority-queue-right int-priority-queue-right-set! + int-priority-queue-index int-priority-queue-index-set! + int-priority-queue-value int-priority-queue-value-set! + int-priority-queue-before? int-priority-queue-match? int-priority-queue-index-before? + int-priority-queue-init! int-priority-queue->rbtree + int-priority-queue-lookup int-priority-queue-node-fold int-priority-queue-node-for-each + int-priority-queue-node-insert! int-priority-queue-remove! int-priority-queue-reposition! + int-priority-queue-empty? int-priority-queue-singleton? + int-priority-queue-delete-min! int-priority-queue-delete! + ##sys#fd-list-add-thread! + ) (foreign-declare #< @@ -44,6 +62,7 @@ #else # define C_signal_interrupted_p C_SCHEME_FALSE #endif +# include #ifdef _WIN32 # if _MSC_VER > 1300 @@ -90,13 +109,328 @@ (hygienic-macros (define-syntax dbg (syntax-rules () - ((_ . _) #f))) ) + ((_ . _) #f))) + #;(define-syntax dbg + (syntax-rules () + ((_ x ...) (begin (print x ...) (flush-output (current-output-port)))))) ) (else (define-macro (dbg . args) #f) - #;(define-macro (dbg . args) + (define-macro (dbg . args) `(print "DBG: " ,@args) ) ) ) +(cond-expand + (rbtree + (include "rbtree.scm") + + ;; We shall replace that with a lolevel structure once. + + (define-record-type + (make-int-priority-queue-entry color parent left right index value) + int-priority-queue-entry? + (color int-priority-queue-color int-priority-queue-color-set!) + (parent int-priority-queue-parent int-priority-queue-parent-set!) + (left int-priority-queue-left int-priority-queue-left-set!) + (right int-priority-queue-right int-priority-queue-right-set!) + (index int-priority-queue-index int-priority-queue-index-set!) + (value int-priority-queue-value int-priority-queue-value-set!)) + + (define (int-priority-queue-before? node1 node2) ;; ordering function + (fx< (int-priority-queue-index node1) (int-priority-queue-index node2))) + + (define (int-priority-queue-match? key node) + (eqv? key (int-priority-queue-index node))) + + (define (int-priority-queue-index-before? key node) + (fx< key (int-priority-queue-index node))) + + (define-rbtree + int-priority-queue-init! ;; defined by define-rbtree + int-priority-queue->rbtree ;; defined by define-rbtree + int-priority-queue-lookup ;; defined by define-rbtree + int-priority-queue-node-fold ;; defined by define-rbtree + int-priority-queue-node-for-each ;; defined by define-rbtree + int-priority-queue-node-insert! ;; defined by define-rbtree + int-priority-queue-remove! ;; defined by define-rbtree + int-priority-queue-reposition! ;; defined by define-rbtree + int-priority-queue-empty? ;; defined by define-rbtree + int-priority-queue-singleton? ;; defined by define-rbtree + int-priority-queue-match? + int-priority-queue-index-before? + int-priority-queue-before? + int-priority-queue-color + int-priority-queue-color-set! + int-priority-queue-parent + int-priority-queue-parent-set! + int-priority-queue-left + int-priority-queue-left-set! + int-priority-queue-right + int-priority-queue-right-set! + int-priority-queue-right + int-priority-queue-right-set! + #f + #f) + + (define-inline (make-queue-entry k v) + (make-int-priority-queue-entry #f #f #f #f k v)) + + (define-inline (make-int-priority-queue) + (int-priority-queue-init! (make-int-priority-queue-entry #f #f #f #f #f #f))) + + (define ##sys#timeout-list (make-int-priority-queue)) + + (define-inline (##sys#timeout-list-empty?) (int-priority-queue-empty? ##sys#timeout-list)) + + (define-inline (timeout-queue-next) (int-priority-queue-right ##sys#timeout-list)) + + (define-inline (timeout-queue-unqueue!) + (int-priority-queue-remove! (timeout-queue-next))) + + (define-inline (timeout-queue-insert-entry! entry) + (int-priority-queue-node-insert! ##sys#timeout-list entry)) + + (define-inline (timeout-queue-remove-entry! entry) + (int-priority-queue-remove! entry)) + + ) ;; rbtree + (else + + ;; Sorry for that, I don't know any better yet. + + (define-syntax define-macro + (syntax-rules () + ((_ (name . llist) body ...) + (define-syntax name + (lambda (x r c) + (apply (lambda llist body ...) (cdr x))))) + ((_ name . body) + (define-syntax name + (lambda (x r c) (cdr x)))))) + + (define-macro (define-llrbtree-code + features + update + update+ + update! + init-root-node! + t-lookup + t-min + t-fold + t-for-each + t-insert + t-delete + t-delete-min + t-empty? + t-k-eq? + t-k- + (make-int-priority-queue-entry color left right index value) + int-priority-queue-entry? + (color int-priority-queue-color int-priority-queue-color-set!) + (left int-priority-queue-left int-priority-queue-left-set!) + (right int-priority-queue-right int-priority-queue-right-set!) + (index int-priority-queue-index int-priority-queue-index-set!) + (value int-priority-queue-value int-priority-queue-value-set!)) + + (define-inline (make-queue-entry k v) + (make-int-priority-queue-entry #f #f #f k v)) + + (define-llrbtree-code + () + (args #f) + (args #f) + ((node . args) + `(let ((node ,node)) + . ,(let loop ((args args)) + (if (null? args) + '(node) + (cons + (case (car args) + ((color:) `(int-priority-queue-color-set! node ,(cadr args))) + ((left:) `(int-priority-queue-left-set! node ,(cadr args))) + ((right:) `(int-priority-queue-right-set! node ,(cadr args))) + (else (error (format "unbrauchbar ~a" args)))) + (loop (cddr args))))))) + int-priority-queue-init! ;; defined + int-priority-queue-lookup ;; defined + #f ;; no min defined + int-priority-queue-node-fold ;; defined + int-priority-queue-node-for-each ;; defined + int-priority-queue-node-insert! ;; defined + int-priority-queue-node-delete! ;; delete by node defined + int-priority-queue-delete-min! ;; defined + int-priority-queue-empty? ;; defined + ((k n) + `(fx= ,k (int-priority-queue-index ,n))) + ((k n) + `(fx< ,k (int-priority-queue-index ,n))) + ((n1 n2) + `(fx<= (int-priority-queue-index ,n1) (int-priority-queue-index ,n2))) + int-priority-queue-left + int-priority-queue-left-set! + int-priority-queue-right + int-priority-queue-right-set! + int-priority-queue-color + int-priority-queue-color-set! + #f) + + (define-llrbtree-code + (ordered) + (args #f) + (args #f) + ((node . args) + `(let ((node ,node)) + . ,(let loop ((args args)) + (if (null? args) + '(node) + (cons + (case (car args) + ((color:) `(int-priority-queue-color-set! node ,(cadr args))) + ((left:) `(int-priority-queue-parent-set! node ,(cadr args))) + ((right:) `(int-priority-queue-right-set! node ,(cadr args))) + (else (error (format "unbrauchbar ~a" args)))) + (loop (cddr args))))))) + #f ;; no init defined + #f ;; no lookup defined + #f ;; no min defined + #f ;; no fold defined + #f ;; no for-each defined + #f ;; no insert defined + int-priority-queue-delete! ;; delete by key defined + #f ;; no delete-min defined + #f ;; no empty? defined + ((k n) + `(fx= ,k (int-priority-queue-index ,n))) + ((k n) + `(fx< ,k (int-priority-queue-index ,n))) + ((n1 n2) + `(fx< (int-priority-queue-index ,n1) (int-priority-queue-index ,n2))) + int-priority-queue-left + int-priority-queue-left-set! + int-priority-queue-right + int-priority-queue-right-set! + int-priority-queue-color + int-priority-queue-color-set! + #f) + + (define ##sys#timeout-list (int-priority-queue-init! (make-queue-entry #f #f))) + + (define ##sys#timeout-list-head #f) + + (define-inline (##sys#timeout-list-empty?) (not ##sys#timeout-list-head)) + + (define-inline (timeout-queue-next) ##sys#timeout-list-head) + + (define-inline (timeout-queue-unqueue!) + (set! ##sys#timeout-list-head (int-priority-queue-delete-min! ##sys#timeout-list))) + + (define-inline (timeout-queue-remove-entry! entry) + (if (eq? ##sys#timeout-list-head entry) + (timeout-queue-unqueue!) + (int-priority-queue-node-delete! ##sys#timeout-list entry))) + + (define-inline (timeout-queue-insert-entry! entry) + (cond + ((not ##sys#timeout-list-head) + (set! ##sys#timeout-list-head entry)) + ((fx< (int-priority-queue-index entry) + (int-priority-queue-index ##sys#timeout-list-head)) + (int-priority-queue-node-insert! ##sys#timeout-list ##sys#timeout-list-head) + (set! ##sys#timeout-list-head entry)) + (else (int-priority-queue-node-insert! ##sys#timeout-list entry)))) + + )) + +(define (make-int-priority-queue) + (int-priority-queue-init! (make-queue-entry #f #f))) + +(define ##sys#fd-list (make-int-priority-queue)) + +(define-inline (##sys#fd-list-empty?) (int-priority-queue-empty? ##sys#fd-list)) + +(cond-expand + (rbtree + (define-inline (fd-list-remove-entry! entry) + (int-priority-queue-remove! entry))) + (else + (define-inline (fd-list-remove-entry! entry) + (int-priority-queue-node-delete! ##sys#fd-list entry)))) + +(define ##sys#ready-queue-head '()) +(define ##sys#ready-queue-tail '()) + +(define (##sys#ready-queue) ##sys#ready-queue-head) + +(define-inline (##sys#ready-queue-empty?) (eq? '() ##sys#ready-queue-head)) + +(define (##sys#add-to-ready-queue thread) + (##sys#setslot thread 3 'ready) + (let ((new-pair (cons thread '()))) + (cond ((##sys#ready-queue-empty?) + (set! ##sys#ready-queue-head new-pair)) + (else (set-cdr! ##sys#ready-queue-tail new-pair)) ) + (set! ##sys#ready-queue-tail new-pair) ) ) + +(define-inline (##sys#remove-from-ready-queue) + (let ((first-pair ##sys#ready-queue-head)) + (and (not (null? first-pair)) + (let ((first-cdr (cdr first-pair))) + (set! ##sys#ready-queue-head first-cdr) + (when (eq? '() first-cdr) (set! ##sys#ready-queue-tail '())) + (car first-pair) ) ) ) ) + +(define ##sys#waiting-queue-head '()) +(define ##sys#waiting-queue-tail '()) + +(define (##sys#waiting-queue) ##sys#waiting-queue-head) + +(define-inline (##sys#waiting-queue-empty?) (eq? '() ##sys#waiting-queue-head)) + +(define-inline (##sys#add-to-waiting-queue thread) + (##sys#setslot thread 3 'ready) + (let ((new-pair (cons thread '()))) + (cond ((##sys#waiting-queue-empty?) + (set! ##sys#waiting-queue-head new-pair)) + (else (set-cdr! ##sys#waiting-queue-tail new-pair)) ) + (set! ##sys#waiting-queue-tail new-pair) ) ) + +(define-inline (##sys#release-waiting-queue) + (set! ##sys#ready-queue-head ##sys#waiting-queue-head) + (set! ##sys#ready-queue-tail ##sys#waiting-queue-tail) + (set! ##sys#waiting-queue-head '()) + (set! ##sys#waiting-queue-tail '())) + (define (##sys#schedule) (define (switch thread) (dbg "switching to " thread) @@ -106,88 +440,38 @@ (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot thread 9)) ((##sys#slot thread 1)) ) (let* ([ct ##sys#current-thread] - [eintr #f] [cts (##sys#slot ct 3)] ) (dbg "scheduling, current: " ct ", ready: " ##sys#ready-queue-head) (##sys#update-thread-state-buffer ct) ;; Put current thread on ready-queue: (when (or (eq? cts 'running) (eq? cts 'ready)) ; should ct really be 'ready? - normally not. (##sys#setislot ct 13 #f) ; clear timeout-unblock flag - (##sys#add-to-ready-queue ct) ) - (let loop1 () - ;; Unblock threads waiting for timeout: - (unless (null? ##sys#timeout-list) - (let ([now (##sys#fudge 16)]) - (dbg "timeout (" now ") list: " ##sys#timeout-list) - (let loop ([lst ##sys#timeout-list]) - (if (null? lst) - (set! ##sys#timeout-list '()) - (let* ([tmo1 (caar lst)] - [tto (cdar lst)] - [tmo2 (##sys#slot tto 4)] ) - (dbg " " tto " -> " tmo2) - (if (eq? tmo1 tmo2) - (if (>= now tmo1) - (begin - (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout - (##sys#clear-i/o-state-for-thread! tto) - ;;(pp `(CLEARED: ,tto ,@##sys#fd-list) ##sys#standard-error) ;*** - (##sys#thread-basic-unblock! tto) - (loop (cdr lst)) ) - (begin - (set! ##sys#timeout-list lst) - ;; If there are no threads blocking on a select call (fd-list) - ;; but there are threads in the timeout list then sleep for - ;; the number of milliseconds of next thread to wake up. - (when (and (null? ##sys#ready-queue-head) - (null? ##sys#fd-list) - (pair? ##sys#timeout-list)) - (let ([tmo1 (caar ##sys#timeout-list)]) - (set! eintr - (and (not (##core#inline "C_msleep" (fxmax 0 (- tmo1 now)))) - (foreign-value "C_signal_interrupted_p" bool) ) ) ) ) ) ) - (loop (cdr lst)) ) ) ) ) ) ) - ;; Unblock threads blocked by I/O: - (if eintr - (##sys#force-primordial) - (begin - (unless (null? ##sys#fd-list) - (##sys#unblock-threads-for-i/o) ) ) ) + (##sys#add-to-waiting-queue ct) ) + (let loop1 ((check #f)) + (when (or check (##sys#ready-queue-empty?)) + ;; Unblock threads waiting for timeout: + (unless (##sys#timeout-list-empty?) + (if (##sys#unblock-threads-for-timeout!) + (##sys#force-primordial))) + ;; Unblock threads blocked by I/O: + (unless (##sys#fd-list-empty?) + (##sys#unblock-threads-for-i/o) ) ) ;; Fetch and activate next ready thread: (let loop2 () (let ([nt (##sys#remove-from-ready-queue)]) (cond [(not nt) - (if (and (null? ##sys#timeout-list) (null? ##sys#fd-list)) + (if (and check (##sys#timeout-list-empty?) (##sys#fd-list-empty?)) (##sys#signal-hook #:runtime-error "deadlock") - (loop1) ) ] + (begin + (##sys#release-waiting-queue) + (loop1 #t)) ) ] [(eq? (##sys#slot nt 3) 'ready) (switch nt)] - [else (loop2)] ) ) ) ) ) ) + [else (loop2)] ) ) ) ) )) (define (##sys#force-primordial) (dbg "primordial thread forced due to interrupt") (##sys#thread-unblock! ##sys#primordial-thread) ) -(define ##sys#ready-queue-head '()) -(define ##sys#ready-queue-tail '()) - -(define (##sys#ready-queue) ##sys#ready-queue-head) - -(define (##sys#add-to-ready-queue thread) - (##sys#setslot thread 3 'ready) - (let ((new-pair (cons thread '()))) - (cond ((eq? '() ##sys#ready-queue-head) - (set! ##sys#ready-queue-head new-pair)) - (else (set-cdr! ##sys#ready-queue-tail new-pair)) ) - (set! ##sys#ready-queue-tail new-pair) ) ) - -(define (##sys#remove-from-ready-queue) - (let ((first-pair ##sys#ready-queue-head)) - (and (not (null? first-pair)) - (let ((first-cdr (cdr first-pair))) - (set! ##sys#ready-queue-head first-cdr) - (when (eq? '() first-cdr) (set! ##sys#ready-queue-tail '())) - (car first-pair) ) ) ) ) - (define (##sys#update-thread-state-buffer thread) (let ([buf (##sys#slot thread 5)]) (##sys#setslot buf 0 ##sys#dynamic-winds) @@ -215,33 +499,54 @@ (##sys#schedule) ) ) ; expected not to return! (oldhook reason state) ) ) ) -(define ##sys#timeout-list '()) - (define (##sys#remove-from-timeout-list t) - (let loop ((l ##sys#timeout-list) (prev #f)) - (if (null? l) - l - (let ((h (##sys#slot l 0)) - (r (##sys#slot l 1))) - (if (eq? (##sys#slot h 1) t) - (if prev - (set-cdr! prev r) - (set! ##sys#timeout-list r)) - (loop r l)))))) + (let ((entry (##sys#slot t 4))) + (when entry + (timeout-queue-remove-entry! entry) + (##sys#setislot t 4 #f)))) (define (##sys#thread-block-for-timeout! t tm) (dbg t " blocks for " tm) - ;; This should really use a balanced tree: - (let loop ([tl ##sys#timeout-list] [prev #f]) - (if (or (null? tl) (< tm (caar tl))) - (if prev - (set-cdr! prev (cons (cons tm t) tl)) - (set! ##sys#timeout-list (cons (cons tm t) tl)) ) - (loop (cdr tl) tl) ) ) + ;; It wouldn't hurt if the thread structure where prepared to be + ;; moved between thread queues, however that too much of a change at + ;; once. + (let ((ton (make-queue-entry tm t))) + (##sys#setslot t 4 ton) + (timeout-queue-insert-entry! ton)) (##sys#setslot t 3 'blocked) - (##sys#setislot t 13 #f) - (##sys#setislot t 4 tm) ) + (##sys#setislot t 13 #f) ) +(define (##sys#unblock-threads-for-timeout!) + (let ([now (##sys#fudge 16)]) + (dbg "timeout (" now ") list: " (##sys#timeout-list-empty?)) + (let loop () + (unless (##sys#timeout-list-empty?) + (let* ((entry (timeout-queue-next)) + (tmo (int-priority-queue-index entry))) + (dbg " " now " -> " tmo) + (if (>= now tmo) + (let ((tto (int-priority-queue-value entry))) + (if (not (eq? (##sys#slot tto 4) entry)) + (print "(not (eq? (##sys#slot " (##sys#slot tto 4) " 4) " entry ")) ")) + (timeout-queue-unqueue!) + (##sys#setislot tto 4 #f) + (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout + (##sys#clear-i/o-state-for-thread! tto) + ;;(pp `(CLEARED: ,tto ,@##sys#fd-list) ##sys#standard-error) ;*** + (##sys#thread-basic-unblock! tto) + (loop) ) )))) + ;; If there are no threads blocking on a select call (fd-list) but + ;; there are threads in the timeout list then sleep for the number + ;; of milliseconds of next thread to wake up and return #t if + ;; interupted. + (and (##sys#ready-queue-empty?) + (##sys#waiting-queue-empty?) + (##sys#fd-list-empty?) + (not (##sys#timeout-list-empty?)) + (let ([tmo (int-priority-queue-index (timeout-queue-next))]) + (and (not (##core#inline "C_msleep" (fxmax 0 (- tmo now)))) + (foreign-value "C_signal_interrupted_p" bool) ) ) ))) + (define (##sys#thread-block-for-termination! t t2) (dbg t " blocks for " t2) (let ([state (##sys#slot t2 3)]) @@ -255,7 +560,6 @@ (dbg "killing: " t " -> " s ", recipients: " (##sys#slot t 12)) (##sys#abandon-mutexes t) (##sys#setslot t 3 s) - (##sys#setislot t 4 #f) (##sys#setislot t 11 #f) (##sys#setislot t 8 '()) (##sys#remove-from-timeout-list t) @@ -272,7 +576,10 @@ (define (##sys#thread-basic-unblock! t) (dbg "unblocking: " t) (##sys#setislot t 11 #f) - (##sys#setislot t 4 #f) + (if (##sys#slot t 4) + (begin + (dbg "##sys#thread-basic-unblock! timout slot is still set!") + (##sys#setislot t 4 #f))) (##sys#add-to-ready-queue t) ) (define ##sys#default-exception-handler @@ -307,8 +614,15 @@ ;;; `select()'-based blocking: -(define ##sys#fd-list '()) +(define (##sys#empty-fd-list!) (set! ##sys#fd-list (make-int-priority-queue))) +(define (##sys#fd-list-add-thread! fd t) + (let ((entry (int-priority-queue-lookup ##sys#fd-list fd))) + (if entry + (if (not (memq t (int-priority-queue-value entry))) + (int-priority-queue-value-set! entry (cons t (int-priority-queue-value entry)))) + (int-priority-queue-node-insert! ##sys#fd-list (make-queue-entry fd (list t)))))) + (define ##sys#fdset-select-timeout (foreign-lambda* int ([bool to] [unsigned-long tm]) "struct timeval timeout;" @@ -342,13 +656,7 @@ (define (##sys#thread-block-for-i/o! t fd i/o) (dbg t " blocks for I/O " fd) - (let loop ([lst ##sys#fd-list]) - (if (null? lst) - (set! ##sys#fd-list (cons (list fd t) ##sys#fd-list)) - (let ([a (car lst)]) - (if (fx= fd (car a)) - (##sys#setslot a 1 (cons t (cdr a))) - (loop (cdr lst)) ) ) ) ) + (##sys#fd-list-add-thread! fd t) (case i/o ((#t #:input) (##sys#fdset-input-set fd)) ((#f #:output) (##sys#fdset-output-set fd)) @@ -359,102 +667,167 @@ (##sys#setislot t 13 #f) (##sys#setslot t 11 (cons fd i/o)) ) +(define-foreign-variable error-bad-file int "(errno == EBADF)") + (define (##sys#unblock-threads-for-i/o) - (dbg "fd-list: " ##sys#fd-list) - (let* ([to? (pair? ##sys#timeout-list)] - [rq? (pair? ##sys#ready-queue-head)] + (dbg "fd-list: " (int-priority-queue-node-fold + (lambda (n i) (cons (cons (int-priority-queue-index n) (int-priority-queue-value n)) i)) + '() + ##sys#fd-list)) + (let* ([to? (not (##sys#timeout-list-empty?))] + [rq? (not (and (##sys#ready-queue-empty?) (##sys#waiting-queue-empty?)))] [n (##sys#fdset-select-timeout ; we use FD_SETSIZE, but really should use max fd (or rq? to?) (if (and to? (not rq?)) ; no thread was unblocked by timeout, so wait - (let* ([tmo1 (caar ##sys#timeout-list)] + (let* ([entry (int-priority-queue-right ##sys#timeout-list)] + [tmo (int-priority-queue-index entry)] [now (##sys#fudge 16)]) - (fxmax 0 (- tmo1 now)) ) + (fxmax 0 (- tmo now)) ) 0) ) ] ) ; otherwise immediate timeout. (dbg n " fds ready") - (cond [(eq? -1 n) - (##sys#force-primordial)] + (cond [(eq? n 0) (##sys#fdset-restore)] + [(eq? -1 n) + (cond + (error-bad-file + (##sys#call-with-current-continuation + (lambda (exit) + (int-priority-queue-node-for-each + (lambda (node) + (define fd (int-priority-queue-index node)) + (define ts (int-priority-queue-value node)) + (dbg "check bad " fd) + (let ((bad ((foreign-lambda* + bool ((integer fd)) + "struct stat buf;" + "int i = ( (fstat(fd, &buf) == -1 && errno == EBADF) ? 1 : 0);" + "return(i);") + fd))) + (when bad + (dbg "bad is " fd) + (##sys#fdset-clear fd) + (fd-list-remove-entry! node) + (for-each + (lambda (thread) + (thread-signal! + thread + (##sys#make-structure + 'condition + '(exn i/o) ;; better? '(exn i/o net) + (list '(exn . message) "bad file descriptor" + '(exn . arguments) (list fd) + '(exn . location) thread) ))) + ts) + (exit #t)))) + ##sys#fd-list))) + (##sys#fdset-restore) + (##sys#unblock-threads-for-i/o)) + (else (##sys#force-primordial))) ] [(fx> n 0) - (set! ##sys#fd-list - (let loop ([n n] [lst ##sys#fd-list]) - (if (or (zero? n) (null? lst)) - lst - (let* ([a (car lst)] - [fd (car a)] - [inf (##core#inline "C_fd_test_input" fd)] - [outf (##core#inline "C_fd_test_output" fd)] ) - (dbg "fd " fd " ready: input=" inf ", output=" outf) - (if (or inf outf) - (let loop2 ([threads (cdr a)]) - (if (null? threads) - (begin - (##sys#fdset-clear fd) - (loop (sub1 n) (cdr lst)) ) - (let* ([t (car threads)] - [p (##sys#slot t 11)] ) - (when (and (pair? p) - (eq? fd (car p)) - (not (##sys#slot t 13) ) ) ; not unblocked by timeout - (##sys#thread-basic-unblock! t) ) - (loop2 (cdr threads)) ) ) ) - (cons a (loop n (cdr lst))) ) ) ) ) ) ] ) - (##sys#fdset-restore) ) ) + (map + (lambda (e) (fd-list-remove-entry! e)) + (##sys#call-with-current-continuation + (lambda (exit) + (int-priority-queue-node-fold + (lambda (node init) + (define fd (int-priority-queue-index node)) + (define threads (int-priority-queue-value node)) + (if (zero? n) (exit init) + (let* ([inf (##core#inline "C_fd_test_input" fd)] + [outf (##core#inline "C_fd_test_output" fd)] ) + (dbg "fd " fd " ready: input=" inf ", output=" outf) + (if (or inf outf) + (begin + (for-each + (lambda (t) + (let* ((p (##sys#slot t 11)) ) + (when (and (pair? p) + (eq? fd (car p)) + (not (##sys#slot t 13) ) ) ; not unblocked by timeout + (##sys#thread-basic-unblock! t) ) )) + threads) + (##sys#fdset-clear fd) + (set! n (sub1 n)) + (cons node init)) + init)))) + '() + ##sys#fd-list)))) + (##sys#fdset-restore) ] ) ) ) - ;;; Clear I/O state for unblocked thread (define (##sys#clear-i/o-state-for-thread! t) (when (pair? (##sys#slot t 11)) - (let ((fd (##sys#slot (##sys#slot t 11) 0))) - (set! ##sys#fd-list - (let loop ([lst ##sys#fd-list]) - (if (null? lst) - '() - (let* ([a (##sys#slot lst 0)] - [fd2 (##sys#slot a 0)] ) - (if (eq? fd fd2) - (let ((ts (##sys#delq t (##sys#slot a 1)))) ; remove from fd-list entry - (cond ((null? ts) - ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error) - (##sys#fdset-clear fd) ; no more threads waiting for this fd - (##sys#fdset-restore) - (##sys#slot lst 1) ) - (else - (##sys#setslot a 1 ts) ; fd-list entry is list with t removed - lst) ) ) - (cons a (loop (##sys#slot lst 1))))))))))) + (let* ((fd (##sys#slot (##sys#slot t 11) 0)) + (entry (int-priority-queue-lookup ##sys#fd-list fd))) + (when entry + (let ((ts (##sys#delq t (int-priority-queue-value entry)))) ; remove from fd-list entry + (cond ((null? ts) + ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error) + (##sys#fdset-clear fd) ; no more threads waiting for this fd + (##sys#fdset-restore) + (fd-list-remove-entry! entry)) + (else + (int-priority-queue-value-set! entry ts)) ) ))))) ; fd-list entry is list with t removed ;;; Get list of all threads that are ready or waiting for timeout or waiting for I/O: ; ; (contributed by Joerg Wittenberger) -(define (##sys#all-threads #!optional +(cond-expand + (rbtree + (define (##sys#all-threads #!optional (cns (lambda (queue arg val init) (cons val init))) (init '())) (let loop ((l ##sys#ready-queue-head) (i init)) (if (pair? l) (loop (cdr l) (cns 'ready #f (car l) i)) - (let loop ((l ##sys#fd-list) (i i)) + (let loop ((l ##sys#waiting-queue-head) (i i)) (if (pair? l) - (loop (cdr l) - (let ((fd (caar l))) - (let loop ((l (cdar l))) - (if (null? l) i - (cns 'i/o fd (car l) (loop (cdr l))))))) - (let loop ((l ##sys#timeout-list) (i i)) - (if (pair? l) - (loop (cdr l) (cns 'timeout (caar l) (cdar l) i)) - i))))))) + (loop (cdr l) (cns 'waiting #f (car l) i)) + (int-priority-queue-node-fold + (lambda (n i) + (fold (lambda (t i) (cns 'i/o (int-priority-queue-index n) t i)) + i (int-priority-queue-value n))) + (int-priority-queue-node-fold + (lambda (n i) + (cns 'timeout (int-priority-queue-index n) (int-priority-queue-value n) i)) + l ##sys#timeout-list) + ##sys#fd-list))))))) + (else + (define (##sys#all-threads #!optional + (cns (lambda (queue arg val init) + (cons val init))) + (init '())) + (let loop ((l ##sys#ready-queue-head) (i init)) + (if (pair? l) + (loop (cdr l) (cns 'ready #f (car l) i)) + (let loop ((l ##sys#waiting-queue-head) (i i)) + (if (pair? l) + (loop (cdr l) (cns 'waiting #f (car l) i)) + (int-priority-queue-node-fold + (lambda (n i) + (fold (lambda (t i) (cns 'i/o (int-priority-queue-index n) t i)) + i (int-priority-queue-value n))) + (let ((r (int-priority-queue-node-fold + (lambda (n i) + (cns 'timeout (int-priority-queue-index n) (int-priority-queue-value n) i)) + l ##sys#timeout-list))) + (let ((n ##sys#timeout-list-head)) + (if n + (cns + 'timeout (int-priority-queue-index n) (int-priority-queue-value n) r) + r))) + ##sys#fd-list)))))))) - ;;; Remove all waiting threads from the relevant queues with the exception of the current thread: - +#| (define (##sys#fetch-and-clear-threads) (let ([all (vector ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#fd-list ##sys#timeout-list)]) (set! ##sys#ready-queue-head '()) (set! ##sys#ready-queue-tail '()) - (set! ##sys#fd-list '()) + (##sys#empty-fd-list!) (set! ##sys#timeout-list '()) all) ) @@ -466,26 +839,17 @@ (set! ##sys#ready-queue-tail (##sys#slot vec 1)) (set! ##sys#fd-list (##sys#slot vec 2)) (set! ##sys#timeout-list (##sys#slot vec 3)) ) +|# - ;;; Unblock thread cleanly: (define (##sys#thread-unblock! t) (when (eq? 'blocked (##sys#slot t 3)) (##sys#remove-from-timeout-list t) - (set! ##sys#fd-list - (let loop ([fdl ##sys#fd-list]) - (if (null? fdl) - '() - (let ([a (##sys#slot fdl 0)]) - (cons - (cons (##sys#slot a 0) - (##sys#delq t (##sys#slot a 1)) ) - (loop (##sys#slot fdl 1)) ) ) ) ) ) + (##sys#clear-i/o-state-for-thread! t) (##sys#setislot t 12 '()) (##sys#thread-basic-unblock! t) ) ) - ;;; Multithreaded breakpoints (define (##sys#break-entry name args) Index: rules.make =================================================================== --- rules.make (Revision 12496) +++ rules.make (Arbeitskopie) @@ -1159,7 +1159,7 @@ regex.c: $(SRCDIR)regex.scm $(CHICKEN) $< $(CHICKEN_LIBRARY_OPTIONS) $(CHICKEN_PCRE_LIBRARY_OPTIONS) -output-file $@ scheduler.c: $(SRCDIR)scheduler.scm - $(CHICKEN) $< $(CHICKEN_LIBRARY_OPTIONS) -output-file $@ + $(CHICKEN) $< $(CHICKEN_LIBRARY_OPTIONS) -extend llrbtree.scm -output-file $@ -feature llrbtree profiler.c: $(SRCDIR)profiler.scm $(CHICKEN) $< $(CHICKEN_LIBRARY_OPTIONS) -output-file $@ stub.c: $(SRCDIR)stub.scm