diff --git a/CHANGES.rst b/CHANGES.rst index 8a269f76..af1a6fd4 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -23,6 +23,7 @@ New features and enhancements * ``cdm_mapper.utils.mapping_functions``: new mapping function `convert_to_decimal` (:pull:`370`) * ``test_data``: add MAROB test data (:pull:`370`) * ``mdf_reader.read_data``: new parameter "delimiter" (:pull:`370`) +* ``cdm_mapper.map_model``'s output now has attribute "attrs" where columns are stored (:pull:`379`) Breaking changes ^^^^^^^^^^^^^^^^ @@ -39,6 +40,8 @@ Breaking changes * ``DataBundle`` now converts all iterables of `pd.DataFrame`/`pd.Series` to `ParquetStreamReader` when initialized (:pull:`348`) * all main functions in `common.select` now return a tuple of 4 (selected values, rejected values, original indexes of selected values, original indexes of rejected values) (:pull:`348`) * move `ParquetStreamReader` and all corresponding methods to `common.iterables` to handle chunking outside of `mdf_reader`/`cdm_mapper`/`core`/`metmetpy` (:issue:`349`, :pull:`348`) +* `cdm_mapper.read_tables`: if "suffix" is None no suffix is selected instead of the wildcard "*" (:pull:`379`) +* `ParquetStreamReader.empty` now is a property not a class method (:pull:`379`) Internal changes ^^^^^^^^^^^^^^^^ @@ -46,6 +49,7 @@ Internal changes * use pre-defined `Literal` constants in `cdm_reader_mapper.properties` (:pull:`363`) * `mdf_reader.utils.utilities.read_csv`: parameter `columns` to `column_names` (:pull:`363`) * introduce post-processing decorator that handles both `pd.DataFrame` and `ParquetStreamReader` (:pull:`348`) +* `cdm_mapper.mapper._map_data_model` now returns a tuple of DataFrame and columns (:pull:`379`) 2.2.1 (2026-01-23) ------------------ diff --git a/cdm_reader_mapper/cdm_mapper/mapper.py b/cdm_reader_mapper/cdm_mapper/mapper.py index 7d4b64bc..639ac87a 100755 --- a/cdm_reader_mapper/cdm_mapper/mapper.py +++ b/cdm_reader_mapper/cdm_mapper/mapper.py @@ -377,7 +377,9 @@ def _map_data_model( table_df = table_df.astype(object) all_tables.append(table_df) - return pd.concat(all_tables, axis=1, join="outer").reset_index(drop=True) + tables_df = pd.concat(all_tables, axis=1, join="outer").reset_index(drop=True) + columns = tables_df.columns + return tables_df, columns def map_model( @@ -428,9 +430,13 @@ def map_model( ------- cdm_tables: pandas.DataFrame DataFrame with MultiIndex columns (cdm_table, column_name). + + Note + ---- + Column names will be written to `cdm_tables.attrs`. """ - @process_function(data_only=True) + @process_function() def _map_model(): return ProcessFunction( data=data, @@ -469,11 +475,13 @@ def _map_model(): cdm_tables = _prepare_cdm_tables(imodel_maps.keys()) - result = _map_model() + results = _map_model() + + result, columns = tuple(results) - if isinstance(result, pd.DataFrame): - return pd.DataFrame(result) - elif isinstance(result, ParquetStreamReader): + if isinstance(result, (pd.DataFrame, ParquetStreamReader)): + result = pd.DataFrame(result) if isinstance(result, pd.DataFrame) else result + result.attrs["columns"] = columns return result raise ValueError( diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index 5c4204ea..6d851658 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -124,10 +124,14 @@ def _read_multiple_files( **kwargs, ) -> list[pd.DataFrame]: if suffix is None: - suffix = "" + suffix_pattern = "*" + elif suffix == "*": + suffix_pattern = "*" + else: + suffix_pattern = f"*{suffix}" # See if there's anything at all: - pattern = get_filename([prefix, f"*{suffix}"], path=inp_dir, extension=extension) + pattern = get_filename([prefix, suffix_pattern], path=inp_dir, extension=extension) files = glob.glob(pattern) if len(files) == 0: @@ -147,7 +151,7 @@ def _read_multiple_files( if prefix: _pattern = [prefix] + _pattern if suffix: - _pattern = _pattern + [f"*{suffix}"] + _pattern = _pattern + [suffix_pattern] pattern_ = get_filename(_pattern, path=inp_dir, extension=extension) paths_ = glob.glob(pattern_) if len(paths_) != 1: diff --git a/cdm_reader_mapper/common/io_files.py b/cdm_reader_mapper/common/io_files.py index 18ea7199..7fbf1198 100755 --- a/cdm_reader_mapper/common/io_files.py +++ b/cdm_reader_mapper/common/io_files.py @@ -2,8 +2,7 @@ from __future__ import annotations -import os - +from pathlib import Path from typing import Sequence @@ -64,4 +63,6 @@ def get_filename( name = separator.join(filter(bool, pattern)) filename = f"{name}{extension}" - return os.path.join(path, filename) + + p = Path(path) + return str(p / filename) diff --git a/cdm_reader_mapper/common/iterators.py b/cdm_reader_mapper/common/iterators.py index 9f28e727..34aa4460 100755 --- a/cdm_reader_mapper/common/iterators.py +++ b/cdm_reader_mapper/common/iterators.py @@ -87,6 +87,8 @@ def __init__( self._generator = self._factory() + self.attrs = {} + def __iter__(self): """Allows: for df in reader: ...""" return self @@ -138,6 +140,7 @@ def copy(self): self._generator, new_gen = itertools.tee(self._generator) return ParquetStreamReader(new_gen) + @property def empty(self): """Return True if stream is empty.""" copy_stream = self.copy() @@ -315,14 +318,14 @@ def _process_chunks( if len(keys) == 1: output_non_data = output_non_data[keys[0]] - if isinstance(output_non_data, list) and len(output_non_data) == 1: - output_non_data = output_non_data[0] - if isinstance(non_data_proc, Callable): output_non_data = non_data_proc( output_non_data, *non_data_proc_args, **non_data_proc_kwargs ) + if isinstance(output_non_data, list) and len(output_non_data) == 1: + output_non_data = output_non_data[0] + # If no data outputs at all if temp_dirs is None: return output_non_data diff --git a/cdm_reader_mapper/core/databundle.py b/cdm_reader_mapper/core/databundle.py index bde599c9..25820343 100755 --- a/cdm_reader_mapper/core/databundle.py +++ b/cdm_reader_mapper/core/databundle.py @@ -267,11 +267,11 @@ def select_where_all_false( -------- Select without overwriting the old data. - >>> db_selected = db.select_where_all_true() + >>> db_selected = db.select_where_all_false() Select valid values only with overwriting the old data. - >>> db.select_where_all_true(inplace=True) + >>> db.select_where_all_false(inplace=True) >>> df_selected = db.data See Also @@ -319,7 +319,7 @@ def select_where_entry_isin( -------- Select without overwriting the old data. - >>> db_selected = db.select_from_list( + >>> db_selected = db.select_where_entry_isin( ... selection={("c1", "B1"): [26, 41]}, ... ) @@ -371,11 +371,11 @@ def select_where_index_isin( -------- Select without overwriting the old data. - >>> db_selected = db.select_from_index([0, 2, 4]) + >>> db_selected = db.select_where_index_isin([0, 2, 4]) Select with overwriting the old data. - >>> db.select_from_index(index=[0, 2, 4], inplace=True) + >>> db.select_where_index_isin(index=[0, 2, 4], inplace=True) >>> df_selected = db.data See Also @@ -414,7 +414,7 @@ def split_by_boolean_true( -------- Split DataBundle. - >>> db_true, db_false = db.split_where_all_true() + >>> db_true, db_false = db.split_by_boolean_true() See Also -------- @@ -458,7 +458,7 @@ def split_by_boolean_false( -------- Split DataBundle. - >>> db_false, db_true = db.split_where_all_false() + >>> db_false, db_true = db.split_by_boolean_false() See Also -------- @@ -505,7 +505,7 @@ def split_by_column_entries( -------- Split DataBundle. - >>> db_isin, db_isnotin = db.split_where_entry_isin( + >>> db_isin, db_isnotin = db.split_by_column_entries( ... selection={("c1", "B1"): [26, 41]}, ... ) @@ -554,7 +554,7 @@ def split_by_index( -------- Split DataBundle. - >>> db_isin, db_isnotin = db.select_from_index([0, 2, 4]) + >>> db_isin, db_isnotin = db.split_by_index([0, 2, 4]) See Also -------- @@ -803,7 +803,7 @@ def map_model(self, imodel=None, inplace=False, **kwargs) -> DataBundle | None: db_ = self._get_db(inplace) _tables = map_model(db_._data, imodel, **kwargs) db_._mode = "tables" - db_._columns = _tables.columns + db_._columns = _tables.attrs["columns"] db_._data = _tables return self._return_db(db_, inplace) diff --git a/tests/test_common.py b/tests/test_common.py index 962fa0bc..3bebdec9 100755 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -1201,12 +1201,12 @@ def test_copy_closed_stream_raises(): def test_empty_returns_true_if_empty(): reader = ParquetStreamReader(lambda: iter([])) - assert reader.empty() is True + assert reader.empty is True def test_empty_returns_false_if_not_empty(): reader = ParquetStreamReader(lambda: iter(make_chunks())) - assert reader.empty() is False + assert reader.empty is False def test_reset_index_continuous_index():