Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,6 @@ cython_debug/
tests/data/local/**
!tests/data/local/**/
!tests/data/local/**/.gitkeep

#configuration files
.vscode/launch.json
3 changes: 3 additions & 0 deletions docs/training_data_tools/points_to_raster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Points to raster

::: eis_toolkit.training_data_tools.points_to_raster
3 changes: 3 additions & 0 deletions docs/training_data_tools/random_sampling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Random sampling

::: eis_toolkit.training_data_tools.random_sampling
157 changes: 157 additions & 0 deletions eis_toolkit/training_data_tools/points_to_raster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from numbers import Number

import geopandas
import numpy as np
from beartype import beartype
from beartype.typing import Literal, Optional, Tuple, Union
from rasterio import profiles, transform
from scipy.ndimage import binary_dilation

from eis_toolkit.exceptions import EmptyDataFrameException, NonMatchingCrsException
from eis_toolkit.utilities.checks.raster import check_raster_profile


def _get_kernel_size(radius: int) -> tuple[int, int]:
size = 1 + (radius * 2)
return size, radius


def _create_grid(size: int, radius) -> tuple[np.ndarray, np.ndarray]:
y = np.arange(-radius, size - radius)
x = np.arange(-radius, size - radius)
y, x = np.meshgrid(y, x)
return x, y


def _basic_kernel(radius: int, value: Number) -> np.ndarray:
size, _ = _get_kernel_size(radius)

x, y = _create_grid(size, radius)
mask = x**2 + y**2 <= radius**2
kernel = np.zeros((size, size))
kernel[mask] = value

return kernel


def _create_local_buffer(
array: np.ndarray,
radius: int,
target_value: Number,
) -> np.ndarray:
kernel = _basic_kernel(radius, target_value)
array = np.squeeze(array) if array.ndim >= 3 else array

return binary_dilation(array == target_value, structure=kernel)


def _create_buffer_around_labels(
array: np.ndarray,
radius: int = 1,
target_value: int = 1,
buffer: Optional[str] = None,
overwrite_nodata: bool = False,
) -> np.ndarray:
out_array = np.copy(array)
out_array = _create_local_buffer(
array=out_array,
radius=radius,
target_value=target_value,
)

if buffer == "avg":
out_array = np.where(out_array, target_value, 0)
out_array = np.where((array != 0) & (out_array != 0), (array + out_array) * 0.5, (array + out_array))
elif buffer == "max":
out_array = np.where(out_array, target_value, 0)
out_array = np.where(array != 0, np.maximum(array, out_array), out_array)
elif buffer == "min":
out_array = np.where(out_array, target_value, 0)
out_array = np.where((array != 0) & (out_array != 0), np.minimum(array, out_array), (array + out_array))
else:
out_array = np.where(out_array, target_value, array)

if overwrite_nodata is False:
out_array = np.where(np.isnan(array), np.nan, out_array)

return out_array


def _point_to_raster(raster_array, raster_meta, geodataframe, attribute, radius, buffer):

width = raster_meta.get("width")
height = raster_meta.get("height")

raster_transform = raster_meta.get("transform")

left = raster_transform[2]
top = raster_transform[5]
right = left + width * raster_transform[0]
bottom = top + height * raster_transform[4]

geodataframe = geodataframe.cx[left:right, bottom:top]

if attribute is not None:
values = geodataframe[attribute]
else:
values = [1]

positives_rows, positives_cols = transform.rowcol(
raster_transform, geodataframe.geometry.x, geodataframe.geometry.y
)
raster_array[positives_rows, positives_cols] = values

unique_values = list(set(values))

if radius is not None:
for target_value in unique_values:
raster_array = _create_buffer_around_labels(raster_array, radius, target_value, buffer)

return raster_array


@beartype
def points_to_raster(
geodataframe: geopandas.GeoDataFrame,
raster_profile: Union[profiles.Profile, dict],
attribute: Optional[str] = None,
radius: Optional[int] = None,
buffer: Optional[Literal["min", "avg", "max"]] = None,
) -> Tuple[np.ndarray, Union[profiles.Profile, dict]]:
"""Convert a point data set into a binary raster.

Assigs attribute values if provided else 1 to pixels corresponding to the points and 0 elsewhere.

Args:
geodataframe: The geodataframe points set to be converted into raster.
attribute: Values to be be assigned to the geodataframe.
radius: Radius to be applied around the geodataframe in [m].
buffer: Buffers the matrix value when two or more radii with different attribute value overlap.
'avg': performs averaging of the two attribute value
'min': minimum of the two attribute values
'max': maximum of the two attribute values

Returns:
A tuple containing the output raster as a NumPy array and updated metadata.

Raises:
EmptyDataFrameException: The input GeoDataFrame is empty.
NonMatchingCrsException: The raster and geodataframe are not in the same CRS.
"""

if geodataframe.empty:
raise EmptyDataFrameException("Expected geodataframe to contain geometries.")

if raster_profile.get("crs") != geodataframe.crs:
raise NonMatchingCrsException("Expected coordinate systems to match between raster and GeoDataFrame.")

check_raster_profile(raster_profile=raster_profile)

raster_width = raster_profile.get("width")
raster_height = raster_profile.get("height")

raster_array = np.zeros((raster_height, raster_width))

out_array = _point_to_raster(raster_array, raster_profile, geodataframe, attribute, radius, buffer)

return out_array, raster_profile
90 changes: 90 additions & 0 deletions eis_toolkit/training_data_tools/random_sampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from numbers import Number

import geopandas as gpd
import numpy as np
import rasterio
import rasterio.transform
from beartype import beartype
from beartype.typing import Tuple, Union
from rasterio import profiles
from shapely.geometry import Point

from eis_toolkit.exceptions import EmptyDataException


def _random_sampling(
indices: np.ndarray,
values: np.ndarray,
sample_number: Number,
random_seed: int,
) -> np.ndarray:

indices_negatives = indices[values == 0]

total_negatives = min(indices_negatives.size, sample_number)

np.random.seed(random_seed)
negative_indices = np.random.choice(indices_negatives.shape[0], total_negatives, replace=False)
Negative_sample = indices_negatives[negative_indices]

return Negative_sample


@beartype
def generate_negatives(
raster_array: np.ndarray,
raster_meta: Union[profiles.Profile, dict],
sample_number: Number,
random_seed: int = 48,
) -> Tuple[gpd.GeoDataFrame, np.ndarray, Union[profiles.Profile, dict]]:
"""Generate probable negatives from raster array with marked positives.

Args:
raster_array: Raster array with marked positives.
raster_meta: Raster metadata.
sample_number: maximum number of negatives to be generated.
random_seed: Seed for generating random negatives.

Returns:
A tuple containing the shapely points, output raster as a NumPy array and updated metadata.

Raises:
EmptyDataException: The raster array is empty.
"""

if raster_array.size == 0:
raise EmptyDataException

out_array = np.copy(raster_array)

total_rows = out_array.shape[0]
total_cols = out_array.shape[1]

indices = np.arange(total_rows * total_cols)

indices = indices.reshape(-1, 1)

values = out_array.reshape(-1, 1)

sampled_negatives = _random_sampling(
indices=indices, values=values, sample_number=sample_number, random_seed=random_seed
)

sampled_negatives = sampled_negatives.reshape(1, -1)

row = sampled_negatives // total_cols
row = row[0]

col = np.mod(sampled_negatives, total_cols)
col = col[0]

out_array[row, col] = -1

x, y = rasterio.transform.xy(raster_meta["transform"], row, col)

points = [Point(x[i], y[i]) for i in range(len(x))]

sample_negative = gpd.GeoDataFrame(geometry=points)
sample_negative.set_crs(raster_meta["crs"], allow_override=True, inplace=True)

return sample_negative, out_array, raster_meta
Loading
Loading