Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ New features and enhancements
* ``cdm_mapper.utils.mapping_functions``: new mapping function `convert_to_decimal` (:pull:`370`)
* ``test_data``: add MAROB test data (:pull:`370`)
* ``mdf_reader.read_data``: new parameter "delimiter" (:pull:`370`)
* ``cdm_mapper.map_model``'s output now has attribute "attrs" where columns are stored (:pull:`379`)

Breaking changes
^^^^^^^^^^^^^^^^
Expand All @@ -39,13 +40,16 @@ Breaking changes
* ``DataBundle`` now converts all iterables of `pd.DataFrame`/`pd.Series` to `ParquetStreamReader` when initialized (:pull:`348`)
* all main functions in `common.select` now return a tuple of 4 (selected values, rejected values, original indexes of selected values, original indexes of rejected values) (:pull:`348`)
* move `ParquetStreamReader` and all corresponding methods to `common.iterables` to handle chunking outside of `mdf_reader`/`cdm_mapper`/`core`/`metmetpy` (:issue:`349`, :pull:`348`)
* `cdm_mapper.read_tables`: if "suffix" is None no suffix is selected instead of the wildcard "*" (:pull:`379`)
* `ParquetStreamReader.empty` now is a property not a class method (:pull:`379`)

Internal changes
^^^^^^^^^^^^^^^^
* re-work internal structure for more readability and better performance (:pull:`360`)
* use pre-defined `Literal` constants in `cdm_reader_mapper.properties` (:pull:`363`)
* `mdf_reader.utils.utilities.read_csv`: parameter `columns` to `column_names` (:pull:`363`)
* introduce post-processing decorator that handles both `pd.DataFrame` and `ParquetStreamReader` (:pull:`348`)
* `cdm_mapper.mapper._map_data_model` now returns a tuple of DataFrame and columns (:pull:`379`)

2.2.1 (2026-01-23)
------------------
Expand Down
20 changes: 14 additions & 6 deletions cdm_reader_mapper/cdm_mapper/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,9 @@ def _map_data_model(
table_df = table_df.astype(object)
all_tables.append(table_df)

return pd.concat(all_tables, axis=1, join="outer").reset_index(drop=True)
tables_df = pd.concat(all_tables, axis=1, join="outer").reset_index(drop=True)
columns = tables_df.columns
return tables_df, columns


def map_model(
Expand Down Expand Up @@ -428,9 +430,13 @@ def map_model(
-------
cdm_tables: pandas.DataFrame
DataFrame with MultiIndex columns (cdm_table, column_name).

Note
----
Column names will be written to `cdm_tables.attrs`.
"""

@process_function(data_only=True)
@process_function()
def _map_model():
return ProcessFunction(
data=data,
Expand Down Expand Up @@ -469,11 +475,13 @@ def _map_model():

cdm_tables = _prepare_cdm_tables(imodel_maps.keys())

result = _map_model()
results = _map_model()

result, columns = tuple(results)

if isinstance(result, pd.DataFrame):
return pd.DataFrame(result)
elif isinstance(result, ParquetStreamReader):
if isinstance(result, (pd.DataFrame, ParquetStreamReader)):
result = pd.DataFrame(result) if isinstance(result, pd.DataFrame) else result
result.attrs["columns"] = columns
return result

raise ValueError(
Expand Down
10 changes: 7 additions & 3 deletions cdm_reader_mapper/cdm_mapper/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,14 @@ def _read_multiple_files(
**kwargs,
) -> list[pd.DataFrame]:
if suffix is None:
suffix = ""
suffix_pattern = "*"
elif suffix == "*":
suffix_pattern = "*"
else:
suffix_pattern = f"*{suffix}"

# See if there's anything at all:
pattern = get_filename([prefix, f"*{suffix}"], path=inp_dir, extension=extension)
pattern = get_filename([prefix, suffix_pattern], path=inp_dir, extension=extension)
files = glob.glob(pattern)

if len(files) == 0:
Expand All @@ -147,7 +151,7 @@ def _read_multiple_files(
if prefix:
_pattern = [prefix] + _pattern
if suffix:
_pattern = _pattern + [f"*{suffix}"]
_pattern = _pattern + [suffix_pattern]
pattern_ = get_filename(_pattern, path=inp_dir, extension=extension)
paths_ = glob.glob(pattern_)
if len(paths_) != 1:
Expand Down
7 changes: 4 additions & 3 deletions cdm_reader_mapper/common/io_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

from __future__ import annotations

import os

from pathlib import Path
from typing import Sequence


Expand Down Expand Up @@ -64,4 +63,6 @@ def get_filename(
name = separator.join(filter(bool, pattern))

filename = f"{name}{extension}"
return os.path.join(path, filename)

p = Path(path)
return str(p / filename)
9 changes: 6 additions & 3 deletions cdm_reader_mapper/common/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def __init__(

self._generator = self._factory()

self.attrs = {}

def __iter__(self):
"""Allows: for df in reader: ..."""
return self
Expand Down Expand Up @@ -138,6 +140,7 @@ def copy(self):
self._generator, new_gen = itertools.tee(self._generator)
return ParquetStreamReader(new_gen)

@property
def empty(self):
"""Return True if stream is empty."""
copy_stream = self.copy()
Expand Down Expand Up @@ -315,14 +318,14 @@ def _process_chunks(
if len(keys) == 1:
output_non_data = output_non_data[keys[0]]

if isinstance(output_non_data, list) and len(output_non_data) == 1:
output_non_data = output_non_data[0]

if isinstance(non_data_proc, Callable):
output_non_data = non_data_proc(
output_non_data, *non_data_proc_args, **non_data_proc_kwargs
)

if isinstance(output_non_data, list) and len(output_non_data) == 1:
output_non_data = output_non_data[0]

# If no data outputs at all
if temp_dirs is None:
return output_non_data
Expand Down
20 changes: 10 additions & 10 deletions cdm_reader_mapper/core/databundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ def select_where_all_false(
--------
Select without overwriting the old data.

>>> db_selected = db.select_where_all_true()
>>> db_selected = db.select_where_all_false()

Select valid values only with overwriting the old data.

>>> db.select_where_all_true(inplace=True)
>>> db.select_where_all_false(inplace=True)
>>> df_selected = db.data

See Also
Expand Down Expand Up @@ -319,7 +319,7 @@ def select_where_entry_isin(
--------
Select without overwriting the old data.

>>> db_selected = db.select_from_list(
>>> db_selected = db.select_where_entry_isin(
... selection={("c1", "B1"): [26, 41]},
... )

Expand Down Expand Up @@ -371,11 +371,11 @@ def select_where_index_isin(
--------
Select without overwriting the old data.

>>> db_selected = db.select_from_index([0, 2, 4])
>>> db_selected = db.select_where_index_isin([0, 2, 4])

Select with overwriting the old data.

>>> db.select_from_index(index=[0, 2, 4], inplace=True)
>>> db.select_where_index_isin(index=[0, 2, 4], inplace=True)
>>> df_selected = db.data

See Also
Expand Down Expand Up @@ -414,7 +414,7 @@ def split_by_boolean_true(
--------
Split DataBundle.

>>> db_true, db_false = db.split_where_all_true()
>>> db_true, db_false = db.split_by_boolean_true()

See Also
--------
Expand Down Expand Up @@ -458,7 +458,7 @@ def split_by_boolean_false(
--------
Split DataBundle.

>>> db_false, db_true = db.split_where_all_false()
>>> db_false, db_true = db.split_by_boolean_false()

See Also
--------
Expand Down Expand Up @@ -505,7 +505,7 @@ def split_by_column_entries(
--------
Split DataBundle.

>>> db_isin, db_isnotin = db.split_where_entry_isin(
>>> db_isin, db_isnotin = db.split_by_column_entries(
... selection={("c1", "B1"): [26, 41]},
... )

Expand Down Expand Up @@ -554,7 +554,7 @@ def split_by_index(
--------
Split DataBundle.

>>> db_isin, db_isnotin = db.select_from_index([0, 2, 4])
>>> db_isin, db_isnotin = db.split_by_index([0, 2, 4])

See Also
--------
Expand Down Expand Up @@ -803,7 +803,7 @@ def map_model(self, imodel=None, inplace=False, **kwargs) -> DataBundle | None:
db_ = self._get_db(inplace)
_tables = map_model(db_._data, imodel, **kwargs)
db_._mode = "tables"
db_._columns = _tables.columns
db_._columns = _tables.attrs["columns"]
db_._data = _tables
return self._return_db(db_, inplace)

Expand Down
4 changes: 2 additions & 2 deletions tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,12 +1201,12 @@ def test_copy_closed_stream_raises():

def test_empty_returns_true_if_empty():
reader = ParquetStreamReader(lambda: iter([]))
assert reader.empty() is True
assert reader.empty is True


def test_empty_returns_false_if_not_empty():
reader = ParquetStreamReader(lambda: iter(make_chunks()))
assert reader.empty() is False
assert reader.empty is False


def test_reset_index_continuous_index():
Expand Down
Loading