guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

39/118: nix-copy-closure: Fix race condition


From: Ludovic Courtès
Subject: 39/118: nix-copy-closure: Fix race condition
Date: Tue, 19 May 2015 14:45:31 +0000

civodul pushed a commit to branch nix
in repository guix.

commit 04170d06bf7d17f882c01d3ab98885e0f3e46d2f
Author: Eelco Dolstra <address@hidden>
Date:   Thu Jul 10 11:51:22 2014 +0200

    nix-copy-closure: Fix race condition
    
    There is a long-standing race condition when copying a closure to a
    remote machine, particularly affecting build-remote.pl: the client
    first asks the remote machine which paths it already has, then copies
    over the missing paths. If the garbage collector kicks in on the
    remote machine between the first and second step, the already-present
    paths may be deleted. The missing paths may then refer to deleted
    paths, causing nix-copy-closure to fail. The client now performs both
    steps using a single remote Nix call (using ‘nix-store --serve’),
    locking all paths in the closure while querying.
    
    I changed the --serve protocol a bit (getting rid of QueryCommand), so
    this breaks the SSH substituter from older versions. But it was marked
    experimental anyway.
    
    Fixes #141.
---
 perl/lib/Nix/CopyClosure.pm              |   77 +++++++++++++++++++++++-
 src/download-via-ssh/download-via-ssh.cc |    8 +-
 src/nix-store/nix-store.cc               |   96 ++++++++++++++++-------------
 src/nix-store/serve-protocol.hh          |   15 ++---
 4 files changed, 136 insertions(+), 60 deletions(-)

diff --git a/perl/lib/Nix/CopyClosure.pm b/perl/lib/Nix/CopyClosure.pm
index 41ceabd..cba365a 100644
--- a/perl/lib/Nix/CopyClosure.pm
+++ b/perl/lib/Nix/CopyClosure.pm
@@ -4,6 +4,15 @@ use strict;
 use Nix::Config;
 use Nix::Store;
 use List::Util qw(sum);
+use IPC::Open2;
+
+
+sub readInt {
+    my ($from) = @_;
+    my $resp;
+    sysread($from, $resp, 8) == 8 or die "did not receive valid reply from 
remote host\n";
+    return unpack("L<x4", $resp);
+}
 
 
 sub copyTo {
@@ -20,14 +29,76 @@ sub copyTo {
         # Ignore exit status because this is just an optimisation.
     }
 
+    # Start ‘nix-store --serve’ on the remote host.
+    my ($from, $to);
+    my $pid = open2($from, $to, "ssh $sshHost @{$sshOpts} nix-store --serve 
--write");
+
+    # Do the handshake.
+    eval {
+        my $SERVE_MAGIC_1 = 0x390c9deb; # FIXME
+        my $clientVersion = 0x200;
+        syswrite($to, pack("L<x4L<x4", $SERVE_MAGIC_1, $clientVersion)) or die;
+        die "did not get valid handshake from remote host\n" if readInt($from) 
!= 0x5452eecb;
+        my $serverVersion = readInt($from);
+        die "unsupported server version\n" if $serverVersion < 0x200 || 
$serverVersion >= 0x300;
+    };
+    if ($@) {
+        chomp $@;
+        warn "$@; falling back to old closure copying method\n";
+        return oldCopyTo(address@hidden, @_);
+    }
+
+    # Send the "query valid paths" command with the "lock" option
+    # enabled. This prevens a race where the remote host
+    # garbage-collect paths that are already there.
+    my $req = pack("L<x4L<x4L<x4", 1, 1, scalar @closure);
+    for my $s (@closure) {
+        my $len = length $s;
+        $req .= pack("L<x4", $len);
+        $req .= $s;
+        $req .= "\000" x (8 - $len % 8) if $len % 8;
+    }
+    syswrite($to, $req) or die;
+
+    # Get back the set of paths that are already valid on the remote host.
+    my %present;
+    my $n = readInt($from);
+    while ($n--) {
+        my $len = readInt($from);
+        my $s;
+        sysread($from, $s, $len) == $len or die;
+        $present{$s} = 1;
+        sysread($from, $s, 8 - $len % 8) if $len % 8; # skip padding
+    }
+
+    my @missing = grep { !$present{$_} } @closure;
+    return if address@hidden;
+
+    # Send the "import paths" command.
+    syswrite($to, pack("L<x4", 4)) or die;
+    exportPaths(fileno($to), $sign, @missing);
+    readInt($from) == 1 or die;
+
+    # Shut down the server process.
+    close $to;
+    waitpid $pid, 0;
+}
+
+
+# For backwards compatibility with Nix <= 1.7. Will be removed
+# eventually.
+sub oldCopyTo {
+    my ($closure, $sshHost, $sshOpts, $storePaths, $compressor, $decompressor,
+        $includeOutputs, $dryRun, $sign, $progressViewer, $useSubstitutes) = 
@_;
+
     # Ask the remote host which paths are invalid.  Because of limits
     # to the command line length, do this in chunks.  Eventually,
     # we'll want to use ‘--from-stdin’, but we can't rely on the
     # target having this option yet.
-    my @missing = ();
+    my @missing;
     my $missingSize = 0;
-    while (scalar(@closure) > 0) {
-        my @ps = splice(@closure, 0, 1500);
+    while (scalar(@$closure) > 0) {
+        my @ps = splice(@$closure, 0, 1500);
         open(READ, "set -f; ssh $sshHost @{$sshOpts} nix-store 
--check-validity --print-invalid @ps|");
         while (<READ>) {
             chomp;
diff --git a/src/download-via-ssh/download-via-ssh.cc 
b/src/download-via-ssh/download-via-ssh.cc
index 6361e71..6cbcd98 100644
--- a/src/download-via-ssh/download-via-ssh.cc
+++ b/src/download-via-ssh/download-via-ssh.cc
@@ -58,7 +58,7 @@ static std::pair<FdSink, FdSource> connect(const string & 
conn)
 
 static void substitute(std::pair<FdSink, FdSource> & pipes, Path storePath, 
Path destPath)
 {
-    writeInt(cmdSubstitute, pipes.first);
+    writeInt(cmdDumpStorePath, pipes.first);
     writeString(storePath, pipes.first);
     pipes.first.flush();
     restorePath(destPath, pipes.second);
@@ -68,20 +68,20 @@ static void substitute(std::pair<FdSink, FdSource> & pipes, 
Path storePath, Path
 
 static void query(std::pair<FdSink, FdSource> & pipes)
 {
-    writeInt(cmdQuery, pipes.first);
     for (string line; getline(std::cin, line);) {
         Strings tokenized = tokenizeString<Strings>(line);
         string cmd = tokenized.front();
         tokenized.pop_front();
         if (cmd == "have") {
-            writeInt(qCmdHave, pipes.first);
+            writeInt(cmdQueryValidPaths, pipes.first);
+            writeInt(0, pipes.first); // don't lock
             writeStrings(tokenized, pipes.first);
             pipes.first.flush();
             PathSet paths = readStrings<PathSet>(pipes.second);
             foreach (PathSet::iterator, i, paths)
                 std::cout << *i << std::endl;
         } else if (cmd == "info") {
-            writeInt(qCmdInfo, pipes.first);
+            writeInt(cmdQueryPathInfos, pipes.first);
             writeStrings(tokenized, pipes.first);
             pipes.first.flush();
             while (1) {
diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc
index 5bcb82f..849cb7e 100644
--- a/src/nix-store/nix-store.cc
+++ b/src/nix-store/nix-store.cc
@@ -869,8 +869,12 @@ static void opClearFailedPaths(Strings opFlags, Strings 
opArgs)
 /* Serve the nix store in a way usable by a restricted ssh user. */
 static void opServe(Strings opFlags, Strings opArgs)
 {
-    if (!opArgs.empty() || !opFlags.empty())
-        throw UsageError("no arguments or flags expected");
+    bool writeAllowed = false;
+    foreach (Strings::iterator, i, opFlags)
+        if (*i == "--write") writeAllowed = true;
+        else throw UsageError(format("unknown flag `%1%'") % *i);
+
+    if (!opArgs.empty()) throw UsageError("no arguments expected");
 
     FdSource in(STDIN_FILENO);
     FdSink out(STDOUT_FILENO);
@@ -883,50 +887,56 @@ static void opServe(Strings opFlags, Strings opArgs)
     out.flush();
     readInt(in); // Client version, unused for now
 
-    ServeCommand cmd = (ServeCommand) readInt(in);
-    switch (cmd) {
-        case cmdQuery:
-            while (true) {
-                QueryCommand qCmd;
-                try {
-                    qCmd = (QueryCommand) readInt(in);
-                } catch (EndOfFile & e) {
-                    break;
-                }
-                switch (qCmd) {
-                    case qCmdHave: {
-                        PathSet paths = readStorePaths<PathSet>(in);
-                        writeStrings(store->queryValidPaths(paths), out);
-                        break;
-                    }
-                    case qCmdInfo: {
-                        PathSet paths = readStorePaths<PathSet>(in);
-                        // !!! Maybe we want a queryPathInfos?
-                        foreach (PathSet::iterator, i, paths) {
-                            if (!store->isValidPath(*i))
-                                continue;
-                            ValidPathInfo info = store->queryPathInfo(*i);
-                            writeString(info.path, out);
-                            writeString(info.deriver, out);
-                            writeStrings(info.references, out);
-                            // !!! Maybe we want compression?
-                            writeLongLong(info.narSize, out); // downloadSize
-                            writeLongLong(info.narSize, out);
-                        }
-                        writeString("", out);
-                        break;
-                    }
-                    default:
-                        throw Error(format("unknown serve query `%1%'") % cmd);
+    while (true) {
+        ServeCommand cmd;
+        try {
+            cmd = (ServeCommand) readInt(in);
+        } catch (EndOfFile & e) {
+            break;
+        }
+
+        switch (cmd) {
+            case cmdQueryValidPaths: {
+                bool lock = readInt(in);
+                PathSet paths = readStorePaths<PathSet>(in);
+                if (lock && writeAllowed)
+                    for (auto & path : paths)
+                        store->addTempRoot(path);
+                writeStrings(store->queryValidPaths(paths), out);
+                out.flush();
+                break;
+            }
+            case cmdQueryPathInfos: {
+                PathSet paths = readStorePaths<PathSet>(in);
+                // !!! Maybe we want a queryPathInfos?
+                foreach (PathSet::iterator, i, paths) {
+                    if (!store->isValidPath(*i))
+                        continue;
+                    ValidPathInfo info = store->queryPathInfo(*i);
+                    writeString(info.path, out);
+                    writeString(info.deriver, out);
+                    writeStrings(info.references, out);
+                    // !!! Maybe we want compression?
+                    writeLongLong(info.narSize, out); // downloadSize
+                    writeLongLong(info.narSize, out);
                 }
+                writeString("", out);
                 out.flush();
+                break;
             }
-            break;
-        case cmdSubstitute:
-            dumpPath(readStorePath(in), out);
-            break;
-        default:
-            throw Error(format("unknown serve command `%1%'") % cmd);
+            case cmdDumpStorePath:
+                dumpPath(readStorePath(in), out);
+                out.flush();
+                break;
+            case cmdImportPaths:
+                if (!writeAllowed) throw Error("importing paths not allowed");
+                store->importPaths(false, in);
+                writeInt(1, out); // indicate success
+                out.flush();
+                break;
+            default:
+                throw Error(format("unknown serve command %1%") % cmd);
+        }
     }
 }
 
diff --git a/src/nix-store/serve-protocol.hh b/src/nix-store/serve-protocol.hh
index 69277bc..07ff4f7 100644
--- a/src/nix-store/serve-protocol.hh
+++ b/src/nix-store/serve-protocol.hh
@@ -2,23 +2,18 @@
 
 namespace nix {
 
-
 #define SERVE_MAGIC_1 0x390c9deb
 #define SERVE_MAGIC_2 0x5452eecb
 
-#define SERVE_PROTOCOL_VERSION 0x101
+#define SERVE_PROTOCOL_VERSION 0x200
 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
 #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
 
-
 typedef enum {
-    cmdQuery = 0,
-    cmdSubstitute = 1,
+    cmdQueryValidPaths = 1,
+    cmdQueryPathInfos = 2,
+    cmdDumpStorePath = 3,
+    cmdImportPaths = 4,
 } ServeCommand;
 
-typedef enum {
-    qCmdHave = 0,
-    qCmdInfo = 1,
-} QueryCommand;
-
 }



reply via email to

[Prev in Thread] Current Thread [Next in Thread]