Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ect"
version = "1.1.2"
version = "1.2.0"
authors = [
{ name="Liz Munch", email="muncheli@msu.edu" },
]
Expand Down
158 changes: 102 additions & 56 deletions src/ect/ect.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,34 +101,42 @@ def _ensure_thresholds(self, graph, override_bound_radius=None):
def calculate(
self,
graph: EmbeddedComplex,
theta: Optional[float] = None,
override_bound_radius: Optional[float] = None,
theta: float = None,
override_bound_radius: float = None,
):
"""Calculate Euler Characteristic Transform (ECT) for a given graph and direction theta

Args:
graph (EmbeddedComplex):
The input complex to calculate the ECT for.
theta (float):
The angle in :math:`[0,2\pi]` for the direction to calculate the ECT.
override_bound_radius (float):
If None, uses the following in order: (i) the bounding radius stored in the class; or if not available (ii) the bounding radius of the given graph. Otherwise, should be a positive float :math:`R` where the ECC will be computed at thresholds in :math:`[-R,R]`. Default is None.
"""
self._ensure_directions(graph.dim, theta)
self._ensure_thresholds(graph, override_bound_radius)

# override with theta if provided
directions = (
self.directions if theta is None else Directions.from_angles([theta])
)
ect_matrix = self._compute_ect(graph, directions, self.thresholds, self.dtype)

simplex_projections = self._compute_simplex_projections(graph, directions)
return ECTResult(ect_matrix, directions, self.thresholds)

ect_matrix = self._compute_directional_transform(
simplex_projections, self.thresholds, self.dtype
def _compute_ect(
self, graph, directions, thresholds: np.ndarray, dtype=np.int32
) -> np.ndarray:
cell_vertex_pointers, cell_vertex_indices_flat, cell_euler_signs, N = (
graph._build_incidence_csr()
)

return ECTResult(ect_matrix, directions, self.thresholds)
thresholds = np.asarray(thresholds, dtype=np.float64)

V = directions.vectors
X = graph.coord_matrix
H = X @ V if V.shape[0] == X.shape[1] else X @ V.T # (N, m)
H_T = np.ascontiguousarray(H.T) # (m, N) for contiguous per-direction rows

out64 = _ect_all_dirs(
H_T,
cell_vertex_pointers,
cell_vertex_indices_flat,
cell_euler_signs,
thresholds,
N,
)
if dtype == np.int32:
return out64.astype(np.int32)
return out64

def _compute_simplex_projections(self, graph: EmbeddedComplex, directions):
"""Compute projections of each k-cell for all dimensions"""
Expand Down Expand Up @@ -160,41 +168,79 @@ def _compute_simplex_projections(self, graph: EmbeddedComplex, directions):

return simplex_projections

@staticmethod
@njit(parallel=True, fastmath=True)
def _compute_directional_transform(
simplex_projections_list, thresholds, dtype=np.int32
):
"""Compute ECT by counting simplices below each threshold - VECTORIZED VERSION

Args:
simplex_projections_list: List of arrays containing projections for each simplex type
[vertex_projections, edge_projections, face_projections]
thresholds: Array of threshold values to compute ECT at
dtype: Data type for output array (default: np.int32)

Returns:
Array of shape (num_directions, num_thresholds) containing Euler characteristics
"""
num_dir = simplex_projections_list[0].shape[1]
num_thresh = thresholds.shape[0]
result = np.empty((num_dir, num_thresh), dtype=dtype)

sorted_projections = List()
for proj in simplex_projections_list:
sorted_proj = np.empty_like(proj)
for i in prange(num_dir):
sorted_proj[:, i] = np.sort(proj[:, i])
sorted_projections.append(sorted_proj)

for i in prange(num_dir):
chi = np.zeros(num_thresh, dtype=dtype)
for k in range(len(sorted_projections)):
projs = sorted_projections[k][:, i]

count = np.searchsorted(projs, thresholds, side="right")

sign = -1 if k % 2 else 1
chi += sign * count
result[i] = chi
return result
@njit(cache=True, parallel=True)
def _ect_all_dirs(
heights_by_direction, # shape (num_directions, num_vertices)
cell_vertex_pointers, # shape (num_cells + 1,)
cell_vertex_indices_flat, # concatenated vertex indices for all cells
cell_euler_signs, # per-cell sign: (+1) for even-dim, (-1) for odd-dim
threshold_values, # shape (num_thresholds,), assumed nondecreasing
num_vertices,
):
"""
Calculate the Euler Characteristic Transform (ECT) for a given direction and thresholds.

Args:
heights_by_direction: The heights of the vertices for each direction
cell_vertex_pointers: The pointers to the vertices for each cell
cell_vertex_indices_flat: The indices of the vertices for each cell
cell_euler_signs: The signs of the cells
threshold_values: The thresholds to calculate the ECT for
num_vertices: The number of vertices in the graph
"""
num_directions = heights_by_direction.shape[0]
num_thresholds = threshold_values.shape[0]
ect_values = np.empty((num_directions, num_thresholds), dtype=np.int64)

for dir_idx in prange(num_directions):
heights = heights_by_direction[dir_idx]

sort_order = np.argsort(heights)

# calculate what position each vertex is in the sorted heights starting from 1 (the rank)
vertex_rank_1based = np.empty(num_vertices, dtype=np.int32)
for rnk in range(num_vertices):
vertex_index = sort_order[rnk]
vertex_rank_1based[vertex_index] = rnk + 1

# euler char can only jump at each vertex value
jump_amount = np.zeros(num_vertices + 1, dtype=np.int64)

# 0-cells add +1 at their entrance ranks
for v in range(num_vertices):
rank_v = vertex_rank_1based[v]
jump_amount[rank_v] += 1

# each pair of pointers defines a cell, so we iterate over them
num_cells = cell_vertex_pointers.shape[0] - 1
for cell_idx in range(num_cells):
start = cell_vertex_pointers[cell_idx]
end = cell_vertex_pointers[cell_idx + 1]
# cells come in when the highest vertex enters
entrance_rank = 0
for k in range(start, end):
v = cell_vertex_indices_flat[k]
rnk = vertex_rank_1based[v]
if rnk > entrance_rank:
entrance_rank = rnk
# record at what rank the cell enters and how much the euler char changes
jump_amount[entrance_rank] += cell_euler_signs[cell_idx]

# calculate euler char at the moment each vertex enters
euler_prefix = np.empty(num_vertices + 1, dtype=np.int64)
running_sum = 0
for r in range(num_vertices + 1):
running_sum += jump_amount[r]
euler_prefix[r] = running_sum

# now find euler char at each threshold wrt the sorted heights
sorted_heights = heights[sort_order]
rank_cursor = 0 # equals r(t) = # { i : h_i <= t }
for thresh_idx in range(num_thresholds):
t = threshold_values[thresh_idx]
while rank_cursor < num_vertices and sorted_heights[rank_cursor] <= t:
rank_cursor += 1
ect_values[dir_idx, thresh_idx] = euler_prefix[rank_cursor]

return ect_values
56 changes: 56 additions & 0 deletions src/ect/embed_complex.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,62 @@ def _get_nice_interval(self, range_size):

return nice_interval * magnitude

def _build_incidence_csr(self) -> tuple:
"""
Build column sparse representation of the cell-to-vertex incidence excluding 0-cells. Format is (cell_vertex_pointers, cell_vertex_indices_flat, cell_euler_signs, n_vertices).
Example: takes the complex [(1,3),(2,4),(1,2,3)] and returns [(0,2,4,7),(1,3,2,4,1,2,3),(-1,-1,1),4]

"""
n_vertices = len(self.node_list)

cells_by_dimension = {}

if hasattr(self, "edge_indices") and self.edge_indices is not None:
edge_indices_array = np.asarray(self.edge_indices)
if edge_indices_array.size:
cells_by_dimension[1] = [
tuple(map(int, row)) for row in edge_indices_array
]

if hasattr(self, "cells") and self.cells:
for dim, cells_of_dim in self.cells.items():
if dim == 0:
continue
if dim == 1 and 1 in cells_by_dimension:
continue
if isinstance(cells_of_dim, np.ndarray):
cell_list = [tuple(map(int, row)) for row in cells_of_dim]
else:
cell_list = [tuple(map(int, c)) for c in cells_of_dim]
if len(cell_list) > 0:
cells_by_dimension[dim] = cell_list

dimensions = sorted(cells_by_dimension.keys())
n_cells = sum(len(cells_by_dimension[d]) for d in dimensions)

cell_vertex_pointers = np.empty(n_cells + 1, dtype=np.int64)
cell_euler_signs = np.empty(n_cells, dtype=np.int32)
cell_vertex_indices_flat = []

cell_vertex_pointers[0] = 0
cell_index = 0
for dim in dimensions:
cells_in_dim = cells_by_dimension[dim]
euler_sign = 1 if (dim % 2 == 0) else -1
for cell_vertices in cells_in_dim:
cell_vertex_indices_flat.extend(cell_vertices)
cell_euler_signs[cell_index] = euler_sign
cell_index += 1
cell_vertex_pointers[cell_index] = len(cell_vertex_indices_flat)

cell_vertex_indices_flat = np.asarray(cell_vertex_indices_flat, dtype=np.int32)
return (
cell_vertex_pointers,
cell_vertex_indices_flat,
cell_euler_signs,
n_vertices,
)


EmbeddedGraph = EmbeddedComplex
EmbeddedCW = EmbeddedComplex
Loading
Loading