[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Mathieu Othacehe |
Date: |
Wed, 6 Jan 2021 15:06:41 -0500 (EST) |
branch: wip-offload
commit ca7a7ca9894eb4f0afb1ed91f9701a4e050645a1
Author: Mathieu Othacehe <othacehe@gnu.org>
AuthorDate: Wed Dec 2 11:13:33 2020 +0100
Add remote build support.
* src/cuirass/remote.scm: New file.
* src/cuirass/remote-server.scm: New file.
* src/cuirass/remote-worker.scm: New file.
* bin/remote-server.in: New file.
* bin/remote-worker.in: New file.
* Makefile.am (bin_SCRIPTS): Add new binaries,
(dist_pkgmodule_DATA): add new files,
(EXTRA_DIST): add new binaries,
(bin/remote-server, bin/remote-worker): new targets.
* .gitignore: Add new binaries.
* bin/cuirass.in (%options): Add "--build-remote" option,
(show-help): document it,
(main): honor it.
* src/cuirass/base.scm (with-build-offload-thread): New macro,
(%build-remote?, %build-offload-channel): new parameters,
(make-build-offload-thread): new procedure,
(build-derivations/offload): new procedure,
(restart-builds): use it to offload builds when %build-remote? is set,
(build-packages): ditto.
---
.gitignore | 2 +
Makefile.am | 20 +-
bin/cuirass.in | 161 +++++++-------
bin/remote-server.in | 29 +++
bin/remote-worker.in | 29 +++
src/cuirass/base.scm | 36 ++-
src/cuirass/database.scm | 121 +++++++---
src/cuirass/http.scm | 41 ++--
src/cuirass/metrics.scm | 80 +++----
src/cuirass/remote-server.scm | 497 ++++++++++++++++++++++++++++++++++++++++++
src/cuirass/remote-worker.scm | 382 ++++++++++++++++++++++++++++++++
src/cuirass/remote.scm | 437 +++++++++++++++++++++++++++++++++++++
src/cuirass/templates.scm | 64 +++++-
src/schema.sql | 18 +-
src/sql/upgrade-17.sql | 2 +-
src/sql/upgrade-18.sql | 10 +
src/sql/upgrade-19.sql | 11 +
tests/database.scm | 3 +-
tests/http.scm | 6 -
19 files changed, 1754 insertions(+), 195 deletions(-)
diff --git a/.gitignore b/.gitignore
index beabf29..7cd0e1f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,8 @@
/bin/cuirass
/bin/cuirass-send-events
/bin/evaluate
+/bin/remote-server
+/bin/remote-worker
/build-aux/config.guess
/build-aux/config.sub
/build-aux/install-sh
diff --git a/Makefile.am b/Makefile.am
index 72cb5a6..59d2c25 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -22,7 +22,13 @@
# You should have received a copy of the GNU General Public License
# along with Cuirass. If not, see <http://www.gnu.org/licenses/>.
-bin_SCRIPTS = bin/cuirass bin/cuirass-send-events bin/evaluate
+bin_SCRIPTS = \
+ bin/cuirass \
+ bin/cuirass-send-events \
+ bin/evaluate \
+ bin/remote-server \
+ bin/remote-worker
+
noinst_SCRIPTS = pre-inst-env
guilesitedir = $(datarootdir)/guile/site/@GUILE_EFFECTIVE_VERSION@
@@ -48,6 +54,9 @@ dist_pkgmodule_DATA = \
src/cuirass/http.scm \
src/cuirass/logging.scm \
src/cuirass/metrics.scm \
+ src/cuirass/remote.scm \
+ src/cuirass/remote-server.scm \
+ src/cuirass/remote-worker.scm \
src/cuirass/send-events.scm \
src/cuirass/ui.scm \
src/cuirass/utils.scm \
@@ -86,7 +95,9 @@ dist_sql_DATA = \
src/sql/upgrade-14.sql \
src/sql/upgrade-15.sql \
src/sql/upgrade-16.sql \
- src/sql/upgrade-17.sql
+ src/sql/upgrade-17.sql \
+ src/sql/upgrade-18.sql \
+ src/sql/upgrade-19.sql
dist_css_DATA = \
src/static/css/cuirass.css \
@@ -167,6 +178,8 @@ EXTRA_DIST = \
bin/cuirass.in \
bin/cuirass-send-events.in \
bin/evaluate.in \
+ bin/remote-server.in \
+ bin/remote-worker.in \
bootstrap \
build-aux/guix.scm \
src/cuirass/config.scm.in \
@@ -227,6 +240,9 @@ generate_file = \
bin/cuirass: $(srcdir)/bin/cuirass.in
bin/cuirass-send-events: $(srcdir)/bin/cuirass-send-events.in
bin/evaluate: $(srcdir)/bin/evaluate.in
+bin/remote-server: $(srcdir)/bin/remote-server.in
+bin/remote-worker: $(srcdir)/bin/remote-worker.in
+
$(bin_SCRIPTS): Makefile
$(generate_file); chmod +x $@
src/cuirass/config.scm: $(srcdir)/src/cuirass/config.scm.in Makefile
diff --git a/bin/cuirass.in b/bin/cuirass.in
index fb0c0fe..20c2447 100644
--- a/bin/cuirass.in
+++ b/bin/cuirass.in
@@ -59,6 +59,7 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@"
-I, --interval=N Wait N seconds between each poll
-Q, --queue-size=N Set the writer queue size to N elements.
--log-queries=FILE Log SQL queries in FILE.
+ --build-remote Use the remote build mechanism
--use-substitutes Allow usage of pre-built substitutes
--record-events Record events for distribution
--threads=N Use up to N kernel threads
@@ -77,6 +78,7 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@"
(listen (value #t))
(interval (single-char #\I) (value #t))
(queue-size (single-char #\Q) (value #t))
+ (build-remote (value #f))
(use-substitutes (value #f))
(threads (value #t))
(fallback (value #f))
@@ -103,6 +105,7 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0"
"$@"
(%package-database (option-ref opts 'database (%package-database)))
(%package-cachedir
(option-ref opts 'cache-directory (%package-cachedir)))
+ (%build-remote? (option-ref opts 'build-remote #f))
(%use-substitutes? (option-ref opts 'use-substitutes #f))
(%fallback? (option-ref opts 'fallback #f))
(%record-events? (option-ref opts 'record-events #f))
@@ -146,84 +149,86 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0"
"$@"
(lambda ()
(with-database
(with-queue-writer-worker
- (and specfile
- (let ((new-specs (save-module-excursion
- (lambda ()
- (set-current-module (make-user-module
'()))
- (primitive-load specfile)))))
- (for-each db-add-specification new-specs)))
-
- (when queries-file
- (log-message "Enable SQL query logging.")
- (db-log-queries queries-file))
-
- (if one-shot?
- (process-specs (db-get-specifications))
- (let ((exit-channel (make-channel)))
- (start-watchdog)
- (if (option-ref opts 'web #f)
- (begin
- (spawn-fiber
- (essential-task
- 'web exit-channel
- (lambda ()
- (run-cuirass-server #:host host #:port port)))
- #:parallel? #t)
-
- (spawn-fiber
- (essential-task
- 'monitor exit-channel
- (lambda ()
- (while #t
- (log-monitoring-stats)
- (sleep 600))))))
-
- (begin
- (clear-build-queue)
-
- ;; If Cuirass was stopped during an evaluation,
- ;; abort it. Builds that were not registered
- ;; during this evaluation will be registered
- ;; during the next evaluation.
- (db-abort-pending-evaluations)
-
- ;; First off, restart builds that had not
- ;; completed or were not even started on a
- ;; previous run.
- (spawn-fiber
- (essential-task
- 'restart-builds exit-channel
- (lambda ()
- (restart-builds))))
-
- (spawn-fiber
- (essential-task
- 'build exit-channel
- (lambda ()
- (while #t
- (process-specs (db-get-specifications))
- (log-message
- "next evaluation in ~a seconds" interval)
- (sleep interval)))))
-
- (spawn-fiber
- (essential-task
- 'metrics exit-channel
- (lambda ()
- (while #t
- (with-time-logging
- "Metrics update"
- (db-update-metrics))
- (sleep 3600)))))
-
- (spawn-fiber
- (essential-task
- 'monitor exit-channel
- (lambda ()
- (while #t
- (log-monitoring-stats)
- (sleep 600)))))))
- (primitive-exit (get-message exit-channel)))))))
+ (and specfile
+ (let ((new-specs (save-module-excursion
+ (lambda ()
+ (set-current-module
+ (make-user-module '()))
+ (primitive-load specfile)))))
+ (for-each db-add-specification new-specs)))
+
+ (when queries-file
+ (log-message "Enable SQL query logging.")
+ (db-log-queries queries-file))
+
+ (if one-shot?
+ (process-specs (db-get-specifications))
+ (let ((exit-channel (make-channel)))
+ (start-watchdog)
+ (if (option-ref opts 'web #f)
+ (begin
+ (spawn-fiber
+ (essential-task
+ 'web exit-channel
+ (lambda ()
+ (run-cuirass-server #:host host
+ #:port port)))
+ #:parallel? #t)
+
+ (spawn-fiber
+ (essential-task
+ 'monitor exit-channel
+ (lambda ()
+ (while #t
+ (log-monitoring-stats)
+ (sleep 600))))))
+
+ (begin
+ (clear-build-queue)
+
+ ;; If Cuirass was stopped during an evaluation,
+ ;; abort it. Builds that were not registered
+ ;; during this evaluation will be registered
+ ;; during the next evaluation.
+ (db-abort-pending-evaluations)
+
+ ;; First off, restart builds that had not
+ ;; completed or were not even started on a
+ ;; previous run.
+ (spawn-fiber
+ (essential-task
+ 'restart-builds exit-channel
+ (lambda ()
+ (restart-builds))))
+
+ (spawn-fiber
+ (essential-task
+ 'build exit-channel
+ (lambda ()
+ (while #t
+ (process-specs (db-get-specifications))
+ (log-message
+ "next evaluation in ~a seconds" interval)
+ (sleep interval)))))
+
+ (spawn-fiber
+ (essential-task
+ 'metrics exit-channel
+ (lambda ()
+ (while #t
+ (with-time-logging
+ "Metrics update"
+ (db-update-metrics))
+ (sleep 3600)))))
+
+ (spawn-fiber
+ (essential-task
+ 'monitor exit-channel
+ (lambda ()
+ (while #t
+ (log-monitoring-stats)
+ (sleep 600)))))))
+ (primitive-exit (get-message exit-channel)))))))
;; Most of our code is I/O so preemption doesn't matter much (it
;; could help while we're doing SQL requests, for instance, but it
diff --git a/bin/remote-server.in b/bin/remote-server.in
new file mode 100644
index 0000000..6425d51
--- /dev/null
+++ b/bin/remote-server.in
@@ -0,0 +1,29 @@
+#!/bin/sh
+# -*- scheme -*-
+# @configure_input@
+#GUILE_LOAD_PATH="@PACKAGE_LOAD_PATH@${GUILE_LOAD_PATH:+:}$GUILE_LOAD_PATH"
+#GUILE_LOAD_COMPILED_PATH="@PACKAGE_LOAD_COMPILED_PATH@${GUILE_LOAD_COMPILED_PATH:+:}$GUILE_LOAD_COMPILED_PATH"
+exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@"
+!#
+;;; remote-server.in -- Remote build server.
+;;; Copyright © 2020 Mathieu Othacehe <othacehe@gnu.org>
+;;;
+;;; This file is part of Cuirass.
+;;;
+;;; Cuirass is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation, either version 3 of the License, or
+;;; (at your option) any later version.
+;;;
+;;; Cuirass is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with Cuirass. If not, see <http://www.gnu.org/licenses/>.
+
+(use-modules (cuirass remote-server))
+
+(define* (main #:optional (args (command-line)))
+ (remote-server (cdr args)))
diff --git a/bin/remote-worker.in b/bin/remote-worker.in
new file mode 100644
index 0000000..8a3830c
--- /dev/null
+++ b/bin/remote-worker.in
@@ -0,0 +1,29 @@
+#!/bin/sh
+# -*- scheme -*-
+# @configure_input@
+#GUILE_LOAD_PATH="@PACKAGE_LOAD_PATH@${GUILE_LOAD_PATH:+:}$GUILE_LOAD_PATH"
+#GUILE_LOAD_COMPILED_PATH="@PACKAGE_LOAD_COMPILED_PATH@${GUILE_LOAD_COMPILED_PATH:+:}$GUILE_LOAD_COMPILED_PATH"
+exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@"
+!#
+;;; remote-worker.in -- Remote build worker.
+;;; Copyright © 2020 Mathieu Othacehe <othacehe@gnu.org>
+;;;
+;;; This file is part of Cuirass.
+;;;
+;;; Cuirass is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation, either version 3 of the License, or
+;;; (at your option) any later version.
+;;;
+;;; Cuirass is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with Cuirass. If not, see <http://www.gnu.org/licenses/>.
+
+(use-modules (cuirass remote-worker))
+
+(define* (main #:optional (args (command-line)))
+ (remote-worker (cdr args)))
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 5d3a456..b074f4f 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -22,8 +22,10 @@
(define-module (cuirass base)
#:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (cuirass logging)
#:use-module (cuirass database)
+ #:use-module (cuirass remote)
#:use-module (cuirass utils)
#:use-module ((cuirass config) #:select (%localstatedir))
#:use-module (gnu packages)
@@ -36,9 +38,13 @@
#:use-module ((guix config) #:select (%state-directory))
#:use-module (git)
#:use-module (ice-9 binary-ports)
+ #:use-module ((ice-9 suspendable-ports)
+ #:select (current-read-waiter
+ current-write-waiter))
#:use-module (ice-9 format)
#:use-module (ice-9 match)
#:use-module (ice-9 popen)
+ #:use-module (ice-9 ports internal)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 receive)
#:use-module (ice-9 regex)
@@ -58,6 +64,8 @@
fetch-inputs
compile
evaluate
+ build-derivations&
+ set-build-successful!
clear-build-queue
cancel-old-builds
restart-builds
@@ -70,6 +78,7 @@
%package-cachedir
%gc-root-directory
%gc-root-ttl
+ %build-remote?
%use-substitutes?
%fallback?))
@@ -102,6 +111,10 @@
(define time-monotonic time-tai))
(else #t))
+(define %build-remote?
+ ;; Define whether to use the remote build mechanism.
+ (make-parameter #f))
+
(define %use-substitutes?
;; Define whether to use substitutes
(make-parameter #f))
@@ -429,7 +442,7 @@ Essentially this procedure inverts the inversion-of-control
that
(lambda _
(close-port output)))))
- (values (non-blocking-port input)
+ (values input
(lambda ()
(match (atomic-box-ref result)
((? condition? c)
@@ -446,7 +459,7 @@ Essentially this procedure inverts the inversion-of-control
that
;; Our shuffling algorithm is simple: we sort by .drv file name. :-)
(sort drv string<?))
-(define (set-build-successful! drv)
+(define* (set-build-successful! drv #:optional log)
"Update the build status of DRV as successful and register any eventual
build products."
(let* ((build (db-get-build drv))
@@ -456,7 +469,8 @@ build products."
(when (and spec build)
(create-build-outputs build
(assq-ref spec #:build-outputs))))
- (db-update-build-status! drv (build-status succeeded)))
+ (db-update-build-status! drv (build-status succeeded)
+ #:log-file log))
(define (update-build-statuses! store lst)
"Update the build status of the derivations listed in LST, which have just
@@ -584,7 +598,7 @@ updating the database accordingly."
(log-message "bogus build-started event for '~a'" drv)))
(('build-remote drv host _ ...)
(log-message "'~a' offloaded to '~a'" drv host)
- (db-update-build-machine! drv host))
+ (db-update-build-worker! drv host))
(('build-succeeded drv _ ...)
(if (valid? drv)
(begin
@@ -642,7 +656,8 @@ started)."
;; Those in VALID can be restarted. If some of them were built in the
;; meantime behind our back, that's fine: 'spawn-builds' will DTRT.
(log-message "restarting ~a pending builds" (length valid))
- (spawn-builds store valid)
+ (unless (%build-remote?)
+ (spawn-builds store valid))
(log-message "done with restarted builds"))))
(define (create-build-outputs build product-specs)
@@ -682,16 +697,19 @@ by PRODUCT-SPECS."
(define (build-packages store jobs eval-id)
"Build JOBS and return a list of Build results."
(define derivations
- (with-time-logging
- (format #f "evaluation ~a registration" eval-id)
- (db-register-builds jobs eval-id)))
+ (let* ((name (db-get-evaluation-specification eval-id))
+ (specification (db-get-specification name)))
+ (with-time-logging
+ (format #f "evaluation ~a registration" eval-id)
+ (db-register-builds jobs eval-id specification))))
(log-message "evaluation ~a registered ~a new derivations"
eval-id (length derivations))
(db-set-evaluation-status eval-id
(evaluation-status succeeded))
- (spawn-builds store derivations)
+ (unless (%build-remote?)
+ (spawn-builds store derivations))
(let* ((results (filter-map (cut db-get-build <>) derivations))
(status (map (cut assq-ref <> #:status) results))
diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm
index 4ef5229..236f192 100644
--- a/src/cuirass/database.scm
+++ b/src/cuirass/database.scm
@@ -24,6 +24,7 @@
(define-module (cuirass database)
#:use-module (cuirass logging)
#:use-module (cuirass config)
+ #:use-module (cuirass remote)
#:use-module (cuirass utils)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
@@ -60,7 +61,7 @@
db-add-build-product
db-register-builds
db-update-build-status!
- db-update-build-machine!
+ db-update-build-worker!
db-get-output
db-get-inputs
db-get-build
@@ -82,6 +83,9 @@
db-get-evaluation-specification
db-get-build-product-path
db-get-build-products
+ db-add-worker
+ db-get-workers
+ db-clear-workers
db-get-evaluation-summary
db-get-checkouts
read-sql-file
@@ -92,6 +96,7 @@
;; Constants.
SQLITE_CONSTRAINT_PRIMARYKEY
SQLITE_CONSTRAINT_UNIQUE
+ SQLITE_BUSY_SNAPSHOT
;; Parameters.
%package-database
%package-schema-file
@@ -106,6 +111,9 @@
with-database
with-queue-writer-worker))
+;; Maximum priority for a Build or Specification.
+(define max-priority 9)
+
(define (%sqlite-exec db sql . args)
"Evaluate the given SQL query with the given ARGS. Return the list of
rows."
@@ -441,7 +449,7 @@ table."
(sqlite-exec db "\
INSERT OR IGNORE INTO Specifications (name, load_path_inputs, \
package_path_inputs, proc_input, proc_file, proc, proc_args, \
-build_outputs) \
+build_outputs, priority) \
VALUES ("
(assq-ref spec #:name) ", "
(assq-ref spec #:load-path-inputs) ", "
@@ -450,7 +458,8 @@ build_outputs) \
(assq-ref spec #:proc-file) ", "
(symbol->string (assq-ref spec #:proc)) ", "
(assq-ref spec #:proc-args) ", "
- (assq-ref spec #:build-outputs) ");")
+ (assq-ref spec #:build-outputs) ", "
+ (or (assq-ref spec #:priority) max-priority) ");")
(let ((spec-id (last-insert-rowid db)))
(for-each (lambda (input)
(db-add-input (assq-ref spec #:name) input))
@@ -504,7 +513,7 @@ SELECT * FROM Specifications ORDER BY name DESC;")))
(match rows
(() specs)
((#(name load-path-inputs package-path-inputs proc-input proc-file
proc
- proc-args build-outputs)
+ proc-args build-outputs priority)
. rest)
(loop rest
(cons `((#:name . ,name)
@@ -518,7 +527,8 @@ SELECT * FROM Specifications ORDER BY name DESC;")))
(#:proc-args . ,(with-input-from-string proc-args
read))
(#:inputs . ,(db-get-inputs name))
(#:build-outputs .
- ,(with-input-from-string build-outputs read)))
+ ,(with-input-from-string build-outputs read))
+ (#:priority . ,priority))
specs)))))))
(define-enumeration evaluation-status
@@ -622,15 +632,19 @@ string."
;; Extended error codes (see <sqlite3.h>).
;; XXX: This should be defined by (sqlite3).
+(define SQLITE_BUSY 5)
(define SQLITE_CONSTRAINT 19)
(define SQLITE_CONSTRAINT_PRIMARYKEY
(logior SQLITE_CONSTRAINT (ash 6 8)))
(define SQLITE_CONSTRAINT_UNIQUE
(logior SQLITE_CONSTRAINT (ash 8 8)))
+(define SQLITE_BUSY_SNAPSHOT
+ (logior SQLITE_BUSY (ash 2 8)))
(define-enumeration build-status
;; Build status as expected by Hydra's API. Note: the negative values are
;; Cuirass' own extensions.
+ (submitted -3)
(scheduled -2)
(started -1)
(succeeded 0)
@@ -662,7 +676,7 @@ Return #f otherwise. BUILD outputs are stored in the
OUTPUTS table."
(with-db-writer-worker-thread/force db
(sqlite-exec db "
INSERT INTO Builds (derivation, evaluation, job_name, system, nix_name, log,
-status, timestamp, starttime, stoptime)
+status, priority, max_silent, timeout, timestamp, starttime, stoptime)
VALUES ("
(assq-ref build #:derivation) ", "
(assq-ref build #:eval-id) ", "
@@ -672,9 +686,12 @@ VALUES ("
(assq-ref build #:log) ", "
(or (assq-ref build #:status)
(build-status scheduled)) ", "
- (or (assq-ref build #:timestamp) 0) ", "
- (or (assq-ref build #:starttime) 0) ", "
- (or (assq-ref build #:stoptime) 0) ");")
+ (assq-ref build #:priority) ", "
+ (or (assq-ref build #:max-silent) 0) ", "
+ (or (assq-ref build #:timeout) 0) ", "
+ (or (assq-ref build #:timestamp) 0) ", "
+ (or (assq-ref build #:starttime) 0) ", "
+ (or (assq-ref build #:stoptime) 0) ");")
(let* ((derivation (assq-ref build #:derivation))
(outputs (assq-ref build #:outputs))
(new-outputs (filter-map (cut db-add-output derivation <>)
@@ -702,7 +719,7 @@ path) VALUES ("
(assq-ref product #:path) ");")
(last-insert-rowid db)))
-(define (db-register-builds jobs eval-id)
+(define (db-register-builds jobs eval-id specification)
(define (new-outputs? outputs)
(let ((new-outputs
(filter-map (match-lambda
@@ -712,16 +729,23 @@ path) VALUES ("
outputs)))
(not (null? new-outputs))))
+ (define (build-priority priority)
+ (let ((spec-priority (assq-ref specification #:priority)))
+ (+ (* spec-priority 10) priority)))
+
(define (register job)
- (let* ((name (assq-ref job #:job-name))
- (drv (assq-ref job #:derivation))
- (job-name (assq-ref job #:job-name))
- (system (assq-ref job #:system))
- (nix-name (assq-ref job #:nix-name))
- (log (assq-ref job #:log))
- (period (assq-ref job #:period))
- (outputs (assq-ref job #:outputs))
- (cur-time (time-second (current-time time-utc))))
+ (let* ((name (assq-ref job #:job-name))
+ (drv (assq-ref job #:derivation))
+ (job-name (assq-ref job #:job-name))
+ (system (assq-ref job #:system))
+ (nix-name (assq-ref job #:nix-name))
+ (log (assq-ref job #:log))
+ (period (assq-ref job #:period))
+ (priority (or (assq-ref job #:priority) max-priority))
+ (max-silent (assq-ref job #:max-silent-time))
+ (timeout (assq-ref job #:timeout))
+ (outputs (assq-ref job #:outputs))
+ (cur-time (time-second (current-time time-utc))))
(and (new-outputs? outputs)
(let ((build `((#:derivation . ,drv)
(#:eval-id . ,eval-id)
@@ -734,12 +758,15 @@ path) VALUES ("
(#:log . ,(or log ""))
(#:status . ,(build-status scheduled))
+ (#:priority . ,(build-priority priority))
+ (#:max-silent . ,max-silent)
+ (#:timeout . ,timeout)
(#:outputs . ,outputs)
(#:timestamp . ,cur-time)
(#:starttime . 0)
(#:stoptime . 0))))
(if period
- (let* ((spec (db-get-evaluation-specification eval-id))
+ (let* ((spec (assq-ref specification #:name))
(time
(db-get-time-since-previous-build job-name spec))
(add-build? (cond
@@ -803,10 +830,10 @@ log file for DRV."
(#:event . ,(assq-ref status-names
status)))))))))
-(define* (db-update-build-machine! drv machine)
- "Update the database so that DRV's machine is MACHINE."
+(define* (db-update-build-worker! drv worker)
+ "Update the database so that DRV's worker is WORKER."
(with-db-writer-worker-thread db
- (sqlite-exec db "UPDATE Builds SET machine=" machine
+ (sqlite-exec db "UPDATE Builds SET worker=" worker
"WHERE derivation=" drv ";")))
(define (db-get-output path)
@@ -955,6 +982,8 @@ CASE WHEN :borderlowid IS NULL THEN
;; before those in 'scheduled' state (-2).
(('order . 'status+submission-time)
"Builds.status DESC, Builds.timestamp DESC, Builds.rowid ASC")
+ (('order . 'priority+timestamp)
+ "Builds.priority DESC, Builds.timestamp ASC")
(_ "Builds.rowid DESC"))))
;; XXX: Make sure that all filters are covered by an index.
@@ -965,10 +994,12 @@ CASE WHEN :borderlowid IS NULL THEN
(derivation . "Builds.derivation = :derivation")
(job . "Builds.job_name = :job")
(system . "Builds.system = :system")
+ (worker . "Builds.worker = :worker")
(evaluation . "Builds.evaluation = :evaluation")
(status . ,(match (assq-ref filters 'status)
(#f #f)
('done "Builds.status >= 0")
+ ('scheduled "Builds.status = -2")
('started "Builds.status = -1")
('pending "Builds.status < 0")
('succeeded "Builds.status = 0")
@@ -1031,7 +1062,8 @@ GROUP_CONCAT(Outputs.name), GROUP_CONCAT(Outputs.path),
GROUP_CONCAT(BP.rowid), GROUP_CONCAT(BP.type), GROUP_CONCAT(BP.file_size),
GROUP_CONCAT(BP.checksum), GROUP_CONCAT(BP.path) FROM
(SELECT Builds.derivation, Builds.rowid, Builds.timestamp, Builds.starttime,
- Builds.stoptime, Builds.log, Builds.status, Builds.job_name,
+ Builds.stoptime, Builds.log, Builds.status, Builds.priority,
+ Builds.max_silent, Builds.timeout, Builds.job_name,
Builds.system, Builds.nix_name, Builds.evaluation,
Specifications.name
FROM Builds
@@ -1070,7 +1102,8 @@ ORDER BY ~a;"
(sqlite-fold-right
(lambda (row result)
(match row
- (#(derivation id timestamp starttime stoptime log status
job-name
+ (#(derivation id timestamp starttime stoptime log status
+ priority max-silent timeout job-name
system nix-name eval-id specification
outputs-name outputs-path
products-id products-type products-file-size
@@ -1082,6 +1115,9 @@ ORDER BY ~a;"
(#:stoptime . ,stoptime)
(#:log . ,log)
(#:status . ,status)
+ (#:priority . ,priority)
+ (#:max-silent . ,max-silent)
+ (#:timeout . ,timeout)
(#:job-name . ,job-name)
(#:system . ,system)
(#:nix-name . ,nix-name)
@@ -1413,3 +1449,38 @@ WHERE build = " build-id))
(#:checksum . ,checksum)
(#:path . ,path))
products)))))))
+
+(define (db-add-worker worker)
+ "Insert WORKER into Worker table."
+ (with-db-writer-worker-thread db
+ (sqlite-exec db "\
+INSERT OR REPLACE INTO Workers (name, address, systems, last_seen)
+VALUES ("
+ (worker-name worker) ", "
+ (worker-address worker) ", "
+ (string-join (worker-systems worker) ",") ", "
+ (worker-last-seen worker) ");")
+ (last-insert-rowid db)))
+
+(define (db-get-workers)
+ "Return the workers in Workers table."
+ (with-db-worker-thread db
+ (let loop ((rows (sqlite-exec db "
+SELECT name, address, systems, last_seen from Workers"))
+ (workers '()))
+ (match rows
+ (() (reverse workers))
+ ((#(name address systems last-seen)
+ . rest)
+ (loop rest
+ (cons (worker
+ (name name)
+ (address address)
+ (systems (string-split systems #\,))
+ (last-seen last-seen))
+ workers)))))))
+
+(define (db-clear-workers)
+ "Remove all workers from Workers table."
+ (with-db-writer-worker-thread db
+ (sqlite-exec db "DELETE FROM Workers;")))
diff --git a/src/cuirass/http.scm b/src/cuirass/http.scm
index 99dc2ce..3ac7ef9 100644
--- a/src/cuirass/http.scm
+++ b/src/cuirass/http.scm
@@ -28,6 +28,7 @@
#:use-module (cuirass metrics)
#:use-module (cuirass utils)
#:use-module (cuirass logging)
+ #:use-module (cuirass remote)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-26)
@@ -446,26 +447,11 @@ Hydra format."
(#:link . ,(string-append "/jobset/" (assq-ref build
#:specification)))))))
(respond-build-not-found id))))
(('GET "build" (= string->number id) "log" "raw")
- (let ((build (and id (db-get-build id))))
- (if build
- (match (assq-ref build #:outputs)
- (((_ (#:path . (? string? output))) _ ...)
- ;; Redirect to a /log URL, which is assumed to be served
- ;; by 'guix publish'.
- (let ((uri (string->uri-reference
- (string-append "/log/"
- (basename output)))))
- (respond (build-response #:code 302
- #:headers `((location . ,uri)))
- #:body "")))
- (()
- ;; Not entry for ID in the 'Outputs' table.
- (respond-json-with-error
- 500
- (format #f "Outputs of build ~a are unknown." id)))
- (#f
- (respond-build-not-found id)))
- (respond-build-not-found id))))
+ (let* ((build (and id (db-get-build id)))
+ (log (and build (assq-ref build #:log))))
+ (if (and log (file-exists? log))
+ (respond-gzipped-file log)
+ (respond-not-found (uri->string (request-uri request))))))
(('GET "output" id)
(let ((output (db-get-output
(string-append (%store-prefix) "/" id))))
@@ -661,6 +647,21 @@ Hydra format."
(respond-json-with-error 500 "No build found.")))
(respond-json-with-error 500 "Query parameter not provided."))))
+ (('GET "workers")
+ (respond-html
+ (html-page
+ "Workers status"
+ (let ((workers (db-get-workers)))
+ (workers-status
+ workers
+ (map (lambda (worker)
+ (let ((name (worker-name worker)))
+ (db-get-builds `((worker . ,name)
+ (status . started)
+ (order . status+submission-time)))))
+ workers)))
+ '())))
+
(('GET "metrics")
(respond-html
(metrics-page)))
diff --git a/src/cuirass/metrics.scm b/src/cuirass/metrics.scm
index cd6a066..9a0fd14 100644
--- a/src/cuirass/metrics.scm
+++ b/src/cuirass/metrics.scm
@@ -329,42 +329,44 @@ timestamp) VALUES ("
(define (db-update-metrics)
"Compute and update all available metrics in database."
(with-db-writer-worker-thread/force db
- ;; We can not update all evaluations metrics for performance reasons.
- ;; Limit to the evaluations that were added during the past three days.
- (let ((specifications
- (map (cut assq-ref <> #:name) (db-get-specifications)))
- (evaluations (db-latest-evaluations)))
- (sqlite-exec db "BEGIN TRANSACTION;")
-
- (db-update-metric 'builds-per-day)
- (db-update-metric 'new-derivations-per-day)
- (db-update-metric 'pending-builds)
-
- ;; Update specification related metrics.
- (for-each (lambda (spec)
- (db-update-metric
- 'average-10-last-eval-duration-per-spec spec)
- (db-update-metric
- 'average-100-last-eval-duration-per-spec spec)
- (db-update-metric
- 'average-eval-duration-per-spec spec)
-
- (db-update-metric
- 'percentage-failure-10-last-eval-per-spec spec)
- (db-update-metric
- 'percentage-failure-100-last-eval-per-spec spec)
- (db-update-metric
- 'percentage-failed-eval-per-spec spec))
- specifications)
-
- ;; Update evaluation related metrics.
- (for-each (lambda (evaluation)
- (db-update-metric
- 'average-eval-build-start-time evaluation)
- (db-update-metric
- 'average-eval-build-complete-time evaluation)
- (db-update-metric
- 'evaluation-completion-speed evaluation))
- evaluations)
-
- (sqlite-exec db "COMMIT;"))))
+ (catch-sqlite-error
+ ;; We can not update all evaluations metrics for performance reasons.
+ ;; Limit to the evaluations that were added during the past three days.
+ (let ((specifications
+ (map (cut assq-ref <> #:name) (db-get-specifications)))
+ (evaluations (db-latest-evaluations)))
+ (sqlite-exec db "BEGIN TRANSACTION;")
+
+ (db-update-metric 'builds-per-day)
+ (db-update-metric 'new-derivations-per-day)
+ (db-update-metric 'pending-builds)
+
+ ;; Update specification related metrics.
+ (for-each (lambda (spec)
+ (db-update-metric
+ 'average-10-last-eval-duration-per-spec spec)
+ (db-update-metric
+ 'average-100-last-eval-duration-per-spec spec)
+ (db-update-metric
+ 'average-eval-duration-per-spec spec)
+
+ (db-update-metric
+ 'percentage-failure-10-last-eval-per-spec spec)
+ (db-update-metric
+ 'percentage-failure-100-last-eval-per-spec spec)
+ (db-update-metric
+ 'percentage-failed-eval-per-spec spec))
+ specifications)
+
+ ;; Update evaluation related metrics.
+ (for-each (lambda (evaluation)
+ (db-update-metric
+ 'average-eval-build-start-time evaluation)
+ (db-update-metric
+ 'average-eval-build-complete-time evaluation)
+ (db-update-metric
+ 'evaluation-completion-speed evaluation))
+ evaluations)
+
+ (sqlite-exec db "COMMIT;"))
+ (on SQLITE_BUSY_SNAPSHOT => #f))))
diff --git a/src/cuirass/remote-server.scm b/src/cuirass/remote-server.scm
new file mode 100644
index 0000000..5fb7633
--- /dev/null
+++ b/src/cuirass/remote-server.scm
@@ -0,0 +1,497 @@
+;;; remote-server.scm -- Remote build server.
+;;; Copyright © 2020 Mathieu Othacehe <othacehe@gnu.org>
+;;;
+;;; This file is part of Cuirass.
+;;;
+;;; GNU Guix is free software; you can redistribute it and/or modify it
+;;; under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation; either version 3 of the License, or (at
+;;; your option) any later version.
+;;;
+;;; GNU Guix is distributed in the hope that it will be useful, but
+;;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with GNU Guix. If not, see <http://www.gnu.org/licenses/>.
+
+(define-module (cuirass remote-server)
+ #:use-module (cuirass base)
+ #:use-module (cuirass config)
+ #:use-module (cuirass database)
+ #:use-module (cuirass logging)
+ #:use-module (cuirass remote)
+ #:use-module (cuirass utils)
+ #:use-module (gcrypt pk-crypto)
+ #:use-module (guix avahi)
+ #:use-module (guix base32)
+ #:use-module (guix base64)
+ #:use-module (guix config)
+ #:use-module (guix derivations)
+ #:use-module (guix records)
+ #:use-module (guix packages)
+ #:use-module (guix pki)
+ #:use-module (guix scripts)
+ #:use-module ((guix store)
+ #:select (current-build-output-port
+ ensure-path
+ store-protocol-error?
+ with-store))
+ #:use-module (guix ui)
+ #:use-module (guix utils)
+ #:use-module (guix workers)
+ #:use-module (guix build download)
+ #:use-module (guix build syscalls)
+ #:use-module ((guix build utils) #:select (mkdir-p))
+ #:use-module (gcrypt hash)
+ #:use-module (gcrypt pk-crypto)
+ #:use-module (simple-zmq)
+ #:use-module (rnrs bytevectors)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-26)
+ #:use-module (srfi srfi-34)
+ #:use-module (srfi srfi-37)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 q)
+ #:use-module (ice-9 rdelim)
+ #:use-module (ice-9 regex)
+ #:use-module (ice-9 threads)
+
+ #:export (remote-server))
+
+;; Indicate if the process has to be stopped.
+(define %stop-process?
+ (make-atomic-box #f))
+
+(define %cache-directory
+ (make-parameter #f))
+
+(define %trigger-substitute-url
+ (make-parameter #f))
+
+(define %private-key
+ (make-parameter #f))
+
+(define %public-key
+ (make-parameter #f))
+
+(define service-name
+ "Cuirass remote server")
+
+(define (show-help)
+ (format #t (G_ "Usage: remote-server [OPTION]...
+Start a remote build server.\n"))
+ (display (G_ "
+ -b, --backend-port=PORT listen worker connections on PORT"))
+ (display (G_ "
+ -l, --log-port=PORT listen build logs on PORT"))
+ (display (G_ "
+ -p, --publish-port=PORT publish substitutes on PORT"))
+ (display (G_ "
+ -D, --database=DB Use DB to read and store build results"))
+ (display (G_ "
+ -c, --cache=DIRECTORY cache built items to DIRECTORY"))
+ (display (G_ "
+ -t, --trigger-substitute-url=URL
+ trigger substitute baking at URL"))
+ (display (G_ "
+ -u, --user=USER change privileges to USER as soon as possible"))
+ (display (G_ "
+ --public-key=FILE use FILE as the public key for signatures"))
+ (display (G_ "
+ --private-key=FILE use FILE as the private key for signatures"))
+ (newline)
+ (display (G_ "
+ -h, --help display this help and exit"))
+ (display (G_ "
+ -V, --version display version information and exit"))
+ (newline)
+ (show-bug-report-information))
+
+(define %options
+ (list (option '(#\h "help") #f #f
+ (lambda _
+ (show-help)
+ (exit 0)))
+ (option '(#\V "version") #f #f
+ (lambda _
+ (show-version-and-exit "guix publish")))
+ (option '(#\b "backend-port") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'backend-port (string->number* arg) result)))
+ (option '(#\l "log-port") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'log-port (string->number* arg) result)))
+ (option '(#\p "publish-port") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'publish-port (string->number* arg) result)))
+ (option '(#\D "database") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'database arg result)))
+ (option '(#\c "cache") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cache arg result)))
+ (option '(#\t "trigger-substitute-url") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'trigger-substitute-url arg result)))
+ (option '(#\u "user") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'user arg result)))
+ (option '("public-key") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'public-key-file arg result)))
+ (option '("private-key") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'private-key-file arg result)))))
+
+(define %default-options
+ `((backend-port . 5555)
+ (log-port . 5556)
+ (publish-port . 5557)
+ (public-key-file . ,%public-key-file)
+ (private-key-file . ,%private-key-file)))
+
+
+;;;
+;;; Build workers.
+;;;
+
+(define %workers
+ ;; Set of connected workers.
+ (make-hash-table))
+
+(define (pop-build name)
+ (define (random-system systems)
+ (list-ref systems (random (length systems))))
+
+ (let ((worker (hash-ref %workers name)))
+ (and worker
+ (let ((system (random-system
+ (worker-systems worker))))
+ (match (db-get-builds `((status . scheduled)
+ (system . ,system)
+ (order . priority+timestamp)
+ (nr . 1)))
+ ((build) build)
+ (() #f))))))
+
+(define (remove-unresponsive-workers!)
+ (let ((unresponsive
+ (hash-fold (lambda (key value old)
+ (let* ((last-seen (worker-last-seen value))
+ (diff (- (current-time) last-seen)))
+ (if (> diff (%worker-timeout))
+ (cons key old)
+ old)))
+ '()
+ %workers)))
+ (for-each (lambda (worker)
+ (hash-remove! %workers worker))
+ unresponsive)))
+
+(define* (read-worker-exp exp #:key reply-worker)
+ "Read the given EXP sent by a worker. REPLY-WORKER is a procedure that can
+be used to reply to the worker."
+ (define (update-workers! base-worker proc)
+ (let* ((worker* (worker
+ (inherit (sexp->worker base-worker))
+ (last-seen (current-time))))
+ (name (worker-name worker*)))
+ (proc name)
+ (hash-set! %workers name worker*)))
+
+ (match (zmq-read-message exp)
+ (('worker-ready worker)
+ (update-workers! worker
+ (lambda (name)
+ (log-message (G_ "Worker `~a' is ready.") name))))
+ (('worker-request-work name)
+ (let ((build (pop-build name)))
+ (if build
+ (let ((derivation (assq-ref build #:derivation))
+ (priority (assq-ref build #:priority))
+ (timeout (assq-ref build #:timeout))
+ (max-silent (assq-ref build #:max-silent)))
+ (db-update-build-worker! derivation name)
+ (db-update-build-status! derivation (build-status submitted))
+ (reply-worker
+ (zmq-build-request-message derivation
+ #:priority priority
+ #:timeout timeout
+ #:max-silent max-silent)))
+ (reply-worker
+ (zmq-no-build-message)))))
+ (('worker-ping worker)
+ (update-workers! worker (const #t))
+ (db-clear-workers)
+ (hash-for-each (lambda (key value)
+ (db-add-worker value))
+ %workers))
+ (('build-started ('drv drv) ('worker worker))
+ (let ((log-file (log-path (%cache-directory) drv)))
+ (log-message "build started: '~a' on ~a." drv worker)
+ (db-update-build-worker! drv worker)
+ (db-update-build-status! drv (build-status started)
+ #:log-file log-file)))))
+
+
+;;;
+;;; Fetch workers.
+;;;
+
+(define (zmq-fetch-workers-endpoint)
+ "inproc://fetch-workers")
+
+(define (zmq-fetch-worker-socket)
+ "Return a socket used to communicate with the fetch workers."
+ (let ((socket (zmq-create-socket %zmq-context ZMQ_PULL))
+ (endpoint (zmq-fetch-workers-endpoint)))
+ (zmq-connect socket endpoint)
+ socket))
+
+(define (url-fetch* url file)
+ (parameterize ((current-output-port (%make-void-port "w"))
+ (current-error-port (%make-void-port "w")))
+ (url-fetch url file)))
+
+(define (publish-narinfo-url publish-url store-hash)
+ "Return the URL of STORE-HASH narinfo file on PUBLISH-URL."
+ (let ((hash (and=> (string-index store-hash #\-)
+ (cut string-take store-hash <>))))
+ (format #f "~a/~a.narinfo" publish-url hash)))
+
+(define (ensure-path* store output)
+ (guard (c ((store-protocol-error? c)
+ (log-message "Failed to add ~a to store." output)
+ #f))
+ (ensure-path store output)))
+
+(define (add-to-store outputs url)
+ "Add the OUTPUTS that are available from the substitute server at URL to the
+store."
+ (parameterize ((current-build-output-port (%make-void-port "w")))
+ (with-store store
+ (set-build-options* store url)
+ (for-each (lambda (output)
+ (ensure-path* store output))
+ (map derivation-output-path outputs)))))
+
+(define (trigger-substitutes-baking outputs url)
+ (for-each (lambda (output)
+ (let* ((path (derivation-output-path output))
+ (store-hash (strip-store-prefix path))
+ (narinfo-url (publish-narinfo-url url store-hash)))
+ (call-with-temporary-output-file
+ (lambda (tmp-file port)
+ (url-fetch* narinfo-url tmp-file)))))
+ outputs))
+
+(define (need-fetching? message)
+ "Return #t if the received MESSAGE implies that some output fetching is
+required and #f otherwise."
+ (match (zmq-read-message message)
+ (('build-succeeded _ ...)
+ #t)
+ (('build-failed _ ...)
+ #t)
+ (else #f)))
+
+(define* (run-fetch message)
+ "Read MESSAGE and download the corresponding build outputs. If
+%CACHE-DIRECTORY is set, download the matching NAR and NARINFO files in this
+directory."
+ (define (build-outputs drv)
+ (catch 'system-error
+ (lambda ()
+ (map (match-lambda
+ ((output-name . output)
+ output))
+ (derivation-outputs
+ (read-derivation-from-file drv))))
+ (const '())))
+
+ (match (zmq-read-message message)
+ (('build-succeeded ('drv drv) ('url url) _ ...)
+ (let ((outputs (build-outputs drv)))
+ (add-to-store outputs url)
+ (when (%trigger-substitute-url)
+ (trigger-substitutes-baking outputs (%trigger-substitute-url)))
+ (log-message "build succeeded: '~a'" drv)
+ (set-build-successful! drv)))
+ (('build-failed ('drv drv) ('url url) _ ...)
+ (log-message "build failed: '~a'" drv)
+ (db-update-build-status! drv (build-status failed)))))
+
+(define (start-fetch-worker name)
+ "Start a fetch worker thread with the given NAME. This worker takes care of
+downloading build outputs. It communicates with the remote server using a ZMQ
+socket."
+ (call-with-new-thread
+ (lambda ()
+ (set-thread-name name)
+ (let ((socket (zmq-fetch-worker-socket)))
+ (let loop ()
+ (match (zmq-get-msg-parts-bytevector socket)
+ ((message)
+ (run-fetch (bv->string message))))
+ (loop))))))
+
+
+;;;
+;;; ZMQ connection.
+;;;
+
+(define %zmq-context
+ (zmq-create-context))
+
+(define (zmq-backend-endpoint backend-port)
+ "Return a ZMQ endpoint string allowing TCP connections on BACKEND-PORT from
+all network interfaces."
+ (string-append "tcp://*:" (number->string backend-port)))
+
+(define (zmq-start-proxy backend-port)
+ "This procedure starts a proxy between client connections from the IPC
+frontend to the workers connected through the TCP backend."
+ (define (socket-ready? items socket)
+ (find (lambda (item)
+ (eq? (poll-item-socket item) socket))
+ items))
+
+ (let* ((build-socket
+ (zmq-create-socket %zmq-context ZMQ_ROUTER))
+ (fetch-socket
+ (zmq-create-socket %zmq-context ZMQ_PUSH))
+ (poll-items (list
+ (poll-item build-socket ZMQ_POLLIN))))
+
+ (zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
+ (zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint))
+
+ ;; Do not use the built-in zmq-proxy as we want to edit the envelope of
+ ;; frontend messages before forwarding them to the backend.
+ (let loop ()
+ (let ((items (zmq-poll* poll-items 1000)))
+ (when (zmq-socket-ready? items build-socket)
+ (match (zmq-get-msg-parts-bytevector build-socket)
+ ((worker empty rest)
+ (let ((reply-worker
+ (lambda (message)
+ (zmq-send-msg-parts-bytevector
+ build-socket
+ (list worker
+ (zmq-empty-delimiter)
+ (string->bv message))))))
+ (if (need-fetching? (bv->string rest))
+ (zmq-send-bytevector fetch-socket rest)
+ (read-worker-exp (bv->string rest)
+ #:reply-worker reply-worker))))))
+ (remove-unresponsive-workers!)
+ (loop)))))
+
+
+;;;
+;;; Entry point.
+;;;
+
+;; The PID of the publish process.
+(define %publish-pid
+ (make-atomic-box #f))
+
+;; The thread running the Avahi publish service.
+(define %avahi-thread
+ (make-atomic-box #f))
+
+(define (signal-handler)
+ "Catch SIGINT to stop the Avahi event loop and the publish process before
+exiting."
+ (sigaction SIGINT
+ (lambda (signum)
+ (let ((publish-pid (atomic-box-ref %publish-pid))
+ (avahi-thread (atomic-box-ref %avahi-thread)))
+ (atomic-box-set! %stop-process? #t)
+
+ (and publish-pid
+ (begin
+ (kill publish-pid SIGHUP)
+ (waitpid publish-pid)))
+
+ (and avahi-thread
+ (join-thread avahi-thread))
+
+ (exit 1)))))
+
+(define (gather-user-privileges user)
+ "switch to the identity of user, a user name."
+ (catch 'misc-error
+ (lambda ()
+ (let ((user (getpw user)))
+ (setgroups #())
+ (setgid (passwd:gid user))
+ (setuid (passwd:uid user))))
+ (lambda (key proc message args . rest)
+ (leave (G_ "user '~a' not found: ~a~%")
+ user (apply format #f message args)))))
+
+(define (remote-server args)
+ (signal-handler)
+
+ (with-error-handling
+ (let* ((opts (args-fold* args %options
+ (lambda (opt name arg result)
+ (leave (G_ "~A: unrecognized option~%") name))
+ (lambda (arg result)
+ (leave (G_ "~A: extraneous argument~%") arg))
+ %default-options))
+ (backend-port (assoc-ref opts 'backend-port))
+ (log-port (assoc-ref opts 'log-port))
+ (publish-port (assoc-ref opts 'publish-port))
+ (cache (assoc-ref opts 'cache))
+ (database (assoc-ref opts 'database))
+ (trigger-substitute-url (assoc-ref opts 'trigger-substitute-url))
+ (user (assoc-ref opts 'user))
+ (public-key
+ (read-file-sexp
+ (assoc-ref opts 'public-key-file)))
+ (private-key
+ (read-file-sexp
+ (assoc-ref opts 'private-key-file))))
+
+ (parameterize ((%cache-directory cache)
+ (%trigger-substitute-url trigger-substitute-url)
+ (%package-database database)
+ (%public-key public-key)
+ (%private-key private-key))
+ (when user
+ (gather-user-privileges user))
+
+ (atomic-box-set!
+ %publish-pid
+ (publish-server publish-port
+ #:public-key public-key
+ #:private-key private-key))
+
+ (atomic-box-set!
+ %avahi-thread
+ (avahi-publish-service-thread
+ service-name
+ #:type remote-server-service-type
+ #:port backend-port
+ #:stop-loop? (lambda ()
+ (atomic-box-ref %stop-process?))
+ #:txt (list (string-append "log-port="
+ (number->string log-port))
+ (string-append "publish-port="
+ (number->string publish-port)))))
+
+ (receive-logs log-port (%cache-directory))
+
+ (with-database
+ (for-each (lambda (number)
+ (start-fetch-worker
+ (string-append "fetch-worker-"
+ (number->string number))))
+ (iota 4))
+
+ (zmq-start-proxy backend-port))))))
diff --git a/src/cuirass/remote-worker.scm b/src/cuirass/remote-worker.scm
new file mode 100644
index 0000000..d4ed022
--- /dev/null
+++ b/src/cuirass/remote-worker.scm
@@ -0,0 +1,382 @@
+;;; remote-worker.scm -- Remote build worker.
+;;; Copyright © 2020 Mathieu Othacehe <othacehe@gnu.org>
+;;;
+;;; This file is part of Cuirass.
+;;;
+;;; GNU Guix is free software; you can redistribute it and/or modify it
+;;; under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation; either version 3 of the License, or (at
+;;; your option) any later version.
+;;;
+;;; GNU Guix is distributed in the hope that it will be useful, but
+;;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with GNU Guix. If not, see <http://www.gnu.org/licenses/>.
+
+(define-module (cuirass remote-worker)
+ #:use-module (cuirass base)
+ #:use-module (cuirass remote)
+ #:use-module (gcrypt pk-crypto)
+ #:use-module (guix avahi)
+ #:use-module (guix config)
+ #:use-module (guix derivations)
+ #:use-module (guix diagnostics)
+ #:use-module (guix pki)
+ #:use-module (guix records)
+ #:use-module (guix scripts)
+ #:use-module (guix serialization)
+ #:use-module ((guix store)
+ #:select (current-build-output-port
+ store-error?
+ store-protocol-error?
+ store-protocol-error-message
+ with-store))
+ #:use-module (guix ui)
+ #:use-module (guix utils)
+ #:use-module (guix build syscalls)
+ #:use-module (guix build utils)
+ #:use-module (guix scripts publish)
+ #:use-module (simple-zmq)
+ #:use-module (rnrs bytevectors)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-26)
+ #:use-module (srfi srfi-34)
+ #:use-module (srfi srfi-37)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
+
+ #:export (remote-worker))
+
+;; Indicate if the process has to be stopped.
+(define %stop-process?
+ (make-atomic-box #f))
+
+(define (show-help)
+ (format #t (G_ "Usage: remote-worker [OPTION]...
+Start a remote build worker.\n"))
+ (display (G_ "
+ -w, --workers=COUNT start COUNT parallel workers"))
+ (display (G_ "
+ -p, --publish-port=PORT publish substitutes on PORT"))
+ (display (G_ "
+ -S, --server=SERVER connect to SERVER"))
+ (display (G_ "
+ -s, --systems=SYSTEMS list of supported SYSTEMS"))
+ (display (G_ "
+ --public-key=FILE use FILE as the public key for signatures"))
+ (display (G_ "
+ --private-key=FILE use FILE as the private key for signatures"))
+ (newline)
+ (display (G_ "
+ -h, --help display this help and exit"))
+ (display (G_ "
+ -V, --version display version information and exit"))
+ (newline)
+ (show-bug-report-information))
+
+(define %options
+ (list (option '(#\h "help") #f #f
+ (lambda _
+ (show-help)
+ (exit 0)))
+ (option '(#\V "version") #f #f
+ (lambda _
+ (show-version-and-exit "guix publish")))
+ (option '(#\a "address") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'address arg result)))
+ (option '(#\w "workers") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'workers (string->number* arg) result)))
+ (option '(#\p "publish-port") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'publish-port (string->number* arg) result)))
+ (option '(#\s "server") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'server arg result)))
+ (option '(#\S "systems") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'systems
+ (string-split arg #\,) result)))
+ (option '("public-key") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'public-key-file arg result)))
+ (option '("private-key") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'private-key-file arg result)))))
+
+(define %default-options
+ `((workers . 1)
+ (publish-port . 5558)
+ (systems . ,(list (%current-system)))
+ (public-key-file . ,%public-key-file)
+ (private-key-file . ,%private-key-file)))
+
+
+;;;
+;;; ZMQ connection.
+;;;
+
+(define %zmq-context
+ (zmq-create-context))
+
+(define (zmq-backend-endpoint address port)
+ "Return a ZMQ endpoint identifying the build server available by TCP at
+ADDRESS and PORT."
+ (string-append "tcp://" address ":" (number->string port)))
+
+(define (zmq-dealer-socket)
+ "The ZMQ socket to communicate with the worker threads."
+ (zmq-create-socket %zmq-context ZMQ_DEALER))
+
+
+;;;
+;;; Worker.
+;;;
+
+;; The port of the local publish server.
+(define %local-publish-port
+ (make-atomic-box #f))
+
+(define (local-publish-url address)
+ "Return the URL of the local publish server."
+ (let ((port (atomic-box-ref %local-publish-port)))
+ (publish-url address port)))
+
+(define* (run-build drv server
+ #:key
+ reply
+ timeout
+ max-silent
+ worker)
+ "Build DRV and send messages upon build start, failure or completion to the
+build server identified by SERVICE-NAME using the REPLY procedure.
+
+The publish server of the build server is added to the list of the store
+substitutes-urls. This way derivations that are not present on the worker can
+still be substituted."
+ (with-store store
+ (let ((address (server-address server))
+ (log-port (server-log-port server))
+ (publish-url (server-publish-url server))
+ (local-publish-url (worker-publish-url worker))
+ (name (worker-name worker)))
+ (set-build-options* store publish-url
+ #:timeout timeout
+ #:max-silent max-silent)
+ (reply (zmq-build-started-message drv name))
+ (guard (c ((store-protocol-error? c)
+ (info (G_ "Derivation `~a' build failed: ~a~%")
+ drv (store-protocol-error-message c))
+ (reply (zmq-build-failed-message drv local-publish-url))))
+ (let ((result
+ (let-values (((port finish)
+ (build-derivations& store (list drv))))
+ (send-log address log-port drv port)
+ (close-port port)
+ (finish))))
+ (if result
+ (begin
+ (info (G_ "Derivation ~a build succeeded.~%") drv)
+ (reply (zmq-build-succeeded-message drv local-publish-url)))
+ (begin
+ (info (G_ "Derivation ~a build failed.~%") drv)
+ (reply
+ (zmq-build-failed-message drv local-publish-url)))))))))
+
+(define* (run-command command server
+ #:key
+ reply worker)
+ "Run COMMAND. SERVICE-NAME is the name of the build server that sent the
+command. REPLY is a procedure that can be used to reply to this server."
+ (match (zmq-read-message command)
+ (('build ('drv drv)
+ ('priority priority)
+ ('timeout timeout)
+ ('max-silent max-silent)
+ ('timestamp timestamp)
+ ('system system))
+ (info (G_ "Building `~a' derivation.~%") drv)
+ (run-build drv server
+ #:reply reply
+ #:worker worker
+ #:timeout timeout
+ #:max-silent max-silent))
+ (('no-build)
+ #t)))
+
+(define (worker-ping worker server)
+ (define (ping socket)
+ (zmq-send-msg-parts-bytevector
+ socket
+ (list (make-bytevector 0)
+ (string->bv
+ (zmq-worker-ping (worker->sexp worker))))))
+
+ (call-with-new-thread
+ (lambda ()
+ (let* ((socket (zmq-dealer-socket))
+ (address (server-address server))
+ (port (server-port server))
+ (endpoint (zmq-backend-endpoint address port)))
+ (zmq-connect socket endpoint)
+ (let loop ()
+ (ping socket)
+ (sleep 60)
+ (loop))))))
+
+(define (start-worker worker server)
+ "Start a worker thread named NAME, reading commands from the DEALER socket
+and executing them. The worker can reply on the same socket."
+ (define (reply socket)
+ (lambda (message)
+ (zmq-send-msg-parts-bytevector
+ socket
+ (list (zmq-empty-delimiter) (string->bv message)))))
+
+ (define (ready socket)
+ (zmq-send-msg-parts-bytevector
+ socket
+ (list (make-bytevector 0)
+ (string->bv
+ (zmq-worker-ready-message (worker->sexp worker))))))
+
+ (define (request-work socket)
+ (let ((name (worker-name worker)))
+ (zmq-send-msg-parts-bytevector
+ socket
+ (list (make-bytevector 0)
+ (string->bv (zmq-worker-request-work-message name))))))
+
+ (match (primitive-fork)
+ (0
+ (set-thread-name (worker-name worker))
+ (let* ((socket (zmq-dealer-socket))
+ (address (server-address server))
+ (port (server-port server))
+ (endpoint (zmq-backend-endpoint address port)))
+ (zmq-connect socket endpoint)
+ (ready socket)
+ (worker-ping worker server)
+ (let loop ()
+ (request-work socket)
+ (match (zmq-get-msg-parts-bytevector socket '())
+ ((empty command)
+ (run-command (bv->string command) server
+ #:reply (reply socket)
+ #:worker worker)))
+ (sleep 10)
+ (loop))))
+ (pid pid)))
+
+
+;;;
+;;; Entry point.
+;;;
+
+;; The PID of the publish process.
+(define %publish-pid
+ (make-atomic-box #f))
+
+(define %worker-pids
+ (make-atomic-box '()))
+
+(define (load-server file)
+ (let ((user-module (make-user-module '((cuirass remote)))))
+ (load* file user-module)))
+
+(define (add-to-worker-pids! pid)
+ (let ((pids (atomic-box-ref %worker-pids)))
+ (atomic-box-set! %worker-pids (cons pid pids))))
+
+(define (signal-handler)
+ "Catch SIGINT to stop the Avahi event loop and the publish process before
+exiting."
+ (sigaction SIGINT
+ (lambda (signum)
+ (let ((publish-pid (atomic-box-ref %publish-pid))
+ (worker-pids (atomic-box-ref %worker-pids)))
+ (atomic-box-set! %stop-process? #t)
+
+ (for-each (lambda (pid)
+ (when pid
+ (kill pid SIGKILL)
+ (waitpid pid)))
+ (cons publish-pid worker-pids))
+
+ (exit 1)))))
+
+(define (remote-worker args)
+ (signal-handler)
+
+ (with-error-handling
+ (let* ((opts (args-fold* args %options
+ (lambda (opt name arg result)
+ (leave (G_ "~A: unrecognized option~%") name))
+ (lambda (arg result)
+ (leave (G_ "~A: extraneous argument~%") arg))
+ %default-options))
+ (address (assoc-ref opts 'address))
+ (workers (assoc-ref opts 'workers))
+ (publish-port (assoc-ref opts 'publish-port))
+ (server (assoc-ref opts 'server))
+ (systems (assoc-ref opts 'systems))
+ (public-key
+ (read-file-sexp
+ (assoc-ref opts 'public-key-file)))
+ (private-key
+ (read-file-sexp
+ (assoc-ref opts 'private-key-file))))
+
+ (atomic-box-set! %local-publish-port publish-port)
+
+ (atomic-box-set!
+ %publish-pid
+ (publish-server publish-port
+ #:public-key public-key
+ #:private-key private-key))
+
+ (when (and server (not address))
+ (leave (G_ "Address must be set when server is provided.~%")))
+
+ (if server
+ (let ((server (load-server server)))
+ (for-each
+ (lambda (n)
+ (let ((publish-url (local-publish-url address)))
+ (add-to-worker-pids!
+ (start-worker (worker
+ (address address)
+ (publish-url publish-url)
+ (name (generate-worker-name))
+ (systems systems))
+ server))))
+ (iota workers))
+ (while #t
+ (sleep 1)))
+ (avahi-browse-service-thread
+ (lambda (action service)
+ (case action
+ ((new-service)
+ (for-each
+ (lambda (n)
+ (let* ((address (or address
+ (avahi-service-local-address service)))
+ (publish-url (local-publish-url address)))
+ (add-to-worker-pids!
+ (start-worker (worker
+ (address address)
+ (publish-url publish-url)
+ (name (generate-worker-name))
+ (systems systems))
+ (avahi-service->server service)))))
+ (iota workers)))))
+ #:ignore-local? #f
+ #:types (list remote-server-service-type)
+ #:stop-loop? (lambda ()
+ (atomic-box-ref %stop-process?)))))))
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
new file mode 100644
index 0000000..32f65bb
--- /dev/null
+++ b/src/cuirass/remote.scm
@@ -0,0 +1,437 @@
+;;; remote.scm -- Build on remote machines.
+;;; Copyright © 2020 Mathieu Othacehe <othacehe@gnu.org>
+;;;
+;;; This file is part of Cuirass.
+;;;
+;;; GNU Guix is free software; you can redistribute it and/or modify it
+;;; under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation; either version 3 of the License, or (at
+;;; your option) any later version.
+;;;
+;;; GNU Guix is distributed in the hope that it will be useful, but
+;;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with GNU Guix. If not, see <http://www.gnu.org/licenses/>.
+
+(define-module (cuirass remote)
+ #:use-module (cuirass logging)
+ #:use-module (guix avahi)
+ #:use-module (guix config)
+ #:use-module (guix derivations)
+ #:use-module (guix records)
+ #:use-module (guix store)
+ #:use-module (guix ui)
+ #:use-module (guix utils)
+ #:use-module (guix build download)
+ #:use-module (guix build syscalls)
+ #:use-module ((guix build utils) #:select (dump-port mkdir-p))
+ #:use-module (guix scripts publish)
+ #:use-module (simple-zmq)
+ #:use-module (zlib)
+ #:use-module (rnrs bytevectors)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-26)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 rdelim)
+ #:use-module (ice-9 threads)
+ #:export (worker
+ worker?
+ worker-address
+ worker-name
+ worker-publish-url
+ worker-systems
+ worker-last-seen
+ worker->sexp
+ sexp->worker
+ generate-worker-name
+ %worker-timeout
+
+ server
+ server?
+ server-address
+ server-port
+ server-log-port
+ server-publish-url
+ publish-url
+ avahi-service->server
+
+ publish-server
+ set-build-options*
+
+ strip-store-prefix
+ log-path
+ receive-logs
+ send-log
+
+ zmq-poll*
+ zmq-socket-ready?
+ zmq-empty-delimiter
+
+ zmq-build-request-message
+ zmq-no-build-message
+ zmq-build-started-message
+ zmq-build-failed-message
+ zmq-build-succeeded-message
+ zmq-worker-ping
+ zmq-worker-ready-message
+ zmq-worker-request-work-message
+ zmq-read-message
+
+ remote-server-service-type))
+
+
+;;;
+;;; Workers.
+;;;
+
+(define-record-type* <worker>
+ worker make-worker
+ worker?
+ (address worker-address)
+ (name worker-name)
+ (publish-url worker-publish-url
+ (default #f))
+ (systems worker-systems)
+ (last-seen worker-last-seen
+ (default 0)))
+
+(define (worker->sexp worker)
+ "Return an sexp describing WORKER."
+ (let ((address (worker-address worker))
+ (name (worker-name worker))
+ (systems (worker-systems worker))
+ (last-seen (worker-last-seen worker)))
+ `(worker
+ (address ,address)
+ (name ,name)
+ (systems ,systems)
+ (last-seen ,last-seen))))
+
+(define (sexp->worker sexp)
+ "Turn SEXP, an sexp as returned by 'worker->sexp', into a <worker> record."
+ (match sexp
+ (('worker ('address address)
+ ('name name)
+ ('systems systems)
+ ('last-seen last-seen))
+ (worker
+ (address address)
+ (name name)
+ (systems systems)
+ (last-seen last-seen)))))
+
+(define %seed
+ (seed->random-state
+ (logxor (getpid) (car (gettimeofday)))))
+
+(define (integer->alphanumeric-char n)
+ "Map N, an integer in the [0..62] range, to an alphanumeric character."
+ (cond ((< n 10)
+ (integer->char (+ (char->integer #\0) n)))
+ ((< n 36)
+ (integer->char (+ (char->integer #\A) (- n 10))))
+ ((< n 62)
+ (integer->char (+ (char->integer #\a) (- n 36))))
+ (else
+ (error "integer out of bounds" n))))
+
+(define (random-string len)
+ "Compute a random string of size LEN where each character is alphanumeric."
+ (let loop ((chars '())
+ (len len))
+ (if (zero? len)
+ (list->string chars)
+ (let ((n (random 62 %seed)))
+ (loop (cons (integer->alphanumeric-char n) chars)
+ (- len 1))))))
+
+(define (generate-worker-name)
+ "Return the service name of the server."
+ (string-append (gethostname) "-" (random-string 4)))
+
+(define %worker-timeout
+ (make-parameter 120))
+
+
+;;;
+;;; Server.
+;;;
+
+(define-record-type* <server>
+ server make-server
+ server?
+ (address server-address)
+ (port server-port)
+ (log-port server-log-port)
+ (publish-url server-publish-url))
+
+(define (publish-url address port)
+ "Return the publish url at ADDRESS and PORT."
+ (string-append "http://" address ":" (number->string port)))
+
+(define (avahi-service->params service)
+ "Return the URL of the publish server corresponding to the service with the
+given NAME."
+ (define (service-txt->params txt)
+ "Parse the service TXT record."
+ (fold (lambda (param params)
+ (match (string-split param #\=)
+ ((key value)
+ (cons (cons (string->symbol key) value)
+ params))))
+ '()
+ txt))
+
+ (define (number-param params param)
+ (string->number (assq-ref params param)))
+
+ (let* ((address (avahi-service-address service))
+ (txt (avahi-service-txt service))
+ (params (service-txt->params txt))
+ (log-port (number-param params 'log-port))
+ (publish-port (number-param params 'publish-port))
+ (publish-url (publish-url address publish-port)))
+ `((#:log-port . ,log-port)
+ (#:publish-url . ,publish-url))))
+
+(define (avahi-service->server service)
+ (let* ((address (avahi-service-address service))
+ (port (avahi-service-port service))
+ (params (avahi-service->params service))
+ (log-port (assq-ref params #:log-port))
+ (publish-url (assq-ref params #:publish-url)))
+ (server
+ (address address)
+ (port port)
+ (log-port log-port)
+ (publish-url publish-url))))
+
+
+;;;
+;;; Store publishing.
+;;;
+
+(define* (set-build-options* store url
+ #:key
+ timeout
+ max-silent)
+ "Add URL to the list of STORE substitutes-urls."
+ (set-build-options store
+ #:use-substitutes? #t
+ #:fallback? #t
+ #:keep-going? #t
+ #:timeout timeout
+ #:max-silent-time max-silent
+ #:verbosity 1
+ #:substitute-urls
+ (cons url %default-substitute-urls)))
+
+(define* (publish-server port
+ #:key
+ public-key
+ private-key)
+ "This procedure starts a publishing server listening on PORT in a new
+process and returns the pid of the forked process. Use PUBLIC-KEY and
+PRIVATE-KEY to sign narinfos."
+ (match (primitive-fork)
+ (0
+ (parameterize ((%public-key public-key)
+ (%private-key private-key))
+ (with-store store
+ (let ((log-file (open-file "/tmp/publish.log" "w")))
+ (close-fdes 1)
+ (close-fdes 2)
+ (dup2 (fileno log-file) 1)
+ (dup2 (fileno log-file) 2)
+ (close-port log-file)
+ (let* ((address (make-socket-address AF_INET INADDR_ANY 0))
+ (socket-address
+ (make-socket-address (sockaddr:fam address)
+ (sockaddr:addr address)
+ port))
+ (socket (open-server-socket socket-address)))
+ (run-publish-server socket store
+ #:compressions
+ (list %default-gzip-compression)))))))
+ (pid pid)))
+
+
+;;;
+;;; Logs.
+;;;
+
+(define (strip-store-prefix file)
+ ; Given a file name like "/gnu/store/…-foo-1.2/bin/foo", return
+ ;; "/bin/foo".
+ (let* ((len (string-length %store-directory))
+ (base (string-drop file (+ 1 len))))
+ (match (string-index base #\/)
+ (#f base)
+ (index (string-drop base index)))))
+
+(define (log-path cache derivation)
+ (let* ((store-hash (strip-store-prefix derivation))
+ (hash (and=> (string-index store-hash #\-)
+ (cut string-take store-hash <>))))
+ (string-append cache "/" hash ".log.gz")))
+
+(define (receive-logs port cache)
+ (define (read-log port)
+ (match (false-if-exception (read port))
+ (('log ('version 0)
+ ('derivation derivation))
+ (let ((file (log-path cache derivation)))
+ (call-with-output-file file
+ (lambda (output)
+ (dump-port port output)))))
+ (_
+ (log-message "invalid log received.~%")
+ #f)))
+
+ (define (wait-for-client port proc)
+ (let ((sock (socket AF_INET SOCK_STREAM 0)))
+ (setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
+ (bind sock AF_INET INADDR_ANY port)
+ (listen sock 1024)
+ (while #t
+ (match (select (list sock) '() '() 60)
+ (((_) () ())
+ (match (accept sock)
+ ((client . address)
+ (write '(log-server (version 0)) client)
+ (force-output client)
+ (proc client))))
+ ((() () ())
+ #f)))))
+
+ (define (client-handler client)
+ (call-with-new-thread
+ (lambda ()
+ (set-thread-name
+ (string-append "log-server-"
+ (number->string (port->fdes client))))
+ (and=> client read-log)
+ (when client
+ (close-port client)))))
+
+ (call-with-new-thread
+ (lambda ()
+ (set-thread-name "log-server")
+ (wait-for-client port client-handler))))
+
+(define* (send-log address port derivation log)
+ (let* ((sock (socket AF_INET SOCK_STREAM 0))
+ (in-addr (inet-pton AF_INET address))
+ (addr (make-socket-address AF_INET in-addr port)))
+ (connect sock addr)
+ (match (select (list sock) '() '() 10)
+ (((_) () ())
+ (match (read sock)
+ (('log-server ('version version ...))
+ (let ((header `(log
+ (version 0)
+ (derivation ,derivation))))
+ (write header sock)
+ (call-with-gzip-output-port sock
+ (lambda (sock-compressed)
+ (dump-port log sock-compressed)))
+ (close-port sock)))
+ (x
+ (log-message "invalid handshake ~s.~%" x)
+ (close-port sock)
+ #f)))
+ ((() () ()) ;timeout
+ (log "timeout while sending files to ~a.~%" port)
+ (close-port sock)
+ #f))))
+
+
+;;;
+;;; ZMQ.
+;;;
+
+(define %zmq-context
+ (zmq-create-context))
+
+(define (EINTR-safe proc)
+ "Return a variant of PROC that catches EINTR 'zmq-error' exceptions and
+retries a call to PROC."
+ (define (safe . args)
+ (catch 'zmq-error
+ (lambda ()
+ (apply proc args))
+ (lambda (key errno . rest)
+ (if (= errno EINTR)
+ (apply safe args)
+ (apply throw key errno rest)))))
+
+ safe)
+
+(define zmq-poll*
+ ;; Return a variant of ZMQ-POLL that catches EINTR errors.
+ (EINTR-safe zmq-poll))
+
+(define (zmq-socket-ready? items socket)
+ "Return #t if the given SOCKET is part of ITEMS, a list returned by a
+'zmq-poll' call, return #f otherwise."
+ (find (lambda (item)
+ (eq? (poll-item-socket item) socket))
+ items))
+
+(define (zmq-read-message msg)
+ (call-with-input-string msg read))
+
+(define (zmq-empty-delimiter)
+ "Return an empty ZMQ delimiter used to format message envelopes."
+ (make-bytevector 0))
+
+;; ZMQ Messages.
+(define* (zmq-build-request-message drv
+ #:key
+ priority
+ timeout
+ max-silent
+ timestamp
+ system)
+ "Return a message requesting the build of DRV for SYSTEM."
+ (format #f "~s" `(build (drv ,drv)
+ (priority ,priority)
+ (timeout ,timeout)
+ (max-silent ,max-silent)
+ (timestamp ,timestamp)
+ (system ,system))))
+
+(define (zmq-no-build-message)
+ "Return a message that indicates that no builds are available."
+ (format #f "~s" `(no-build)))
+
+(define (zmq-build-started-message drv worker)
+ "Return a message that indicates that the build of DRV has started."
+ (format #f "~s" `(build-started (drv ,drv) (worker ,worker))))
+
+(define* (zmq-build-failed-message drv url #:optional log)
+ "Return a message that indicates that the build of DRV has failed."
+ (format #f "~s" `(build-failed (drv ,drv) (url ,url) (log ,log))))
+
+(define* (zmq-build-succeeded-message drv url #:optional log)
+ "Return a message that indicates that the build of DRV is done."
+ (format #f "~s" `(build-succeeded (drv ,drv) (url ,url) (log ,log))))
+
+(define (zmq-worker-ping worker)
+ "Return a message that indicates that WORKER is alive."
+ (format #f "~s" `(worker-ping ,worker)))
+
+(define (zmq-worker-ready-message worker)
+ "Return a message that indicates that WORKER is ready."
+ (format #f "~s" `(worker-ready ,worker)))
+
+(define (zmq-worker-request-work-message name)
+ "Return a message that indicates that WORKER is requesting work."
+ (format #f "~s" `(worker-request-work ,name)))
+
+(define remote-server-service-type
+ "_remote-server._tcp")
diff --git a/src/cuirass/templates.scm b/src/cuirass/templates.scm
index 70737fc..e55e1cb 100644
--- a/src/cuirass/templates.scm
+++ b/src/cuirass/templates.scm
@@ -34,6 +34,7 @@
#:use-module ((guix utils) #:select (string-replace-substring))
#:use-module ((cuirass database) #:select (build-status
evaluation-status))
+ #:use-module (cuirass remote)
#:export (html-page
specifications-table
evaluation-info-table
@@ -42,7 +43,8 @@
build-details
evaluation-build-table
running-builds-table
- global-metrics-content))
+ global-metrics-content
+ workers-status))
(define (navigation-items navigation)
(match navigation
@@ -137,6 +139,9 @@ system whose names start with " (code "guile-") ":" (br)
(href "/metrics"))
"Global metrics")
(a (@ (class "dropdown-item")
+ (href "/workers"))
+ "Workers status")
+ (a (@ (class "dropdown-item")
(href "/status"))
"Running builds")))
(li (@ (class "nav-item"))
@@ -293,10 +298,8 @@ system whose names start with " (code "guile-") ":" (br)
(time->string (assq-ref build #:stoptime))
"—")))
(tr (th "Log file")
- (td ,(if completed?
- `(a (@ (href "/build/" ,(assq-ref build #:id) "/log/raw"))
- "raw")
- "—")))
+ (td (a (@ (href "/build/" ,(assq-ref build #:id) "/log/raw"))
+ "raw")))
(tr (th "Derivation")
(td (pre ,(assq-ref build #:derivation))))
(tr (th "Outputs")
@@ -515,10 +518,8 @@ and BUILD-MAX are global minimal and maximal (stoptime,
rowid) pairs."
(td ,(assq-ref build #:job))
(td ,(assq-ref build #:nixname))
(td ,(assq-ref build #:system))
- (td ,(if completed?
- `(a (@ (href "/build/" ,(assq-ref build #:id) "/log/raw"))
- "raw")
- "—"))))
+ (td (a (@ (href "/build/" ,(assq-ref build #:id) "/log/raw"))
+ "raw"))))
(define (build-id build)
(match build
@@ -810,7 +811,9 @@ and BUILD-MAX are global minimal and maximal row
identifiers."
(td ,(assq-ref build #:job-name))
(td ,(time->string
(assq-ref build #:starttime)))
- (td ,(assq-ref build #:system))))
+ (td ,(assq-ref build #:system))
+ (td (a (@ (href "/build/" ,(assq-ref build #:id) "/log/raw"))
+ "raw"))))
`((p (@ (class "lead")) "Running builds")
(table
@@ -820,7 +823,8 @@ and BUILD-MAX are global minimal and maximal row
identifiers."
`((thead (tr (th (@ (scope "col")) "ID")
(th (@ (scope "col")) "Job")
(th (@ (scope "col")) "Queued at")
- (th (@ (scope "col")) "System")))
+ (th (@ (scope "col")) "System")
+ (th (@ (scope "col")) "Log")))
(tbody
,(map build-row builds)))))))
@@ -1013,3 +1017,41 @@ completed builds divided by the time required to build
them.")
#:title "Pending builds"
#:labels '("Pending builds")
#:colors (list "#3e95cd")))))
+
+(define (workers-status workers builds)
+ (define (build-row build)
+ `(tr
+ (th (@ (scope "row"))
+ (a (@ (href "/build/" ,(assq-ref build #:id) "/details"))
+ ,(assq-ref build #:id)))
+ (td ,(assq-ref build #:job-name))
+ (td ,(time->string
+ (assq-ref build #:starttime)))
+ (td ,(assq-ref build #:system))
+ (td (a (@ (href "/build/" ,(assq-ref build #:id) "/log/raw"))
+ "raw"))))
+
+ (define (worker-header worker)
+ `((p ,(integer->char 128994)
+ " "
+ (b ,(worker-name worker))
+ ,(format #f " (~a, ~{~a ~})"
+ (worker-address worker)
+ (worker-systems worker)))))
+
+ (define (worker-table worker builds)
+ `(,@(worker-header worker)
+ (table
+ (@ (class "table table-sm table-hover table-striped"))
+ ,@(if (null? builds)
+ `((th (@ (scope "col")) "Idle"))
+ `((thead (tr (th (@ (scope "col")) "ID")
+ (th (@ (scope "col")) "Job")
+ (th (@ (scope "col")) "Queued at")
+ (th (@ (scope "col")) "System")
+ (th (@ (scope "col")) "Log")))
+ (tbody
+ ,(map build-row builds)))))))
+
+ `((p (@ (class "lead")) "Workers status")
+ ,@(map worker-table workers builds)))
diff --git a/src/schema.sql b/src/schema.sql
index 51d0c80..761b48f 100644
--- a/src/schema.sql
+++ b/src/schema.sql
@@ -7,8 +7,9 @@ CREATE TABLE Specifications (
proc_input TEXT NOT NULL, -- name of the input containing the proc that
does the evaluation
proc_file TEXT NOT NULL, -- file containing the procedure that does the
evaluation, relative to proc_input
proc TEXT NOT NULL, -- defined in proc_file
- proc_args TEXT NOT NULL, -- passed to proc
- build_outputs TEXT NOT NULL --specify what build outputs should be made
available for download
+ proc_args TEXT NOT NULL, -- passed to proc
+ build_outputs TEXT NOT NULL, --specify what build outputs should be made
available for download
+ priority INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE Inputs (
@@ -61,10 +62,13 @@ CREATE TABLE Builds (
evaluation INTEGER NOT NULL,
job_name TEXT NOT NULL,
system TEXT NOT NULL,
- machine TEXT, --optional, machine performing the build.
+ worker TEXT, --optional, worker performing the build.
nix_name TEXT NOT NULL,
log TEXT NOT NULL,
status INTEGER NOT NULL,
+ priority INTEGER NOT NULL DEFAULT 0,
+ max_silent INTEGER NOT NULL DEFAULT 0,
+ timeout INTEGER NOT NULL DEFAULT 0,
timestamp INTEGER NOT NULL,
starttime INTEGER NOT NULL,
stoptime INTEGER NOT NULL,
@@ -96,6 +100,13 @@ CREATE TABLE Events (
event_json TEXT NOT NULL
);
+CREATE TABLE Workers (
+ name TEXT NOT NULL PRIMARY KEY,
+ address TEXT NOT NULL,
+ systems TEXT NOT NULL,
+ last_seen INTEGER NOT NULL
+);
+
-- XXX: All queries targeting Builds and Outputs tables *must* be covered by
-- an index. It is also preferable for the other tables.
CREATE INDEX Builds_status_index ON Builds (status);
@@ -106,6 +117,7 @@ CREATE INDEX Builds_timestamp_stoptime on Builds(timestamp,
stoptime);
CREATE INDEX Builds_stoptime on Builds(stoptime DESC);
CREATE INDEX Builds_stoptime_id on Builds(stoptime DESC, id DESC);
CREATE INDEX Builds_status_ts_id on Builds(status DESC, timestamp DESC, id
ASC);
+CREATE INDEX Builds_priority_timestamp on Builds(priority DESC, timestamp ASC);
CREATE INDEX Evaluations_status_index ON Evaluations (id, status);
CREATE INDEX Evaluations_specification_index ON Evaluations (specification, id
DESC);
diff --git a/src/sql/upgrade-17.sql b/src/sql/upgrade-17.sql
index f74bb92..065ca5f 100644
--- a/src/sql/upgrade-17.sql
+++ b/src/sql/upgrade-17.sql
@@ -1,5 +1,5 @@
BEGIN TRANSACTION;
-ALTER TABLE Builds ADD machine TEXT DEFAULT NULL;
+ALTER TABLE Builds ADD worker TEXT DEFAULT NULL;
COMMIT;
diff --git a/src/sql/upgrade-18.sql b/src/sql/upgrade-18.sql
new file mode 100644
index 0000000..13b9f01
--- /dev/null
+++ b/src/sql/upgrade-18.sql
@@ -0,0 +1,10 @@
+BEGIN TRANSACTION;
+
+CREATE TABLE Workers (
+ name TEXT NOT NULL PRIMARY KEY,
+ address TEXT NOT NULL,
+ systems TEXT NOT NULL,
+ last_seen INTEGER NOT NULL
+);
+
+COMMIT;
diff --git a/src/sql/upgrade-19.sql b/src/sql/upgrade-19.sql
new file mode 100644
index 0000000..4213e11
--- /dev/null
+++ b/src/sql/upgrade-19.sql
@@ -0,0 +1,11 @@
+BEGIN TRANSACTION;
+
+ALTER TABLE Specifications ADD priority INTEGER NOT NULL DEFAULT 0;
+
+ALTER TABLE Builds ADD priority INTEGER NOT NULL DEFAULT 0;
+ALTER TABLE Builds ADD max_silent INTEGER NOT NULL DEFAULT 0;
+ALTER TABLE Builds ADD timeout INTEGER NOT NULL DEFAULT 0;
+
+CREATE INDEX Builds_priority_timestamp on Builds(priority DESC, timestamp ASC);
+
+COMMIT;
diff --git a/tests/database.scm b/tests/database.scm
index 73b347c..d5fa060 100644
--- a/tests/database.scm
+++ b/tests/database.scm
@@ -47,7 +47,8 @@
(#:tag . #f)
(#:commit . #f)
(#:no-compile? . #f))))
- (#:build-outputs . ())))
+ (#:build-outputs . ())
+ (#:priority . 9)))
(define (make-dummy-checkouts fakesha1 fakesha2)
`(((#:commit . ,fakesha1)
diff --git a/tests/http.scm b/tests/http.scm
index 8642425..02f4b08 100644
--- a/tests/http.scm
+++ b/tests/http.scm
@@ -218,12 +218,6 @@
(object->json-string build-query-result)
json->scm)))
- (test-equal "/build/1/log/raw"
- `(302 ,(string->uri-reference "/log/fake-1.0"))
- (let ((response (http-get (test-cuirass-uri "/build/1/log/raw"))))
- (list (response-code response)
- (response-location response))))
-
(test-equal "/build/42"
404
(response-code (http-get (test-cuirass-uri "/build/42"))))