;; (C) 2008, 2009, 2010 (cond-expand ((or rscheme) (define (delay*thunk thunk name) (let ((mutex (make-mutex name))) (lazy (let ((tmp mutex) (result #f)) (dynamic-wind (lambda () (mutex-lock! tmp)) (lambda () (if mutex (begin (set! result (call-with-values thunk eager)) (set! mutex #f))) result) (lambda () (mutex-unlock! tmp))))))) (define-macro (delay* expr name) `(delay*thunk (lambda () ,expr) name))) (else (define (delay*thunk thunk name) (let ((mutex (make-mutex (cons name '*single-use*)))) (lazy (make-eager-promise (let ((m mutex) (result #f)) (dynamic-wind (lambda () (mutex-lock! m)) (lambda () (if mutex (begin (set! result (call-with-values thunk list)) (set! mutex #f))) result) (lambda () (mutex-unlock! m))))))) ;; (let ((t (make-thread thunk name))) ;; (delay (begin ;; (if (eq? (thread-state t) 'created) ;; (handle-exceptions ex #f (thread-start! t))) ;; (thread-join! t)))) ) (define-syntax delay* (syntax-rules () ((_ expr name) (delay*thunk (lambda () expr) name)))))) (define (!%call-with-values timeout abort-handler thunk receiver) (let* ((thread (thread-start! (make-thread thunk "!call-with-values"))) (alive (and abort-handler (let ((parent (current-thread))) (thread-start! (make-thread (lambda () (guard (ex (else #f)) (thread-join! parent)) (abort-handler thread)) "!call-with-values-alive")))))) (call-with-values (cond (timeout (lambda () (let ((pending #f)) (dynamic-wind (lambda () (set! pending (register-timeout-message! timeout (current-thread)))) (if abort-handler (lambda () (guard (ex ((timeout-object? ex) (abort-handler thread) (raise ex))) (thread-join! thread))) (lambda () (thread-join! thread))) (lambda () (cancel-timeout-message! pending) (if alive (thread-terminate! alive))))))) (abort-handler (lambda () (dynamic-wind (lambda () #t) (lambda () (thread-join! thread)) (lambda () (if alive (thread-terminate! alive)))))) (else (lambda () (thread-join! thread)))) receiver))) (define (!call-with-values thunk receiver . rest) (let ((timeout (if (pair? rest) (car rest) #f)) (abort-handler (if (pair? rest) (let ((rest (cdr rest))) (if (pair? rest) (car rest) thread-signal-timeout!)) thread-signal-timeout!))) (if (or timeout abort-handler) (!%call-with-values timeout abort-handler thunk receiver) (call-with-values thunk receiver)))) (cond-expand ((or rscheme) (define (!order/thunk thunk name) (let ((thread (thread-start! (make-thread thunk (if (string? name) name (format "~a" name)))))) (lazy (call-with-values (lambda () (guard (ex ((uncaught-exception? ex) (raise (uncaught-exception-reason ex)))) (thread-join! thread))) eager))))) (else (define (!order/thunk thunk name) (let ((thread (thread-start! (make-thread thunk name)))) (delay (guard (ex ((uncaught-exception? ex) (raise (uncaught-exception-reason ex)))) (thread-join! thread))))))) (cond-expand ((or rscheme) #| (define-syntax !order (syntax-form (expr name) (!order/thunk (lambda () expr) name))) |# (define-macro (!order expr name) `(!order/thunk (lambda () ,expr) ,name)) ) (else (define-syntax !order (syntax-rules () ((_ expr name) (!order/thunk (lambda () expr) name)))))) (cond-expand ((or rscheme) (define-macro (!start expr name) `(thread-start! (make-thread (lambda () (yield-mutex-set! ,expr)) ,name))) ) (else (define-syntax !start (syntax-rules () ((_ expr name) (thread-start! (make-thread (lambda () (yield-mutex-set! expr)) name))))))) (define (!apply proc args) (guard (ex ((uncaught-exception? ex) (raise (uncaught-exception-reason ex))) (else (raise ex))) (thread-join! (thread-start! (make-thread (lambda () (apply proc args)) "!apply"))))) #| (define (!apply proc args) (thread-join! (thread-start! (make-thread (lambda () (guard (ex (else (log-condition (format "~s ~s" proc args) ex) (raise (if (condition? ex) ex (format "~s ~s ~s" proc args ex))))) (apply proc args))) "!apply")))) |# (define local-none '(local-none)) (define (!map map abort-handler exception-handler n tmo f args) (let* ((mbox (make-mailbox 'parallel-1)) (ltmo (and tmo (list (timeout-object)))) (threads (map (lambda (arg) (let* ((r (vector #f ; 0 flag: result valid #f ; 1 result (if valid) local-none ; 2 final exception (none if handled by eception-handler) #f ; 3 worker thread )) (t (make-thread (lambda () (guard (ex (else (vector-set! r 2 ex))) (vector-set! r 1 (f arg))) (vector-set! r 0 #t) (vector-set! r 3 #f) (send-message! mbox r)) "!map"))) (vector-set! r 3 t) r)) args)) (abrt! (lambda () (if (not (eq? abort-handler #t)) (for-each (lambda (e) (if (not (vector-ref e 0)) (let ((t (vector-ref e 3))) (vector-set! e 3 #f) (if (thread? t) (cond ((eq? abort-handler thread-signal-timeout!) (thread-signal-timeout! t)) ((eq? abort-handler thread-join!) (guard (ex (else #f)) (thread-join! t))) ((procedure? abort-handler) (!order (abort-handler t) 'parallel-abandoned)) (else (thread-terminate! t))))))) threads)))) (alive (and (or tmo (not (eq? abort-handler #t))) (let ((parent (current-thread))) (thread-start! (make-thread (lambda () (guard (ex (else (send-message! mbox (debug ex ltmo)))) (with-timeout! tmo (lambda () (guard (ex (else (debug ex #f))) (thread-join! parent)) (or (eq? abort-handler #t) (abrt!)))))) (thread-name parent)))))) (max (length threads)) (results (lambda (missing) (lambda (key f . init) (case key ((missing:) (f missing)) ((conditions:) ; defaults to make-compound-condition (apply f (let loop ((threads threads)) (cond ((null? threads) '()) ((vector-ref (car threads) 0) (loop (cdr threads))) (else (cons (vector-ref (car threads) 2) (loop (cdr threads)))))))) ((fold:) (let loop ((threads threads) (init (car init))) (if (null? threads) init (loop (cdr threads) (if (vector-ref (car threads) 0) (f (vector-ref (car threads) 1) init) init))))) ((fold-right:) (let loop ((threads threads)) (if (null? threads) (car init) (if (vector-ref (car threads) 0) (f (vector-ref (car threads) 1) (loop (cdr threads))) (loop (cdr threads)))))) (else (let ((slot (case key ((conditions:) 2) ((values:) 1) (else #f)))) (apply f (let loop ((threads threads)) (cond ((null? threads) '()) ((not (vector-ref (car threads) 0)) (loop (cdr threads))) (else (cons (vector-ref (car threads) slot) (loop (cdr threads)))))))))))))) (guard (ex (else (abrt!) (if (thread? alive) (thread-terminate! alive)) (raise ex))) (for-each (lambda (t) (thread-start! (vector-ref t 3))) threads) (let loop ((n (or n max)) (err (if n (add1 (fx- max (if (number? n) n max))) -1))) (if (or (eqv? n 0) (eqv? err 0)) (begin (abrt!) (if (thread? alive) (thread-terminate! alive)) (results n)) (let ((m (receive-message! mbox))) (cond ((and ltmo (eq? m ltmo)) (if (eq? n #t) (loop 0 err) (raise (timeout-object))) ) ((eq? (vector-ref m 2) local-none) (loop (or (eq? n #t) (sub1 n)) (sub1 err))) (else (if (procedure? exception-handler) (call-with-values (lambda () (exception-handler (vector-ref m 2))) (lambda handled (if (pair? handled) (begin (vector-set! m 1 (car handled)) (vector-set! m 2 #f) (vector-set! m 0 #t) (loop (or (eq? n #t) (sub1 n)) err)) (loop n (sub1 err))))) (loop n (sub1 err))))))))))) (define (!mapfold* srclocid make-compound-condition map abort-handler exception-handler n tmo mapf foldf init args) (let* ((mbox (make-mailbox srclocid)) (ltmo (and tmo (list (timeout-object)))) (threads (map (lambda (arg) (let* ((r (vector #f ; 0 flag: result valid #f ; 1 result (if valid) local-none ; 2 final exception (none if handled by eception-handler) #f ; 3 worker thread )) (t (make-thread (lambda () (guard (ex (else (vector-set! r 2 ex))) (vector-set! r 1 (mapf arg))) (vector-set! r 0 #t) (vector-set! r 3 #f) (send-message! mbox r)) "!mapfold"))) (vector-set! r 3 t) r)) args)) (abrt! (lambda () (if (not (eq? abort-handler #t)) (for-each (lambda (e) (if (not (vector-ref e 0)) (let ((t (vector-ref e 3))) (vector-set! e 3 #f) (if (thread? t) (cond ((eq? abort-handler thread-signal-timeout!) (thread-signal-timeout! t)) ((eq? abort-handler thread-join!) (guard (ex (else #f)) (thread-join! t))) ((procedure? abort-handler) (!order (abort-handler t) 'parallel-abandoned)) (else (thread-terminate! t))))))) threads)))) (alive (and (or tmo (not (eq? abort-handler #t))) (let ((parent (current-thread))) (thread-start! (make-thread (lambda () (guard (ex (else (send-message! mbox ltmo))) (with-timeout! tmo (lambda () (guard (ex (else #f)) (thread-join! parent)) (or (eq? abort-handler #t) (abrt!)))))) (thread-name parent)))))) (max (length threads))) (guard (ex (else (abrt!) (if (thread? alive) (thread-terminate! alive)) (raise ex))) (for-each (lambda (t) (thread-start! (vector-ref t 3))) threads) (letrec ((cont (lambda (n v init total) (receive (n init) (foldf (or (eq? n #t) (sub1 n)) v init) (loop n (sub1 total) init)))) (loop (lambda (n total init) (if (or (eqv? n 0) (eqv? total 0)) (begin (abrt!) (if (thread? alive) (thread-terminate! alive)) (if (or (eq? n #t) (eqv? n 0)) init (raise (apply make-compound-condition (let loop ((threads threads)) (cond ((null? threads) '()) ((vector-ref (car threads) 0) (loop (cdr threads))) (else (cons (vector-ref (car threads) 2) (loop (cdr threads)))))))))) (let ((m (receive-message! mbox))) (cond ((and ltmo (eq? m ltmo)) (if (eq? n #t) (loop 0 total init) (raise (timeout-object))) ) ((eq? (vector-ref m 2) local-none) (cont n (vector-ref m 1) init total)) (else (if (procedure? exception-handler) (call-with-values (lambda () (exception-handler (vector-ref m 2))) (lambda handled (if (pair? handled) (begin (vector-set! m 1 (car handled)) (vector-set! m 2 #f) (vector-set! m 0 #t) (cont n (car handled) init total)) (loop n (sub1 total) init)))) (loop n (sub1 total) init))))))))) (loop (or n max) max init))))) (define (!mapfold mapf foldf init data . rest) (let ((srclocid '!mapfold) (n #f) (tmo #f) (map-handler #f) (fold-handler #f) (too-much (lambda (n) (fx>= n 1))) (make-compound-condition list) (map map)) (do ((rest rest (cddr rest))) ((null? rest)) (case (car rest) ((count:) (set! n (cadr rest))) ((timeout:) (set! tmo (cadr rest))) ((map-handler:) (set! map-handler (cadr rest))) ((fold-handler:) (set! fold-handler (cadr rest))) ((map:) (set! map (cadr rest))) ((compound-condition:) (set! make-compound-condition (cadr rest))) ((name:) (set! srclocid (cadr rest))) (else (raise rest)))) (!mapfold* srclocid make-compound-condition map fold-handler map-handler n tmo mapf foldf init data))) (define (map-parallel abort-handler exception-handler n tmo f args) ((!map map abort-handler exception-handler n tmo f args) values: list))