diff --git a/src/algorithms.py b/src/algorithms.py index 37da7b0..b30a728 100644 --- a/src/algorithms.py +++ b/src/algorithms.py @@ -1,37 +1,38 @@ # -*- coding: utf-8 -*- -from time import time from collections import deque import numpy as np -import math,random,logging +import math, random from concurrent.futures import ProcessPoolExecutor, as_completed -import multiprocessing as mp -from collections import defaultdict - from utils import * -def generate_parameters_random_walk(workers): +def generate_parameters_random_walk(): + """ + For each layer and node, compute the number of edges incident to the node with weight + larger than the average weight of all edges in that layer. + Store as a dictionary (layer -> node -> number of such neighbours) + """ logging.info('Loading distances_nets from disk...') sum_weights = {} amount_edges = {} - + layer = 0 - while(isPickle('distances_nets_weights-layer-'+str(layer))): - logging.info('Executing layer {}...'.format(layer)) - weights = restoreVariableFromDisk('distances_nets_weights-layer-'+str(layer)) - - for k,list_weights in weights.iteritems(): - if(layer not in sum_weights): + while is_pickle('distances_nets_weights-layer-' + str(layer)): + logging.info('Executing layer {}...'.format(layer)) + weights = restore_variable_from_disk('distances_nets_weights-layer-' + str(layer)) + + for node, list_weights in weights.iteritems(): + if layer not in sum_weights: sum_weights[layer] = 0 - if(layer not in amount_edges): + if layer not in amount_edges: amount_edges[layer] = 0 for w in list_weights: sum_weights[layer] += w amount_edges[layer] += 1 - + logging.info('Layer {} executed.'.format(layer)) layer += 1 @@ -40,45 +41,58 @@ def generate_parameters_random_walk(workers): average_weight[layer] = sum_weights[layer] / amount_edges[layer] logging.info("Saving average_weights on disk...") - saveVariableOnDisk(average_weight,'average_weight') + save_variable_on_disk(average_weight, 'average_weight') amount_neighbours = {} layer = 0 - while(isPickle('distances_nets_weights-layer-'+str(layer))): - logging.info('Executing layer {}...'.format(layer)) - weights = restoreVariableFromDisk('distances_nets_weights-layer-'+str(layer)) + while is_pickle('distances_nets_weights-layer-' + str(layer)): + logging.info('Executing layer {}...'.format(layer)) + weights = restore_variable_from_disk('distances_nets_weights-layer-' + str(layer)) amount_neighbours[layer] = {} - for k,list_weights in weights.iteritems(): + for node, list_weights in weights.iteritems(): cont_neighbours = 0 for w in list_weights: - if(w > average_weight[layer]): + if w > average_weight[layer]: cont_neighbours += 1 - amount_neighbours[layer][k] = cont_neighbours + amount_neighbours[layer][node] = cont_neighbours logging.info('Layer {} executed.'.format(layer)) layer += 1 logging.info("Saving amount_neighbours on disk...") - saveVariableOnDisk(amount_neighbours,'amount_neighbours') + save_variable_on_disk(amount_neighbours, 'amount_neighbours') + -def chooseNeighbor(v,graphs,alias_method_j,alias_method_q,layer): +def choose_neighbour(v, graphs, alias_method_j, alias_method_q, layer): v_list = graphs[layer][v] - idx = alias_draw(alias_method_j[layer][v],alias_method_q[layer][v]) + idx = alias_draw(alias_method_j[layer][v], alias_method_q[layer][v]) v = v_list[idx] return v -def exec_random_walk(graphs,alias_method_j,alias_method_q,v,walk_length,amount_neighbours): +def exec_random_walk(graphs, alias_method_j, alias_method_q, v, walk_length, amount_neighbours): + """ + Execute a single random walk starting at a given node. + Args: + graphs: + alias_method_j: + alias_method_q: + v: + walk_length: + amount_neighbours: + + Returns: + + """ original_v = v t0 = time() - initialLayer = 0 - layer = initialLayer - + initial_layer = 0 + layer = initial_layer path = deque() path.append(v) @@ -86,90 +100,94 @@ def exec_random_walk(graphs,alias_method_j,alias_method_q,v,walk_length,amount_n while len(path) < walk_length: r = random.random() - if(r < 0.3): - v = chooseNeighbor(v,graphs,alias_method_j,alias_method_q,layer) - path.append(v) + if r < 0.3: + v = choose_neighbour(v, graphs, alias_method_j, alias_method_q, layer) + path.append(v) else: r = random.random() - limiar_moveup = prob_moveup(amount_neighbours[layer][v]) - if(r > limiar_moveup): - if(layer > initialLayer): - layer = layer - 1 + if r > prob_moveup(amount_neighbours[layer][v]): + if layer > initial_layer: + layer = layer - 1 else: - if((layer + 1) in graphs and v in graphs[layer + 1]): + if (layer + 1) in graphs and v in graphs[layer + 1]: layer = layer + 1 t1 = time() - logging.info('RW - vertex {}. Time : {}s'.format(original_v,(t1-t0))) + logging.info('RW - vertex {}. Time : {}s'.format(original_v, (t1 - t0))) return path -def exec_ramdom_walks_for_chunck(vertices,graphs,alias_method_j,alias_method_q,walk_length,amount_neighbours): +def exec_random_walks_for_chunk(vertices, graphs, alias_method_j, alias_method_q, walk_length, amount_neighbours): walks = deque() for v in vertices: - walks.append(exec_random_walk(graphs,alias_method_j,alias_method_q,v,walk_length,amount_neighbours)) + walks.append(exec_random_walk(graphs, alias_method_j, alias_method_q, v, walk_length, amount_neighbours)) return walks -def generate_random_walks_large_graphs(num_walks,walk_length,workers,vertices): +def generate_random_walks_large_graphs(num_walks, walk_length, workers, vertices): logging.info('Loading distances_nets from disk...') - graphs = restoreVariableFromDisk('distances_nets_graphs') - alias_method_j = restoreVariableFromDisk('nets_weights_alias_method_j') - alias_method_q = restoreVariableFromDisk('nets_weights_alias_method_q') - amount_neighbours = restoreVariableFromDisk('amount_neighbours') + graphs = restore_variable_from_disk('distances_nets_graphs') + alias_method_j = restore_variable_from_disk('nets_weights_alias_method_j') + alias_method_q = restore_variable_from_disk('nets_weights_alias_method_q') + amount_neighbours = restore_variable_from_disk('amount_neighbours') logging.info('Creating RWs...') t0 = time() - - walks = deque() - initialLayer = 0 - - parts = workers - - with ProcessPoolExecutor(max_workers=workers) as executor: - - for walk_iter in range(num_walks): - random.shuffle(vertices) - logging.info("Execution iteration {} ...".format(walk_iter)) - walk = exec_ramdom_walks_for_chunck(vertices,graphs,alias_method_j,alias_method_q,walk_length,amount_neighbours) - walks.extend(walk) - logging.info("Iteration {} executed.".format(walk_iter)) + walks = deque() + for walk_iter in range(num_walks): + random.shuffle(vertices) + logging.info("Execution iteration {} ...".format(walk_iter)) + walk = exec_random_walks_for_chunk(vertices, graphs, alias_method_j, alias_method_q, walk_length, + amount_neighbours) + walks.extend(walk) + logging.info("Iteration {} executed.".format(walk_iter)) t1 = time() - logging.info('RWs created. Time : {}m'.format((t1-t0)/60)) + logging.info('RWs created. Time : {}m'.format((t1 - t0) / 60)) logging.info("Saving Random Walks on disk...") save_random_walks(walks) -def generate_random_walks(num_walks,walk_length,workers,vertices): +def generate_random_walks(num_walks, walk_length, workers, vertices): + """ + Execute and save random walks. + Args: + num_walks: int + number of walks to be performed on each node + walk_length: int + length of each random walk + workers: int + number of workers for parallel execution + vertices: list + starting nodes for the random walks + """ logging.info('Loading distances_nets on disk...') - graphs = restoreVariableFromDisk('distances_nets_graphs') - alias_method_j = restoreVariableFromDisk('nets_weights_alias_method_j') - alias_method_q = restoreVariableFromDisk('nets_weights_alias_method_q') - amount_neighbours = restoreVariableFromDisk('amount_neighbours') + graphs = restore_variable_from_disk('distances_nets_graphs') + alias_method_j = restore_variable_from_disk('nets_weights_alias_method_j') + alias_method_q = restore_variable_from_disk('nets_weights_alias_method_q') + amount_neighbours = restore_variable_from_disk('amount_neighbours') logging.info('Creating RWs...') t0 = time() - + walks = deque() - initialLayer = 0 - if(workers > num_walks): + if workers > num_walks: workers = num_walks with ProcessPoolExecutor(max_workers=workers) as executor: futures = {} for walk_iter in range(num_walks): random.shuffle(vertices) - job = executor.submit(exec_ramdom_walks_for_chunck,vertices,graphs,alias_method_j,alias_method_q,walk_length,amount_neighbours) + job = executor.submit(exec_random_walks_for_chunk, vertices, graphs, alias_method_j, alias_method_q, + walk_length, amount_neighbours) futures[job] = walk_iter - #part += 1 logging.info("Receiving results...") for job in as_completed(futures): walk = job.result() @@ -178,36 +196,35 @@ def generate_random_walks(num_walks,walk_length,workers,vertices): walks.extend(walk) del futures[job] - t1 = time() - logging.info('RWs created. Time: {}m'.format((t1-t0)/60)) + logging.info('RWs created. Time: {}m'.format((t1 - t0) / 60)) logging.info("Saving Random Walks on disk...") save_random_walks(walks) + def save_random_walks(walks): with open('random_walks.txt', 'w') as file: for walk in walks: line = '' for v in walk: - line += str(v)+' ' + line += str(v) + ' ' line += '\n' file.write(line) - return + def prob_moveup(amount_neighbours): x = math.log(amount_neighbours + math.e) - p = (x / ( x + 1)) + p = (x / (x + 1)) return p - def alias_draw(J, q): - ''' + """ Draw sample from a non-uniform discrete distribution using alias sampling. - ''' + """ K = len(J) - kk = int(np.floor(np.random.rand()*K)) + kk = int(np.floor(np.random.rand() * K)) if np.random.rand() < q[kk]: return kk else: diff --git a/src/algorithms_distances.py b/src/algorithms_distances.py index 67db2e1..19ffc1c 100644 --- a/src/algorithms_distances.py +++ b/src/algorithms_distances.py @@ -1,539 +1,602 @@ # -*- coding: utf-8 -*- -from time import time from collections import deque import numpy as np -import math,logging +import math from fastdtw import fastdtw from concurrent.futures import ProcessPoolExecutor, as_completed -from collections import defaultdict from utils import * import os -limiteDist = 20 -def getDegreeListsVertices(g,vertices,calcUntilLayer): - degreeList = {} +def get_degree_lists_vertices(g, vertices, calc_until_layer, is_directed, in_degrees, out_degrees): + degree_list = {} for v in vertices: - degreeList[v] = getDegreeLists(g,v,calcUntilLayer) + degree_list[v] = get_degree_lists(g, v, calc_until_layer, is_directed, in_degrees, out_degrees) - return degreeList + return degree_list -def getCompactDegreeListsVertices(g,vertices,maxDegree,calcUntilLayer): - degreeList = {} - - for v in vertices: - degreeList[v] = getCompactDegreeLists(g,v,maxDegree,calcUntilLayer) - - return degreeList +def get_compact_degree_lists_vertices(g, vertices, calc_until_layer, is_directed, in_degrees, out_degrees): + degree_list = {} -def getCompactDegreeLists(g, root, maxDegree,calcUntilLayer): + for v in vertices: + degree_list[v] = get_compact_degree_lists(g, v, calc_until_layer, is_directed, in_degrees, out_degrees) + + return degree_list + + +def get_compact_degree_lists(g, root, calc_until_layer, is_directed, in_degrees, out_degrees): + """ + Perform BFS to compute *compact* degree sequences at each k-distance ring around a given node. + Args: + g: dict + the graph dictionary + root: int + the initial node + calc_until_layer: int + the maximum distance + is_directed: boolean + whether the graph is directed + in_degrees: dict + (weighted) incoming degree per node (directed graphs only) + out_degrees: dict + (weighted outgoing degree per node (directed graphs only) + Returns: + A dictionary mapping depths (int) to CHANGE HERE + """ t0 = time() - listas = {} - vetor_marcacao = [0] * (max(g) + 1) + lists = {} + vectors = [0] * (max(g) + 1) - # Marcar s e inserir s na fila Q queue = deque() queue.append(root) - vetor_marcacao[root] = 1 - l = {} - - ## Variáveis de controle de distância + vectors[root] = 1 + + l_in= {} + l_out = {} + depth = 0 - pendingDepthIncrease = 0 - timeToDepthIncrease = 1 + pending_depth_increase = 0 + time_to_depth_increase = 1 while queue: vertex = queue.popleft() - timeToDepthIncrease -= 1 + time_to_depth_increase -= 1 + + if is_directed: + degree_in = in_degrees.get(vertex, 0) + degree_out = out_degrees.get(vertex, 0) + else: + degree_in = degree_out = len(g[vertex]) - d = len(g[vertex]) - if(d not in l): - l[d] = 0 - l[d] += 1 + if degree_in not in l_in: + l_in[degree_in] = 0 + l_in[degree_in] += 1 + if degree_out not in l_out: + l_out[degree_out] = 0 + l_out[degree_out] += 1 for v in g[vertex]: - if(vetor_marcacao[v] == 0): - vetor_marcacao[v] = 1 + if vectors[v] == 0: + vectors[v] = 1 queue.append(v) - pendingDepthIncrease += 1 + pending_depth_increase += 1 - if(timeToDepthIncrease == 0): + if time_to_depth_increase == 0: - list_d = [] - for degree,freq in l.iteritems(): - list_d.append((degree,freq)) - list_d.sort(key=lambda x: x[0]) - listas[depth] = np.array(list_d,dtype=np.int32) + lp = _finalise_degree_seq_compact(l_in, l_out, is_directed) + lists[depth] = lp - l = {} + l_in = {} + l_out = {} - if(calcUntilLayer == depth): + if calc_until_layer == depth: break depth += 1 - timeToDepthIncrease = pendingDepthIncrease - pendingDepthIncrease = 0 - + time_to_depth_increase = pending_depth_increase + pending_depth_increase = 0 t1 = time() - logging.info('BFS vertex {}. Time: {}s'.format(root,(t1-t0))) - - return listas - - -def getDegreeLists(g, root, calcUntilLayer): + logging.info('BFS vertex {}. Time: {}s'.format(root, (t1 - t0))) + + return lists + + +def get_degree_lists(g, root, calc_until_layer, is_directed, in_degrees, out_degrees): + """ + Perform BFS to compute degree sequences at each k-distance ring around a given node. + Args: + g: dict + the graph dictionary + root: int + the initial node + calc_until_layer: int + the maximum distance + is_directed: boolean + whether the graph is directed + in_degrees: dict + (weighted) incoming degree per node (directed graphs only) + out_degrees: dict + (weighted outgoing degree per node (directed graphs only) + Returns: + A dictionary mapping depths (int) to: + a) ordered degree sequences (np.array), if undirected, or + b) pairs of ordered in-degree and out-degree sequences (np.array), if directed + """ t0 = time() - listas = {} - vetor_marcacao = [0] * (max(g) + 1) + lists = {} + vectors = [0] * (max(g) + 1) - # Marcar s e inserir s na fila Q queue = deque() queue.append(root) - vetor_marcacao[root] = 1 - + vectors[root] = 1 + + l = [] - l = deque() - - ## Variáveis de controle de distância depth = 0 - pendingDepthIncrease = 0 - timeToDepthIncrease = 1 + pending_depth_increase = 0 + time_to_depth_increase = 1 while queue: vertex = queue.popleft() - timeToDepthIncrease -= 1 + time_to_depth_increase -= 1 - l.append(len(g[vertex])) + if is_directed: + degree = in_degrees.get(vertex, 0), out_degrees.get(vertex, 0) + else: + degree = len(g[vertex]) + + l.append(degree) for v in g[vertex]: - if(vetor_marcacao[v] == 0): - vetor_marcacao[v] = 1 + if vectors[v] == 0: + vectors[v] = 1 queue.append(v) - pendingDepthIncrease += 1 + pending_depth_increase += 1 - if(timeToDepthIncrease == 0): + if time_to_depth_increase == 0: - lp = np.array(l,dtype='float') - lp = np.sort(lp) - listas[depth] = lp - l = deque() + lp = _finalise_degree_seq(l, is_directed) + lists[depth] = lp + l = [] - if(calcUntilLayer == depth): + if calc_until_layer == depth: break depth += 1 - timeToDepthIncrease = pendingDepthIncrease - pendingDepthIncrease = 0 - + time_to_depth_increase = pending_depth_increase + pending_depth_increase = 0 t1 = time() - logging.info('BFS vertex {}. Time: {}s'.format(root,(t1-t0))) + logging.info('BFS vertex {}. Time: {}s'.format(root, (t1 - t0))) + + return lists + + +def _finalise_degree_seq(l, is_directed): + if is_directed: + lp_in = [] + lp_out = [] + for in_degree, out_degree in l: + lp_in.append(in_degree) + lp_out.append(out_degree) + lp_in = np.array(lp_in, dtype='float') + lp_out = np.array(lp_out, dtype='float') + lp_in = np.sort(lp_in) + lp_out = np.sort(lp_out) + return lp_in, lp_out + else: + lp = np.array(l, dtype='float') + lp = np.sort(lp) + return lp - return listas +def _finalise_degree_seq_compact(l_in, l_out, is_directed): + if is_directed: + list_d = [] + for degree, freq in l_in.iteritems(): + list_d.append((degree, freq)) + list_d.sort(key=lambda x: x[0]) + lp_in = np.array(list_d, dtype=np.int32) -def cost(a,b): - ep = 0.5 - m = max(a,b) + ep - mi = min(a,b) + ep - return ((m/mi) - 1) + list_d = [] + for degree, freq in l_out.iteritems(): + list_d.append((degree, freq)) + list_d.sort(key=lambda x: x[0]) + lp_out = np.array(list_d, dtype=np.int32) + + return lp_in, lp_out + + else: + list_d = [] + for degree, freq in l_in.iteritems(): + list_d.append((degree, freq)) + list_d.sort(key=lambda x: x[0]) + lp = np.array(list_d, dtype=np.int32) -def cost_min(a,b): + return lp + + +def cost(a, b): ep = 0.5 - m = max(a[0],b[0]) + ep - mi = min(a[0],b[0]) + ep - return ((m/mi) - 1) * min(a[1],b[1]) + m = max(a, b) + ep + mi = min(a, b) + ep + return ((m / mi) - 1) -def cost_max(a,b): +def cost_max(a, b): ep = 0.5 - m = max(a[0],b[0]) + ep - mi = min(a[0],b[0]) + ep - return ((m/mi) - 1) * max(a[1],b[1]) - -def preprocess_degreeLists(): - - logging.info("Recovering degreeList from disk...") - degreeList = restoreVariableFromDisk('degreeList') - - logging.info("Creating compactDegreeList...") - - dList = {} - dFrequency = {} - for v,layers in degreeList.iteritems(): - dFrequency[v] = {} - for layer,degreeListLayer in layers.iteritems(): - dFrequency[v][layer] = {} - for degree in degreeListLayer: - if(degree not in dFrequency[v][layer]): - dFrequency[v][layer][degree] = 0 - dFrequency[v][layer][degree] += 1 - for v,layers in dFrequency.iteritems(): - dList[v] = {} - for layer,frequencyList in layers.iteritems(): - list_d = [] - for degree,freq in frequencyList.iteritems(): - list_d.append((degree,freq)) - list_d.sort(key=lambda x: x[0]) - dList[v][layer] = np.array(list_d,dtype='float') - - logging.info("compactDegreeList created!") - - saveVariableOnDisk(dList,'compactDegreeList') - -def verifyDegrees(degrees,degree_v_root,degree_a,degree_b): - - if(degree_b == -1): + m = max(a[0], b[0]) + ep + mi = min(a[0], b[0]) + ep + return ((m / mi) - 1) * max(a[1], b[1]) + + +def _verify_degrees(degree_v_root, degree_a, degree_b): + if degree_b == -1: degree_now = degree_a - elif(degree_a == -1): + elif degree_a == -1: degree_now = degree_b - elif(abs(degree_b - degree_v_root) < abs(degree_a - degree_v_root)): + elif abs(degree_b - degree_v_root) < abs(degree_a - degree_v_root): degree_now = degree_b else: degree_now = degree_a - return degree_now + return degree_now -def get_vertices(v,degree_v,degrees,a_vertices): - a_vertices_selected = 2 * math.log(a_vertices,2) - #logging.info("Selecionando {} próximos ao vértice {} ...".format(int(a_vertices_selected),v)) + +def _get_vertices(v, degree_v, degrees, a_vertices): + a_vertices_selected = 2 * math.log(a_vertices, 2) vertices = deque() try: - c_v = 0 + c_v = 0 for v2 in degrees[degree_v]['vertices']: - if(v != v2): + if v != v2: vertices.append(v2) c_v += 1 - if(c_v > a_vertices_selected): + if c_v > a_vertices_selected: raise StopIteration - if('before' not in degrees[degree_v]): + if 'before' not in degrees[degree_v]: degree_b = -1 else: degree_b = degrees[degree_v]['before'] - if('after' not in degrees[degree_v]): + if 'after' not in degrees[degree_v]: degree_a = -1 else: degree_a = degrees[degree_v]['after'] - if(degree_b == -1 and degree_a == -1): + if degree_b == -1 and degree_a == -1: raise StopIteration - degree_now = verifyDegrees(degrees,degree_v,degree_a,degree_b) + degree_now = _verify_degrees(degree_v, degree_a, degree_b) while True: for v2 in degrees[degree_now]['vertices']: - if(v != v2): + if v != v2: vertices.append(v2) c_v += 1 - if(c_v > a_vertices_selected): + if c_v > a_vertices_selected: raise StopIteration - if(degree_now == degree_b): - if('before' not in degrees[degree_b]): + if degree_now == degree_b: + if 'before' not in degrees[degree_b]: degree_b = -1 else: degree_b = degrees[degree_b]['before'] else: - if('after' not in degrees[degree_a]): + if 'after' not in degrees[degree_a]: degree_a = -1 else: degree_a = degrees[degree_a]['after'] - - if(degree_b == -1 and degree_a == -1): + + if degree_b == -1 and degree_a == -1: raise StopIteration - degree_now = verifyDegrees(degrees,degree_v,degree_a,degree_b) + degree_now = _verify_degrees(degree_v, degree_a, degree_b) except StopIteration: - #logging.info("Vértice {} - próximos selecionados.".format(v)) return list(vertices) - return list(vertices) - -def splitDegreeList(part,c,G,compactDegree): - if(compactDegree): +def split_degree_list(part, c, G, compact_degree, a_vertices): + if compact_degree: logging.info("Recovering compactDegreeList from disk...") - degreeList = restoreVariableFromDisk('compactDegreeList') + degree_list = restore_variable_from_disk('compactDegreeList') else: logging.info("Recovering degreeList from disk...") - degreeList = restoreVariableFromDisk('degreeList') + degree_list = restore_variable_from_disk('degreeList') logging.info("Recovering degree vector from disk...") - degrees = restoreVariableFromDisk('degrees_vector') + degrees = restore_variable_from_disk('degrees_vector') - degreeListsSelected = {} + degree_lists_selected = {} vertices = {} - a_vertices = len(G) for v in c: - nbs = get_vertices(v,len(G[v]),degrees,a_vertices) + nbs = _get_vertices(v, len(G[v]), degrees, a_vertices) vertices[v] = nbs - degreeListsSelected[v] = degreeList[v] + degree_lists_selected[v] = degree_list[v] for n in nbs: - degreeListsSelected[n] = degreeList[n] - - saveVariableOnDisk(vertices,'split-vertices-'+str(part)) - saveVariableOnDisk(degreeListsSelected,'split-degreeList-'+str(part)) - - -def calc_distances(part, compactDegree = False): - - vertices = restoreVariableFromDisk('split-vertices-'+str(part)) - degreeList = restoreVariableFromDisk('split-degreeList-'+str(part)) + degree_lists_selected[n] = degree_list[n] + + save_variable_on_disk(vertices, 'split-vertices-' + str(part)) + save_variable_on_disk(degree_lists_selected, 'split-degreeList-' + str(part)) + + +def calc_distances(part, compact_degree=False, is_directed=False): + """ + Compute the structural distance between pairs of nodes within a precomputed chunk of given ID. + Only considers pairs that have previously been identified as (roughly) similar in terms of undirected degree. + Args: + part: int + index of the chunk of vertices to be used + compact_degree: boolean + indicating whether to use the compact degree optimisation + is_directed: boolean + whether the graph is directed + + Returns: + None (stores pickle on disk) + """ + vertices = restore_variable_from_disk('split-vertices-' + str(part)) + degree_list = restore_variable_from_disk('split-degreeList-' + str(part)) distances = {} - if compactDegree: + if compact_degree: dist_func = cost_max else: dist_func = cost - for v1,nbs in vertices.iteritems(): - lists_v1 = degreeList[v1] + for v1, nbs in vertices.iteritems(): + lists_v1 = degree_list[v1] for v2 in nbs: t00 = time() - lists_v2 = degreeList[v2] - - max_layer = min(len(lists_v1),len(lists_v2)) - distances[v1,v2] = {} + lists_v2 = degree_list[v2] - for layer in range(0,max_layer): - dist, path = fastdtw(lists_v1[layer],lists_v2[layer],radius=1,dist=dist_func) + max_layer = min(len(lists_v1), len(lists_v2)) + distances[v1, v2] = {} - distances[v1,v2][layer] = dist + for layer in range(0, max_layer): + distances[v1, v2][layer] = _deg_seq_dist(lists_v1[layer], lists_v2[layer], dist_func, is_directed) t11 = time() - logging.info('fastDTW between vertices ({}, {}). Time: {}s'.format(v1,v2,(t11-t00))) - - - preprocess_consolides_distances(distances) - saveVariableOnDisk(distances,'distances-'+str(part)) - return - -def calc_distances_all(vertices,list_vertices,degreeList,part, compactDegree = False): - + logging.info('fastDTW between vertices ({}, {}). Time: {}s'.format(v1, v2, (t11 - t00))) + + _consolidate_distances(distances) + save_variable_on_disk(distances, 'distances-' + str(part)) + + +def calc_distances_all(vertices, list_vertices, degree_list, part, compact_degree=False, is_directed=False): + """ + Compute the structural distance between any pair of nodes. + Args: + vertices: list + a chunk of vertices to compute distances between + list_vertices: list + a list of lists, containing for each vertex all other vertices to be compared with + degree_list: dict + nested dictionary (vertex -> layer -> degree sequence) + part: int + index of the current chunk of vertices + compact_degree: boolean + indicating whether to use the compact degree optimisation + is_directed: boolean + whether the graph is directed + + Returns: + None (stores pickle on disk) + """ distances = {} cont = 0 - if compactDegree: + if compact_degree: dist_func = cost_max else: dist_func = cost for v1 in vertices: - lists_v1 = degreeList[v1] + lists_v1 = degree_list[v1] for v2 in list_vertices[cont]: - lists_v2 = degreeList[v2] - - max_layer = min(len(lists_v1),len(lists_v2)) - distances[v1,v2] = {} - - for layer in range(0,max_layer): - #t0 = time() - dist, path = fastdtw(lists_v1[layer],lists_v2[layer],radius=1,dist=dist_func) - #t1 = time() - #logging.info('D ({} , {}), Tempo fastDTW da camada {} : {}s . Distância: {}'.format(v1,v2,layer,(t1-t0),dist)) - distances[v1,v2][layer] = dist - - - cont += 1 - - preprocess_consolides_distances(distances) - saveVariableOnDisk(distances,'distances-'+str(part)) - return - - -def selectVertices(layer,fractionCalcDists): - previousLayer = layer - 1 + lists_v2 = degree_list[v2] - logging.info("Recovering distances from disk...") - distances = restoreVariableFromDisk('distances') + max_layer = min(len(lists_v1), len(lists_v2)) + distances[v1, v2] = {} - threshold = calcThresholdDistance(previousLayer,distances,fractionCalcDists) + for layer in range(max_layer): + distances[v1, v2][layer] = _deg_seq_dist(lists_v1[layer], lists_v2[layer], dist_func, is_directed) - logging.info('Selecting vertices...') - - vertices_selected = deque() - - for vertices,layers in distances.iteritems(): - if(previousLayer not in layers): - continue - if(layers[previousLayer] <= threshold): - vertices_selected.append(vertices) + cont += 1 - distances = {} + _consolidate_distances(distances) + save_variable_on_disk(distances, 'distances-' + str(part)) - logging.info('Vertices selected.') - return vertices_selected +def _deg_seq_dist(seq1, seq2, dist_func, is_directed): + if is_directed: + # independent 2-dim dtw + seq1_in, seq1_out = seq1 + seq2_in, seq2_out = seq2 + dist_in, path_in = fastdtw(seq1_in, seq2_in, radius=1, dist=dist_func) + dist_out, path_out = fastdtw(seq1_out, seq2_out, radius=1, dist=dist_func) + return dist_in + dist_out + else: + dist, path = fastdtw(seq1, seq2, radius=1, dist=dist_func) + return dist -def preprocess_consolides_distances(distances, startLayer = 1): +def _consolidate_distances(distances): + """ + In-place summing up of distances along the layers + Args: + distances: nested dictionary (v1, v2 -> layer -> distance) + Returns: + None (stores pickle on disk) + """ logging.info('Consolidating distances...') - for vertices,layers in distances.iteritems(): - keys_layers = sorted(layers.keys()) - startLayer = min(len(keys_layers),startLayer) - for layer in range(0,startLayer): - keys_layers.pop(0) - + for vertices, distance_by_layer in distances.iteritems(): + layers = sorted(distance_by_layer.keys()) + start_layer = min(len(layers), 1) + for layer in range(0, start_layer): + layers.pop(0) - for layer in keys_layers: - layers[layer] += layers[layer - 1] + for layer in layers: + distance_by_layer[layer] += distance_by_layer[layer - 1] logging.info('Distances consolidated.') -def exec_bfs_compact(G,workers,calcUntilLayer): - +def exec_bfs_compact(G, workers, calc_until_layer, is_directed, in_degrees, out_degrees, embedding_vertices): futures = {} - degreeList = {} + degree_list = {} t0 = time() - vertices = G.keys() + vertices = G.keys() if embedding_vertices is None else embedding_vertices parts = workers - chunks = partition(vertices,parts) - - logging.info('Capturing larger degree...') - maxDegree = 0 - for v in vertices: - if(len(G[v]) > maxDegree): - maxDegree = len(G[v]) - logging.info('Larger degree captured') + chunks = partition(vertices, parts) with ProcessPoolExecutor(max_workers=workers) as executor: part = 1 for c in chunks: - job = executor.submit(getCompactDegreeListsVertices,G,c,maxDegree,calcUntilLayer) + job = executor.submit(get_compact_degree_lists_vertices, G, c, calc_until_layer, is_directed, in_degrees, + out_degrees) futures[job] = part part += 1 for job in as_completed(futures): dl = job.result() - v = futures[job] - degreeList.update(dl) + degree_list.update(dl) - logging.info("Saving degreeList on disk...") - saveVariableOnDisk(degreeList,'compactDegreeList') + logging.info('Saving degreeList on disk ... (is_directed={})'.format(is_directed)) + save_variable_on_disk(degree_list, 'compactDegreeList') t1 = time() - logging.info('Execution time - BFS: {}m'.format((t1-t0)/60)) - + logging.info('Execution time - BFS: {}m'.format((t1 - t0) / 60)) - return - -def exec_bfs(G,workers,calcUntilLayer): +def exec_bfs(G, workers, calc_until_layer, is_directed, in_degrees, out_degrees, embedding_vertices): futures = {} - degreeList = {} + degree_list = {} t0 = time() - vertices = G.keys() + vertices = G.keys() if embedding_vertices is None else embedding_vertices parts = workers - chunks = partition(vertices,parts) + chunks = partition(vertices, parts) with ProcessPoolExecutor(max_workers=workers) as executor: part = 1 for c in chunks: - job = executor.submit(getDegreeListsVertices,G,c,calcUntilLayer) + job = executor.submit(get_degree_lists_vertices, G, c, calc_until_layer, is_directed, in_degrees, + out_degrees) futures[job] = part part += 1 for job in as_completed(futures): dl = job.result() - v = futures[job] - degreeList.update(dl) + degree_list.update(dl) - logging.info("Saving degreeList on disk...") - saveVariableOnDisk(degreeList,'degreeList') + logging.info('Saving degreeList on disk ... (is_directed={})'.format(is_directed)) + save_variable_on_disk(degree_list, 'degreeList') t1 = time() - logging.info('Execution time - BFS: {}m'.format((t1-t0)/60)) - - - return + logging.info('Execution time - BFS: {}m'.format((t1 - t0) / 60)) def generate_distances_network_part1(workers): + """ + Load and merge distances from all chunks. Rearrange them grouped by layer: + layer -> v1, v2 -> distance + Args: + workers: int + Number of chunks, needed for loading distances from disk + """ parts = workers weights_distances = {} - for part in range(1,parts + 1): - + for part in range(1, parts + 1): + logging.info('Executing part {}...'.format(part)) - distances = restoreVariableFromDisk('distances-'+str(part)) - - for vertices,layers in distances.iteritems(): - for layer,distance in layers.iteritems(): + distances = restore_variable_from_disk('distances-' + str(part)) + + for vertices, distance_by_layer in distances.iteritems(): + for layer, distance in distance_by_layer.iteritems(): vx = vertices[0] vy = vertices[1] - if(layer not in weights_distances): + if layer not in weights_distances: weights_distances[layer] = {} - weights_distances[layer][vx,vy] = distance + weights_distances[layer][vx, vy] = distance logging.info('Part {} executed.'.format(part)) - for layer,values in weights_distances.iteritems(): - saveVariableOnDisk(values,'weights_distances-layer-'+str(layer)) - return + for layer, values in weights_distances.iteritems(): + save_variable_on_disk(values, 'weights_distances-layer-' + str(layer)) + def generate_distances_network_part2(workers): + """ + Construct the skeleton of the context graph: a symmetric directed layer graph + with edges between each pair of nodes for which a distance (at that layer) exists. + Args: + workers: int + Number of chunks, needed for loading distances from disk + """ parts = workers graphs = {} - for part in range(1,parts + 1): + for part in range(1, parts + 1): logging.info('Executing part {}...'.format(part)) - distances = restoreVariableFromDisk('distances-'+str(part)) + distances = restore_variable_from_disk('distances-' + str(part)) - for vertices,layers in distances.iteritems(): - for layer,distance in layers.iteritems(): + for vertices, distance_by_layer in distances.iteritems(): + for layer, distance in distance_by_layer.iteritems(): vx = vertices[0] vy = vertices[1] - if(layer not in graphs): + if layer not in graphs: graphs[layer] = {} - if(vx not in graphs[layer]): - graphs[layer][vx] = [] - if(vy not in graphs[layer]): - graphs[layer][vy] = [] + if vx not in graphs[layer]: + graphs[layer][vx] = [] + if vy not in graphs[layer]: + graphs[layer][vy] = [] graphs[layer][vx].append(vy) graphs[layer][vy].append(vx) logging.info('Part {} executed.'.format(part)) - for layer,values in graphs.iteritems(): - saveVariableOnDisk(values,'graphs-layer-'+str(layer)) + for layer, values in graphs.iteritems(): + save_variable_on_disk(values, 'graphs-layer-' + str(layer)) - return def generate_distances_network_part3(): - + """ + Create a probability weight for each edge in the layer graph. Weights are stored in a separate dictionary + per layer, of the form (node -> list of weights) [N.B. order of weights = order of neighbours] + Also execute some preprocessing for the alias method. + """ layer = 0 - while(isPickle('graphs-layer-'+str(layer))): - graphs = restoreVariableFromDisk('graphs-layer-'+str(layer)) - weights_distances = restoreVariableFromDisk('weights_distances-layer-'+str(layer)) + while is_pickle('graphs-layer-' + str(layer)): + graphs = restore_variable_from_disk('graphs-layer-' + str(layer)) + weights_distances = restore_variable_from_disk('weights_distances-layer-' + str(layer)) logging.info('Executing layer {}...'.format(layer)) alias_method_j = {} alias_method_q = {} weights = {} - - for v,neighbors in graphs.iteritems(): + + for v, neighbors in graphs.iteritems(): e_list = deque() sum_w = 0.0 - for n in neighbors: - if (v,n) in weights_distances: - wd = weights_distances[v,n] + if (v, n) in weights_distances: + wd = weights_distances[v, n] else: - wd = weights_distances[n,v] + wd = weights_distances[n, v] w = np.exp(-float(wd)) e_list.append(w) sum_w += w @@ -544,97 +607,111 @@ def generate_distances_network_part3(): alias_method_j[v] = J alias_method_q[v] = q - saveVariableOnDisk(weights,'distances_nets_weights-layer-'+str(layer)) - saveVariableOnDisk(alias_method_j,'alias_method_j-layer-'+str(layer)) - saveVariableOnDisk(alias_method_q,'alias_method_q-layer-'+str(layer)) + save_variable_on_disk(weights, 'distances_nets_weights-layer-' + str(layer)) + save_variable_on_disk(alias_method_j, 'alias_method_j-layer-' + str(layer)) + save_variable_on_disk(alias_method_q, 'alias_method_q-layer-' + str(layer)) logging.info('Layer {} executed.'.format(layer)) layer += 1 logging.info('Weights created.') - return - def generate_distances_network_part4(): + """ + Merge the (unweighted) directed context graphs into a single graph containing + all layers, represented as a dictionary (layer -> node -> list of neighbours) + """ logging.info('Consolidating graphs...') graphs_c = {} layer = 0 - while(isPickle('graphs-layer-'+str(layer))): + while (is_pickle('graphs-layer-' + str(layer))): logging.info('Executing layer {}...'.format(layer)) - graphs = restoreVariableFromDisk('graphs-layer-'+str(layer)) + graphs = restore_variable_from_disk('graphs-layer-' + str(layer)) graphs_c[layer] = graphs logging.info('Layer {} executed.'.format(layer)) layer += 1 - logging.info("Saving distancesNets on disk...") - saveVariableOnDisk(graphs_c,'distances_nets_graphs') + save_variable_on_disk(graphs_c, 'distances_nets_graphs') logging.info('Graphs consolidated.') - return + def generate_distances_network_part5(): + """ + Merge the dictionaries holding the precomputed J-values of the alias method into a + single dictionary containing all layers (layer -> node -> J) + """ alias_method_j_c = {} layer = 0 - while(isPickle('alias_method_j-layer-'+str(layer))): - logging.info('Executing layer {}...'.format(layer)) - alias_method_j = restoreVariableFromDisk('alias_method_j-layer-'+str(layer)) + while is_pickle('alias_method_j-layer-' + str(layer)): + logging.info('Executing layer {}...'.format(layer)) + alias_method_j = restore_variable_from_disk('alias_method_j-layer-' + str(layer)) alias_method_j_c[layer] = alias_method_j logging.info('Layer {} executed.'.format(layer)) layer += 1 logging.info("Saving nets_weights_alias_method_j on disk...") - saveVariableOnDisk(alias_method_j_c,'nets_weights_alias_method_j') + save_variable_on_disk(alias_method_j_c, 'nets_weights_alias_method_j') - return def generate_distances_network_part6(): + """ + Merge the dictionaries holding the precomputed q-values of the alias method into a + single dictionary containing all layers (layer -> node -> q) + """ alias_method_q_c = {} layer = 0 - while(isPickle('alias_method_q-layer-'+str(layer))): - logging.info('Executing layer {}...'.format(layer)) - alias_method_q = restoreVariableFromDisk('alias_method_q-layer-'+str(layer)) + while is_pickle('alias_method_q-layer-' + str(layer)): + logging.info('Executing layer {}...'.format(layer)) + alias_method_q = restore_variable_from_disk('alias_method_q-layer-' + str(layer)) alias_method_q_c[layer] = alias_method_q logging.info('Layer {} executed.'.format(layer)) layer += 1 logging.info("Saving nets_weights_alias_method_q on disk...") - saveVariableOnDisk(alias_method_q_c,'nets_weights_alias_method_q') + save_variable_on_disk(alias_method_q_c, 'nets_weights_alias_method_q') - return def generate_distances_network(workers): + """ + Construct the layered context graphs in six steps, each of which + is implemented in a separate method. + Args: + workers: int + Number of chunks + """ t0 = time() logging.info('Creating distance network...') - os.system("rm "+returnPathStruc2vec()+"/../pickles/weights_distances-layer-*.pickle") + os.system("rm " + return_path_struc2vec() + "/../pickles/weights_distances-layer-*.pickle") with ProcessPoolExecutor(max_workers=1) as executor: - job = executor.submit(generate_distances_network_part1,workers) + job = executor.submit(generate_distances_network_part1, workers) job.result() t1 = time() - t = t1-t0 + t = t1 - t0 logging.info('- Time - part 1: {}s'.format(t)) t0 = time() - os.system("rm "+returnPathStruc2vec()+"/../pickles/graphs-layer-*.pickle") + os.system("rm " + return_path_struc2vec() + "/../pickles/graphs-layer-*.pickle") with ProcessPoolExecutor(max_workers=1) as executor: - job = executor.submit(generate_distances_network_part2,workers) + job = executor.submit(generate_distances_network_part2, workers) job.result() t1 = time() - t = t1-t0 + t = t1 - t0 logging.info('- Time - part 2: {}s'.format(t)) logging.info('distance network created.') logging.info('Transforming distances into weights...') t0 = time() - os.system("rm "+returnPathStruc2vec()+"/../pickles/distances_nets_weights-layer-*.pickle") - os.system("rm "+returnPathStruc2vec()+"/../pickles/alias_method_j-layer-*.pickle") - os.system("rm "+returnPathStruc2vec()+"/../pickles/alias_method_q-layer-*.pickle") + os.system("rm " + return_path_struc2vec() + "/../pickles/distances_nets_weights-layer-*.pickle") + os.system("rm " + return_path_struc2vec() + "/../pickles/alias_method_j-layer-*.pickle") + os.system("rm " + return_path_struc2vec() + "/../pickles/alias_method_q-layer-*.pickle") with ProcessPoolExecutor(max_workers=1) as executor: job = executor.submit(generate_distances_network_part3) job.result() t1 = time() - t = t1-t0 + t = t1 - t0 logging.info('- Time - part 3: {}s'.format(t)) t0 = time() @@ -642,7 +719,7 @@ def generate_distances_network(workers): job = executor.submit(generate_distances_network_part4) job.result() t1 = time() - t = t1-t0 + t = t1 - t0 logging.info('- Time - part 4: {}s'.format(t)) t0 = time() @@ -650,7 +727,7 @@ def generate_distances_network(workers): job = executor.submit(generate_distances_network_part5) job.result() t1 = time() - t = t1-t0 + t = t1 - t0 logging.info('- Time - part 5: {}s'.format(t)) t0 = time() @@ -658,18 +735,16 @@ def generate_distances_network(workers): job = executor.submit(generate_distances_network_part6) job.result() t1 = time() - t = t1-t0 + t = t1 - t0 logging.info('- Time - part 6: {}s'.format(t)) - - return def alias_setup(probs): - ''' + """ Compute utility lists for non-uniform sampling from discrete distributions. Refer to https://hips.seas.harvard.edu/blog/2013/03/03/the-alias-method-efficient-sampling-with-many-discrete-outcomes/ for details - ''' + """ K = len(probs) q = np.zeros(K) J = np.zeros(K, dtype=np.int) @@ -677,7 +752,7 @@ def alias_setup(probs): smaller = [] larger = [] for kk, prob in enumerate(probs): - q[kk] = K*prob + q[kk] = K * prob if q[kk] < 1.0: smaller.append(kk) else: diff --git a/src/graph.py b/src/graph.py index d488275..315392c 100644 --- a/src/graph.py +++ b/src/graph.py @@ -1,287 +1,316 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- """Graph utilities.""" -import logging -import sys -import math from io import open -from os import path -from time import time -from glob import glob -from six.moves import range, zip, zip_longest -from six import iterkeys -from collections import defaultdict, Iterable -from multiprocessing import cpu_count -import random -from random import shuffle -from itertools import product,permutations -import collections - -from concurrent.futures import ProcessPoolExecutor - -from multiprocessing import Pool -from multiprocessing import cpu_count - -#novas importações -import numpy as np -import operator - - -class Graph(defaultdict): - """Efficient basic implementation of nx `Graph' – Undirected graphs with self loops""" - def __init__(self): - super(Graph, self).__init__(list) - - def nodes(self): - return self.keys() - - def adjacency_iter(self): - return self.iteritems() - - def subgraph(self, nodes={}): - subgraph = Graph() - - for n in nodes: - if n in self: - subgraph[n] = [x for x in self[n] if x in nodes] - - return subgraph - - def make_undirected(self): - - t0 = time() - - for v in self.keys(): - for other in self[v]: - if v != other: - self[other].append(v) - - t1 = time() - #logger.info('make_directed: added missing edges {}s'.format(t1-t0)) - - self.make_consistent() - return self - - def make_consistent(self): - t0 = time() - for k in iterkeys(self): - self[k] = list(sorted(set(self[k]))) - - t1 = time() - #logger.info('make_consistent: made consistent in {}s'.format(t1-t0)) - - #self.remove_self_loops() - - return self - - def remove_self_loops(self): - - removed = 0 - t0 = time() - - for x in self: - if x in self[x]: - self[x].remove(x) - removed += 1 - - t1 = time() - - #logger.info('remove_self_loops: removed {} loops in {}s'.format(removed, (t1-t0))) - return self - - def check_self_loops(self): - for x in self: - for y in self[x]: - if x == y: - return True - - return False - - def has_edge(self, v1, v2): - if v2 in self[v1] or v1 in self[v2]: - return True - return False - - def degree(self, nodes=None): - if isinstance(nodes, Iterable): - return {v:len(self[v]) for v in nodes} - else: - return len(self[nodes]) - - def order(self): - "Returns the number of nodes in the graph" - return len(self) - - def number_of_edges(self): - "Returns the number of nodes in the graph" - return sum([self.degree(x) for x in self.keys()])/2 - - def number_of_nodes(self): - "Returns the number of nodes in the graph" - return self.order() - - def gToDict(self): - d = {} - for k,v in self.iteritems(): - d[k] = v - return d - - def printAdjList(self): - for key,value in self.iteritems(): - print (key,":",value) +from algorithms import * +from algorithms_distances import * +from collections import defaultdict + + +class Graph(): + def __init__(self, d, is_directed, workers, bfs_workers = None, until_layer=None, in_degrees=None, out_degrees=None, + embedding_vertices=None): + + self.G = d + self.num_vertices = number_of_nodes_(d) + self.num_edges = number_of_edges_(d, is_directed) + self.is_directed = is_directed + self.workers = workers + self.bfs_workers = bfs_workers + self.calc_until_layer = until_layer + self.in_degrees = in_degrees + self.out_degrees = out_degrees + self.embedding_vertices = embedding_vertices + logging.info('Graph - is_directed: {}'.format(self.is_directed)) + logging.info('Graph - Number of vertices: {}'.format(self.num_vertices)) + logging.info('Graph - Number of edges: {}'.format(self.num_edges)) + + def preprocess_neighbors_with_bfs(self): + + workers = self.workers if self.bfs_workers is None else self.bfs_workers + + with ProcessPoolExecutor(max_workers=workers) as executor: + job = executor.submit(exec_bfs, self.G, workers, self.calc_until_layer, self.is_directed, + self.in_degrees, self.out_degrees, self.embedding_vertices) + + job.result() + + return + + def preprocess_neighbors_with_bfs_compact(self): + + workers = self.workers if self.bfs_workers is None else self.bfs_workers + + with ProcessPoolExecutor(max_workers=workers) as executor: + job = executor.submit(exec_bfs_compact, self.G, workers, self.calc_until_layer, self.is_directed, + self.in_degrees, self.out_degrees, self.embedding_vertices) + + job.result() + + return + + def create_vectors(self): + """ + Create an ordering of all network vertices by (undirected) degree. + + Note for future improvements: It may be worth using k-d trees to improve this for the directed case. + """ + logging.info("Creating degree vectors...") + degrees = {} + degrees_sorted = set() + G = self.G + + vertices = G.keys() if self.embedding_vertices is None else self.embedding_vertices + + for v in vertices: + degree = len(G[v]) + degrees_sorted.add(degree) + if degree not in degrees: + degrees[degree] = {} + degrees[degree]['vertices'] = deque() + degrees[degree]['vertices'].append(v) + degrees_sorted = np.array(list(degrees_sorted), dtype='int') + degrees_sorted = np.sort(degrees_sorted) + + l = len(degrees_sorted) + for index, degree in enumerate(degrees_sorted): + if index > 0: + degrees[degree]['before'] = degrees_sorted[index - 1] + if index < (l - 1): + degrees[degree]['after'] = degrees_sorted[index + 1] + logging.info("Degree vectors created.") + logging.info("Saving degree vectors...") + save_variable_on_disk(degrees, 'degrees_vector') + def calc_distances_all_vertices(self, compact_degree=False): + logging.info("Using compactDegree: {}".format(compact_degree)) + if self.calc_until_layer: + logging.info("Calculations until layer: {}".format(self.calc_until_layer)) -def clique(size): - return from_adjlist(permutations(range(1,size+1))) + futures = {} -# http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python -def grouper(n, iterable, padvalue=None): - "grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')" - return zip_longest(*[iter(iterable)]*n, fillvalue=padvalue) + vertices = list(reversed(sorted(self.G.keys() if self.embedding_vertices is None else self.embedding_vertices))) -def parse_adjacencylist(f): - adjlist = [] - for l in f: - if l and l[0] != "#": - introw = [int(x) for x in l.strip().split()] - row = [introw[0]] - row.extend(set(sorted(introw[1:]))) - adjlist.extend([row]) - - return adjlist + if compact_degree: + logging.info("Recovering degreeList from disk...") + degree_list = restore_variable_from_disk('compactDegreeList') + else: + logging.info("Recovering compactDegreeList from disk...") + degree_list = restore_variable_from_disk('degreeList') -def parse_adjacencylist_unchecked(f): - adjlist = [] - for l in f: - if l and l[0] != "#": - adjlist.extend([[int(x) for x in l.strip().split()]]) - return adjlist + parts = self.workers + chunks = partition(vertices, parts) -def load_adjacencylist(file_, undirected=False, chunksize=10000, unchecked=True): + t0 = time() - if unchecked: - parse_func = parse_adjacencylist_unchecked - convert_func = from_adjlist_unchecked - else: - parse_func = parse_adjacencylist - convert_func = from_adjlist + with ProcessPoolExecutor(max_workers=self.workers) as executor: - adjlist = [] + part = 1 + for c in chunks: + logging.info("Executing part {}...".format(part)) + list_v = [] + for v in c: + list_v.append([vd for vd in vertices if vd > v]) + job = executor.submit(calc_distances_all, c, list_v, degree_list, part, compact_degree=compact_degree, + is_directed=self.is_directed) + futures[job] = part + part += 1 - t0 = time() + logging.info("Receiving results...") - with open(file_) as f: - with ProcessPoolExecutor(max_workers=cpu_count()) as executor: - total = 0 - for idx, adj_chunk in enumerate(executor.map(parse_func, grouper(int(chunksize), f))): - adjlist.extend(adj_chunk) - total += len(adj_chunk) - - t1 = time() + for job in as_completed(futures): + job.result() + r = futures[job] + logging.info("Part {} Completed.".format(r)) - logging.info('Parsed {} edges with {} chunks in {}s'.format(total, idx, t1-t0)) + logging.info('Distances calculated.') + t1 = time() + logging.info('Time : {}m'.format((t1 - t0) / 60)) - t0 = time() - G = convert_func(adjlist) - t1 = time() + return - logging.info('Converted edges to graph in {}s'.format(t1-t0)) + def calc_distances(self, compact_degree=False): - if undirected: - t0 = time() - G = G.make_undirected() - t1 = time() - logging.info('Made graph undirected in {}s'.format(t1-t0)) + logging.info("Using compactDegree: {}".format(compact_degree)) + if self.calc_until_layer: + logging.info("Calculations until layer: {}".format(self.calc_until_layer)) - return G + futures = {} + G = self.G + vertices = G.keys() if self.embedding_vertices is None else self.embedding_vertices + a_vertices = len(vertices) -def load_edgelist(file_, undirected=True): - G = Graph() - with open(file_) as f: - for l in f: - if(len(l.strip().split()[:2]) > 1): - x, y = l.strip().split()[:2] - x = int(x) - y = int(y) - G[x].append(y) - if undirected: - G[y].append(x) - else: - x = l.strip().split()[:2] - x = int(x[0]) - G[x] = [] - - G.make_consistent() - return G + parts = self.workers + chunks = partition(vertices, parts) + with ProcessPoolExecutor(max_workers=1) as executor: -def load_matfile(file_, variable_name="network", undirected=True): - mat_varables = loadmat(file_) - mat_matrix = mat_varables[variable_name] + logging.info("Split degree List...") + part = 1 + for c in chunks: + job = executor.submit(split_degree_list, part, c, G, compact_degree, a_vertices) + job.result() + logging.info("degreeList {} completed.".format(part)) + part += 1 - return from_numpy(mat_matrix, undirected) + with ProcessPoolExecutor(max_workers=self.workers) as executor: + part = 1 + for _ in chunks: + logging.info("Executing part {}...".format(part)) + job = executor.submit(calc_distances, part, compact_degree=compact_degree, is_directed=self.is_directed) + futures[job] = part + part += 1 -def from_networkx(G_input, undirected=True): - G = Graph() + logging.info("Receiving results...") + for job in as_completed(futures): + job.result() + r = futures[job] + logging.info("Part {} completed.".format(r)) - for idx, x in enumerate(G_input.nodes_iter()): - for y in iterkeys(G_input[x]): - G[x].append(y) + return - if undirected: - G.make_undirected() + def create_distances_network(self): - return G + with ProcessPoolExecutor(max_workers=1) as executor: + job = executor.submit(generate_distances_network, self.workers) + job.result() -def from_numpy(x, undirected=True): - G = Graph() + return - if issparse(x): - cx = x.tocoo() - for i,j,v in zip(cx.row, cx.col, cx.data): - G[i].append(j) - else: - raise Exception("Dense matrices not yet supported.") + def preprocess_parameters_random_walk(self): - if undirected: - G.make_undirected() + with ProcessPoolExecutor(max_workers=1) as executor: + job = executor.submit(generate_parameters_random_walk) - G.make_consistent() - return G + job.result() + return -def from_adjlist(adjlist): - G = Graph() - - for row in adjlist: - node = row[0] - neighbors = row[1:] - G[node] = list(sorted(set(neighbors))) + def simulate_walks(self, num_walks, walk_length): - return G + vertices = self.G.keys() if self.embedding_vertices is None else self.embedding_vertices + # for large graphs, it is serially executed, because of memory use. + if len(self.embedding_vertices if self.embedding_vertices is not None else self.G) > 500000: -def from_adjlist_unchecked(adjlist): - G = Graph() - - for row in adjlist: - node = row[0] - neighbors = row[1:] - G[node] = neighbors + with ProcessPoolExecutor(max_workers=1) as executor: + job = executor.submit(generate_random_walks_large_graphs, num_walks, walk_length, self.workers, + vertices) - return G + job.result() + + else: + + with ProcessPoolExecutor(max_workers=1) as executor: + job = executor.submit(generate_random_walks, num_walks, walk_length, self.workers, vertices) + + job.result() + + return + + +def load_edgelist(file_, directed=False, weighted=False): + """ + Loads an edgelist into a symmetric dictionary (the skeleton). When specified directed=True, also stores + dictionaries with incoming and outgoing degree of each node. + Args: + file_: str + the path of the edgelist file + directed: boolean + whether the graph is directed + weighted: boolean + whether the graph is weighted + + Returns: (dict, dict, dict) + Returns skeleton, in_degrees, out_degrees. The latter two are empty if directed=False. + """ + skeleton = defaultdict(list) + in_degrees = {} + out_degrees = {} + with open(file_) as f: + for l in f: + if len(l.strip().split()[:2]) > 1: + if weighted: + x, y, w = l.strip().split()[:3] + w = float(w) + else: + x, y = l.strip().split()[:2] + w = 1 + + x = int(x) + y = int(y) + + if x not in skeleton: + skeleton[x] = [] + if y not in skeleton: + skeleton[y] = [] + + if not directed or x < y: + skeleton[x].append(y) + skeleton[y].append(x) + + if directed: + in_degrees[y] = in_degrees.get(y, 0) + w + out_degrees[x] = out_degrees.get(x, 0) + w + + else: + x = l.strip().split()[:2] + x = int(x[0]) + if x not in skeleton: + skeleton[x] = [] + + skeleton = verify_consistency_(skeleton, directed) + + return skeleton, in_degrees, out_degrees + + +def verify_consistency_(skeleton, is_directed): + """ + Remove duplicates from the graph skeleton and print a warning message if any duplicates were found, + as this will cause in_degrees and out_degrees to carry wrong values. + Args: + skeleton: dict + the graph dictionary + + Returns: dict + the graph dictionary without duplicate neighbours + """ + logging.info('Verifying consistency of edgelist ...') + cleaned_skeleton = remove_duplicates_(skeleton) + if is_directed: + for k, v in skeleton.iteritems(): + if len(v) != len(cleaned_skeleton[k]): + print('WARNING: The edgelist file contains duplicates. Directed degrees will not be accurate.') + print('Example duplicates amongst the neighbours of node {}'.format(k)) + break + return cleaned_skeleton + + +def remove_duplicates_(graph_dict): + """ + Remove duplicates in the neighbourhood lists. + """ + d = defaultdict(list) + for k in graph_dict.iterkeys(): + d[k] = sorted(set(graph_dict[k])) # sorted: returns a list + return d -def from_dict(d): - G = Graph() - for k,v in d.iteritems(): - G[k] = v +def number_of_nodes_(graph_dict): + """ + Returns the number of nodes in a graph represented by a dictionary. + """ + return len(graph_dict) - return G +def number_of_edges_(graph_dict, is_directed): + """ + Returns the number of edges in a graph represented by a dictionary. + """ + degree_sum = sum(len(graph_dict[node]) for node in graph_dict.keys()) + return degree_sum if is_directed else degree_sum / 2 diff --git a/src/main.py b/src/main.py index 9c37f04..4403657 100644 --- a/src/main.py +++ b/src/main.py @@ -1,129 +1,154 @@ -#!/usr/bin/python +#!/usr/bin/env python2 # -*- coding: utf-8 -*- import argparse, logging -import numpy as np -import struc2vec +import graph from gensim.models import Word2Vec from gensim.models.word2vec import LineSentence -from time import time -import graph +logging.basicConfig(filename='struc2vec.log', filemode='w', level=logging.DEBUG, format='%(asctime)s %(message)s') -logging.basicConfig(filename='struc2vec.log',filemode='w',level=logging.DEBUG,format='%(asctime)s %(message)s') def parse_args(): - ''' - Parses the struc2vec arguments. - ''' - parser = argparse.ArgumentParser(description="Run struc2vec.") + """ + Parses the struc2vec arguments. + """ + parser = argparse.ArgumentParser(description="Run struc2vec.") + + parser.add_argument('--input', metavar='EDGELIST_FILE', nargs='?', default='graph/karate.edgelist', + help='Input graph path') + + parser.add_argument('--output', metavar='EMB_FILE', nargs='?', default='emb/karate.emb', + help='Embeddings path') + + parser.add_argument('--embed-subset', metavar='NODELIST_FILE', type=str, default=None, + help='Only compute embeddings for nodes in a given nodelist. ' + 'Still uses the entire graph as context for the embedding.') + + parser.add_argument('--dimensions', type=int, default=128, + help='Number of dimensions. Default is 128.') + + parser.add_argument('--walk-length', type=int, default=80, + help='Length of walk per source. Default is 80.') - parser.add_argument('--input', nargs='?', default='graph/karate.edgelist', - help='Input graph path') + parser.add_argument('--num-walks', type=int, default=10, + help='Number of walks per source. Default is 10.') - parser.add_argument('--output', nargs='?', default='emb/karate.emb', - help='Embeddings path') + parser.add_argument('--window-size', type=int, default=10, + help='Context size for optimization. Default is 10.') - parser.add_argument('--dimensions', type=int, default=128, - help='Number of dimensions. Default is 128.') + parser.add_argument('--until-layer', type=int, default=None, + help='Calculation until the layer.') - parser.add_argument('--walk-length', type=int, default=80, - help='Length of walk per source. Default is 80.') + parser.add_argument('--iter', default=5, type=int, + help='Number of epochs in SGD') - parser.add_argument('--num-walks', type=int, default=10, - help='Number of walks per source. Default is 10.') + parser.add_argument('--workers', type=int, default=4, + help='Number of parallel workers. Default is 8.') - parser.add_argument('--window-size', type=int, default=10, - help='Context size for optimization. Default is 10.') + parser.add_argument('--bfs-workers', type=int, default=None, + help='Number of parallel workers only for BFS stage. Default is using --workers.') - parser.add_argument('--until-layer', type=int, default=None, - help='Calculation until the layer.') + parser.add_argument('--weighted', dest='weighted', action='store_true', + help='Boolean specifying (un)weighted. Default is unweighted.') + parser.set_defaults(weighted=False) - parser.add_argument('--iter', default=5, type=int, - help='Number of epochs in SGD') + parser.add_argument('--directed', dest='directed', action='store_true', + help='Graph is (un)directed. Default is undirected.') + parser.set_defaults(directed=False) - parser.add_argument('--workers', type=int, default=4, - help='Number of parallel workers. Default is 8.') + parser.add_argument('--OPT1', default=False, type=bool, + help='optimization 1') + parser.add_argument('--OPT2', default=False, type=bool, + help='optimization 2') + parser.add_argument('--OPT3', default=False, type=bool, + help='optimization 3') + return parser.parse_args() - parser.add_argument('--weighted', dest='weighted', action='store_true', - help='Boolean specifying (un)weighted. Default is unweighted.') - parser.add_argument('--unweighted', dest='unweighted', action='store_false') - parser.set_defaults(weighted=False) - parser.add_argument('--directed', dest='directed', action='store_true', - help='Graph is (un)directed. Default is undirected.') - parser.add_argument('--undirected', dest='undirected', action='store_false') - parser.set_defaults(directed=False) +def read_graph(args): + """ + Reads the input network. + """ + logging.info(" - Loading graph...") + graph_dict, in_degrees, out_degrees = graph.load_edgelist(args.input, args.directed, args.weighted) + logging.info(" - Graph loaded.") + return graph_dict, in_degrees, out_degrees - parser.add_argument('--OPT1', default=False, type=bool, - help='optimization 1') - parser.add_argument('--OPT2', default=False, type=bool, - help='optimization 2') - parser.add_argument('--OPT3', default=False, type=bool, - help='optimization 3') - return parser.parse_args() -def read_graph(): - ''' - Reads the input network. - ''' - logging.info(" - Loading graph...") - G = graph.load_edgelist(args.input,undirected=True) - logging.info(" - Graph loaded.") - return G +def read_embedding_set(args): + """ + Reads a nodelist with vertices to be embedded. + """ + vertices = set() + with open(args.embed_subset, 'r') as f: + for line in f: + vertex = int(line.strip()) + vertices.add(vertex) + return list(vertices) + def learn_embeddings(): - ''' - Learn embeddings by optimizing the Skipgram objective using SGD. - ''' - logging.info("Initializing creation of the representations...") - walks = LineSentence('random_walks.txt') - model = Word2Vec(walks, size=args.dimensions, window=args.window_size, min_count=0, hs=1, sg=1, workers=args.workers, iter=args.iter) - model.wv.save_word2vec_format(args.output) - logging.info("Representations created.") - - return + """ + Learn embeddings by optimizing the Skipgram objective using SGD. + """ + logging.info("Initializing creation of the representations...") + walks = LineSentence('random_walks.txt') + model = Word2Vec(walks, size=args.dimensions, window=args.window_size, min_count=0, hs=1, sg=1, + workers=args.workers, iter=args.iter) + model.wv.save_word2vec_format(args.output) + logging.info("Representations created.") + + return + def exec_struc2vec(args): - ''' - Pipeline for representational learning for all nodes in a graph. - ''' - if(args.OPT3): - until_layer = args.until_layer - else: - until_layer = None + """ + Pipeline for representational learning for all nodes in a graph. + """ + if args.weighted and not args.directed: + raise NotImplementedError('edge weights are only implemented for directed graphs') - G = read_graph() - G = struc2vec.Graph(G, args.directed, args.workers, untilLayer = until_layer) + if args.OPT3: + until_layer = args.until_layer + else: + until_layer = None - if(args.OPT1): - G.preprocess_neighbors_with_bfs_compact() - else: - G.preprocess_neighbors_with_bfs() + if args.embed_subset: + embedding_vertices = read_embedding_set(args) + else: + embedding_vertices = None - if(args.OPT2): - G.create_vectors() - G.calc_distances(compactDegree = args.OPT1) - else: - G.calc_distances_all_vertices(compactDegree = args.OPT1) + graph_dict, in_degrees, out_degrees = read_graph(args) # in_degrees = out_degrees = {} if not args.directed + G = graph.Graph(graph_dict, args.directed, args.workers, bfs_workers=args.bfs_workers, until_layer=until_layer, + in_degrees=in_degrees, + out_degrees=out_degrees, embedding_vertices=embedding_vertices) + if args.OPT1: + G.preprocess_neighbors_with_bfs_compact() + else: + G.preprocess_neighbors_with_bfs() - G.create_distances_network() - G.preprocess_parameters_random_walk() + if args.OPT2: + G.create_vectors() + G.calc_distances(compact_degree=args.OPT1) + else: + G.calc_distances_all_vertices(compact_degree=args.OPT1) - G.simulate_walks(args.num_walks, args.walk_length) + G.create_distances_network() + G.preprocess_parameters_random_walk() + G.simulate_walks(args.num_walks, args.walk_length) - return G + return G -def main(args): - G = exec_struc2vec(args) +def main(args): + exec_struc2vec(args) - learn_embeddings() + learn_embeddings() if __name__ == "__main__": - args = parse_args() - main(args) - + args = parse_args() + main(args) diff --git a/src/struc2vec.py b/src/struc2vec.py deleted file mode 100644 index 05887d6..0000000 --- a/src/struc2vec.py +++ /dev/null @@ -1,245 +0,0 @@ -# -*- coding: utf-8 -*- - -import numpy as np -import random,sys,logging -from concurrent.futures import ProcessPoolExecutor, as_completed -from multiprocessing import Manager -from time import time -from collections import deque - -from utils import * -from algorithms import * -from algorithms_distances import * -import graph - - -class Graph(): - def __init__(self, g, is_directed, workers, untilLayer = None): - - logging.info(" - Converting graph to dict...") - self.G = g.gToDict() - logging.info("Graph converted.") - - self.num_vertices = g.number_of_nodes() - self.num_edges = g.number_of_edges() - self.is_directed = is_directed - self.workers = workers - self.calcUntilLayer = untilLayer - logging.info('Graph - Number of vertices: {}'.format(self.num_vertices)) - logging.info('Graph - Number of edges: {}'.format(self.num_edges)) - - - def preprocess_neighbors_with_bfs(self): - - with ProcessPoolExecutor(max_workers=self.workers) as executor: - job = executor.submit(exec_bfs,self.G,self.workers,self.calcUntilLayer) - - job.result() - - return - - def preprocess_neighbors_with_bfs_compact(self): - - with ProcessPoolExecutor(max_workers=self.workers) as executor: - job = executor.submit(exec_bfs_compact,self.G,self.workers,self.calcUntilLayer) - - job.result() - - return - - def preprocess_degree_lists(self): - - with ProcessPoolExecutor(max_workers=self.workers) as executor: - job = executor.submit(preprocess_degreeLists) - - job.result() - - return - - - def create_vectors(self): - logging.info("Creating degree vectors...") - degrees = {} - degrees_sorted = set() - G = self.G - for v in G.keys(): - degree = len(G[v]) - degrees_sorted.add(degree) - if(degree not in degrees): - degrees[degree] = {} - degrees[degree]['vertices'] = deque() - degrees[degree]['vertices'].append(v) - degrees_sorted = np.array(list(degrees_sorted),dtype='int') - degrees_sorted = np.sort(degrees_sorted) - - l = len(degrees_sorted) - for index, degree in enumerate(degrees_sorted): - if(index > 0): - degrees[degree]['before'] = degrees_sorted[index - 1] - if(index < (l - 1)): - degrees[degree]['after'] = degrees_sorted[index + 1] - logging.info("Degree vectors created.") - logging.info("Saving degree vectors...") - saveVariableOnDisk(degrees,'degrees_vector') - - - def calc_distances_all_vertices(self,compactDegree = False): - - logging.info("Using compactDegree: {}".format(compactDegree)) - if(self.calcUntilLayer): - logging.info("Calculations until layer: {}".format(self.calcUntilLayer)) - - futures = {} - - count_calc = 0 - - vertices = list(reversed(sorted(self.G.keys()))) - - if(compactDegree): - logging.info("Recovering degreeList from disk...") - degreeList = restoreVariableFromDisk('compactDegreeList') - else: - logging.info("Recovering compactDegreeList from disk...") - degreeList = restoreVariableFromDisk('degreeList') - - parts = self.workers - chunks = partition(vertices,parts) - - t0 = time() - - with ProcessPoolExecutor(max_workers = self.workers) as executor: - - part = 1 - for c in chunks: - logging.info("Executing part {}...".format(part)) - list_v = [] - for v in c: - list_v.append([vd for vd in degreeList.keys() if vd > v]) - job = executor.submit(calc_distances_all, c, list_v, degreeList,part, compactDegree = compactDegree) - futures[job] = part - part += 1 - - - logging.info("Receiving results...") - - for job in as_completed(futures): - job.result() - r = futures[job] - logging.info("Part {} Completed.".format(r)) - - logging.info('Distances calculated.') - t1 = time() - logging.info('Time : {}m'.format((t1-t0)/60)) - - return - - - def calc_distances(self, compactDegree = False): - - logging.info("Using compactDegree: {}".format(compactDegree)) - if(self.calcUntilLayer): - logging.info("Calculations until layer: {}".format(self.calcUntilLayer)) - - futures = {} - #distances = {} - - count_calc = 0 - - G = self.G - vertices = G.keys() - - parts = self.workers - chunks = partition(vertices,parts) - - with ProcessPoolExecutor(max_workers = 1) as executor: - - logging.info("Split degree List...") - part = 1 - for c in chunks: - job = executor.submit(splitDegreeList,part,c,G,compactDegree) - job.result() - logging.info("degreeList {} completed.".format(part)) - part += 1 - - - with ProcessPoolExecutor(max_workers = self.workers) as executor: - - part = 1 - for c in chunks: - logging.info("Executing part {}...".format(part)) - job = executor.submit(calc_distances, part, compactDegree = compactDegree) - futures[job] = part - part += 1 - - logging.info("Receiving results...") - for job in as_completed(futures): - job.result() - r = futures[job] - logging.info("Part {} completed.".format(r)) - - - return - - def consolide_distances(self): - - distances = {} - - parts = self.workers - for part in range(1,parts + 1): - d = restoreVariableFromDisk('distances-'+str(part)) - preprocess_consolides_distances(distances) - distances.update(d) - - - preprocess_consolides_distances(distances) - saveVariableOnDisk(distances,'distances') - - - def create_distances_network(self): - - with ProcessPoolExecutor(max_workers=1) as executor: - job = executor.submit(generate_distances_network,self.workers) - - job.result() - - return - - def preprocess_parameters_random_walk(self): - - with ProcessPoolExecutor(max_workers=1) as executor: - job = executor.submit(generate_parameters_random_walk,self.workers) - - job.result() - - return - - - def simulate_walks(self,num_walks,walk_length): - - # for large graphs, it is serially executed, because of memory use. - if(len(self.G) > 500000): - - with ProcessPoolExecutor(max_workers=1) as executor: - job = executor.submit(generate_random_walks_large_graphs,num_walks,walk_length,self.workers,self.G.keys()) - - job.result() - - else: - - with ProcessPoolExecutor(max_workers=1) as executor: - job = executor.submit(generate_random_walks,num_walks,walk_length,self.workers,self.G.keys()) - - job.result() - - - return - - - - - - - - - - diff --git a/src/utils.py b/src/utils.py index b236857..1fd3004 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,50 +1,51 @@ # -*- coding: utf-8 -*- from time import time -import logging,inspect +import logging, inspect import cPickle as pickle from itertools import islice import os.path dir_f = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) -folder_pickles = dir_f+"/../pickles/" +folder_pickles = dir_f + "/../pickles/" -def returnPathStruc2vec(): + +def return_path_struc2vec(): return dir_f -def isPickle(fname): - return os.path.isfile(dir_f+'/../pickles/'+fname+'.pickle') -def chunks(data, SIZE=10000): +def is_pickle(fname): + return os.path.isfile(dir_f + '/../pickles/' + fname + '.pickle') + + +def chunks(data, size=10000): it = iter(data) - for i in xrange(0, len(data), SIZE): - yield {k:data[k] for k in islice(it, SIZE)} + for i in xrange(0, len(data), size): + yield {k: data[k] for k in islice(it, size)} + def partition(lst, n): division = len(lst) / float(n) - return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ] + return [lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n)] + -def restoreVariableFromDisk(name): +def restore_variable_from_disk(name): logging.info('Recovering variable...') t0 = time() val = None with open(folder_pickles + name + '.pickle', 'rb') as handle: val = pickle.load(handle) t1 = time() - logging.info('Variable recovered. Time: {}m'.format((t1-t0)/60)) + logging.info('Variable recovered. Time: {}m'.format((t1 - t0) / 60)) return val -def saveVariableOnDisk(f,name): + +def save_variable_on_disk(f, name): logging.info('Saving variable on disk...') t0 = time() with open(folder_pickles + name + '.pickle', 'wb') as handle: pickle.dump(f, handle, protocol=pickle.HIGHEST_PROTOCOL) t1 = time() - logging.info('Variable saved. Time: {}m'.format((t1-t0)/60)) + logging.info('Variable saved. Time: {}m'.format((t1 - t0) / 60)) return - - - - -