From 3b36ad86fbd3de5353e2cc45e0c03fdc417d6f3a Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Mon, 26 Aug 2019 06:27:56 -0700 Subject: [PATCH 01/23] distributed working locally --- .../distributed_ondisk/distributed_kmeans.py | 404 ++++++++++++++ benchs/distributed_ondisk/rpc.py | 496 ++++++++++++++++++ benchs/distributed_ondisk/run_on_cluster.bash | 63 +++ 3 files changed, 963 insertions(+) create mode 100644 benchs/distributed_ondisk/distributed_kmeans.py create mode 100644 benchs/distributed_ondisk/rpc.py create mode 100644 benchs/distributed_ondisk/run_on_cluster.bash diff --git a/benchs/distributed_ondisk/distributed_kmeans.py b/benchs/distributed_ondisk/distributed_kmeans.py new file mode 100644 index 0000000000..c33e7c5eec --- /dev/null +++ b/benchs/distributed_ondisk/distributed_kmeans.py @@ -0,0 +1,404 @@ +#! /usr/bin/env python3 +""" +Simple distributed kmeans implementation Relies on an abstraction +for the training matrix, that can be sharded over several machines. +""" + +import faiss +import time +import numpy as np +import sys +import pdb +import argparse + +from scipy.sparse import csc_matrix + +from multiprocessing.dummy import Pool as ThreadPool + +import rpc + + + + +class DatasetAssign: + """Wrapper for a matrix that offers a function to assign the vectors + to centroids. All other implementations offer the same interface""" + + def __init__(self, x): + self.x = np.ascontiguousarray(x, dtype='float32') + + def count(self): + return self.x.shape[0] + + def dim(self): + return self.x.shape[1] + + def get_subset(self, indices): + return self.x[indices] + + def perform_search(self, centroids): + index = faiss.IndexFlatL2(self.x.shape[1]) + index.add(centroids) + return index.search(self.x, 1) + + def assign_to(self, centroids, weights=None): + D, I = self.perform_search(centroids) + + I = I.ravel() + D = D.ravel() + n = len(self.x) + if weights is None: + weights = np.ones(n, dtype='float32') + nc = len(centroids) + m = csc_matrix((weights, I, np.arange(n + 1)), + shape=(nc, n)) + sum_per_centroid = m * self.x + + return I, D, sum_per_centroid + + +class DatasetAssignGPU(DatasetAssign): + """ GPU version of the previous """ + + def __init__(self, x, gpu_id, verbose=False): + DatasetAssign.__init__(self, x) + index = faiss.IndexFlatL2(x.shape[1]) + if gpu_id >= 0: + self.index = faiss.index_cpu_to_gpu( + faiss.StandardGpuResources(), + gpu_id, index) + else: + # -1 -> assign to all GPUs + self.index = faiss.index_cpu_to_all_gpus(index) + + + def perform_search(self, centroids): + self.index.reset() + self.index.add(centroids) + return self.index.search(self.x, 1) + + +class DatasetAssignDispatch: + """dispatches to several other DatasetAssigns and combines the + results""" + + def __init__(self, xes, in_parallel): + self.xes = xes + self.d = xes[0].dim() + if not in_parallel: + self.imap = map + else: + self.pool = ThreadPool(len(self.xes)) + self.imap = self.pool.imap + self.sizes = list(map(lambda x: x.count(), self.xes)) + self.cs = np.cumsum([0] + self.sizes) + + def count(self): + return self.cs[-1] + + def dim(self): + return self.d + + def get_subset(self, indices): + res = np.zeros((len(indices), self.d), dtype='float32') + nos = np.searchsorted(self.cs[1:], indices, side='right') + + def handle(i): + mask = nos == i + sub_indices = indices[mask] - self.cs[i] + subset = self.xes[i].get_subset(sub_indices) + res[mask] = subset + + list(self.imap(handle, range(len(self.xes)))) + return res + + def assign_to(self, centroids, weights=None): + src = self.imap( + lambda x: x.assign_to(centroids, weights), + self.xes + ) + I = [] + D = [] + sum_per_centroid = None + for Ii, Di, sum_per_centroid_i in src: + I.append(Ii) + D.append(Di) + if sum_per_centroid is None: + sum_per_centroid = sum_per_centroid_i + else: + sum_per_centroid += sum_per_centroid_i + return np.hstack(I), np.hstack(D), sum_per_centroid + + +def imbalance_factor(k , assign): + return faiss.imbalance_factor(len(assign), k, faiss.swig_ptr(assign)) + + +def reassign_centroids(hassign, centroids, rs=None): + """ reassign centroids when some of them collapse """ + if rs is None: + rs = np.random + k, d = centroids.shape + nsplit = 0 + empty_cents = np.where(hassign == 0)[0] + + if empty_cents.size == 0: + return 0 + + fac = np.ones(d) + fac[::2] += 1 / 1024. + fac[1::2] -= 1 / 1024. + + # this is a single pass unless there are more than k/2 + # empty centroids + while empty_cents.size > 0: + # choose which centroids to split + probas = hassign.astype('float') - 1 + probas[probas < 0] = 0 + probas /= probas.sum() + nnz = (probas > 0).sum() + + nreplace = min(nnz, empty_cents.size) + cjs = rs.choice(k, size=nreplace, p=probas) + + for ci, cj in zip(empty_cents[:nreplace], cjs): + + c = centroids[cj] + centroids[ci] = c * fac + centroids[cj] = c / fac + + hassign[ci] = hassign[cj] // 2 + hassign[cj] -= hassign[ci] + nsplit += 1 + + empty_cents = empty_cents[nreplace:] + + return nsplit + + +def kmeans(k, data, niter=25, seed=1234, checkpoint=None): + """Pure python kmeans implementation. Follows the Faiss C++ version + quite closely, but takes a DatasetAssign instead of a training data + matrix. Also redo is not implemented. """ + n, d = data.count(), data.dim() + + print(("Clustering %d points in %dD to %d clusters, " + + "%d iterations seed %d") % (n, d, k, niter, seed)) + + rs = np.random.RandomState(seed) + print("preproc...") + t0 = time.time() + # initialization + perm = rs.choice(n, size=k, replace=False) + centroids = data.get_subset(perm) + + print(" done") + t_search_tot = 0 + obj = [] + for i in range(niter): + t0s = time.time() + + print('assigning', end='\r', flush=True) + assign, D, sums = data.assign_to(centroids) + + print('compute centroids', end='\r', flush=True) + + # pdb.set_trace() + + t_search_tot += time.time() - t0s; + + err = D.sum() + obj.append(err) + + hassign = np.bincount(assign, minlength=k) + + fac = hassign.reshape(-1, 1).astype('float32') + fac[fac == 0] = 1 # quiet warning + + centroids = sums / fac + + nsplit = reassign_centroids(hassign, centroids, rs) + + print((" Iteration %d (%.2f s, search %.2f s): " + "objective=%g imbalance=%.3f nsplit=%d") % ( + i, (time.time() - t0), t_search_tot, + err, imbalance_factor (k, assign), + nsplit) + ) + + if checkpoint is not None: + print('storing centroids in', checkpoint) + np.save(checkpoint, centroids) + + return centroids + + +class AssignServer(rpc.Server): + """ Assign version that can be exposed via RPC """ + id_counter = 0 + + def __init__(self, s, assign): + rpc.Server.__init__(self, s, AssignServer.id_counter) + AssignServer.id_counter + self.assign = assign + + def __getattr__(self, f): + return getattr(self.assign, f) + + + +def bvecs_mmap(fname): + x = np.memmap(fname, dtype='uint8', mode='r') + d = x[:4].view('int32')[0] + return x.reshape(-1, d + 4)[:, 4:] + + +def ivecs_mmap(fname): + a = np.memmap(fname, dtype='int32', mode='r') + d = a[0] + return a.reshape(-1, d + 1)[:, 1:] + +def fvecs_mmap(fname): + return ivecs_mmap(fname).view('float32') + + +def do_test(todo): + testdata = '/datasets01_101/simsearch/041218/bigann/bigann_learn.bvecs' + + x = bvecs_mmap(testdata) + + # bad distribution to stress-test split code + xx = x[:100000].copy() + xx[:50000] = x[0] + + todo = sys.argv[1:] + + if "0" in todo: + # reference C++ run + km = faiss.Kmeans(x.shape[1], 1000, niter=20, verbose=True) + km.train(xx.astype('float32')) + + if "1" in todo: + # using the Faiss c++ implementation + data = DatasetAssign(xx) + kmeans(1000, data, 20) + + if "2" in todo: + # use the dispatch object (on local datasets) + data = DatasetAssignDispatch([ + DatasetAssign(xx[20000 * i : 20000 * (i + 1)]) + for i in range(5) + ], False + ) + kmeans(1000, data, 20) + + if "3" in todo: + # same, with GPU + ngpu = faiss.get_num_gpus() + print('using %d GPUs' % ngpu) + data = DatasetAssignDispatch([ + DatasetAssignGPU(xx[100000 * i // ngpu: 100000 * (i + 1) // ngpu], i) + for i in range(ngpu) + ], True + ) + kmeans(1000, data, 20) + + +def main(): + parser = argparse.ArgumentParser() + + def aa(*args, **kwargs): + group.add_argument(*args, **kwargs) + + group = parser.add_argument_group('general options') + aa('--test', default='', help='perform tests (comma-separated numbers)') + + aa('--k', default=0, type=int, help='nb centroids') + aa('--seed', default=1234, type=int, help='random seed') + aa('--niter', default=20, type=int, help='nb iterations') + aa('--gpu', default=-2, type=int, help='GPU to use (-2:none, -1: all)') + + group = parser.add_argument_group('I/O options') + aa('--indata', default='', + help='data file to load (supported formats fvecs, bvecs, npy') + aa('--i0', default=0, type=int, help='first vector to keep') + aa('--i1', default=-1, type=int, help='last vec to keep + 1') + aa('--out', default='', help='file to store centroids') + aa('--store_each_iteration', default=False, action='store_true', + help='store centroid checkpoints') + + group = parser.add_argument_group('server options') + aa('--server', action='store_true', default=False, help='run server') + aa('--port', default=12345, type=int, help='server port') + aa('--when_ready', default=None, help='store host:port to this file when ready') + aa('--ipv4', default=False, action='store_true', help='force ipv4') + + group = parser.add_argument_group('client options') + aa('--client', action='store_true', default=False, help='run client') + aa('--servers', default='', help='list of server:port separated by spaces') + + args = parser.parse_args() + + if args.test: + do_test(args.test.split(',')) + return + + # prepare data matrix (either local or remote) + if args.indata: + print('loading ', args.indata) + if args.indata.endswith('.bvecs'): + x = bvecs_mmap(args.indata) + elif args.indata.endswith('.fvecs'): + x = fvecs_mmap(args.indata) + elif args.indata.endswith('.npy'): + x = np.load(args.indata, mmap_mode='r') + else: + assert False + + if args.i1 == -1: + args.i1 = len(x) + x = x[args.i0:args.i1] + if args.gpu == -2: + data = DatasetAssign(x) + else: + print('moving to GPU') + data = DatasetAssignGPU(x, args.gpu) + + elif args.client: + print('connecting to servers') + + def connect_client(hostport): + host, port = hostport.split(':') + port = int(port) + client = rpc.Client(host, port, v6=not args.ipv4) + print('client %s:%d ready' % (host, port)) + return client + + hostports = args.servers.strip().split(' ') + pool = ThreadPool(len(hostports)) + + data = DatasetAssignDispatch( + pool.map(connect_client, hostports), + True + ) + else: + assert False + + if args.server: + print('starting server') + rpc.run_server( + lambda s: AssignServer(s, data), + args.port, report_to_file=args.when_ready, + v6=not args.ipv4) + + else: + print('running kmeans') + centroids = kmeans(args.k, data, niter=args.niter, seed=args.seed, + checkpoint=args.out if args.store_each_iteration else None) + if args.out != '': + print('writing centroids to', args.out) + np.save(args.out, centroids) + + +if __name__ == '__main__': + main() diff --git a/benchs/distributed_ondisk/rpc.py b/benchs/distributed_ondisk/rpc.py new file mode 100644 index 0000000000..fba0ae82bd --- /dev/null +++ b/benchs/distributed_ondisk/rpc.py @@ -0,0 +1,496 @@ +# @nolint +# old code, not worthwhile to lint + +""" +Simplistic RPC implementation. +Exposes all functions of a Server object. + +Uses pickle for serialization and the socket interface. +""" + +import os,pdb,pickle,time,errno,sys,_thread,traceback,socket,threading,gc + + +# default +PORT=12032 + + +######################################################################### +# simple I/O functions + + + +def inline_send_handle(f,conn): + st=os.fstat(f.fileno()) + size=st.st_size + pickle.dump(size,conn) + conn.write(f.read(size)) + +def inline_send_string(s,conn): + size=len(s) + pickle.dump(size,conn) + conn.write(s) + + +if False: + + def inline_send(filename,conn): + inline_send_handle(open(filename,'r'),conn) + + def inline_recv_handle(f,conn): + size=pickle.load(conn) + rd=0 + while rd65536: sz=65536 + buf=conn.read(sz) + f.write(buf) + rd+=len(buf) + + def inline_recv(filename,conn): + inline_recv_handle(open(filename,"w"),conn) + + + +class FileSock: + " wraps a socket so that it is usable by pickle/cPickle " + + def __init__(self,sock): + self.sock=sock + self.nr=0 + + def write(self,buf): + # print("sending %d bytes"%len(buf)) + #self.sock.sendall(buf) + # print("...done") + bs = 512 * 1024 + ns = 0 + while ns < len(buf): + sent = self.sock.send(buf[ns:ns + bs]) + ns += sent + + + def read(self,bs=512*1024): + #if self.nr==10000: pdb.set_trace() + self.nr+=1 + # print("read bs=%d"%bs) + b = [] + nb = 0 + while len(b) Date: Mon, 26 Aug 2019 06:48:44 -0700 Subject: [PATCH 02/23] simplify RPC code --- .../distributed_ondisk/distributed_kmeans.py | 9 +- benchs/distributed_ondisk/rpc.py | 643 ++++++------------ 2 files changed, 202 insertions(+), 450 deletions(-) diff --git a/benchs/distributed_ondisk/distributed_kmeans.py b/benchs/distributed_ondisk/distributed_kmeans.py index c33e7c5eec..7dafd6fcb2 100644 --- a/benchs/distributed_ondisk/distributed_kmeans.py +++ b/benchs/distributed_ondisk/distributed_kmeans.py @@ -235,11 +235,9 @@ def kmeans(k, data, niter=25, seed=1234, checkpoint=None): class AssignServer(rpc.Server): """ Assign version that can be exposed via RPC """ - id_counter = 0 - def __init__(self, s, assign): - rpc.Server.__init__(self, s, AssignServer.id_counter) - AssignServer.id_counter + def __init__(self, s, assign, log_prefix=''): + rpc.Server.__init__(self, s, log_prefix=log_prefix) self.assign = assign def __getattr__(self, f): @@ -386,8 +384,9 @@ def connect_client(hostport): if args.server: print('starting server') + log_prefix = f"{rpc.socket.gethostname()}:{args.port}" rpc.run_server( - lambda s: AssignServer(s, data), + lambda s: AssignServer(s, data, log_prefix=log_prefix), args.port, report_to_file=args.when_ready, v6=not args.ipv4) diff --git a/benchs/distributed_ondisk/rpc.py b/benchs/distributed_ondisk/rpc.py index fba0ae82bd..f7ff359aee 100644 --- a/benchs/distributed_ondisk/rpc.py +++ b/benchs/distributed_ondisk/rpc.py @@ -1,5 +1,6 @@ -# @nolint -# old code, not worthwhile to lint +# Local Variables: +# pythonx-indent-level: 4 + """ Simplistic RPC implementation. @@ -20,477 +21,229 @@ -def inline_send_handle(f,conn): - st=os.fstat(f.fileno()) - size=st.st_size - pickle.dump(size,conn) - conn.write(f.read(size)) - -def inline_send_string(s,conn): - size=len(s) - pickle.dump(size,conn) - conn.write(s) - - -if False: - - def inline_send(filename,conn): - inline_send_handle(open(filename,'r'),conn) - - def inline_recv_handle(f,conn): - size=pickle.load(conn) - rd=0 - while rd65536: sz=65536 - buf=conn.read(sz) - f.write(buf) - rd+=len(buf) - - def inline_recv(filename,conn): - inline_recv_handle(open(filename,"w"),conn) +def inline_send_handle(f, conn): + st = os.fstat(f.fileno()) + size = st.st_size + pickle.dump(size, conn) + conn.write(f.read(size)) +def inline_send_string(s, conn): + size = len(s) + pickle.dump(size, conn) + conn.write(s) class FileSock: - " wraps a socket so that it is usable by pickle/cPickle " - - def __init__(self,sock): - self.sock=sock - self.nr=0 - - def write(self,buf): - # print("sending %d bytes"%len(buf)) - #self.sock.sendall(buf) - # print("...done") - bs = 512 * 1024 - ns = 0 - while ns < len(buf): - sent = self.sock.send(buf[ns:ns + bs]) - ns += sent - - - def read(self,bs=512*1024): - #if self.nr==10000: pdb.set_trace() - self.nr+=1 - # print("read bs=%d"%bs) - b = [] - nb = 0 - while len(b) Date: Mon, 26 Aug 2019 07:43:48 -0700 Subject: [PATCH 03/23] works with SLURM --- .../distributed_ondisk/distributed_kmeans.py | 5 +- benchs/distributed_ondisk/rpc.py | 4 - benchs/distributed_ondisk/run_on_cluster.bash | 81 ++++++++++++++++--- 3 files changed, 75 insertions(+), 15 deletions(-) diff --git a/benchs/distributed_ondisk/distributed_kmeans.py b/benchs/distributed_ondisk/distributed_kmeans.py index 7dafd6fcb2..7c6c2156b3 100644 --- a/benchs/distributed_ondisk/distributed_kmeans.py +++ b/benchs/distributed_ondisk/distributed_kmeans.py @@ -368,15 +368,16 @@ def aa(*args, **kwargs): def connect_client(hostport): host, port = hostport.split(':') port = int(port) + print('connecting %s:%d' % (host, port)) client = rpc.Client(host, port, v6=not args.ipv4) print('client %s:%d ready' % (host, port)) return client hostports = args.servers.strip().split(' ') - pool = ThreadPool(len(hostports)) + # pool = ThreadPool(len(hostports)) data = DatasetAssignDispatch( - pool.map(connect_client, hostports), + list(map(connect_client, hostports)), True ) else: diff --git a/benchs/distributed_ondisk/rpc.py b/benchs/distributed_ondisk/rpc.py index f7ff359aee..c3ccfaf5e9 100644 --- a/benchs/distributed_ondisk/rpc.py +++ b/benchs/distributed_ondisk/rpc.py @@ -1,7 +1,3 @@ -# Local Variables: -# pythonx-indent-level: 4 - - """ Simplistic RPC implementation. Exposes all functions of a Server object. diff --git a/benchs/distributed_ondisk/run_on_cluster.bash b/benchs/distributed_ondisk/run_on_cluster.bash index 247cc95c65..441515eb02 100644 --- a/benchs/distributed_ondisk/run_on_cluster.bash +++ b/benchs/distributed_ondisk/run_on_cluster.bash @@ -6,6 +6,9 @@ todo=$1 # the training data of the Deep1B dataset testdata=/datasets01_101/simsearch/041218/deep1b/learn.fvecs +nvec=1000000 +k=4000 + if [ -z "$todo" ]; then echo "nothing to do" @@ -13,14 +16,14 @@ if [ -z "$todo" ]; then elif [ $todo == test_kmeans_0 ]; then # non distributed baseline python distributed_kmeans.py \ - --indata $testdata --i1 1000000 \ - --k 4000 + --indata $testdata --i1 $nvec \ + --k $k elif [ $todo == test_kmeans_1 ]; then # using all the machine's GPUs python distributed_kmeans.py \ - --indata $testdata --i1 1000000 \ - --k 4000 --gpu -1 + --indata $testdata --i1 $nvec \ + --k $k --gpu -1 elif [ $todo == test_kmeans_2 ]; then # distrbuted run, with one local server per GPU @@ -33,9 +36,9 @@ elif [ $todo == test_kmeans_2 ]; then hostports='' for((gpu=0;gpu Date: Mon, 26 Aug 2019 17:05:31 +0200 Subject: [PATCH 04/23] Create README.md --- benchs/distributed_ondisk/README.md | 45 +++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 benchs/distributed_ondisk/README.md diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md new file mode 100644 index 0000000000..ecab18bf99 --- /dev/null +++ b/benchs/distributed_ondisk/README.md @@ -0,0 +1,45 @@ +# Distributed on-disk index for 1T-scale datasets + +This is code corresponding to the description in [Indexing 1T vectors](https://github.com/facebookresearch/faiss/wiki/Indexing-1T-vectors). +All the code is in python 3 (and not compatible with Python 2). +The current code uses the Deep1B dataset for demonstration purposes, but can scale to 1000x larger. + +## Distributed k-means + +To cluster 200M vectors to 10M centroids, it is useful to have a distriubuted k-means implementation. + +The distributed k-means implementation here is based on 3 files: + +- [`rpc.py`](rpc.py) is a very simple remote procedure call implementation based on sockets and pickle. +It exposes the methods of an object on the server side so that they can be called from the client as if the object was local. + +- [`distributed_kmeans.py`](distributed_kmeans.py) contains the k-means implementation. +The main loop of k-means is re-implemented in python but follows closely the Faiss C++ implementation, and should not be significantly less efficient. +It relies on a `DatasetAssign` object that does the assignement to centrtoids, which is the bulk of the computation. +The object can be a Faiss CPU index, a GPU index or a set of remote GPU or CPU indexes. + +- [`run_on_cluster.bash`](run_on_cluster.bash) contains the shell code to run the distributed k-means on a cluster. + +The distributed k-means works with a Python install that contains faiss and scipy (for sparse matrices). + +### Local tests + +Edit `distibuted_kmeans.py` to point `testdata` to your local copy of the dataset. + +Then, 4 levels of sanity check can be run: +```bash +# reference Faiss C++ run +python distributed_kmeans.py --test 0 +# using the Python implementation +python distributed_kmeans.py --test 1 +# use the dispatch object (on local datasets) +python distributed_kmeans.py --test 2 +# same, with GPUs +python distributed_kmeans.py --test 3 +``` +The output should look like [This gist](https://gist.github.com/mdouze/ffa01fe666a9325761266fe55ead72ad). + + + + + From e1e5c224a914ffdf1889691e3ceef6befbd52dbf Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Mon, 26 Aug 2019 17:16:43 +0200 Subject: [PATCH 05/23] Update README.md --- benchs/distributed_ondisk/README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index ecab18bf99..0bf39061b5 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -21,6 +21,9 @@ The object can be a Faiss CPU index, a GPU index or a set of remote GPU or CPU i - [`run_on_cluster.bash`](run_on_cluster.bash) contains the shell code to run the distributed k-means on a cluster. The distributed k-means works with a Python install that contains faiss and scipy (for sparse matrices). +It clusters the training data of Deep1B, this can be changed easily to any file in fvecs, bvecs or npy format that contains the training set. +The training vectors may be too large to fit in RAM, but they are memory-mapped so that should not be a problem. +The file is also assumed to be accessible from all server machines with eg. a distributed file system. ### Local tests @@ -39,6 +42,28 @@ python distributed_kmeans.py --test 3 ``` The output should look like [This gist](https://gist.github.com/mdouze/ffa01fe666a9325761266fe55ead72ad). +### Distributed sanity check + +To run the distributed k-means, `distibuted_kmeans.py` has to be run both on the servers (`--server` option) and client sides (`--client` option). +Edit the top of `run_on_cluster.bash` to set the path of the data to cluster. + +Sanity checks can be run with +```bash +# non distributed baseline +bash run_on_cluster.bash test_kmeans_0 +# using all the machine's GPUs +bash run_on_cluster.bash test_kmeans_1 +# distrbuted run, with one local server per GPU +bash run_on_cluster.bash test_kmeans_2 +``` +The test `test_kmeans_2` simulates a distributed run on a single machine by starting one server process per GPU and connecting to the servers via the rpc protocol. +The output should look like [this gist](https://gist.github.com/mdouze/5b2dc69b74579ecff04e1686a277d32e). + +### Distributed run + + + + From 4cea2665a789b807330a6cd4b579d8659977af0a Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Mon, 26 Aug 2019 18:20:54 +0200 Subject: [PATCH 06/23] Update README.md --- benchs/distributed_ondisk/README.md | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index 0bf39061b5..dd060d8c8c 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -7,6 +7,8 @@ The current code uses the Deep1B dataset for demonstration purposes, but can sca ## Distributed k-means To cluster 200M vectors to 10M centroids, it is useful to have a distriubuted k-means implementation. +The distribution simply consists in splitting the training vectors across machines (servers) and have them do the assignment. +The master/client then synthesizes the results and updates the centroids. The distributed k-means implementation here is based on 3 files: @@ -59,12 +61,28 @@ bash run_on_cluster.bash test_kmeans_2 The test `test_kmeans_2` simulates a distributed run on a single machine by starting one server process per GPU and connecting to the servers via the rpc protocol. The output should look like [this gist](https://gist.github.com/mdouze/5b2dc69b74579ecff04e1686a277d32e). -### Distributed run +### Distributed run +The way the script can be distributed depends on the cluster's scheduling system. +Here we use Slurm, but it should be relatively easy to adapt to any scheduler that can allocate a set of matchines and start the same exectuable on all of them. +The command +``` +bash run_on_cluster.bash slurm_distributed_kmeans +``` +asks SLURM for 5 machines with 4 GPUs each with the `srun` command. +All 5 machines run the script with the `slurm_within_kmeans_server` option. +They determine the number of servers and their own server id via the `SLURM_NPROCS` and `SLURM_PROCID` environment variables. +All machines start `distributed_kmeans.py` in server mode for the slice of the dataset they are responsible for. +In addition, the machine #0 also starts the client. +The client knows who are the other servers via the variable `SLURM_JOB_NODELIST`. +It connects to all clients and performs the clustering. +Note that the jobs must be killed at the end of the k-means run. +The output should look like [this gist](https://gist.github.com/mdouze/8d25e89fb4af5093057cae0f917da6cd). +For a real run, one would also store the output centroids with the `--out filename` option. From 95d92843ce2ddee04c3e00bceb1a33069d0e31ea Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Tue, 27 Aug 2019 00:54:14 -0700 Subject: [PATCH 07/23] fix slurm nodelist --- benchs/distributed_ondisk/run_on_cluster.bash | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/benchs/distributed_ondisk/run_on_cluster.bash b/benchs/distributed_ondisk/run_on_cluster.bash index 441515eb02..06a14f7735 100644 --- a/benchs/distributed_ondisk/run_on_cluster.bash +++ b/benchs/distributed_ondisk/run_on_cluster.bash @@ -102,11 +102,36 @@ elif [ $todo == slurm_within_kmeans_server ]; then --server --gpu -1 \ --port $port --ipv4 & + # Slurm has a somewhat convoluted way of specifying the nodes + # assigned to each task. This is to parse the SLURM_TASKS_PER_NODE variable + function parse_tasks_per_node () { + local blocks=$1 + for block in ${blocks//,/ }; do + if [ ${block/x/} != $block ]; then + tpn=${block%(*} + repeat=${block#*x} + repeat=${repeat%?} + for((i=0;i Date: Tue, 27 Aug 2019 11:05:22 +0200 Subject: [PATCH 08/23] Update README.md --- benchs/distributed_ondisk/README.md | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index dd060d8c8c..2953fbf76e 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -6,7 +6,7 @@ The current code uses the Deep1B dataset for demonstration purposes, but can sca ## Distributed k-means -To cluster 200M vectors to 10M centroids, it is useful to have a distriubuted k-means implementation. +To cluster 500M vectors to 10M centroids, it is useful to have a distriubuted k-means implementation. The distribution simply consists in splitting the training vectors across machines (servers) and have them do the assignment. The master/client then synthesizes the results and updates the centroids. @@ -81,8 +81,29 @@ All machines start `distributed_kmeans.py` in server mode for the slice of the d In addition, the machine #0 also starts the client. The client knows who are the other servers via the variable `SLURM_JOB_NODELIST`. It connects to all clients and performs the clustering. -Note that the jobs must be killed at the end of the k-means run. The output should look like [this gist](https://gist.github.com/mdouze/8d25e89fb4af5093057cae0f917da6cd). -For a real run, one would also store the output centroids with the `--out filename` option. +### Run used for deep1B + +For the real run, we run the clustering on 50M vectors to 1M centroids. +This is just a matter of using as many machines / GPUs as possible in setting the output centroids with the `--out filename` option. +Then run +``` +bash run_on_cluster.bash deep1b_clustering +``` +In this implementation, the overhead of transmitting the data is non-negligible and so is the centroid computation stage. +This is due to the inefficient Python implementation and the RPC protocol that is not optimized for broadcast / gather (like MPI). +However, it is a simple implementation that should run on most clusters. + +## Making the trained index + +After the centroids are obtained, an empty trained index must be constructed. +This is done by: + +- applying a pre-processing stage (a random rotation) to balance the dimensions of the vectors + +- wrapping the centroids into a HNSW index to speed up the CPU-based assignment of vectors + +- training the 6-bit scalar quantizer used to encode the vectors + From 941164f4345e36633cd7fa06c72cb4de14dc1dc7 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Tue, 27 Aug 2019 11:35:29 +0200 Subject: [PATCH 09/23] Update README.md --- benchs/distributed_ondisk/README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index 2953fbf76e..22a69c8426 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -92,6 +92,15 @@ Then run ``` bash run_on_cluster.bash deep1b_clustering ``` + +The last lines of output read like: +``` + Iteration 19 (898.92 s, search 875.71 s): objective=1.33601e+07 imbalance=1.303 nsplit=0 + 0: writing centroids to /checkpoint/matthijs/ondisk_distributed/1M_centroids.npy +``` + +This means that the total training time was 899s, of which 876s were used for computation. +However, the computation includes the I/O overhead to the assignment servers. In this implementation, the overhead of transmitting the data is non-negligible and so is the centroid computation stage. This is due to the inefficient Python implementation and the RPC protocol that is not optimized for broadcast / gather (like MPI). However, it is a simple implementation that should run on most clusters. From bbb5907336ba365d077f32f6a094d4da4b57189b Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Tue, 27 Aug 2019 11:54:35 +0200 Subject: [PATCH 10/23] Update README.md --- benchs/distributed_ondisk/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index 22a69c8426..840886563d 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -110,9 +110,11 @@ However, it is a simple implementation that should run on most clusters. After the centroids are obtained, an empty trained index must be constructed. This is done by: -- applying a pre-processing stage (a random rotation) to balance the dimensions of the vectors +- applying a pre-processing stage (a random rotation) to balance the dimensions of the vectors. This can be done after clustering, the clusters are just rotated as well. - wrapping the centroids into a HNSW index to speed up the CPU-based assignment of vectors - training the 6-bit scalar quantizer used to encode the vectors +This is performed by the script `make_trained_index.py`. + From a497b6c9114a873eb573c4b0d158f312641978a0 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Tue, 27 Aug 2019 02:54:58 -0700 Subject: [PATCH 11/23] added make_trained_index --- .../distributed_ondisk/make_trained_index.py | 48 +++++++++++++++++++ benchs/distributed_ondisk/run_on_cluster.bash | 46 ++++++++++++++---- 2 files changed, 84 insertions(+), 10 deletions(-) create mode 100644 benchs/distributed_ondisk/make_trained_index.py diff --git a/benchs/distributed_ondisk/make_trained_index.py b/benchs/distributed_ondisk/make_trained_index.py new file mode 100644 index 0000000000..345a40d879 --- /dev/null +++ b/benchs/distributed_ondisk/make_trained_index.py @@ -0,0 +1,48 @@ + +import numpy as np +import faiss + +deep1bdir = "/datasets01_101/simsearch/041218/deep1b/" +workdir = "/checkpoint/matthijs/ondisk_distributed/" + + +print('Load centroids') +centroids = np.load(workdir + '1M_centroids.npy') +ncent, d = centroids.shape + + +print('apply random rotation') +rrot = faiss.RandomRotationMatrix(d, d) +rrot.init(1234) +centroids = rrot.apply_py(centroids) + +print('make HNSW index as quantizer') +quantizer = faiss.IndexHNSWFlat(d, 32) +quantizer.hnsw.efSearch = 1024 +quantizer.hnsw.efConstruction = 200 +quantizer.add(centroids) + +print('build index') +index = faiss.IndexPreTransform( + rrot, + faiss.IndexIVFScalarQuantizer( + quantizer, d, ncent, faiss.ScalarQuantizer.QT_6bit + ) + ) + +def ivecs_mmap(fname): + a = np.memmap(fname, dtype='int32', mode='r') + d = a[0] + return a.reshape(-1, d + 1)[:, 1:] + +def fvecs_mmap(fname): + return ivecs_mmap(fname).view('float32') + + +print('finish training index') +xt = fvecs_mmap(deep1bdir + 'learn.fvecs') +xt = np.ascontiguousarray(xt[:256 * 1000], dtype='float32') +index.train(xt) + +print('write output') +faiss.write_index(index, workdir + 'trained.faissindex') diff --git a/benchs/distributed_ondisk/run_on_cluster.bash b/benchs/distributed_ondisk/run_on_cluster.bash index 06a14f7735..2fac741960 100644 --- a/benchs/distributed_ondisk/run_on_cluster.bash +++ b/benchs/distributed_ondisk/run_on_cluster.bash @@ -3,11 +3,23 @@ set -e todo=$1 +# other options can be transmitted +shift # the training data of the Deep1B dataset -testdata=/datasets01_101/simsearch/041218/deep1b/learn.fvecs -nvec=1000000 -k=4000 +deep1bdir=/datasets01_101/simsearch/041218/deep1b +traindata=$deep1bdir/learn.fvecs + +# this is for small tests +# nvec=1000000 +# k=4000 + +# for the real run +nvec=50000000 +k=1000000 + +# working directory for the real run +workdir=/checkpoint/matthijs/ondisk_distributed if [ -z "$todo" ]; then @@ -16,13 +28,13 @@ if [ -z "$todo" ]; then elif [ $todo == test_kmeans_0 ]; then # non distributed baseline python distributed_kmeans.py \ - --indata $testdata --i1 $nvec \ + --indata $traindata --i1 $nvec \ --k $k elif [ $todo == test_kmeans_1 ]; then # using all the machine's GPUs python distributed_kmeans.py \ - --indata $testdata --i1 $nvec \ + --indata $traindata --i1 $nvec \ --k $k --gpu -1 elif [ $todo == test_kmeans_2 ]; then @@ -44,7 +56,7 @@ elif [ $todo == test_kmeans_2 ]; then echo "start server $gpu for range $i0:$i1" python distributed_kmeans.py \ - --indata $testdata \ + --indata $traindata \ --i0 $i0 --i1 $i1 \ --server --gpu $gpu \ --port $port --ipv4 & @@ -86,7 +98,7 @@ elif [ $todo == slurm_within_kmeans_server ]; then if [ $rank != 0 ]; then python -u distributed_kmeans.py \ - --indata $testdata \ + --indata $traindata \ --i0 $i0 --i1 $i1 \ --server --gpu -1 \ --port $port --ipv4 @@ -97,7 +109,7 @@ elif [ $todo == slurm_within_kmeans_server ]; then trap 'kill -HUP 0' 0 python -u distributed_kmeans.py \ - --indata $testdata \ + --indata $traindata \ --i0 $i0 --i1 $i1 \ --server --gpu -1 \ --port $port --ipv4 & @@ -108,7 +120,7 @@ elif [ $todo == slurm_within_kmeans_server ]; then local blocks=$1 for block in ${blocks//,/ }; do if [ ${block/x/} != $block ]; then - tpn=${block%(*} + tpn="${block%(*}" repeat=${block#*x} repeat=${repeat%?} for((i=0;i Date: Tue, 27 Aug 2019 11:55:25 +0200 Subject: [PATCH 12/23] Update README.md --- benchs/distributed_ondisk/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index 840886563d..da078541d9 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -116,5 +116,5 @@ This is done by: - training the 6-bit scalar quantizer used to encode the vectors -This is performed by the script `make_trained_index.py`. +This is performed by the script [`make_trained_index.py`](make_trained_index.py). From e7e8396bb63777f41a11b0bf127fad523e492c00 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Tue, 27 Aug 2019 06:02:36 -0700 Subject: [PATCH 13/23] added vslice script --- .../distributed_ondisk/make_index_vslice.py | 112 ++++++++++++++++++ benchs/distributed_ondisk/run_on_cluster.bash | 46 ++++++- 2 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 benchs/distributed_ondisk/make_index_vslice.py diff --git a/benchs/distributed_ondisk/make_index_vslice.py b/benchs/distributed_ondisk/make_index_vslice.py new file mode 100644 index 0000000000..dfd4e92b8d --- /dev/null +++ b/benchs/distributed_ondisk/make_index_vslice.py @@ -0,0 +1,112 @@ +import os +import time +import numpy as np +import faiss +import argparse +from multiprocessing.dummy import Pool as ThreadPool + +def ivecs_mmap(fname): + a = np.memmap(fname, dtype='int32', mode='r') + d = a[0] + return a.reshape(-1, d + 1)[:, 1:] + +def fvecs_mmap(fname): + return ivecs_mmap(fname).view('float32') + + +def produce_batches(args): + + x = fvecs_mmap(args.input) + + if args.i1 == -1: + args.i1 = len(x) + + print("Iterating on vectors %d:%d from %s by batches of size %d" % ( + args.i0, args.i1, args.input, args.bs)) + + for j0 in range(args.i0, args.i1, args.bs): + j1 = min(j0 + args.bs, args.i1) + yield np.arange(j0, j1), x[j0:j1] + + +def rate_limited_iter(l): + 'a thread pre-processes the next element' + pool = ThreadPool(1) + res = None + + def next_or_None(): + try: + return next(l) + except StopIteration: + return None + + while True: + res_next = pool.apply_async(next_or_None) + if res is not None: + res = res.get() + if res is None: + return + yield res + res = res_next + +deep1bdir = "/datasets01_101/simsearch/041218/deep1b/" +workdir = "/checkpoint/matthijs/ondisk_distributed/" + +def main(): + parser = argparse.ArgumentParser( + description='make index for a subset of the data') + + def aa(*args, **kwargs): + group.add_argument(*args, **kwargs) + + group = parser.add_argument_group('index type') + aa('--inputindex', + default=workdir + 'trained.faissindex', + help='empty input index to fill in') + aa('--nt', default=-1, type=int, help='nb of openmp threads to use') + + group = parser.add_argument_group('db options') + aa('--input', default=deep1bdir + "base.fvecs") + aa('--bs', default=2**18, type=int, + help='batch size for db access') + aa('--i0', default=0, type=int, help='lower bound to index') + aa('--i1', default=-1, type=int, help='upper bound of vectors to index') + + group = parser.add_argument_group('output') + aa('-o', default='/tmp/x', help='output index') + aa('--keepquantizer', default=False, action='store_true', + help='by default we remove the data from the quantizer to save space') + + args = parser.parse_args() + print('args=', args) + + print('start accessing data') + src = produce_batches(args) + + print('loading index', args.inputindex) + index = faiss.read_index(args.inputindex) + + if args.nt != -1: + faiss.omp_set_num_threads(args.nt) + + t0 = time.time() + ntot = 0 + for ids, x in rate_limited_iter(src): + print('add %d:%d (%.3f s)' % (ntot, ntot + ids.size, time.time() - t0)) + index.add_with_ids(np.ascontiguousarray(x, dtype='float32'), ids) + ntot += ids.size + + index_ivf = faiss.extract_index_ivf(index) + print('invlists stats: imbalance %.3f' % index_ivf.invlists.imbalance_factor()) + index_ivf.invlists.print_stats() + + if not args.keepquantizer: + print('resetting quantizer content') + index_ivf = faiss.extract_index_ivf(index) + index_ivf.quantizer.reset() + + print('store output', args.o) + faiss.write_index(index, args.o) + +if __name__ == '__main__': + main() diff --git a/benchs/distributed_ondisk/run_on_cluster.bash b/benchs/distributed_ondisk/run_on_cluster.bash index 2fac741960..1688bffc77 100644 --- a/benchs/distributed_ondisk/run_on_cluster.bash +++ b/benchs/distributed_ondisk/run_on_cluster.bash @@ -11,16 +11,16 @@ deep1bdir=/datasets01_101/simsearch/041218/deep1b traindata=$deep1bdir/learn.fvecs # this is for small tests -# nvec=1000000 -# k=4000 +nvec=1000000 +k=4000 # for the real run -nvec=50000000 -k=1000000 +# nvec=50000000 +# k=1000000 # working directory for the real run workdir=/checkpoint/matthijs/ondisk_distributed - +mkdir -p $workdir/{vslices,hslices} if [ -z "$todo" ]; then echo "nothing to do" @@ -171,6 +171,42 @@ elif [ $todo == deep1b_clustering ]; then -l bash $( realpath $0 ) slurm_within_kmeans_server \ --out $workdir/1M_centroids.npy +elif [ $todo == make_index_vslices ]; then + + # vslice: slice per database shards + + nvec=1000000000 + nslice=200 + + for((i=0;i $workdir/vslices/slice$i.bash < Date: Tue, 27 Aug 2019 16:01:54 +0200 Subject: [PATCH 14/23] Update README.md --- benchs/distributed_ondisk/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index da078541d9..42cafb24a4 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -118,3 +118,16 @@ This is done by: This is performed by the script [`make_trained_index.py`](make_trained_index.py). +## Building the index by slices + +We call the slices "vslisces" as they are vertical slices of the big matrix (see explanation in the wiki section [Split across datanbase partitions](https://github.com/facebookresearch/faiss/wiki/Indexing-1T-vectors#split-across-database-partitions) + +The script [make_index_vslice.py](make_index_vslice.py) makes an index for a subset of the vectors of the input data and stores it as an independent index. +It can be run in a brute-force parallel fashion, there is no constraint on ordering. +To run the script in parallel on a slurm cluster, use: +``` +bash run_on_cluster.bash make_index_vslices +``` + +## Splitting accross inverted lists + From b43ae78c81820eda4c9a4ee12d41dca2f54f1db0 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Tue, 27 Aug 2019 16:26:31 +0200 Subject: [PATCH 15/23] Update README.md --- benchs/distributed_ondisk/README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index 42cafb24a4..ceea1aef78 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -123,11 +123,23 @@ This is performed by the script [`make_trained_index.py`](make_trained_index.py) We call the slices "vslisces" as they are vertical slices of the big matrix (see explanation in the wiki section [Split across datanbase partitions](https://github.com/facebookresearch/faiss/wiki/Indexing-1T-vectors#split-across-database-partitions) The script [make_index_vslice.py](make_index_vslice.py) makes an index for a subset of the vectors of the input data and stores it as an independent index. +There are 200 slices of 5M vectors each for Deep1B. It can be run in a brute-force parallel fashion, there is no constraint on ordering. To run the script in parallel on a slurm cluster, use: ``` bash run_on_cluster.bash make_index_vslices ``` +For a real dataset, the data would be read from a DBMS. +In that case, reading the data and indexing it in parallel is worthwhile because reading is very slow. ## Splitting accross inverted lists +The 200 slices need to be merged together. +This is done with the script [merge_to_ondisk.py](merge_to_ondisk.py), that memory maps the 200 vertical slice indexes, extracts a subset of the inverted lists and writes them to a contiguous horizontal slice. +We slice the inverted lists into 50 horizontal slices. +This is run with +``` +bash run_on_cluster.bash make_index_hslices +``` + + From 2135f6b04de230b5cf850a427d7062e0085067f8 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Tue, 27 Aug 2019 07:27:06 -0700 Subject: [PATCH 16/23] added merge_to_ondisk --- benchs/distributed_ondisk/merge_to_ondisk.py | 91 +++++++++++++++++++ benchs/distributed_ondisk/run_on_cluster.bash | 34 +++++++ 2 files changed, 125 insertions(+) create mode 100644 benchs/distributed_ondisk/merge_to_ondisk.py diff --git a/benchs/distributed_ondisk/merge_to_ondisk.py b/benchs/distributed_ondisk/merge_to_ondisk.py new file mode 100644 index 0000000000..70119b293a --- /dev/null +++ b/benchs/distributed_ondisk/merge_to_ondisk.py @@ -0,0 +1,91 @@ +import os +import faiss +import argparse +from multiprocessing.dummy import Pool as ThreadPool + +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + + parser.add_argument('--inputs', nargs='*', required=True, + help='input indexes to merge') + parser.add_argument('--l0', type=int, default=0) + parser.add_argument('--l1', type=int, default=-1) + + parser.add_argument('--nt', default=-1, + help='nb threads') + + parser.add_argument('--output', required=True, + help='output index filename') + parser.add_argument('--outputIL', + help='output invfile filename') + + args = parser.parse_args() + + if args.nt != -1: + print('set nb of threads to', args.nt) + + + ils = faiss.InvertedListsPtrVector() + ils_dont_dealloc = [] + + pool = ThreadPool(20) + + def load_index(fname): + print("loading", fname) + try: + index = faiss.read_index(fname, faiss.IO_FLAG_MMAP | faiss.IO_FLAG_READ_ONLY) + except RuntimeError as e: + print('could not load %s: %s' % (fname, e)) + return fname, None + + print(" %d entries" % index.ntotal) + return fname, index + + index0 = None + + for fname, index in pool.imap(load_index, args.inputs): + if index is None: + continue + index_ivf = faiss.extract_index_ivf(index) + il = faiss.downcast_InvertedLists(index_ivf.invlists) + index_ivf.invlists = None + il.this.own() + ils_dont_dealloc.append(il) + if (args.l0, args.l1) != (0, -1): + print('restricting to lists %d:%d' % (args.l0, args.l1)) + # il = faiss.SliceInvertedLists(il, args.l0, args.l1) + + il.crop_invlists(args.l0, args.l1) + ils_dont_dealloc.append(il) + ils.push_back(il) + + if index0 is None: + index0 = index + + print("loaded %d invlists" % ils.size()) + + if not args.outputIL: + args.outputIL = args.output + '_invlists' + + il0 = ils.at(0) + + il = faiss.OnDiskInvertedLists( + il0.nlist, il0.code_size, + args.outputIL) + + print("perform merge") + + ntotal = il.merge_from(ils.data(), ils.size(), True) + + print("swap into index0") + + index0_ivf = faiss.extract_index_ivf(index0) + index0_ivf.nlist = il0.nlist + index0_ivf.ntotal = index0.ntotal = ntotal + index0_ivf.invlists = il + index0_ivf.own_invlists = False + + print("write", args.output) + + faiss.write_index(index0, args.output) diff --git a/benchs/distributed_ondisk/run_on_cluster.bash b/benchs/distributed_ondisk/run_on_cluster.bash index 1688bffc77..d4c9607bed 100644 --- a/benchs/distributed_ondisk/run_on_cluster.bash +++ b/benchs/distributed_ondisk/run_on_cluster.bash @@ -205,7 +205,41 @@ EOF done +elif [ $todo == make_index_hslices ]; then + # hslice: slice per inverted lists + + nlist=1000000 + nslice=50 + + for((i=0;i $workdir/hslices/slice$i.bash < Date: Tue, 27 Aug 2019 16:28:58 +0200 Subject: [PATCH 17/23] Update README.md --- benchs/distributed_ondisk/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index ceea1aef78..a5de7d5cc9 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -142,4 +142,7 @@ This is run with bash run_on_cluster.bash make_index_hslices ``` +## Querying the index +At this point the index is ready. +The horizontal slices need to be loaded in the right order and combined into an index to be usable. From aa53510d1bc7f6c5e6a365784c759f08fdb5594b Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Wed, 28 Aug 2019 01:55:22 -0700 Subject: [PATCH 18/23] added combined_index.py --- benchs/distributed_ondisk/combined_index.py | 187 ++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 benchs/distributed_ondisk/combined_index.py diff --git a/benchs/distributed_ondisk/combined_index.py b/benchs/distributed_ondisk/combined_index.py new file mode 100644 index 0000000000..50281c1668 --- /dev/null +++ b/benchs/distributed_ondisk/combined_index.py @@ -0,0 +1,187 @@ +import os +import faiss +import numpy as np + + +class CombinedIndex: + """ + combines a set of inverted lists into a hstack + masks part of those lists + adds these inverted lists to an empty index that contains + the info on how to perform searches + """ + + def __init__(self, invlist_fnames, empty_index_fname, + masked_index_fname=None): + + self.indexes = indexes = [] + ilv = faiss.InvertedListsPtrVector() + + for fname in invlist_fnames: + if os.path.exists(fname): + print('reading', fname, end='\r', flush=True) + index = faiss.read_index(fname) + indexes.append(index) + il = faiss.extract_index_ivf(index).invlists + else: + assert False + ilv.push_back(il) + print() + + self.big_il = faiss.VStackInvertedLists(ilv.size(), ilv.data()) + if masked_index_fname: + self.big_il_base = self.big_il + print('loading', masked_index_fname) + self.masked_index = faiss.read_index( + masked_index_fname, + faiss.IO_FLAG_MMAP | faiss.IO_FLAG_READ_ONLY) + self.big_il = faiss.MaskedInvertedLists( + faiss.extract_index_ivf(self.masked_index).invlists, + self.big_il_base) + + print('loading empty index', empty_index_fname) + self.index = faiss.read_index(empty_index_fname) + ntotal = self.big_il.compute_ntotal() + + print('replace invlists') + index_ivf = faiss.extract_index_ivf(self.index) + index_ivf.replace_invlists(self.big_il, False) + index_ivf.ntotal = self.index.ntotal = ntotal + index_ivf.parallel_mode = 1 # seems reasonable to do this all the time + + quantizer = faiss.downcast_index(index_ivf.quantizer) + quantizer.hnsw.efSearch = 1024 + + ############################################################ + # Expose fields and functions of the index as methods so that they + # can be called by RPC + + def search(self, x, k): + return self.index.search(x, k) + + def range_search(self, x, radius): + return self.index.range_search(x, radius) + + def transform_and_assign(self, xq): + index = self.index + + if isinstance(index, faiss.IndexPreTransform): + assert index.chain.size() == 1 + vt = index.chain.at(0) + xq = vt.apply_py(xq) + + # perform quantization + index_ivf = faiss.extract_index_ivf(index) + quantizer = index_ivf.quantizer + coarse_dis, list_nos = quantizer.search(xq, index_ivf.nprobe) + return xq, list_nos, coarse_dis + + + def ivf_search_preassigned(self, xq, list_nos, coarse_dis, k): + index_ivf = faiss.extract_index_ivf(self.index) + n, d = xq.shape + assert d == index_ivf.d + n2, d2 = list_nos.shape + assert list_nos.shape == coarse_dis.shape + assert n2 == n + assert d2 == index_ivf.nprobe + D = np.empty((n, k), dtype='float32') + I = np.empty((n, k), dtype='int64') + index_ivf.search_preassigned( + n, faiss.swig_ptr(xq), k, + faiss.swig_ptr(list_nos), faiss.swig_ptr(coarse_dis), + faiss.swig_ptr(D), faiss.swig_ptr(I), False) + return D, I + + + def ivf_range_search_preassigned(self, xq, list_nos, coarse_dis, radius): + index_ivf = faiss.extract_index_ivf(self.index) + n, d = xq.shape + assert d == index_ivf.d + n2, d2 = list_nos.shape + assert list_nos.shape == coarse_dis.shape + assert n2 == n + assert d2 == index_ivf.nprobe + res = faiss.RangeSearchResult(n) + + index_ivf.range_search_preassigned( + n, faiss.swig_ptr(xq), radius, + faiss.swig_ptr(list_nos), faiss.swig_ptr(coarse_dis), + res) + + lims = faiss.rev_swig_ptr(res.lims, n + 1).copy() + nd = int(lims[-1]) + D = faiss.rev_swig_ptr(res.distances, nd).copy() + I = faiss.rev_swig_ptr(res.labels, nd).copy() + return lims, D, I + + def set_nprobe(self, nprobe): + index_ivf = faiss.extract_index_ivf(self.index) + index_ivf.nprobe = nprobe + + def set_parallel_mode(self, pm): + index_ivf = faiss.extract_index_ivf(self.index) + index_ivf.parallel_mode = pm + + def get_ntotal(self): + return self.index.ntotal + + def set_prefetch_nthread(self, nt): + for idx in self.indexes: + il = faiss.downcast_InvertedLists( + faiss.extract_index_ivf(idx).invlists) + il.prefetch_nthread + il.prefetch_nthread = nt + + def set_omp_num_threads(self, nt): + faiss.omp_set_num_threads(nt) + +class CombinedIndexDeep1B(CombinedIndex): + """ loads a CombinedIndex with the data from the big photodna index """ + + def __init__(self): + # set some paths + workdir = "/checkpoint/matthijs/ondisk_distributed/" + + # empty index with the proper quantizer + indexfname = workdir + 'trained.faissindex' + + # index that has some invlists that override the big one + masked_index_fname = None + invlist_fnames = [ + '%s/hslices/slice%d.faissindex' % (workdir, i) + for i in range(50) + ] + CombinedIndex.__init__(self, invlist_fnames, indexfname, masked_index_fname) + + +def ivecs_read(fname): + a = np.fromfile(fname, dtype='int32') + d = a[0] + return a.reshape(-1, d + 1)[:, 1:].copy() + + +def fvecs_read(fname): + return ivecs_read(fname).view('float32') + + +if __name__ == '__main__': + import time + ci = CombinedIndexDeep1B() + print('loaded index of size ', ci.index.ntotal) + + deep1bdir = "/datasets01_101/simsearch/041218/deep1b/" + + xq = fvecs_read(deep1bdir + "deep1B_queries.fvecs") + gt_fname = deep1bdir + "deep1B_groundtruth.ivecs" + gt = ivecs_read(gt_fname) + + for nprobe in 1, 10, 100, 1000: + ci.set_nprobe(nprobe) + t0 = time.time() + D, I = ci.search(xq, 100) + t1 = time.time() + print('nprobe=%d 1-recall@1=%.4f t=%.2fs' % ( + nprobe, (I[:, 0] == gt[:, 0]).sum() / len(xq), + t1 - t0 + )) From 547b1b707af776c67aafad73fa5cf6b06ca8acb3 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Wed, 28 Aug 2019 11:01:38 +0200 Subject: [PATCH 19/23] Update README.md --- benchs/distributed_ondisk/README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index a5de7d5cc9..bef1410d71 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -3,6 +3,7 @@ This is code corresponding to the description in [Indexing 1T vectors](https://github.com/facebookresearch/faiss/wiki/Indexing-1T-vectors). All the code is in python 3 (and not compatible with Python 2). The current code uses the Deep1B dataset for demonstration purposes, but can scale to 1000x larger. +To run it, download the Deep1B dataset as explained [here](../#getting-deep1b), and edit paths to the dataset in the scripts. ## Distributed k-means @@ -146,3 +147,22 @@ bash run_on_cluster.bash make_index_hslices At this point the index is ready. The horizontal slices need to be loaded in the right order and combined into an index to be usable. +This is done in the [combined_index.py](combined_index.py) script. +It provides a `CombinedIndexDeep1B` object that contains an index object that can be searched. +To test, run: +``` +python combined_index.py +``` +The output should look like: +``` +(faiss_1.5.2) matthijs@devfair0144:~/faiss_versions/faiss_1Tcode/faiss/benchs/distributed_ondisk$ python combined_index.py +reading /checkpoint/matthijs/ondisk_distributed//hslices/slice49.faissindex +loading empty index /checkpoint/matthijs/ondisk_distributed/trained.faissindex +replace invlists +loaded index of size 1000000000 +nprobe=1 1-recall@1=0.2904 t=12.35s +nnprobe=10 1-recall@1=0.6499 t=17.67s +nprobe=100 1-recall@1=0.8673 t=29.23s +nprobe=1000 1-recall@1=0.9132 t=129.58s +``` +ie. searching is a lot slower than from RAM. From ecb2dc34c404d79e5945c1ca69238d9cbf218aef Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Wed, 28 Aug 2019 12:04:01 +0200 Subject: [PATCH 20/23] Update README.md --- benchs/distributed_ondisk/README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index bef1410d71..9fd2051d95 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -166,3 +166,18 @@ nprobe=100 1-recall@1=0.8673 t=29.23s nprobe=1000 1-recall@1=0.9132 t=129.58s ``` ie. searching is a lot slower than from RAM. + +## Distributed query + +To reduce the bandwidth required from the machine that does the queries, it is possible to split the search accross several search servers. +This way, only the effective results are returned to the main machine. + +The search client and server are implemented in [`search_server.py`](search_server.py). +It can be used as a script to start a search server for `CombinedIndexDeep1B` or as a module to load the clients. + + + +## Conclusion + +This code contains the core components to make an index that scales up to 1T vectors. +There are a few simplifications wrt. the index that was effectively used in [Indexing 1T vectors](https://github.com/facebookresearch/faiss/wiki/Indexing-1T-vectors). From 7c7af916323f89dbbcb3e3b9bb79dbae3eb80134 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Wed, 28 Aug 2019 14:24:28 +0200 Subject: [PATCH 21/23] Update README.md --- benchs/distributed_ondisk/README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/benchs/distributed_ondisk/README.md b/benchs/distributed_ondisk/README.md index 9fd2051d95..e1d91aed5b 100644 --- a/benchs/distributed_ondisk/README.md +++ b/benchs/distributed_ondisk/README.md @@ -175,7 +175,18 @@ This way, only the effective results are returned to the main machine. The search client and server are implemented in [`search_server.py`](search_server.py). It can be used as a script to start a search server for `CombinedIndexDeep1B` or as a module to load the clients. +The search servers can be started with +``` +bash run_on_cluster.bash run_search_servers +``` +(adjust to the number of servers that can be used). + +Then an example of search client is [`distributed_query_demo.py`](distributed_query_demo.py). +It connects to the servers and assigns subsets of inverted lists to visit to each of them. +A typical output is [this gist](https://gist.github.com/mdouze/1585b9854a9a2437d71f2b2c3c05c7c5). +The number in MiB indicates the amount of data that is read from disk to perform the search. +In this case, the scale of the dataset is too small for the distributed search to have much impact, but on datasets > 10x larger, the difference becomes more significant. ## Conclusion From 3e1557559f53f650585520d9d85c4b3053583fa2 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Wed, 28 Aug 2019 05:25:11 -0700 Subject: [PATCH 22/23] added distributed search demo --- .../distributed_query_demo.py | 65 +++++ benchs/distributed_ondisk/run_on_cluster.bash | 12 + benchs/distributed_ondisk/search_server.py | 242 ++++++++++++++++++ 3 files changed, 319 insertions(+) create mode 100644 benchs/distributed_ondisk/distributed_query_demo.py create mode 100644 benchs/distributed_ondisk/search_server.py diff --git a/benchs/distributed_ondisk/distributed_query_demo.py b/benchs/distributed_ondisk/distributed_query_demo.py new file mode 100644 index 0000000000..2e4c4e911d --- /dev/null +++ b/benchs/distributed_ondisk/distributed_query_demo.py @@ -0,0 +1,65 @@ +import os +import faiss +import numpy as np +import time +import rpc +import sys + +import combined_index +import search_server + +hostnames = sys.argv[1:] + +print("Load local index") +ci = combined_index.CombinedIndexDeep1B() + +print("connect to clients") +clients = [] +for host in hostnames: + client = rpc.Client(host, 12012, v6=False) + clients.append(client) + +# check if all servers respond +print("sizes seen by servers:", [cl.get_ntotal() for cl in clients]) + + +# aggregate all clients into a one that uses them all for speed +# note that it also requires a local index ci +sindex = search_server.SplitPerListIndex(ci, clients) +sindex.verbose = True + +# set reasonable parameters +ci.set_parallel_mode(1) +ci.set_prefetch_nthread(0) +ci.set_omp_num_threads(64) + +# initialize params +sindex.set_parallel_mode(1) +sindex.set_prefetch_nthread(0) +sindex.set_omp_num_threads(64) + +def ivecs_read(fname): + a = np.fromfile(fname, dtype='int32') + d = a[0] + return a.reshape(-1, d + 1)[:, 1:].copy() + +def fvecs_read(fname): + return ivecs_read(fname).view('float32') + + +deep1bdir = "/datasets01_101/simsearch/041218/deep1b/" + +xq = fvecs_read(deep1bdir + "deep1B_queries.fvecs") +gt_fname = deep1bdir + "deep1B_groundtruth.ivecs" +gt = ivecs_read(gt_fname) + + +for nprobe in 1, 10, 100, 1000: + sindex.set_nprobe(nprobe) + t0 = time.time() + D, I = sindex.search(xq, 100) + t1 = time.time() + print('nprobe=%d 1-recall@1=%.4f t=%.2fs' % ( + nprobe, (I[:, 0] == gt[:, 0]).sum() / len(xq), + t1 - t0 + )) diff --git a/benchs/distributed_ondisk/run_on_cluster.bash b/benchs/distributed_ondisk/run_on_cluster.bash index d4c9607bed..14c3bb9725 100644 --- a/benchs/distributed_ondisk/run_on_cluster.bash +++ b/benchs/distributed_ondisk/run_on_cluster.bash @@ -241,6 +241,18 @@ EOF done +elif [ $todo == run_search_servers ]; then + + nserv=3 + + srun -n$nserv \ + --time=48:00:00 \ + --cpus-per-task=64 --gres=gpu:0 --mem=100G \ + --constraint=pascal \ + --partition=priority --comment='priority is the only one that works' \ + -l python -u search_server.py --port 12012 + + else echo "unknown todo $todo" exit 1 diff --git a/benchs/distributed_ondisk/search_server.py b/benchs/distributed_ondisk/search_server.py new file mode 100644 index 0000000000..142388933a --- /dev/null +++ b/benchs/distributed_ondisk/search_server.py @@ -0,0 +1,242 @@ +import os +import time +import rpc +import combined_index + +import argparse + + +############################################################ +# Server implementation +############################################################ + + +class MyServer(rpc.Server): + """ Assign version that can be exposed via RPC """ + def __init__(self, s, index): + rpc.Server.__init__(self, s) + self.index = index + + def __getattr__(self, f): + return getattr(self.index, f) + +def main(): + parser = argparse.ArgumentParser() + + def aa(*args, **kwargs): + group.add_argument(*args, **kwargs) + + group = parser.add_argument_group('server options') + aa('--port', default=12012, type=int, help='server port') + aa('--when_ready_dir', default=None, + help='store host:port to this file when ready') + aa('--ipv4', default=False, action='store_true', help='force ipv4') + aa('--rank', default=0, type=int, + help='rank used as index in the client table') + + args = parser.parse_args() + + when_ready = None + if args.when_ready_dir: + when_ready = '%s/%d' % (args.when_ready_dir, args.rank) + + print('loading index') + + index = combined_index.CombinedIndexDeep1B() + + print('starting server') + rpc.run_server( + lambda s: MyServer(s, index), + args.port, report_to_file=when_ready, + v6=not args.ipv4) + +if __name__ == '__main__': + main() + + +############################################################ +# Client implementation +############################################################ + +from multiprocessing.dummy import Pool as ThreadPool +import faiss +import numpy as np + + +def connect_clients_clist(port=12012, ipv4=False): + """ get a list of servers from clist and attempts to connect to them + each client is an object that exposes the MyServer interface. + + The servers should be started with + + bash run_search_servers.bash 10 + + (to get 10 servers) + """ + clients = [] + for l in os.popen('clist -Nrp --format tsv', 'r'): + l = l.strip() + if 'startedRunningAt' in l: + continue + fi = l.split('\t') + name = fi[7] + if 'SServ' in name: + host = fi[1] + jobid = fi[2] + if host == 'None': continue + print(jobid) + client = rpc.Client(host, port, v6=not ipv4) + clients.append(client) + return clients + + +class ResultHeap: + """ Combine query results from a sliced dataset (for k-nn search) """ + + def __init__(self, nq, k): + " nq: number of query vectors, k: number of results per query " + self.I = np.zeros((nq, k), dtype='int64') + self.D = np.zeros((nq, k), dtype='float32') + self.nq, self.k = nq, k + heaps = faiss.float_maxheap_array_t() + heaps.k = k + heaps.nh = nq + heaps.val = faiss.swig_ptr(self.D) + heaps.ids = faiss.swig_ptr(self.I) + heaps.heapify() + self.heaps = heaps + + def add_batch_result(self, D, I, i0): + assert D.shape == (self.nq, self.k) + assert I.shape == (self.nq, self.k) + I += i0 + self.heaps.addn_with_ids( + self.k, faiss.swig_ptr(D), + faiss.swig_ptr(I), self.k) + + def finalize(self): + self.heaps.reorder() + +def distribute_weights(weights, nbin): + """ assign a set of weights to a smaller set of bins to balance them """ + nw = weights.size + o = weights.argsort() + bins = np.zeros(nbin) + assign = np.ones(nw, dtype=int) + for i in o[::-1]: + b = bins.argmin() + assign[i] = b + bins[b] += weights[i] + return bins, assign + + + +class SplitPerListIndex: + """manages a local index, that does the coarse quantization and a set + of sub_indexes. The sub_indexes search a subset of the inverted + lists. The SplitPerListIndex merges results from the sub-indexes""" + + def __init__(self, index, sub_indexes): + self.index = index + self.code_size = faiss.extract_index_ivf(index.index).code_size + self.sub_indexes = sub_indexes + self.ni = len(self.sub_indexes) + # pool of threads. Each thread manages one sub-index. + self.pool = ThreadPool(self.ni) + self.verbose = False + + def set_nprobe(self, nprobe): + self.index.set_nprobe(nprobe) + self.pool.map( + lambda i: self.sub_indexes[i].set_nprobe(nprobe), + range(self.ni) + ) + + def set_omp_num_threads(self, nt): + faiss.omp_set_num_threads(nt) + self.pool.map( + lambda idx: idx.set_omp_num_threads(nt), + self.sub_indexes + ) + + def set_parallel_mode(self, pm): + self.index.set_parallel_mode(pm) + self.pool.map( + lambda idx: idx.set_parallel_mode(pm), + self.sub_indexes + ) + + def set_prefetch_nthread(self, nt): + self.index.set_prefetch_nthread(nt) + self.pool.map( + lambda idx: idx.set_prefetch_nthread(nt), + self.sub_indexes + ) + + def balance_lists(self, list_nos): + big_il = self.index.big_il + weights = np.array([big_il.list_size(int(i)) + for i in list_nos.ravel()]) + bins, assign = distribute_weights(weights, self.ni) + if self.verbose: + print('bins weight range %d:%d total %d (%.2f MiB)' % ( + bins.min(), bins.max(), bins.sum(), + bins.sum() * (self.code_size + 8) / 2 ** 20)) + self.nscan = bins.sum() + return assign.reshape(list_nos.shape) + + def search(self, x, k): + xqo, list_nos, coarse_dis = self.index.transform_and_assign(x) + assign = self.balance_lists(list_nos) + + def do_query(i): + sub_index = self.sub_indexes[i] + list_nos_i = list_nos.copy() + list_nos_i[assign != i] = -1 + t0 = time.time() + Di, Ii = sub_index.ivf_search_preassigned( + xqo, list_nos_i, coarse_dis, k) + #print(list_nos_i, Ii) + if self.verbose: + print('client %d: %.3f s' % (i, time.time() - t0)) + return Di, Ii + + rh = ResultHeap(x.shape[0], k) + + for Di, Ii in self.pool.imap(do_query, range(self.ni)): + #print("ADD", Ii, rh.I) + rh.add_batch_result(Di, Ii, 0) + rh.finalize() + return rh.D, rh.I + + def range_search(self, x, radius): + xqo, list_nos, coarse_dis = self.index.transform_and_assign(x) + assign = self.balance_lists(list_nos) + nq = len(x) + + def do_query(i): + sub_index = self.sub_indexes[i] + list_nos_i = list_nos.copy() + list_nos_i[assign != i] = -1 + t0 = time.time() + limi, Di, Ii = sub_index.ivf_range_search_preassigned( + xqo, list_nos_i, coarse_dis, radius) + if self.verbose: + print('slice %d: %.3f s' % (i, time.time() - t0)) + return limi, Di, Ii + + D = [[] for i in range(nq)] + I = [[] for i in range(nq)] + + sizes = np.zeros(nq, dtype=int) + for lims, Di, Ii in self.pool.imap(do_query, range(self.ni)): + for i in range(nq): + l0, l1 = lims[i:i + 2] + D[i].append(Di[l0:l1]) + I[i].append(Ii[l0:l1]) + sizes[i] += l1 - l0 + lims = np.zeros(nq + 1, dtype=int) + lims[1:] = np.cumsum(sizes) + D = np.hstack([j for i in D for j in i]) + I = np.hstack([j for i in I for j in i]) + return lims, D, I From 8b654827d21424b7b4b8c620079258791e977782 Mon Sep 17 00:00:00 2001 From: Matthijs Douze Date: Wed, 28 Aug 2019 05:26:27 -0700 Subject: [PATCH 23/23] fix --- benchs/distributed_ondisk/search_server.py | 26 ---------------------- 1 file changed, 26 deletions(-) diff --git a/benchs/distributed_ondisk/search_server.py b/benchs/distributed_ondisk/search_server.py index 142388933a..011d432b7d 100644 --- a/benchs/distributed_ondisk/search_server.py +++ b/benchs/distributed_ondisk/search_server.py @@ -63,32 +63,6 @@ def aa(*args, **kwargs): import numpy as np -def connect_clients_clist(port=12012, ipv4=False): - """ get a list of servers from clist and attempts to connect to them - each client is an object that exposes the MyServer interface. - - The servers should be started with - - bash run_search_servers.bash 10 - - (to get 10 servers) - """ - clients = [] - for l in os.popen('clist -Nrp --format tsv', 'r'): - l = l.strip() - if 'startedRunningAt' in l: - continue - fi = l.split('\t') - name = fi[7] - if 'SServ' in name: - host = fi[1] - jobid = fi[2] - if host == 'None': continue - print(jobid) - client = rpc.Client(host, port, v6=not ipv4) - clients.append(client) - return clients - class ResultHeap: """ Combine query results from a sliced dataset (for k-nn search) """