[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Fri, 30 Jun 2023 18:12:49 -0400 (EDT) |
branch: master
commit 445198e2a0afcbfb30c2dd178fd9e093480d5718
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Fri Jun 30 23:59:53 2023 +0200
remote: Simplify interface to send and receive messages.
This hides serialization/deserialization, assembly of message parts, and
the actual send/receive operation behind 'send-message' and
'receive-message'.
* src/cuirass/remote.scm (zmq-remote-address)
(zmq-message-string, zmq-read-message): Remove.
(send-message, receive-message): New procedures.
* src/cuirass/remote.scm (build-request-message):
(no-build-message, build-started-message)
(build-failed-message, build-succeeded-message)
(worker-ping, worker-ready-message)
(worker-request-work-message)
(worker-request-info-message, server-info-message): Remove 'format'
call and return an sexp instead.
* src/cuirass/scripts/remote-server.scm (read-worker-exp):
Add #:peer-address. Change 'msg' to 'sexp'.
(need-fetching?): Remove call to 'zmq-read-message'. Remove
inappropriate use of 'else' keyword.
(run-fetch): Remove call to 'zmq-read-message'. Use 'receive-message'
instead of 'zmq-message-receive*' & co.
(zmq-start-proxy): Use 'receive-message' and 'send-message' instead of
'zmq-message-receive*', 'zmq-message-send' & co. Pass #:peer-address to
'read-worker-exp'.
* src/cuirass/scripts/remote-worker.scm (run-command): Remove call to
'zmq-read-message'.
(spawn-worker-ping)[ping]: Use 'send-message'.
(start-worker): Use 'send-message' and 'receive-message' instead of
the whole shebang.
---
src/cuirass/remote.scm | 88 ++++++++++++++++++++++-------------
src/cuirass/scripts/remote-server.scm | 61 +++++++++---------------
src/cuirass/scripts/remote-worker.scm | 54 +++++++++------------
3 files changed, 99 insertions(+), 104 deletions(-)
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index 2193235..6124707 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -70,8 +70,6 @@
send-log
zmq-poll*
- zmq-message-receive*
- zmq-empty-delimiter
build-request-message
no-build-message
@@ -83,9 +81,9 @@
worker-request-work-message
worker-request-info-message
server-info-message
- zmq-remote-address
- zmq-message-string
- zmq-read-message
+
+ send-message
+ receive-message
remote-server-service-type))
@@ -385,20 +383,46 @@ retries a call to PROC."
(define zmq-message-receive*
(EINTR-safe zmq-message-receive))
-(define (zmq-remote-address message)
- (zmq-message-gets message "Peer-Address"))
-
-(define (zmq-message-string message)
- (bv->string
- (zmq-message-content message)))
-
-(define (zmq-read-message msg)
- (call-with-input-string msg read))
-
(define (zmq-empty-delimiter)
"Return an empty ZMQ delimiter used to format message envelopes."
(make-bytevector 0))
+(define* (send-message socket sexp
+ #:key recipient)
+ "Send SEXP over SOCKET, a ZMQ socket. When RECIPIENT is true, assume SOCKET
+is a ROUTER socket and use RECIPIENT, a bytevector, as the routing prefix of
+the message."
+ (let ((payload (list (zmq-empty-delimiter)
+ (string->bv (object->string sexp)))))
+ (zmq-send-msg-parts-bytevector socket
+ (if recipient
+ (cons recipient payload)
+ payload))))
+
+(define* (receive-message socket #:key router?)
+ "Read an sexp from SOCKET, a ZMQ socket, and return it. Return the
+unspecified value when reading a message without payload.
+
+When ROUTER? is true, assume messages received start with a routing
+prefix (the identity of the peer, as a bytevector), and return three values:
+the payload, the peer's identity (a bytevector), and the peer address."
+ (if router?
+ (match (zmq-message-receive* socket)
+ ((sender (= zmq-message-size 0) data)
+ (values (call-with-input-string (bv->string
+ (zmq-message-content data))
+ read)
+ (zmq-message-content sender)
+ (zmq-message-gets data "Peer-Address")))
+ ((sender #vu8())
+ (values *unspecified* sender)))
+ (match (zmq-get-msg-parts-bytevector socket '())
+ ((#vu8() data)
+ (call-with-input-string (bv->string data)
+ read))
+ ((#vu8())
+ *unspecified*))))
+
;; ZMQ Messages.
(define* (build-request-message drv
#:key
@@ -408,50 +432,50 @@ retries a call to PROC."
timestamp
system)
"Return a message requesting the build of DRV for SYSTEM."
- (format #f "~s" `(build (drv ,drv)
- (priority ,priority)
- (timeout ,timeout)
- (max-silent ,max-silent)
- (timestamp ,timestamp)
- (system ,system))))
+ `(build (drv ,drv)
+ (priority ,priority)
+ (timeout ,timeout)
+ (max-silent ,max-silent)
+ (timestamp ,timestamp)
+ (system ,system)))
(define (no-build-message)
"Return a message that indicates that no builds are available."
- (format #f "~s" `(no-build)))
+ `(no-build))
(define (build-started-message drv worker)
"Return a message that indicates that the build of DRV has started."
- (format #f "~s" `(build-started (drv ,drv) (worker ,worker))))
+ `(build-started (drv ,drv) (worker ,worker)))
(define* (build-failed-message drv url #:optional log)
"Return a message that indicates that the build of DRV has failed."
- (format #f "~s" `(build-failed (drv ,drv) (url ,url) (log ,log))))
+ `(build-failed (drv ,drv) (url ,url) (log ,log)))
(define* (build-succeeded-message drv url #:optional log)
"Return a message that indicates that the build of DRV is done."
- (format #f "~s" `(build-succeeded (drv ,drv) (url ,url) (log ,log))))
+ `(build-succeeded (drv ,drv) (url ,url) (log ,log)))
(define (worker-ping worker)
"Return a message that indicates that WORKER is alive."
- (format #f "~s" `(worker-ping ,worker)))
+ `(worker-ping ,worker))
(define (worker-ready-message worker)
"Return a message that indicates that WORKER is ready."
- (format #f "~s" `(worker-ready ,worker)))
+ `(worker-ready ,worker))
(define (worker-request-work-message name)
"Return a message that indicates that WORKER is requesting work."
- (format #f "~s" `(worker-request-work ,name)))
+ `(worker-request-work ,name))
(define (worker-request-info-message)
"Return a message requesting server information."
- (format #f "~s" '(worker-request-info)))
+ '(worker-request-info))
(define (server-info-message worker-address log-port publish-port)
"Return a message containing server information."
- (format #f "~s" `(server-info (worker-address ,worker-address)
- (log-port ,log-port)
- (publish-port ,publish-port))))
+ `(server-info (worker-address ,worker-address)
+ (log-port ,log-port)
+ (publish-port ,publish-port)))
(define remote-server-service-type
"_remote-server._tcp")
diff --git a/src/cuirass/scripts/remote-server.scm
b/src/cuirass/scripts/remote-server.scm
index 385d5f6..5df0ae2 100644
--- a/src/cuirass/scripts/remote-server.scm
+++ b/src/cuirass/scripts/remote-server.scm
@@ -57,6 +57,7 @@
#:use-module (srfi srfi-26)
#:use-module (srfi srfi-34)
#:use-module (srfi srfi-37)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (ice-9 q)
@@ -227,8 +228,8 @@ and store the result inside the BOX."
(worker-systems worker))))
(db-get-pending-build system)))))
-(define* (read-worker-exp msg #:key reply-worker)
- "Read the given MSG sent by a worker. REPLY-WORKER is a procedure that can
+(define* (read-worker-exp sexp #:key peer-address reply-worker)
+ "Read the given SEXP sent by a worker. REPLY-WORKER is a procedure that can
be used to reply to the worker."
(define (update-worker! base-worker)
(let* ((worker* (worker
@@ -238,13 +239,12 @@ be used to reply to the worker."
(worker-name worker*))
(db-add-or-update-worker worker*)))
- (match (zmq-read-message
- (zmq-message-string msg))
+ (match sexp
(('worker-ready worker)
(update-worker! worker))
(('worker-request-info)
(reply-worker
- (server-info-message (zmq-remote-address msg) (%log-port)
(%publish-port))))
+ (server-info-message peer-address (%log-port) (%publish-port))))
(('worker-request-work name)
(let ((worker (db-get-worker name)))
(when (and (%debug) worker)
@@ -358,7 +358,7 @@ at URL."
(define (need-fetching? message)
"Return #t if the received MESSAGE implies that some output fetching is
required and #f otherwise."
- (match (zmq-read-message message)
+ (match message
(('build-succeeded ('drv drv) _ ...)
(when (%debug)
(log-debug "fetching required for ~a (success)" drv))
@@ -367,7 +367,7 @@ required and #f otherwise."
(when (%debug)
(log-debug "fetching required for ~a (fail)" drv))
#t)
- (else #f)))
+ (_ #f)))
(define* (run-fetch message)
"Read MESSAGE and download the corresponding build outputs. If
@@ -383,7 +383,7 @@ directory."
(read-derivation-from-file drv))))
(const '())))
- (match (zmq-read-message message)
+ (match message
(('build-succeeded ('drv drv) ('url url) _ ...)
(let ((outputs (build-outputs drv)))
(log-info "fetching '~a' from ~a" drv url)
@@ -412,11 +412,8 @@ socket."
(set-thread-name name)
(let ((socket (zmq-fetch-worker-socket)))
(let loop ()
- (match (zmq-message-receive* socket)
- ((message)
- (run-fetch (bv->string
- (zmq-message-content message)))
- (atomic-box-fetch-and-dec! %fetch-queue-size)))
+ (run-fetch (receive-message socket))
+ (atomic-box-fetch-and-dec! %fetch-queue-size)
(loop))))))
@@ -484,32 +481,18 @@ frontend to the workers connected through the TCP
backend."
(let* ((items (zmq-poll* poll-items 1000))
(start-time (current-time)))
(when (socket-ready? items build-socket)
- (match (zmq-message-receive* build-socket)
- ((worker empty rest)
- (let* ((command (bv->string (zmq-message-content rest)))
- (reply-worker
- (lambda (message)
- (zmq-message-send-parts
- build-socket
- (map zmq-msg-init
- (list (zmq-message-content worker)
- (zmq-empty-delimiter)
- (string->bv message)))))))
- (if (need-fetching? command)
- (let ((fetch-msg (zmq-msg-init
- (zmq-message-content rest))))
- (atomic-box-fetch-and-inc! %fetch-queue-size)
- (zmq-message-send fetch-socket fetch-msg))
- (read-worker-exp rest
- #:reply-worker reply-worker))))
- (x
- (log-error "Unexpected message: ~a." x)
- (for-each (lambda (msg)
- (log-error "~/content: ~a (~a)"
- (zmq-message-content msg)
- (false-if-exception
- (zmq-message-string msg))))
- x))))
+ (let* ((command sender sender-address
+ (receive-message build-socket #:router? #t))
+ (reply-worker (lambda (message)
+ (send-message build-socket message
+ #:recipient sender))))
+ (if (need-fetching? command)
+ (begin
+ (atomic-box-fetch-and-inc! %fetch-queue-size)
+ (send-message fetch-socket command))
+ (read-worker-exp command
+ #:peer-address sender-address
+ #:reply-worker reply-worker))))
(db-remove-unresponsive-workers (%worker-timeout))
(let ((delta (- (current-time) start-time)))
(when (> delta %loop-timeout)
diff --git a/src/cuirass/scripts/remote-worker.scm
b/src/cuirass/scripts/remote-worker.scm
index 01eb943..69fe1e4 100644
--- a/src/cuirass/scripts/remote-worker.scm
+++ b/src/cuirass/scripts/remote-worker.scm
@@ -253,7 +253,7 @@ still be substituted."
reply worker)
"Run COMMAND. SERVICE-NAME is the name of the build server that sent the
command. REPLY is a procedure that can be used to reply to this server."
- (match (zmq-read-message command)
+ (match command
(('build ('drv drv)
('priority priority)
('timeout timeout)
@@ -275,10 +275,8 @@ command. REPLY is a procedure that can be used to reply
to this server."
(define (spawn-worker-ping worker server)
"Spawn a thread that periodically pings SERVER."
(define (ping socket)
- (zmq-send-msg-parts-bytevector
- socket
- (list (make-bytevector 0)
- (string->bv (worker-ping (worker->sexp worker))))))
+ (send-message socket
+ (worker-ping (worker->sexp worker))))
(call-with-new-thread
(lambda ()
@@ -304,29 +302,19 @@ command. REPLY is a procedure that can be used to reply
to this server."
and executing them. The worker can reply on the same socket."
(define (reply socket)
(lambda (message)
- (zmq-send-msg-parts-bytevector
- socket
- (list (zmq-empty-delimiter) (string->bv message)))))
+ (send-message socket message)))
(define (ready socket worker)
- (zmq-send-msg-parts-bytevector
- socket
- (list (make-bytevector 0)
- (string->bv
- (worker-ready-message (worker->sexp worker))))))
+ (send-message socket
+ (worker-ready-message (worker->sexp worker))))
(define (request-work socket worker)
(let ((name (worker-name worker)))
- (zmq-send-msg-parts-bytevector
- socket
- (list (make-bytevector 0)
- (string->bv (worker-request-work-message name))))))
+ (send-message socket
+ (worker-request-work-message name))))
(define (request-info socket)
- (zmq-send-msg-parts-bytevector
- socket
- (list (make-bytevector 0)
- (string->bv (worker-request-info-message)))))
+ (send-message socket (worker-request-info-message)))
(define (read-server-info socket)
;; Ignore the boostrap message sent due to ZMQ_PROBE_ROUTER option.
@@ -334,14 +322,12 @@ and executing them. The worker can reply on the same
socket."
((empty) #f))
(request-info socket)
- (match (zmq-get-msg-parts-bytevector socket '())
- ((empty info)
- (match (zmq-read-message (bv->string info))
- (('server-info
- ('worker-address worker-address)
- ('log-port log-port)
- ('publish-port publish-port))
- (list worker-address log-port publish-port))))))
+ (match (receive-message socket)
+ (`(server-info
+ (worker-address ,worker-address)
+ (log-port ,log-port)
+ (publish-port ,publish-port))
+ (list worker-address log-port publish-port))))
(define (server-info->server info serv)
(match info
@@ -390,12 +376,14 @@ and executing them. The worker can reply on the same
socket."
(begin
(log-info (G_ "~a: request work.") (worker-name wrk))
(request-work socket worker)
- (match (zmq-get-msg-parts-bytevector socket '())
- ((empty) ;server reconnect
+ (match (receive-message socket)
+ ((? unspecified?) ;server reconnect
(log-info (G_ "~a: received a bootstrap message.")
(worker-name wrk)))
- ((empty command)
- (run-command (bv->string command) server
+ (command
+ (log-debug (G_ "~a: received command: ~s")
+ (worker-name wrk) command)
+ (run-command command server
#:reply (reply socket)
#:worker worker)))))