# # add_file "merkle_dir.py" # # add_file "monotone.py" # # add_file "nonce.py" # # add_file "remote.py" # # patch "dumb.py" # from [0d4c80029f21d2b36b9422a5716a7243d4f3a84f] # to [f27c38a9c6c0949562af24eb429ef70655e387ec] # # patch "merkle_dir.py" # from [] # to [83b65f2294aa41de080fdf12c3345ff70f6204ca] # # patch "monotone.py" # from [] # to [5f7331d14d941ef0f6d5e6be9526e02ab7059ec5] # # patch "nonce.py" # from [] # to [a50e3bcf7d5d440201724817060fa4c163eb7dce] # # patch "remote.py" # from [] # to [9f1f211f97f2e689b647cd30dd6cacaa22b16a1e] # ======================================================================== --- dumb.py 0d4c80029f21d2b36b9422a5716a7243d4f3a84f +++ dumb.py f27c38a9c6c0949562af24eb429ef70655e387ec @@ -1,12 +1,8 @@ -import subprocess import sha from sets import Set import os import os.path from cStringIO import StringIO -import re -import zlib -import glob # What's on disk: # We write all interesting data out to a file DATA @@ -46,6 +42,9 @@ # some are unused?) # finally, there's a file VERSION, which contains the sha1 of the string # "\0\0...\0", plus a newline. +# finally2, there's a file called DATA_LENGTH, which contains the size of +# the file DATA in bytes. (this is used to synthesize offsets into it for +# remotely appended data.) # How a pull works: # -- pull VERSION, see if it matches ours. (note down what it says for @@ -94,6 +93,13 @@ # -- fetch byte ranges (sync, but perhaps should be able to pipeline # fetching multiple different ranges?) +# so: "append and then return" +# "fetch all of these files and then return" +# "atomically replace all of these files and then return" +# "fetch this list of byte ranges from a given file and then return" +# "query if this file exists and then return" (may just attempt fetching +# it and see what happens) +# "delete file" # locking: # writers: @@ -120,335 +126,8 @@ # or even just start the pull over...? # TODO: -# NEED more tightly packed disk format! -# (maybe make pull-only, not sync? so can do linear reads of "new stuff"?) -# (push via rsync or the like?) -# -- puller, syncer (no pusher) -# -- with some sort of abstract pipelined IO class -# -- don't pull full revision stuff into memory -# -- compress packets on disk # -- cat revision and packet commands -> automate? -# -- don't pull full packets into memory -# -- store stuff in tree under given id -- so can skip loading all the -# revision info if revision is already there -class MonotoneError (Exception): - pass - -class Monotone: - def __init__(self, db, executable="monotone"): - self.db = db - self.executable = executable - - def init_db(self): - self.run_monotone(["db", "init"]) - - def ensure_db(self): - if not os.path.exists(self.db): - self.init_db() - - def revisions_list(self): - output = self.run_monotone(["automate", "select", "i:"]) - return output.split() - - def get_revision(self, rid): - return self.run_monotone(["cat", "revision", rid]) - - def get_revision_packet(self, rid): - return self.run_monotone(["rdata", rid]) - - def get_file_packet(self, fid): - return self.run_monotone(["fdata", fid]) - - def get_file_delta_packet(self, old_fid, new_fid): - return self.run_monotone(["fdelta", old_fid, new_fid]) - - def get_manifest_packet(self, mid): - return self.run_monotone(["mdata", mid]) - - def get_manifest_delta_packet(self, old_mid, new_mid): - return self.run_monotone(["mdelta", old_mid, new_mid]) - - def get_cert_packets(self, rid): - output = self.run_monotone(["certs", rid]) - packets = [] - curr_packet = "" - for line in output.strip().split("\n"): - curr_packet += line + "\n" - if line == "[end]": - packets.append(curr_packet) - curr_packet = "" - assert not curr_packet - return packets - - # returns output as a string, raises an error on error - def run_monotone(self, args): - process = subprocess.Popen([self.executable, "--db", self.db] + args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - if process.returncode: - raise MonotoneError, stderr - return stdout - - # feeds stuff into 'monotone read' - def feed(self, iterator): - process = subprocess.Popen([self.executable, "--db", self.db, "read"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - # this is technically broken; we might deadlock. - # subprocess.Popen.communicate uses threads to do this; that'd be - # better. - for chunk in iterator: - process.stdin.write(chunk) - process.stdin.close() - stdout, stderr = process.communicate() - if process.returncode: - raise MonotoneError, stderr - - # copied wholesale from viewmtn (08fd7bf8143512bfcabe5f65cf40013e10b89d28)'s - # monotone.py. hacked to remove the []s from hash values, and to leave in - # ("from", "") tuples. - def basic_io_parser(self, data): - "returns a list of lists of (key, value) tuples." - basic_io_hex_re = re.compile(r'^ *(\S+) \[([0-9A-Za-z]*)\]$') - basic_io_string_re = re.compile(r'^ *(\S+) (\".*)$') - def unescape_string_value(str): - rv = "" - is_terminated = False - in_escape = False - if str[0] != '"': - raise Exception("basic_io parse error; not a string.") - for c in str[1:]: - if in_escape: - if c != '\\' and c != '\"': - raise Exception("basic_io parse error; expected \" or \\") - rv += c - in_escape = False - else: - if c == '\\': in_escape = True - if c == '"': - if is_terminated: - raise Exception("basic_io parse error; string ends twice!") - is_terminated = True - else: rv += c - return is_terminated, rv - rv = [] - stanza = [] - ongoing_string = None - for line in data.split('\n'): - if not ongoing_string: - if line == '' and len(stanza) != 0: - rv.append(stanza) - stanza = [] - m = basic_io_hex_re.match(line) - if m: - key, value = m.groups() - stanza.append((key, value)) - continue - m = basic_io_string_re.match(line) - if m: - key, value = m.groups() - is_terminated, e_value = unescape_string_value(value) - if not is_terminated: ongoing_string = value - else: stanza.append((key, e_value)) - continue - else: - ongoing_string += '\n' + line - is_terminated, e_value = unescape_string_value(ongoing_string) - if is_terminated: - stanza.append((key, e_value)) - ongoing_string = None - return rv - - -# class MerkleDir: -# # how deeply do we nest our directories? rule of thumb is that you want -# # around (certs + revisions) to be less than 256**(depth + 1). -# # At depth = 2, 256**(depth+1) is 16777216. That should suffice. -# depth = 2 - -# index_name = "HASHES" - -# def __init__(self, directory): -# self.dir = directory -# self.ids = Set() -# for dir, subdirs, files in os.walk(self.dir): -# for f in files: -# if self.ishash(f): -# self.ids.add(f) - -# def ishash(self, string): -# return len(string) == sha.digestsize*2 - -# def hash(self, string): -# return sha.new(string).hexdigest() - -# def dir_for_text(self, textid): -# pieces = [] -# for i in range(self.depth): -# pieces.append(textid[2*i:2*i+2]) -# return os.path.join(*pieces) - -# def add(self, text): -# textid = self.hash(text) -# if textid in self.ids: -# return -# textdir = os.path.join(self.dir, self.dir_for_text(textid)) -# if not os.path.exists(textdir): -# os.makedirs(textdir) -# handle = open(os.path.join(textdir, textid), "w") -# handle.write(text) -# handle.close() -# self.ids.add(textid) - -# def rehash(self): -# for dir, subdirs, files in os.walk(self.dir, topdown=0): -# files = [f for f in files if self.ishash(f)] -# index_path = os.path.join(dir, self.index_name) -# index_handle = open(index_path, "w") -# if files: -# assert not subdirs -# for f in files: -# index_handle.write("file %s\n" % (f,)) -# elif subdirs: -# assert not files -# for d in subdirs: -# subindex = open(os.path.join(dir, d, self.index_name), "r").read() -# index_handle.write("dir %s %s\n" % (d, self.hash(subindex))) -# else: -# assert 0 - -# # returns an iterator over strings; all strings when concatenated == all -# # data put into this tree when concatenated -# def all_data(self): -# for dir, subdirs, files in os.walk(self.dir): -# for f in files: -# if self.ishash(f): -# handle = open(f, "r") -# for chunk in iter(lambda: f.read(4096), ""): -# yield chunk - - -class MerkleDir: - data_file = "DATA" - index_file = "INDEX" - hashes_prefix = "HASHES_" - - def __init__(self, directory): - self.dir = directory - if not os.path.isdir(self.dir): - os.makedirs(self.dir) - if not os.path.exists(os.path.join(self.dir, self.data_file)): - open(os.path.join(self.dir, self.data_file), "w").close() - if not os.path.exists(os.path.join(self.dir, self.index_file)): - open(os.path.join(self.dir, self.index_file), "w").close() - # dict: id -> (offset, length) - self.index_write_handle = open(os.path.join(self.dir, self.index_file), "a") - self.data_write_handle = open(os.path.join(self.dir, self.data_file), "ab") - self.add_open = 0 - self.reread_index() - - # returns an iterator over (id, offset, length) - def chunk_locations(self): - self.flush() - handle = open(os.path.join(self.dir, self.index_file)) - for line in handle: - id, offset, length = line.split() - yield (id, offset, length) - - # returns an iterator over data chunks - def chunks(self): - self.flush() - handle = open(os.path.join(self.dir, self.data_file)) - curr_offset = 0 - for id, offset, length in self.chunk_locations(): - assert curr_offset == offset - cdata = self.data_file.read(length) - curr_offset += length - yield zlib.decompress(cdata) - - def reread_index(self): - self.ids = {} - for id, offset, length in self.chunk_locations(): - self.ids[id] = (int(offset), int(length)) - - def add_to_index(self, id, offset, length): - assert not self.ids.has_key(id) - self.ids[id] = (offset, length) - self.index_write_handle.write("%s %s %s\n" % (id, offset, length)) - - def add(self, id): - assert not self.add_open - if self.ids.has_key(id): - return None - else: - self.add_open = 1 - return MerkleAdder(self, id) - - def flush(self): - self.data_write_handle.flush() - self.index_write_handle.flush() - - def rehash(self): - self.flush() - old_hashes = glob.glob(os.path.join(self.dir, self.hashes_prefix + "*")) - for old in old_hashes: - os.unlink(old) - # We only do two levels of merkle hashing; with a branching factor of - # 256, this gives 65536 bins for the actual ids, which should give us - # a reasonable loading factor even for large repos. - self.binned_indexes = {} - for id in self.ids.iterkeys(): - bin = id[:2] - if not self.binned_indexes.has_key(bin): - self.binned_indexes[bin] = [] - self.binned_indexes[bin].append(id) - bin_hashes = {} - for bin, ids in self.binned_indexes.iteritems(): - handle = HashWriter(os.path.join(self.dir, self.hashes_prefix + bin)) - ids.sort() - for id in ids: - handle.write("chunk %s %s %s\n" % ((id,) + self.ids[id])) - handle.close() - bin_hashes[bin] = handle.hash() - root_hashes = "" - for bin, hash in bin_hashes.iteritems(): - root_hashes += "subtree %s %s %s" % (bin, hash, self.hashes_prefix + bin) - open(os.path.join(self.dir, self.hashes_prefix), "wb").write(zlib.compress(root_hashes)) - -class HashWriter: - def __init__(self, filename): - self.sha = sha.new() - self.file = open(filename, "wb") - self.compressor = zlib.compressobj() - def write(self, data): - self.sha.update(data) - self.file.write(self.compressor.compress(data)) - def close(self): - self.file.write(self.compressor.flush()) - self.file.close() - def hash(self): - return self.sha.hexdigest() - -class MerkleAdder: - def __init__(self, store, id): - self.store = store - self.offset = self.store.data_write_handle.tell() - self.id = id - self.compressor = zlib.compressobj() - def write(self, data): - compressed_data = self.compressor.compress(data) - self.store.data_write_handle.write(compressed_data) - def close(self): - last_data = self.compressor.flush() - self.store.data_write_handle.write(last_data) - length = self.store.data_write_handle.tell() - self.offset - self.store.add_to_index(self.id, self.offset, length) - self.store.add_open = 0 - def do_import(monotone, dir): monotone.ensure_db() md = MerkleDir(dir) @@ -456,7 +135,7 @@ def do_export(monotone, dir): md = MerkleDir(dir) - for rid in monotone.revisions_list(): + for rid in monotone.toposort(monotone.revisions_list()): certs = monotone.get_cert_packets(rid) for cert in certs: handle = md.add(sha.new(cert).hexdigest()) ======================================================================== --- merkle_dir.py +++ merkle_dir.py 83b65f2294aa41de080fdf12c3345ff70f6204ca @@ -0,0 +1,134 @@ +import sha +import os +import os.path +import zlib +import glob + +class LockError(Exception): + pass + +class MerkleDir: + data_file = "DATA" + index_file = "INDEX" + data_length_file = "DATA_LENGTH" + lock_file = "__lock" + hashes_prefix = "HASHES_" + + def __init__(self, directory): + self.dir = directory + if not os.path.isdir(self.dir): + os.makedirs(self.dir) + if not os.path.exists(os.path.join(self.dir, self.data_file)): + open(os.path.join(self.dir, self.data_file), "w").close() + if not os.path.exists(os.path.join(self.dir, self.index_file)): + open(os.path.join(self.dir, self.index_file), "w").close() + # dict: id -> (offset, length) + self.index_write_handle = open(os.path.join(self.dir, self.index_file), "a") + self.data_write_handle = open(os.path.join(self.dir, self.data_file), "ab") + self.add_open = 0 + self.reread_index() + + def __del__(self): + self.flush() + self.unlock() + + # returns an iterator over (id, offset, length) + def chunk_locations(self): + self.flush() + handle = open(os.path.join(self.dir, self.index_file)) + for line in handle: + id, offset, length = line.split() + yield (id, offset, length) + + # returns an iterator over data chunks + def chunks(self): + self.flush() + handle = open(os.path.join(self.dir, self.data_file)) + curr_offset = 0 + for id, offset, length in self.chunk_locations(): + assert curr_offset == offset + cdata = self.data_file.read(length) + curr_offset += length + yield zlib.decompress(cdata) + + def reread_index(self): + self.ids = {} + for id, offset, length in self.chunk_locations(): + self.ids[id] = (int(offset), int(length)) + + def add_to_index(self, id, offset, length): + assert not self.ids.has_key(id) + self.ids[id] = (offset, length) + self.index_write_handle.write("%s %s %s\n" % (id, offset, length)) + + def add(self, id): + assert not self.add_open + if self.ids.has_key(id): + return None + else: + self.add_open = 1 + return MerkleAdder(self, id) + + def flush(self): + length = self.data_write_handle.tell() + open(os.path.join(self.dir, self.data_length_file), "w").write(str(length)) + self.data_write_handle.flush() + self.index_write_handle.flush() + + def rehash(self): + self.flush() + old_hashes = glob.glob(os.path.join(self.dir, self.hashes_prefix + "*")) + for old in old_hashes: + os.unlink(old) + # We only do two levels of merkle hashing; with a branching factor of + # 256, this gives 65536 bins for the actual ids, which should give us + # a reasonable loading factor even for large repos. + self.binned_indexes = {} + for id in self.ids.iterkeys(): + bin = id[:2] + if not self.binned_indexes.has_key(bin): + self.binned_indexes[bin] = [] + self.binned_indexes[bin].append(id) + bin_hashes = {} + for bin, ids in self.binned_indexes.iteritems(): + handle = HashWriter(os.path.join(self.dir, self.hashes_prefix + bin)) + ids.sort() + for id in ids: + handle.write("chunk %s %s %s\n" % ((id,) + self.ids[id])) + handle.close() + bin_hashes[bin] = handle.hash() + root_hashes = "" + for bin, hash in bin_hashes.iteritems(): + root_hashes += "subtree %s %s %s" % (bin, hash, self.hashes_prefix + bin) + open(os.path.join(self.dir, self.hashes_prefix), "wb").write(zlib.compress(root_hashes)) + +class HashWriter: + def __init__(self, filename): + self.sha = sha.new() + self.file = open(filename, "wb") + self.compressor = zlib.compressobj() + def write(self, data): + self.sha.update(data) + self.file.write(self.compressor.compress(data)) + def close(self): + self.file.write(self.compressor.flush()) + self.file.close() + def hash(self): + return self.sha.hexdigest() + +class MerkleAdder: + def __init__(self, store, id): + self.store = store + self.offset = self.store.data_write_handle.tell() + self.id = id + self.compressor = zlib.compressobj() + def write(self, data): + compressed_data = self.compressor.compress(data) + self.store.data_write_handle.write(compressed_data) + def close(self): + last_data = self.compressor.flush() + self.store.data_write_handle.write(last_data) + length = self.store.data_write_handle.tell() - self.offset + self.store.add_to_index(self.id, self.offset, length) + self.store.add_open = 0 + ======================================================================== --- monotone.py +++ monotone.py 5f7331d14d941ef0f6d5e6be9526e02ab7059ec5 @@ -0,0 +1,143 @@ +import subprocess +import os.path +import re + +class MonotoneError (Exception): + pass + +class Monotone: + def __init__(self, db, executable="monotone"): + self.db = db + self.executable = executable + + def init_db(self): + self.run_monotone(["db", "init"]) + + def ensure_db(self): + if not os.path.exists(self.db): + self.init_db() + + def revisions_list(self): + output = self.run_monotone(["automate", "select", "i:"]) + return output.split() + + def toposort(self, revisions): + output = self.run_monotone(["automate", "toposort", "address@hidden"], + "\n".join(revisions) + "\n") + sorted = output.split() + assert len(sorted) = len(revisions) + return sorted + + def get_revision(self, rid): + return self.run_monotone(["cat", "revision", rid]) + + def get_revision_packet(self, rid): + return self.run_monotone(["rdata", rid]) + + def get_file_packet(self, fid): + return self.run_monotone(["fdata", fid]) + + def get_file_delta_packet(self, old_fid, new_fid): + return self.run_monotone(["fdelta", old_fid, new_fid]) + + def get_manifest_packet(self, mid): + return self.run_monotone(["mdata", mid]) + + def get_manifest_delta_packet(self, old_mid, new_mid): + return self.run_monotone(["mdelta", old_mid, new_mid]) + + def get_cert_packets(self, rid): + output = self.run_monotone(["certs", rid]) + packets = [] + curr_packet = "" + for line in output.strip().split("\n"): + curr_packet += line + "\n" + if line == "[end]": + packets.append(curr_packet) + curr_packet = "" + assert not curr_packet + return packets + + # returns output as a string, raises an error on error + def run_monotone(self, args, input=None): + process = subprocess.Popen([self.executable, "--db", self.db] + args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = process.communicate(input) + if process.returncode: + raise MonotoneError, stderr + return stdout + + # feeds stuff into 'monotone read' + def feed(self, iterator): + process = subprocess.Popen([self.executable, "--db", self.db, "read"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + # this is technically broken; we might deadlock. + # subprocess.Popen.communicate uses threads to do this; that'd be + # better. + for chunk in iterator: + process.stdin.write(chunk) + process.stdin.close() + stdout, stderr = process.communicate() + if process.returncode: + raise MonotoneError, stderr + + # copied wholesale from viewmtn (08fd7bf8143512bfcabe5f65cf40013e10b89d28)'s + # monotone.py. hacked to remove the []s from hash values, and to leave in + # ("from", "") tuples. + def basic_io_parser(self, data): + "returns a list of lists of (key, value) tuples." + basic_io_hex_re = re.compile(r'^ *(\S+) \[([0-9A-Za-z]*)\]$') + basic_io_string_re = re.compile(r'^ *(\S+) (\".*)$') + def unescape_string_value(str): + rv = "" + is_terminated = False + in_escape = False + if str[0] != '"': + raise Exception("basic_io parse error; not a string.") + for c in str[1:]: + if in_escape: + if c != '\\' and c != '\"': + raise Exception("basic_io parse error; expected \" or \\") + rv += c + in_escape = False + else: + if c == '\\': in_escape = True + if c == '"': + if is_terminated: + raise Exception("basic_io parse error; string ends twice!") + is_terminated = True + else: rv += c + return is_terminated, rv + rv = [] + stanza = [] + ongoing_string = None + for line in data.split('\n'): + if not ongoing_string: + if line == '' and len(stanza) != 0: + rv.append(stanza) + stanza = [] + m = basic_io_hex_re.match(line) + if m: + key, value = m.groups() + stanza.append((key, value)) + continue + m = basic_io_string_re.match(line) + if m: + key, value = m.groups() + is_terminated, e_value = unescape_string_value(value) + if not is_terminated: ongoing_string = value + else: stanza.append((key, e_value)) + continue + else: + ongoing_string += '\n' + line + is_terminated, e_value = unescape_string_value(ongoing_string) + if is_terminated: + stanza.append((key, e_value)) + ongoing_string = None + return rv + + ======================================================================== --- nonce.py +++ nonce.py a50e3bcf7d5d440201724817060fa4c163eb7dce @@ -0,0 +1,10 @@ +import os +import random + +nonce_size = 16 + +def nonce(): + try: + return os.urandom(nonce_size) + except: + return "".join([chr(random.randrange(256)) for i in range(nonce_size)]) ======================================================================== --- remote.py +++ remote.py 9f1f211f97f2e689b647cd30dd6cacaa22b16a1e @@ -0,0 +1,73 @@ +# interface to remote (dumb) servers + +import os +import os.path + +class ReadableServer: + # All operators are blocking + + # takes an iterable of filenames + # returns a map {filename -> contents of file} + def fetch(self, filenames): + raise NotImplementedError + + # bytes is an iterable of pairs (offset, length) + # this is a generator + # it yields nested tuples ((offset, length), data) + def fetch_bytes(self, filename, bytes): + raise NotImplementedError + + def exists(self, filename): + raise NotImplementedError + + +class WriteableServer (ReadableServer): + # returns None + def append(self, filename, data): + raise NotImplementedError + + # files is a map {filename -> contents of file} + # this operation must be atomic + def replace(self, files): + raise NotImplementedError + + def delete(self, filename): + raise NotImplementedError + + +class LocalServer (WriteableServer: + def __init__(self, dir): + self.dir = dir + + def fetch(self, filenames): + files = {} + for fn in filenames: + f = open(os.path.join(self.dir, fn), "rb") + files[fn] = f.read() + f.close() + return files + + def fetch_bytes(self, filename, bytes): + f = open(os.path.join(self.dir, filename), "rb") + for offset, length in bytes: + f.seek(offset) + yield ((offset, length), f.read(length)) + + def exists(self, filename): + return os.path.exists(os.path.join(self.dir, filename)) + + def append(self, filename, data): + f = open(os.path.join(self.dir, filename), "ab") + f.write(data) + f.close() + + def replace(self, filenames): + for fn, data in filenames.iteritems(): + tmpname = os.path.join(self.dir, "_tmp") + tmph = open(tmpname, "wb") + tmph.write(data) + tmph.close() + os.rename(tmpname, os.path.join(self.dir, fn)) + + def delete(self, filename): + os.unlink(os.path.join(self.dir, filename))