From 8322dad9eaa83b4804e870577b994b84f0b594de Mon Sep 17 00:00:00 2001 From: yemeen Date: Thu, 18 Sep 2025 21:18:28 -0400 Subject: [PATCH 1/3] add new sparse ect algo --- src/ect/ect.py | 210 +++++++++++++++++++++++++++++++++------------ tests/test_dect.py | 95 ++++++++------------ 2 files changed, 190 insertions(+), 115 deletions(-) diff --git a/src/ect/ect.py b/src/ect/ect.py index 8343fa1..ff0caa2 100644 --- a/src/ect/ect.py +++ b/src/ect/ect.py @@ -101,34 +101,98 @@ def _ensure_thresholds(self, graph, override_bound_radius=None): def calculate( self, graph: EmbeddedComplex, - theta: Optional[float] = None, - override_bound_radius: Optional[float] = None, + theta: float = None, + override_bound_radius: float = None, ): - """Calculate Euler Characteristic Transform (ECT) for a given graph and direction theta - - Args: - graph (EmbeddedComplex): - The input complex to calculate the ECT for. - theta (float): - The angle in :math:`[0,2\pi]` for the direction to calculate the ECT. - override_bound_radius (float): - If None, uses the following in order: (i) the bounding radius stored in the class; or if not available (ii) the bounding radius of the given graph. Otherwise, should be a positive float :math:`R` where the ECC will be computed at thresholds in :math:`[-R,R]`. Default is None. - """ self._ensure_directions(graph.dim, theta) self._ensure_thresholds(graph, override_bound_radius) - - # override with theta if provided directions = ( self.directions if theta is None else Directions.from_angles([theta]) ) + ect_matrix = self._compute_ect(graph, directions, self.thresholds, self.dtype) - simplex_projections = self._compute_simplex_projections(graph, directions) + return ECTResult(ect_matrix, directions, self.thresholds) + + def _build_incidence_csr(self, graph) -> tuple: + """ + Build column sparse representation of the cell-to-vertex incidence excluding 0-cells. Format is (cell_vertex_pointers, cell_vertex_indices_flat, cell_euler_signs, n_vertices). + Example: takes the complex [(1,3),(2,4),(1,2,3)] and returns [(0,2,4,7),(1,3,2,4,1,2,3),(-1,-1,1),4] + + """ + n_vertices = len(graph.node_list) - ect_matrix = self._compute_directional_transform( - simplex_projections, self.thresholds, self.dtype + cells_by_dimension = {} + + if hasattr(graph, "edge_indices") and graph.edge_indices is not None: + edge_indices_array = np.asarray(graph.edge_indices) + if edge_indices_array.size: + cells_by_dimension[1] = [ + tuple(map(int, row)) for row in edge_indices_array + ] + + if hasattr(graph, "cells") and graph.cells: + for dim, cells_of_dim in graph.cells.items(): + if dim == 0: + continue + if dim == 1 and 1 in cells_by_dimension: + continue + if isinstance(cells_of_dim, np.ndarray): + cell_list = [tuple(map(int, row)) for row in cells_of_dim] + else: + cell_list = [tuple(map(int, c)) for c in cells_of_dim] + if len(cell_list) > 0: + cells_by_dimension[dim] = cell_list + + dimensions = sorted(cells_by_dimension.keys()) + n_cells = sum(len(cells_by_dimension[d]) for d in dimensions) + + cell_vertex_pointers = np.empty(n_cells + 1, dtype=np.int64) + cell_euler_signs = np.empty(n_cells, dtype=np.int32) + cell_vertex_indices_flat = [] + + cell_vertex_pointers[0] = 0 + cell_index = 0 + for dim in dimensions: + cells_in_dim = cells_by_dimension[dim] + euler_sign = 1 if (dim % 2 == 0) else -1 + for cell_vertices in cells_in_dim: + cell_vertex_indices_flat.extend(cell_vertices) + cell_euler_signs[cell_index] = euler_sign + cell_index += 1 + cell_vertex_pointers[cell_index] = len(cell_vertex_indices_flat) + + cell_vertex_indices_flat = np.asarray(cell_vertex_indices_flat, dtype=np.int32) + return ( + cell_vertex_pointers, + cell_vertex_indices_flat, + cell_euler_signs, + n_vertices, ) - return ECTResult(ect_matrix, directions, self.thresholds) + def _compute_ect( + self, graph, directions, thresholds: np.ndarray, dtype=np.int32 + ) -> np.ndarray: + cell_vertex_pointers, cell_vertex_indices_flat, cell_euler_signs, N = ( + self._build_incidence_csr(graph) + ) + thresholds = np.asarray(thresholds, dtype=np.float64) + + V = directions.vectors + X = graph.coord_matrix + H = X @ V if V.shape[0] == X.shape[1] else X @ V.T # (N, m) + H_T = np.ascontiguousarray(H.T) # (m, N) for contiguous per-direction rows + + out64 = _ect_all_dirs( + H_T, + cell_vertex_pointers, + cell_vertex_indices_flat, + cell_euler_signs, + thresholds, + N, + ) + if dtype == np.int32: + return out64.astype(np.int32) + return out64 def _compute_simplex_projections(self, graph: EmbeddedComplex, directions): """Compute projections of each k-cell for all dimensions""" @@ -160,41 +224,79 @@ def _compute_simplex_projections(self, graph: EmbeddedComplex, directions): return simplex_projections - @staticmethod - @njit(parallel=True, fastmath=True) - def _compute_directional_transform( - simplex_projections_list, thresholds, dtype=np.int32 - ): - """Compute ECT by counting simplices below each threshold - VECTORIZED VERSION - Args: - simplex_projections_list: List of arrays containing projections for each simplex type - [vertex_projections, edge_projections, face_projections] - thresholds: Array of threshold values to compute ECT at - dtype: Data type for output array (default: np.int32) +@njit(cache=True, parallel=True) +def _ect_all_dirs( + heights_by_direction, # shape (num_directions, num_vertices) + cell_vertex_pointers, # shape (num_cells + 1,) + cell_vertex_indices_flat, # concatenated vertex indices for all cells + cell_euler_signs, # per-cell sign: (+1) for even-dim, (-1) for odd-dim + threshold_values, # shape (num_thresholds,), assumed nondecreasing + num_vertices, +): + """ + Calculate the Euler Characteristic Transform (ECT) for a given direction and thresholds. - Returns: - Array of shape (num_directions, num_thresholds) containing Euler characteristics - """ - num_dir = simplex_projections_list[0].shape[1] - num_thresh = thresholds.shape[0] - result = np.empty((num_dir, num_thresh), dtype=dtype) - - sorted_projections = List() - for proj in simplex_projections_list: - sorted_proj = np.empty_like(proj) - for i in prange(num_dir): - sorted_proj[:, i] = np.sort(proj[:, i]) - sorted_projections.append(sorted_proj) - - for i in prange(num_dir): - chi = np.zeros(num_thresh, dtype=dtype) - for k in range(len(sorted_projections)): - projs = sorted_projections[k][:, i] - - count = np.searchsorted(projs, thresholds, side="right") - - sign = -1 if k % 2 else 1 - chi += sign * count - result[i] = chi - return result + Args: + heights_by_direction: The heights of the vertices for each direction + cell_vertex_pointers: The pointers to the vertices for each cell + cell_vertex_indices_flat: The indices of the vertices for each cell + cell_euler_signs: The signs of the cells + threshold_values: The thresholds to calculate the ECT for + num_vertices: The number of vertices in the graph + """ + num_directions = heights_by_direction.shape[0] + num_thresholds = threshold_values.shape[0] + ect_values = np.empty((num_directions, num_thresholds), dtype=np.int64) + + for dir_idx in prange(num_directions): + heights = heights_by_direction[dir_idx] + + sort_order = np.argsort(heights) + + # calculate what position each vertex is in the sorted heights starting from 1 (the rank) + vertex_rank_1based = np.empty(num_vertices, dtype=np.int32) + for rnk in range(num_vertices): + vertex_index = sort_order[rnk] + vertex_rank_1based[vertex_index] = rnk + 1 + + # euler char can only jump at each vertex value + jump_amount = np.zeros(num_vertices + 1, dtype=np.int64) + + # 0-cells add +1 at their entrance ranks + for v in range(num_vertices): + rank_v = vertex_rank_1based[v] + jump_amount[rank_v] += 1 + + # each pair of pointers defines a cell, so we iterate over them + num_cells = cell_vertex_pointers.shape[0] - 1 + for cell_idx in range(num_cells): + start = cell_vertex_pointers[cell_idx] + end = cell_vertex_pointers[cell_idx + 1] + # cells come in when the highest vertex enters + entrance_rank = 0 + for k in range(start, end): + v = cell_vertex_indices_flat[k] + rnk = vertex_rank_1based[v] + if rnk > entrance_rank: + entrance_rank = rnk + # record at what rank the cell enters and how much the euler char changes + jump_amount[entrance_rank] += cell_euler_signs[cell_idx] + + # calculate euler char at the moment each vertex enters + euler_prefix = np.empty(num_vertices + 1, dtype=np.int64) + running_sum = 0 + for r in range(num_vertices + 1): + running_sum += jump_amount[r] + euler_prefix[r] = running_sum + + # now find euler char at each threshold wrt the sorted heights + sorted_heights = heights[sort_order] + rank_cursor = 0 # equals r(t) = # { i : h_i <= t } + for thresh_idx in range(num_thresholds): + t = threshold_values[thresh_idx] + while rank_cursor < num_vertices and sorted_heights[rank_cursor] <= t: + rank_cursor += 1 + ect_values[dir_idx, thresh_idx] = euler_prefix[rank_cursor] + + return ect_values diff --git a/tests/test_dect.py b/tests/test_dect.py index ee69848..1bce174 100644 --- a/tests/test_dect.py +++ b/tests/test_dect.py @@ -11,7 +11,7 @@ def setUp(self): self.num_dirs = 8 self.num_thresh = 10 self.bound_radius = 2.0 - + def test_initialization_with_scale_parameter(self): """Test that DECT initializes correctly with scale parameter""" # Test with default scale @@ -21,7 +21,7 @@ def test_initialization_with_scale_parameter(self): bound_radius=self.bound_radius, ) self.assertEqual(dect1.scale, 10.0) # Default scale - + # Test with custom scale custom_scale = 25.0 dect2 = DECT( @@ -31,12 +31,12 @@ def test_initialization_with_scale_parameter(self): scale=custom_scale, ) self.assertEqual(dect2.scale, custom_scale) - + # Verify parent class attributes are initialized self.assertEqual(dect2.bound_radius, self.bound_radius) self.assertEqual(dect2.num_dirs, self.num_dirs) self.assertEqual(dect2.num_thresh, self.num_thresh) - + def test_calculate_with_default_and_custom_scale(self): """Test calculate method with both default and custom scale values""" dect = DECT( @@ -45,22 +45,22 @@ def test_calculate_with_default_and_custom_scale(self): bound_radius=self.bound_radius, scale=15.0, # Init scale ) - + # Test with default scale from init result1 = dect.calculate(self.graph) self.assertIsNotNone(result1) - + # Test with override scale in calculate result2 = dect.calculate(self.graph, scale=50.0) self.assertIsNotNone(result2) - + # Results should be different due to different scales self.assertFalse(np.allclose(result1, result2)) - + # Test that None scale uses init scale result3 = dect.calculate(self.graph, scale=None) self.assertTrue(np.allclose(result1, result3)) - + def test_inheritance_from_ect(self): """Test that DECT properly inherits from ECT""" dect = DECT( @@ -68,39 +68,16 @@ def test_inheritance_from_ect(self): num_thresh=self.num_thresh, bound_radius=self.bound_radius, ) - + # Check that DECT is instance of ECT self.assertIsInstance(dect, ECT) - + # Verify inherited methods are available - self.assertTrue(hasattr(dect, '_ensure_directions')) - self.assertTrue(hasattr(dect, '_ensure_thresholds')) - self.assertTrue(hasattr(dect, '_compute_simplex_projections')) - self.assertTrue(hasattr(dect, 'calculate')) - - # Verify only _compute_directional_transform is different - ect = ECT( - num_dirs=self.num_dirs, - num_thresh=self.num_thresh, - bound_radius=self.bound_radius, - ) - - # These methods should be the same (inherited) - self.assertEqual( - dect._ensure_directions.__code__.co_code, - ect._ensure_directions.__code__.co_code - ) - self.assertEqual( - dect._ensure_thresholds.__code__.co_code, - ect._ensure_thresholds.__code__.co_code - ) - - # This method should be different (overridden) - self.assertNotEqual( - dect._compute_directional_transform.__code__.co_code, - ect._compute_directional_transform.__code__.co_code - ) - + self.assertTrue(hasattr(dect, "_ensure_directions")) + self.assertTrue(hasattr(dect, "_ensure_thresholds")) + self.assertTrue(hasattr(dect, "_compute_simplex_projections")) + self.assertTrue(hasattr(dect, "calculate")) + def test_result_shape_and_type(self): """Test that DECT returns correct result shape and type""" dect = DECT( @@ -108,42 +85,38 @@ def test_result_shape_and_type(self): num_thresh=self.num_thresh, bound_radius=self.bound_radius, ) - + result = dect.calculate(self.graph) - + # Check result type self.assertIsInstance(result, ECTResult) - + # Check shape self.assertEqual(result.shape, (self.num_dirs, self.num_thresh)) - + # Check that directions and thresholds are included self.assertIsNotNone(result.directions) self.assertIsInstance(result.directions, Directions) self.assertEqual(len(result.directions), self.num_dirs) - + self.assertIsNotNone(result.thresholds) self.assertEqual(len(result.thresholds), self.num_thresh) - + # Check dtype - ECTResult always converts to float64 for float types dect_float32 = DECT( - num_dirs=self.num_dirs, - num_thresh=self.num_thresh, - dtype=np.float32 + num_dirs=self.num_dirs, num_thresh=self.num_thresh, dtype=np.float32 ) result_float32 = dect_float32.calculate(self.graph) # ECTResult converts float types to float64 self.assertEqual(result_float32.dtype, np.float64) - + # Test with float64 dect_float64 = DECT( - num_dirs=self.num_dirs, - num_thresh=self.num_thresh, - dtype=np.float64 + num_dirs=self.num_dirs, num_thresh=self.num_thresh, dtype=np.float64 ) result_float64 = dect_float64.calculate(self.graph) self.assertEqual(result_float64.dtype, np.float64) - + def test_calculate_with_single_direction(self): """Test DECT calculation with a single direction (theta parameter)""" dect = DECT( @@ -151,14 +124,14 @@ def test_calculate_with_single_direction(self): num_thresh=self.num_thresh, bound_radius=self.bound_radius, ) - + # Test with specific theta - result = dect.calculate(self.graph, theta=np.pi/4) - + result = dect.calculate(self.graph, theta=np.pi / 4) + # Should have single direction self.assertEqual(result.shape, (1, self.num_thresh)) self.assertEqual(len(result.directions), 1) - + def test_with_different_graph_types(self): """Test DECT works with both EmbeddedGraph and EmbeddedCW""" dect = DECT( @@ -166,17 +139,17 @@ def test_with_different_graph_types(self): num_thresh=self.num_thresh, bound_radius=self.bound_radius, ) - + # Test with graph graph = create_example_graph() result_graph = dect.calculate(graph) self.assertEqual(result_graph.shape, (self.num_dirs, self.num_thresh)) - + # Test with CW complex cw = create_example_cw() result_cw = dect.calculate(cw) self.assertEqual(result_cw.shape, (self.num_dirs, self.num_thresh)) -if __name__ == '__main__': - unittest.main() \ No newline at end of file +if __name__ == "__main__": + unittest.main() From 8067e7ffd9aec18d0f858786e107fa04fc99c1af Mon Sep 17 00:00:00 2001 From: yemeen Date: Thu, 18 Sep 2025 21:23:40 -0400 Subject: [PATCH 2/3] Bump version to 1.2.0 in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f68f6cb..ad02f82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ect" -version = "1.1.2" +version = "1.2.0" authors = [ { name="Liz Munch", email="muncheli@msu.edu" }, ] From e68bbdf25d7880c1ef4460abdc480c99c9fac8ac Mon Sep 17 00:00:00 2001 From: yemeen Date: Thu, 18 Sep 2025 21:57:19 -0400 Subject: [PATCH 3/3] move sparse function to embed_complex --- src/ect/ect.py | 58 +--------------------------------------- src/ect/embed_complex.py | 56 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 57 deletions(-) diff --git a/src/ect/ect.py b/src/ect/ect.py index ff0caa2..04b1824 100644 --- a/src/ect/ect.py +++ b/src/ect/ect.py @@ -113,67 +113,11 @@ def calculate( return ECTResult(ect_matrix, directions, self.thresholds) - def _build_incidence_csr(self, graph) -> tuple: - """ - Build column sparse representation of the cell-to-vertex incidence excluding 0-cells. Format is (cell_vertex_pointers, cell_vertex_indices_flat, cell_euler_signs, n_vertices). - Example: takes the complex [(1,3),(2,4),(1,2,3)] and returns [(0,2,4,7),(1,3,2,4,1,2,3),(-1,-1,1),4] - - """ - n_vertices = len(graph.node_list) - - cells_by_dimension = {} - - if hasattr(graph, "edge_indices") and graph.edge_indices is not None: - edge_indices_array = np.asarray(graph.edge_indices) - if edge_indices_array.size: - cells_by_dimension[1] = [ - tuple(map(int, row)) for row in edge_indices_array - ] - - if hasattr(graph, "cells") and graph.cells: - for dim, cells_of_dim in graph.cells.items(): - if dim == 0: - continue - if dim == 1 and 1 in cells_by_dimension: - continue - if isinstance(cells_of_dim, np.ndarray): - cell_list = [tuple(map(int, row)) for row in cells_of_dim] - else: - cell_list = [tuple(map(int, c)) for c in cells_of_dim] - if len(cell_list) > 0: - cells_by_dimension[dim] = cell_list - - dimensions = sorted(cells_by_dimension.keys()) - n_cells = sum(len(cells_by_dimension[d]) for d in dimensions) - - cell_vertex_pointers = np.empty(n_cells + 1, dtype=np.int64) - cell_euler_signs = np.empty(n_cells, dtype=np.int32) - cell_vertex_indices_flat = [] - - cell_vertex_pointers[0] = 0 - cell_index = 0 - for dim in dimensions: - cells_in_dim = cells_by_dimension[dim] - euler_sign = 1 if (dim % 2 == 0) else -1 - for cell_vertices in cells_in_dim: - cell_vertex_indices_flat.extend(cell_vertices) - cell_euler_signs[cell_index] = euler_sign - cell_index += 1 - cell_vertex_pointers[cell_index] = len(cell_vertex_indices_flat) - - cell_vertex_indices_flat = np.asarray(cell_vertex_indices_flat, dtype=np.int32) - return ( - cell_vertex_pointers, - cell_vertex_indices_flat, - cell_euler_signs, - n_vertices, - ) - def _compute_ect( self, graph, directions, thresholds: np.ndarray, dtype=np.int32 ) -> np.ndarray: cell_vertex_pointers, cell_vertex_indices_flat, cell_euler_signs, N = ( - self._build_incidence_csr(graph) + graph._build_incidence_csr() ) thresholds = np.asarray(thresholds, dtype=np.float64) diff --git a/src/ect/embed_complex.py b/src/ect/embed_complex.py index 38b7756..fe234cc 100644 --- a/src/ect/embed_complex.py +++ b/src/ect/embed_complex.py @@ -782,6 +782,62 @@ def _get_nice_interval(self, range_size): return nice_interval * magnitude + def _build_incidence_csr(self) -> tuple: + """ + Build column sparse representation of the cell-to-vertex incidence excluding 0-cells. Format is (cell_vertex_pointers, cell_vertex_indices_flat, cell_euler_signs, n_vertices). + Example: takes the complex [(1,3),(2,4),(1,2,3)] and returns [(0,2,4,7),(1,3,2,4,1,2,3),(-1,-1,1),4] + + """ + n_vertices = len(self.node_list) + + cells_by_dimension = {} + + if hasattr(self, "edge_indices") and self.edge_indices is not None: + edge_indices_array = np.asarray(self.edge_indices) + if edge_indices_array.size: + cells_by_dimension[1] = [ + tuple(map(int, row)) for row in edge_indices_array + ] + + if hasattr(self, "cells") and self.cells: + for dim, cells_of_dim in self.cells.items(): + if dim == 0: + continue + if dim == 1 and 1 in cells_by_dimension: + continue + if isinstance(cells_of_dim, np.ndarray): + cell_list = [tuple(map(int, row)) for row in cells_of_dim] + else: + cell_list = [tuple(map(int, c)) for c in cells_of_dim] + if len(cell_list) > 0: + cells_by_dimension[dim] = cell_list + + dimensions = sorted(cells_by_dimension.keys()) + n_cells = sum(len(cells_by_dimension[d]) for d in dimensions) + + cell_vertex_pointers = np.empty(n_cells + 1, dtype=np.int64) + cell_euler_signs = np.empty(n_cells, dtype=np.int32) + cell_vertex_indices_flat = [] + + cell_vertex_pointers[0] = 0 + cell_index = 0 + for dim in dimensions: + cells_in_dim = cells_by_dimension[dim] + euler_sign = 1 if (dim % 2 == 0) else -1 + for cell_vertices in cells_in_dim: + cell_vertex_indices_flat.extend(cell_vertices) + cell_euler_signs[cell_index] = euler_sign + cell_index += 1 + cell_vertex_pointers[cell_index] = len(cell_vertex_indices_flat) + + cell_vertex_indices_flat = np.asarray(cell_vertex_indices_flat, dtype=np.int32) + return ( + cell_vertex_pointers, + cell_vertex_indices_flat, + cell_euler_signs, + n_vertices, + ) + EmbeddedGraph = EmbeddedComplex EmbeddedCW = EmbeddedComplex