diff --git a/include/rayforce.h b/include/rayforce.h index b21b1579..d87bfdfd 100644 --- a/include/rayforce.h +++ b/include/rayforce.h @@ -304,6 +304,51 @@ ray_t* ray_timestamp(int64_t val); ray_t* ray_guid(const uint8_t* bytes); ray_t* ray_typed_null(int8_t type); +/* ===== Null Sentinel Values ===== + * + * Per-type null encoding for nullable scalar types. Callers compare values + * directly (e.g. `x == NULL_I64`, `x != x` for NaN); there are no predicate + * macros or aliases. Temporal types (DATE/TIME/TIMESTAMP) reuse NULL_I32 or + * NULL_I64 based on their storage width. SYM null = sym ID 0; STR null = + * empty string (length 0); BOOL and U8 are non-nullable. + * + * Phase 1 added the constants and locked BOOL/U8 down as non-nullable. + * Phase 2 wired NULL_F64 into the CSV parser, ray_typed_null, and the + * I64→F64 UPDATE cast — null F64 slots now hold NaN alongside the + * nullmap bit. + * Phase 3a generalized this to integer / temporal types (I16, I32, I64, + * DATE, TIME, TIMESTAMP). Producer surface mirrors Phase 2 — CSV + * parser, ray_typed_null, cast_vec_copy_nulls, set_all_null, + * store_typed_elem (lang/internal.h), UPDATE atom broadcast (3 sites), + * UPDATE WHERE numeric-promo cast, group-by key scatter (serial + + * parallel + grpt TOP_N), pivot key scatter, linkop deref. The + * grouped-aggregation consumer (da_accum_row + scalar_accum_row) gained + * per-agg integer-null guards in the SUM/AVG/STDDEV/VAR/PROD/MIN/MAX/ + * FIRST/LAST arms — sentinel-compare (`v != precomputed_sentinel`) + * rather than nullmap consultation for cache-line efficiency; the + * tradeoff (a user-stored INT_MIN in a HAS_NULLS column is dropped) + * is bounded by dual encoding keeping the bitmap as source of truth. + * Phase 3b closed the documented finalization gaps in the + * scalar and direct-array (DA) grouped accumulators: per-(group, agg) + * non-null counts (`nn_count[gid * n_aggs + a]`) drive AVG / VAR / + * STDDEV divisors and gate MIN / MAX / PROD / FIRST / LAST result + * emission — all-null groups now produce a typed null (NULL_F64 / + * NULL_I64 plus the nullmap bit) instead of leaking the accumulator + * seed (DBL_MAX / -DBL_MAX / 0 / product identity). FIRST/LAST also + * gained "skip null rows" semantics: a null prefix no longer advances + * acc->first_row[gid]. The multi-key radix HT (accum_from_entry, + * ~line 2155) still inherits the pre-existing nullable-agg gap noted + * at the sparse-path fallback (~line 5728). + * Through Phase 7 (full cutover) the bitmap bit `nullmap[0] & 1` is + * kept in sync with the sentinel value for atoms ("dual encoding"), so + * legacy bitmap-aware readers and new sentinel-aware readers agree. + * After Phase 7 the bitmap arm is reclaimed for inline stats and the + * bit becomes a pure optimization hint. */ +#define NULL_I16 ((int16_t)INT16_MIN) +#define NULL_I32 ((int32_t)INT32_MIN) +#define NULL_I64 ((int64_t)INT64_MIN) +#define NULL_F64 (__builtin_nan("")) + /* Null bitmap check for atoms — bit 0 of nullmap[0] marks typed nulls. * Also matches RAY_NULL_OBJ (the untyped null singleton). */ #define RAY_ATOM_IS_NULL(x) (RAY_IS_NULL(x) || ((x)->type < 0 && ((x)->nullmap[0] & 1))) diff --git a/src/io/csv.c b/src/io/csv.c index e5f810bd..212b4fba 100644 --- a/src/io/csv.c +++ b/src/io/csv.c @@ -929,14 +929,17 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, switch (ctx->col_types[c]) { case CSV_TYPE_BOOL: ((uint8_t*)ctx->col_data[c])[row] = 0; break; case CSV_TYPE_U8: ((uint8_t*)ctx->col_data[c])[row] = 0; break; - case CSV_TYPE_I16: ((int16_t*)ctx->col_data[c])[row] = 0; break; - case CSV_TYPE_I32: ((int32_t*)ctx->col_data[c])[row] = 0; break; - case CSV_TYPE_I64: ((int64_t*)ctx->col_data[c])[row] = 0; break; - case CSV_TYPE_F64: ((double*)ctx->col_data[c])[row] = 0.0; break; - case CSV_TYPE_DATE: ((int32_t*)ctx->col_data[c])[row] = 0; break; - case CSV_TYPE_TIME: ((int32_t*)ctx->col_data[c])[row] = 0; break; + /* Phase 3a dual encoding: integer/temporal nulls + * carry the width-correct INT_MIN sentinel in the + * payload alongside the nullmap bit. */ + case CSV_TYPE_I16: ((int16_t*)ctx->col_data[c])[row] = NULL_I16; break; + case CSV_TYPE_I32: ((int32_t*)ctx->col_data[c])[row] = NULL_I32; break; + case CSV_TYPE_I64: ((int64_t*)ctx->col_data[c])[row] = NULL_I64; break; + case CSV_TYPE_F64: ((double*)ctx->col_data[c])[row] = NULL_F64; break; + case CSV_TYPE_DATE: ((int32_t*)ctx->col_data[c])[row] = NULL_I32; break; + case CSV_TYPE_TIME: ((int32_t*)ctx->col_data[c])[row] = NULL_I32; break; case CSV_TYPE_TIMESTAMP: - ((int64_t*)ctx->col_data[c])[row] = 0; break; + ((int64_t*)ctx->col_data[c])[row] = NULL_I64; break; case CSV_TYPE_GUID: memset((uint8_t*)ctx->col_data[c] + (size_t)row * 16, 0, 16); break; @@ -946,8 +949,14 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, break; default: break; } - ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); - my_had_null[c] = true; + /* BOOL/U8 are non-nullable (Phase 1 lockdown). Empty + * cells store the default 0/false above and skip the + * nullmap mark. */ + if (ctx->col_types[c] != CSV_TYPE_BOOL && + ctx->col_types[c] != CSV_TYPE_U8) { + ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); + my_had_null[c] = true; + } } break; } @@ -963,19 +972,19 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, switch (ctx->col_types[c]) { case CSV_TYPE_BOOL: { + /* BOOL is non-nullable (Phase 1). fast_bool returns 0 + * for empty / unparseable input; we store it as-is and + * never mark a nullmap bit. */ bool is_null; uint8_t v = fast_bool(fld, flen, &is_null); ((uint8_t*)ctx->col_data[c])[row] = v; - if (is_null) { - ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); - my_had_null[c] = true; - } break; } case CSV_TYPE_I64: { bool is_null; int64_t v = fast_i64(fld, flen, &is_null); - ((int64_t*)ctx->col_data[c])[row] = v; + /* Phase 3a dual encoding: payload is NULL_I64 whenever nullmap bit is set. */ + ((int64_t*)ctx->col_data[c])[row] = is_null ? NULL_I64 : v; if (is_null) { ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); my_had_null[c] = true; @@ -983,19 +992,19 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, break; } case CSV_TYPE_U8: { + /* U8 is non-nullable (Phase 1). fast_i64 returns 0 for + * empty / unparseable input; we store it as-is and + * never mark a nullmap bit. */ bool is_null; int64_t v = fast_i64(fld, flen, &is_null); ((uint8_t*)ctx->col_data[c])[row] = (uint8_t)v; - if (is_null) { - ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); - my_had_null[c] = true; - } break; } case CSV_TYPE_I16: { bool is_null; int64_t v = fast_i64(fld, flen, &is_null); - ((int16_t*)ctx->col_data[c])[row] = (int16_t)v; + /* Phase 3a dual encoding: payload is NULL_I16 whenever nullmap bit is set. */ + ((int16_t*)ctx->col_data[c])[row] = is_null ? NULL_I16 : (int16_t)v; if (is_null) { ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); my_had_null[c] = true; @@ -1005,7 +1014,8 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, case CSV_TYPE_I32: { bool is_null; int64_t v = fast_i64(fld, flen, &is_null); - ((int32_t*)ctx->col_data[c])[row] = (int32_t)v; + /* Phase 3a dual encoding: payload is NULL_I32 whenever nullmap bit is set. */ + ((int32_t*)ctx->col_data[c])[row] = is_null ? NULL_I32 : (int32_t)v; if (is_null) { ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); my_had_null[c] = true; @@ -1015,7 +1025,8 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, case CSV_TYPE_F64: { bool is_null; double v = fast_f64(fld, flen, &is_null); - ((double*)ctx->col_data[c])[row] = v; + /* Phase 2 dual encoding: payload is NaN whenever nullmap bit is set. */ + ((double*)ctx->col_data[c])[row] = is_null ? NULL_F64 : v; if (is_null) { ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); my_had_null[c] = true; @@ -1025,7 +1036,8 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, case CSV_TYPE_DATE: { bool is_null; int32_t v = fast_date(fld, flen, &is_null); - ((int32_t*)ctx->col_data[c])[row] = v; + /* Phase 3a dual encoding: payload is NULL_I32 whenever nullmap bit is set. */ + ((int32_t*)ctx->col_data[c])[row] = is_null ? NULL_I32 : v; if (is_null) { ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); my_had_null[c] = true; @@ -1035,7 +1047,8 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, case CSV_TYPE_TIME: { bool is_null; int32_t v = fast_time(fld, flen, &is_null); - ((int32_t*)ctx->col_data[c])[row] = v; + /* Phase 3a dual encoding: payload is NULL_I32 whenever nullmap bit is set. */ + ((int32_t*)ctx->col_data[c])[row] = is_null ? NULL_I32 : v; if (is_null) { ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); my_had_null[c] = true; @@ -1045,7 +1058,8 @@ static void csv_parse_fn(void* arg, uint32_t worker_id, case CSV_TYPE_TIMESTAMP: { bool is_null; int64_t v = fast_timestamp(fld, flen, &is_null); - ((int64_t*)ctx->col_data[c])[row] = v; + /* Phase 3a dual encoding: payload is NULL_I64 whenever nullmap bit is set. */ + ((int64_t*)ctx->col_data[c])[row] = is_null ? NULL_I64 : v; if (is_null) { ctx->col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); my_had_null[c] = true; @@ -1122,14 +1136,17 @@ static void csv_parse_serial(const char* buf, size_t buf_size, switch (col_types[c]) { case CSV_TYPE_BOOL: ((uint8_t*)col_data[c])[row] = 0; break; case CSV_TYPE_U8: ((uint8_t*)col_data[c])[row] = 0; break; - case CSV_TYPE_I16: ((int16_t*)col_data[c])[row] = 0; break; - case CSV_TYPE_I32: ((int32_t*)col_data[c])[row] = 0; break; - case CSV_TYPE_I64: ((int64_t*)col_data[c])[row] = 0; break; - case CSV_TYPE_F64: ((double*)col_data[c])[row] = 0.0; break; - case CSV_TYPE_DATE: ((int32_t*)col_data[c])[row] = 0; break; - case CSV_TYPE_TIME: ((int32_t*)col_data[c])[row] = 0; break; + /* Phase 3a dual encoding: integer/temporal nulls + * carry the width-correct INT_MIN sentinel in the + * payload alongside the nullmap bit. */ + case CSV_TYPE_I16: ((int16_t*)col_data[c])[row] = NULL_I16; break; + case CSV_TYPE_I32: ((int32_t*)col_data[c])[row] = NULL_I32; break; + case CSV_TYPE_I64: ((int64_t*)col_data[c])[row] = NULL_I64; break; + case CSV_TYPE_F64: ((double*)col_data[c])[row] = NULL_F64; break; + case CSV_TYPE_DATE: ((int32_t*)col_data[c])[row] = NULL_I32; break; + case CSV_TYPE_TIME: ((int32_t*)col_data[c])[row] = NULL_I32; break; case CSV_TYPE_TIMESTAMP: - ((int64_t*)col_data[c])[row] = 0; break; + ((int64_t*)col_data[c])[row] = NULL_I64; break; case CSV_TYPE_GUID: memset((uint8_t*)col_data[c] + (size_t)row * 16, 0, 16); break; @@ -1139,8 +1156,12 @@ static void csv_parse_serial(const char* buf, size_t buf_size, break; default: break; } - col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); - col_had_null[c] = true; + /* BOOL/U8 are non-nullable (Phase 1 lockdown). */ + if (col_types[c] != CSV_TYPE_BOOL && + col_types[c] != CSV_TYPE_U8) { + col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); + col_had_null[c] = true; + } } break; } @@ -1156,19 +1177,17 @@ static void csv_parse_serial(const char* buf, size_t buf_size, switch (col_types[c]) { case CSV_TYPE_BOOL: { + /* BOOL is non-nullable (Phase 1 lockdown). */ bool is_null; uint8_t v = fast_bool(fld, flen, &is_null); ((uint8_t*)col_data[c])[row] = v; - if (is_null) { - col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); - col_had_null[c] = true; - } break; } case CSV_TYPE_I64: { bool is_null; int64_t v = fast_i64(fld, flen, &is_null); - ((int64_t*)col_data[c])[row] = v; + /* Phase 3a dual encoding: payload is NULL_I64 whenever nullmap bit is set. */ + ((int64_t*)col_data[c])[row] = is_null ? NULL_I64 : v; if (is_null) { col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); col_had_null[c] = true; @@ -1176,19 +1195,17 @@ static void csv_parse_serial(const char* buf, size_t buf_size, break; } case CSV_TYPE_U8: { + /* U8 is non-nullable (Phase 1 lockdown). */ bool is_null; int64_t v = fast_i64(fld, flen, &is_null); ((uint8_t*)col_data[c])[row] = (uint8_t)v; - if (is_null) { - col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); - col_had_null[c] = true; - } break; } case CSV_TYPE_I16: { bool is_null; int64_t v = fast_i64(fld, flen, &is_null); - ((int16_t*)col_data[c])[row] = (int16_t)v; + /* Phase 3a dual encoding: payload is NULL_I16 whenever nullmap bit is set. */ + ((int16_t*)col_data[c])[row] = is_null ? NULL_I16 : (int16_t)v; if (is_null) { col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); col_had_null[c] = true; @@ -1198,7 +1215,8 @@ static void csv_parse_serial(const char* buf, size_t buf_size, case CSV_TYPE_I32: { bool is_null; int64_t v = fast_i64(fld, flen, &is_null); - ((int32_t*)col_data[c])[row] = (int32_t)v; + /* Phase 3a dual encoding: payload is NULL_I32 whenever nullmap bit is set. */ + ((int32_t*)col_data[c])[row] = is_null ? NULL_I32 : (int32_t)v; if (is_null) { col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); col_had_null[c] = true; @@ -1208,7 +1226,8 @@ static void csv_parse_serial(const char* buf, size_t buf_size, case CSV_TYPE_F64: { bool is_null; double v = fast_f64(fld, flen, &is_null); - ((double*)col_data[c])[row] = v; + /* Phase 2 dual encoding: payload is NaN whenever nullmap bit is set. */ + ((double*)col_data[c])[row] = is_null ? NULL_F64 : v; if (is_null) { col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); col_had_null[c] = true; @@ -1218,7 +1237,8 @@ static void csv_parse_serial(const char* buf, size_t buf_size, case CSV_TYPE_DATE: { bool is_null; int32_t v = fast_date(fld, flen, &is_null); - ((int32_t*)col_data[c])[row] = v; + /* Phase 3a dual encoding: payload is NULL_I32 whenever nullmap bit is set. */ + ((int32_t*)col_data[c])[row] = is_null ? NULL_I32 : v; if (is_null) { col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); col_had_null[c] = true; @@ -1228,7 +1248,8 @@ static void csv_parse_serial(const char* buf, size_t buf_size, case CSV_TYPE_TIME: { bool is_null; int32_t v = fast_time(fld, flen, &is_null); - ((int32_t*)col_data[c])[row] = v; + /* Phase 3a dual encoding: payload is NULL_I32 whenever nullmap bit is set. */ + ((int32_t*)col_data[c])[row] = is_null ? NULL_I32 : v; if (is_null) { col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); col_had_null[c] = true; @@ -1238,7 +1259,8 @@ static void csv_parse_serial(const char* buf, size_t buf_size, case CSV_TYPE_TIMESTAMP: { bool is_null; int64_t v = fast_timestamp(fld, flen, &is_null); - ((int64_t*)col_data[c])[row] = v; + /* Phase 3a dual encoding: payload is NULL_I64 whenever nullmap bit is set. */ + ((int64_t*)col_data[c])[row] = is_null ? NULL_I64 : v; if (is_null) { col_nullmaps[c][row >> 3] |= (uint8_t)(1u << (row & 7)); col_had_null[c] = true; diff --git a/src/lang/internal.h b/src/lang/internal.h index ac69f3a7..5fdcbf41 100644 --- a/src/lang/internal.h +++ b/src/lang/internal.h @@ -277,8 +277,23 @@ static inline int64_t elem_as_i64(ray_t* elem) { * Returns 0 on success, -1 if the element type doesn't match. */ static inline int store_typed_elem(ray_t* vec, int64_t i, ray_t* elem) { if (RAY_ATOM_IS_NULL(elem)) { - int esz = ray_elem_size(vec->type); - memset((char*)ray_data(vec) + i * esz, 0, esz); + /* Phase 2/3a dual-encoding: payload must carry the width-correct + * sentinel alongside the nullmap bit. */ + switch (vec->type) { + case RAY_F64: + ((double*)ray_data(vec))[i] = NULL_F64; break; + case RAY_I64: case RAY_TIMESTAMP: + ((int64_t*)ray_data(vec))[i] = NULL_I64; break; + case RAY_I32: case RAY_DATE: case RAY_TIME: + ((int32_t*)ray_data(vec))[i] = NULL_I32; break; + case RAY_I16: + ((int16_t*)ray_data(vec))[i] = NULL_I16; break; + default: { + int esz = ray_elem_size(vec->type); + memset((char*)ray_data(vec) + i * esz, 0, esz); + break; + } + } ray_vec_set_null(vec, i, true); return 0; } diff --git a/src/lang/parse.c b/src/lang/parse.c index 73ee640f..d95becb9 100644 --- a/src/lang/parse.c +++ b/src/lang/parse.c @@ -634,8 +634,14 @@ static ray_t* parse_vector(ray_parser_t *p) { } vec->len = count; for (int32_t i = 0; i < count; i++) { - if (RAY_ATOM_IS_NULL(elems[i])) + if (RAY_ATOM_IS_NULL(elems[i])) { ray_vec_set_null(vec, i, true); + /* Phase 2 dual-encoding: a non-F64 typed null (0Nl/0Ni/0Nh) + * carries i64 = 0, so the cast above wrote 0.0 to the slot. + * Overwrite with NULL_F64 so raw-payload consumers see NaN. + * Null F64 atoms already carry NULL_F64 from ray_typed_null. */ + d[i] = NULL_F64; + } ray_release(elems[i]); } return vec; diff --git a/src/ops/builtins.c b/src/ops/builtins.c index 655f7869..130000d9 100644 --- a/src/ops/builtins.c +++ b/src/ops/builtins.c @@ -755,6 +755,41 @@ static ray_t* cast_vec_copy_nulls(ray_t* vec, ray_t* val) { if (le[j] && RAY_ATOM_IS_NULL(le[j])) ray_vec_set_null(vec, j, true); } + /* Phase 2/3a dual encoding: when the destination has nulls, fill each + * null payload slot with the correct-width sentinel so consumers that + * read the raw payload (without consulting the bitmap) honor the null + * contract. Narrowing casts (Hazard 3) require writing the dest-width + * sentinel directly — propagating through the cast macro produces + * (int16_t)NULL_I32 = 0 etc., which collides with a legitimate value. */ + if (vec->attrs & RAY_ATTR_HAS_NULLS) { + switch (vec->type) { + case RAY_F64: { + double* d = (double*)ray_data(vec); + for (int64_t j = 0; j < vec->len; j++) + if (ray_vec_is_null(vec, j)) d[j] = NULL_F64; + break; + } + case RAY_I64: case RAY_TIMESTAMP: { + int64_t* d = (int64_t*)ray_data(vec); + for (int64_t j = 0; j < vec->len; j++) + if (ray_vec_is_null(vec, j)) d[j] = NULL_I64; + break; + } + case RAY_I32: case RAY_DATE: case RAY_TIME: { + int32_t* d = (int32_t*)ray_data(vec); + for (int64_t j = 0; j < vec->len; j++) + if (ray_vec_is_null(vec, j)) d[j] = NULL_I32; + break; + } + case RAY_I16: { + int16_t* d = (int16_t*)ray_data(vec); + for (int64_t j = 0; j < vec->len; j++) + if (ray_vec_is_null(vec, j)) d[j] = NULL_I16; + break; + } + default: break; + } + } return vec; } @@ -1802,8 +1837,13 @@ ray_t* ray_enlist_fn(ray_t** args, int64_t n) { d[i] = (args[i]->type == -RAY_F64) ? args[i]->f64 : (double)args[i]->i64; vec->len = n; for (int64_t i = 0; i < n; i++) { - if (RAY_ATOM_IS_NULL(args[i])) + if (RAY_ATOM_IS_NULL(args[i])) { + /* Dual-encoding contract: the (double)NULL_I64 cast above + * produces a large finite value (~-9.22e18), not NaN. Stamp + * the F64 sentinel so the payload matches the bitmap bit. */ + d[i] = NULL_F64; ray_vec_set_null(vec, i, true); + } } return vec; } @@ -2602,8 +2642,26 @@ ray_t* ray_group_fn(ray_t* x) { * the first row index suffices. */ if (idx_vecs[g] && idx_vecs[g]->len > 0) { int64_t first_row = ((int64_t*)ray_data(idx_vecs[g]))[0]; - if (ray_vec_is_null(x, first_row)) + if (ray_vec_is_null(x, first_row)) { ray_vec_set_null(keys_vec, g, true); + /* Dual-encoding contract: the payload at slot g must hold + * the width-correct null sentinel so sentinel-aware readers + * agree with the bitmap. */ + void* base = ray_data(keys_vec); + switch (key_type) { + case RAY_I64: case RAY_TIMESTAMP: + ((int64_t*)base)[g] = NULL_I64; break; + case RAY_I32: case RAY_DATE: case RAY_TIME: + ((int32_t*)base)[g] = NULL_I32; break; + case RAY_I16: + ((int16_t*)base)[g] = NULL_I16; break; + case RAY_F64: + ((double*)base)[g] = NULL_F64; break; + case RAY_F32: + ((float*)base)[g] = (float)NULL_F64; break; + default: break; /* SYM/BOOL/U8: no sentinel slot */ + } + } } } diff --git a/src/ops/expr.c b/src/ops/expr.c index 648a2551..bb913a64 100644 --- a/src/ops/expr.c +++ b/src/ops/expr.c @@ -295,6 +295,11 @@ bool try_linear_sumavg_input_i64(ray_graph_t* g, ray_t* tbl, ray_op_t* input_op, for (uint8_t i = 0; i < lin.n_terms; i++) { ray_t* col = ray_table_get_col(tbl, lin.syms[i]); if (!col || !type_is_linear_i64_col(col->type)) return false; + /* Phase 3a: scalar_sum_linear_i64_fn reads slots raw via + * scalar_i64_at; any nullable term would poison the sum with + * NULL_I{16,32,64} sentinels. Refuse the fast plan and let + * the caller fall back to the generic masked path. */ + if (col->attrs & RAY_ATTR_HAS_NULLS) return false; out_plan->term_ptrs[i] = ray_data(col); out_plan->term_types[i] = col->type; out_plan->coeff_i64[i] = lin.coeff_i64[i]; @@ -936,14 +941,15 @@ static void expr_full_fn(void* ctx, uint32_t worker_id, int64_t start, int64_t e /* Post-pass for the fused unary path: |INT64_MIN| and -INT64_MIN don't fit in * i64 (signed-overflow; k/q convention surfaces this as typed null). The * element-wise loop uses unsigned wrap, so any overflow position lands as - * INT64_MIN in data. Convert each such position to typed-null: zero data[i] - * (preserve "null position is 0" invariant) and set the null bit. Caller - * must invoke single-threaded — after pool dispatch joins. */ + * INT64_MIN in data. Post Phase 3a-1, INT64_MIN IS the canonical NULL_I64 + * sentinel — the dual-encoding contract requires the payload to *remain* + * INT64_MIN while the null bit is set. So we only need to flip the bitmap + * bit; the payload is already correct. Caller must invoke single-threaded + * — after pool dispatch joins. */ static void mark_i64_overflow_as_null(ray_t* result, int64_t off, int64_t len) { int64_t* d = (int64_t*)ray_data(result) + off; for (int64_t i = 0; i < len; i++) { - if (RAY_UNLIKELY(d[i] == INT64_MIN)) { - d[i] = 0; + if (RAY_UNLIKELY(d[i] == NULL_I64)) { ray_vec_set_null(result, off + i, true); } } @@ -1228,6 +1234,32 @@ static void set_all_null(ray_t* result, int64_t len) { } else { for (int64_t i = 0; i < len; i++) ray_vec_set_null(result, i, true); } + /* Phase 2/3a dual-encoding: results must also carry the matching + * width sentinel in every payload slot so raw-payload consumers see + * the null marker without consulting the bitmap. */ + switch (result->type) { + case RAY_F64: { + double* d = (double*)ray_data(result); + for (int64_t i = 0; i < len; i++) d[i] = NULL_F64; + break; + } + case RAY_I64: case RAY_TIMESTAMP: { + int64_t* d = (int64_t*)ray_data(result); + for (int64_t i = 0; i < len; i++) d[i] = NULL_I64; + break; + } + case RAY_I32: case RAY_DATE: case RAY_TIME: { + int32_t* d = (int32_t*)ray_data(result); + for (int64_t i = 0; i < len; i++) d[i] = NULL_I32; + break; + } + case RAY_I16: { + int16_t* d = (int16_t*)ray_data(result); + for (int64_t i = 0; i < len; i++) d[i] = NULL_I16; + break; + } + default: break; + } } /* Propagate null bitmaps for binary ops: null in either operand → null in result. */ diff --git a/src/ops/group.c b/src/ops/group.c index e3a563b2..e19c410d 100644 --- a/src/ops/group.c +++ b/src/ops/group.c @@ -1273,7 +1273,7 @@ static void med_per_group_fn(void* ctx_v, uint32_t worker_id, } } if (actual == 0) { - c->out_data[g] = 0.0; + c->out_data[g] = NULL_F64; ray_vec_set_null(c->out, g, true); } else { c->out_data[g] = ray_median_dbl_inplace(slice, actual); @@ -2816,6 +2816,26 @@ static void radix_phase3_fn(void* ctx, uint32_t worker_id, int64_t start, int64_ if (null_mask & (int64_t)(1u << k)) { if (c->key_cols && c->key_cols[k]) grp_set_null(c->key_cols[k], di); + /* Phase 2/3a dual encoding: fill correct-width sentinel. */ + char* dst = c->key_dsts[k]; + uint8_t esz = c->key_esizes[k]; + size_t off = (size_t)di * esz; + int8_t kt = c->key_types[k]; + switch (kt) { + case RAY_F64: { + double v = NULL_F64; memcpy(dst + off, &v, 8); break; + } + case RAY_I64: case RAY_TIMESTAMP: { + int64_t v = NULL_I64; memcpy(dst + off, &v, 8); break; + } + case RAY_I32: case RAY_DATE: case RAY_TIME: { + int32_t v = NULL_I32; memcpy(dst + off, &v, 4); break; + } + case RAY_I16: { + int16_t v = NULL_I16; memcpy(dst + off, &v, 2); break; + } + default: break; + } continue; } int64_t kv = rkeys[k]; @@ -2877,7 +2897,7 @@ static void radix_phase3_fn(void* ctx, uint32_t worker_id, int64_t start, int64_ case OP_VAR: case OP_VAR_POP: case OP_STDDEV: case OP_STDDEV_POP: { bool insuf = (op == OP_VAR || op == OP_STDDEV) ? cnt <= 1 : cnt <= 0; - if (insuf) { v = 0.0; grp_set_null(ao->vec, di); break; } + if (insuf) { v = NULL_F64; grp_set_null(ao->vec, di); break; } double sum_val = sf ? ROW_RD_F64(row, ly->off_sum, s) : (double)ROW_RD_I64(row, ly->off_sum, s); double sq_val = ly->off_sumsq ? ROW_RD_F64(row, ly->off_sumsq, s) : 0.0; @@ -3051,6 +3071,12 @@ typedef struct { da_val_t* max_val; /* MAX [n_slots * n_aggs] */ double* sumsq_f64; /* sum-of-squares for STDDEV/VAR */ int64_t* count; /* group counts [n_slots] */ + int64_t* nn_count; /* per-(group, agg) non-null counts [n_slots * n_aggs]; + * incremented inside the F64 NaN-skip / integer + * sentinel-skip guards. Drives null-aware divisors + * (AVG/VAR/STDDEV) and all-null finalization + * (MIN/MAX/PROD/FIRST/LAST). NULL when none of the + * aggs needs null tracking (no HAS_NULLS columns). */ int64_t* first_row; /* min row index seen per slot (FIRST) [n_slots] */ int64_t* last_row; /* max row index seen per slot (LAST) [n_slots] */ /* Arena headers */ @@ -3059,6 +3085,7 @@ typedef struct { ray_t* _h_max; ray_t* _h_sumsq; ray_t* _h_count; + ray_t* _h_nn_count; ray_t* _h_first_row; ray_t* _h_last_row; } da_accum_t; @@ -3069,12 +3096,17 @@ static inline void da_accum_free(da_accum_t* a) { scratch_free(a->_h_max); scratch_free(a->_h_sumsq); scratch_free(a->_h_count); + scratch_free(a->_h_nn_count); scratch_free(a->_h_first_row); scratch_free(a->_h_last_row); } /* Unified agg result emitter — used by both DA and HT paths. - * Arrays indexed by [gi * n_aggs + a], counts by [gi]. */ + * Arrays indexed by [gi * n_aggs + a], counts by [gi]. nn_counts (if + * non-NULL) carries the per-(group, agg) non-null row count: AVG/VAR/ + * STDDEV use it as the divisor and MIN/MAX/PROD/FIRST/LAST emit a typed + * null when it is zero. Pass NULL to keep the legacy count[gid]-divisor + * behaviour (callers without HAS_NULLS aggs need not allocate it). */ static void emit_agg_columns(ray_t** result, ray_graph_t* g, const ray_op_ext_t* ext, ray_t* const* agg_vecs, uint32_t grp_count, uint8_t n_aggs, @@ -3083,7 +3115,8 @@ static void emit_agg_columns(ray_t** result, ray_graph_t* g, const ray_op_ext_t* const int64_t* min_i64, const int64_t* max_i64, const int64_t* counts, const agg_affine_t* affine, - const double* sumsq_f64) { + const double* sumsq_f64, + const int64_t* nn_counts) { for (uint8_t a = 0; a < n_aggs; a++) { uint16_t agg_op = ext->agg_ops[a]; ray_t* agg_col = agg_vecs[a]; @@ -3105,6 +3138,11 @@ static void emit_agg_columns(ray_t** result, ray_graph_t* g, const ray_op_ext_t* new_col->len = (int64_t)grp_count; for (uint32_t gi = 0; gi < grp_count; gi++) { size_t idx = (size_t)gi * n_aggs + a; + /* nn_counts[idx] == 0 means the group is all-null for this + * agg column — null-aware operators (MIN/MAX/PROD/FIRST/LAST/ + * AVG/VAR/STDDEV) must surface a typed null instead of leaking + * the accumulator seed (DBL_MAX / -DBL_MAX / 0). */ + int64_t nn = nn_counts ? nn_counts[idx] : counts[gi]; if (out_type == RAY_F64) { double v; switch (agg_op) { @@ -3114,22 +3152,29 @@ static void emit_agg_columns(ray_t** result, ray_graph_t* g, const ray_op_ext_t* v += affine[a].bias_f64 * counts[gi]; break; case OP_PROD: + if (nn == 0) { v = NULL_F64; ray_vec_set_null(new_col, gi, true); break; } v = is_f64 ? sum_f64[idx] : (double)sum_i64[idx]; break; case OP_AVG: - v = is_f64 ? sum_f64[idx] / counts[gi] : (double)sum_i64[idx] / counts[gi]; + if (nn == 0) { v = NULL_F64; ray_vec_set_null(new_col, gi, true); break; } + v = is_f64 ? sum_f64[idx] / nn : (double)sum_i64[idx] / nn; if (affine && affine[a].enabled) v += affine[a].bias_f64; break; - case OP_MIN: v = is_f64 ? min_f64[idx] : (double)min_i64[idx]; break; - case OP_MAX: v = is_f64 ? max_f64[idx] : (double)max_i64[idx]; break; + case OP_MIN: + if (nn == 0) { v = NULL_F64; ray_vec_set_null(new_col, gi, true); break; } + v = is_f64 ? min_f64[idx] : (double)min_i64[idx]; break; + case OP_MAX: + if (nn == 0) { v = NULL_F64; ray_vec_set_null(new_col, gi, true); break; } + v = is_f64 ? max_f64[idx] : (double)max_i64[idx]; break; case OP_FIRST: case OP_LAST: + if (nn == 0) { v = NULL_F64; ray_vec_set_null(new_col, gi, true); break; } v = is_f64 ? sum_f64[idx] : (double)sum_i64[idx]; break; case OP_VAR: case OP_VAR_POP: case OP_STDDEV: case OP_STDDEV_POP: { - int64_t cnt = counts[gi]; + int64_t cnt = nn; bool insuf = (agg_op == OP_VAR || agg_op == OP_STDDEV) ? cnt <= 1 : cnt <= 0; - if (insuf) { v = 0.0; ray_vec_set_null(new_col, gi, true); break; } + if (insuf) { v = NULL_F64; ray_vec_set_null(new_col, gi, true); break; } double sum_val = is_f64 ? sum_f64[idx] : (double)sum_i64[idx]; double sq_val = sumsq_f64 ? sumsq_f64[idx] : 0.0; double mean = sum_val / cnt; @@ -3152,11 +3197,19 @@ static void emit_agg_columns(ray_t** result, ray_graph_t* g, const ray_op_ext_t* if (affine && affine[a].enabled) v += affine[a].bias_i64 * counts[gi]; break; - case OP_PROD: v = sum_i64[idx]; break; + case OP_PROD: + if (nn == 0) { v = NULL_I64; ray_vec_set_null(new_col, gi, true); break; } + v = sum_i64[idx]; break; case OP_COUNT: v = counts[gi]; break; - case OP_MIN: v = min_i64[idx]; break; - case OP_MAX: v = max_i64[idx]; break; - case OP_FIRST: case OP_LAST: v = sum_i64[idx]; break; + case OP_MIN: + if (nn == 0) { v = NULL_I64; ray_vec_set_null(new_col, gi, true); break; } + v = min_i64[idx]; break; + case OP_MAX: + if (nn == 0) { v = NULL_I64; ray_vec_set_null(new_col, gi, true); break; } + v = max_i64[idx]; break; + case OP_FIRST: case OP_LAST: + if (nn == 0) { v = NULL_I64; ray_vec_set_null(new_col, gi, true); break; } + v = sum_i64[idx]; break; default: v = 0; break; } ((int64_t*)ray_data(new_col))[gi] = v; @@ -3257,6 +3310,8 @@ typedef struct { uint8_t n_aggs; uint8_t need_flags; /* DA_NEED_* bitmask */ uint32_t agg_f64_mask; /* bitmask: bit a set if agg[a] is RAY_F64 */ + uint32_t agg_int_null_mask; /* bitmask: bit a set if agg[a] is an integer col with HAS_NULLS */ + int64_t* agg_int_null_sentinel; /* per-agg int sentinel (NULL_I64 etc) when bit set in mask */ bool all_sum; /* true when all ops are SUM/AVG/COUNT (no MIN/MAX/FIRST/LAST) */ uint32_t n_slots; const int64_t* match_idx; /* NULL = no selection */ @@ -3637,6 +3692,18 @@ static ray_t* materialize_broadcast_input(ray_t* src, int64_t nrows) { } } +/* Per-type integer null sentinel for an aggregation column. Returns 0 for + * non-nullable / non-integer types (BOOL, U8, SYM, F64) since 0 will never + * match a read_col_i64 value flagged as null via agg_int_null_mask. */ +static inline int64_t agg_int_null_sentinel_for(int8_t t) { + switch (t) { + case RAY_I64: case RAY_TIMESTAMP: return NULL_I64; + case RAY_I32: case RAY_DATE: case RAY_TIME: return (int64_t)NULL_I32; + case RAY_I16: return (int64_t)NULL_I16; + default: return 0; + } +} + /* ---- Scalar aggregate (n_keys==0): one flat scan, no GID, no hash ---- */ typedef struct { void** agg_ptrs; @@ -3652,6 +3719,9 @@ typedef struct { /* per-worker accumulators (1 slot each) */ da_accum_t* accums; uint32_t n_accums; + /* Phase 3a: per-agg integer-null sentinel + mask (mirrors da_ctx_t). */ + uint32_t agg_int_null_mask; + int64_t* agg_int_null_sentinel; } scalar_ctx_t; static inline int64_t scalar_i64_at(const void* ptr, int8_t type, int64_t r) { @@ -3711,6 +3781,11 @@ static void scalar_sum_linear_i64_fn(void* ctx, uint32_t worker_id, int64_t star static inline void scalar_accum_row(scalar_ctx_t* c, da_accum_t* acc, int64_t r) { uint8_t n_aggs = c->n_aggs; acc->count[0]++; + /* Per-(group, agg) non-null counters drive AVG/VAR/STDDEV divisors + * and all-null finalization for MIN/MAX/PROD/FIRST/LAST. Only + * allocated when at least one agg can produce a null + * (acc->nn_count != NULL). */ + int64_t* nn = acc->nn_count; for (uint8_t a = 0; a < n_aggs; a++) { double fv; int64_t iv; if (c->agg_linear && c->agg_linear[a].enabled) { @@ -3732,30 +3807,64 @@ static inline void scalar_accum_row(scalar_ctx_t* c, da_accum_t* acc, int64_t r) } uint16_t op = c->agg_ops[a]; bool is_f = (c->agg_types[a] == RAY_F64); + /* Phase 3a dual encoding: NULL_I* sentinel = null. */ + bool int_null = !is_f && (c->agg_int_null_mask & (1u << a)) && + iv == c->agg_int_null_sentinel[a]; + bool is_null = is_f ? !(fv == fv) : int_null; if (op == OP_SUM || op == OP_AVG || op == OP_STDDEV || op == OP_STDDEV_POP || op == OP_VAR || op == OP_VAR_POP) { - if (is_f) acc->sum[a].f += fv; - else acc->sum[a].i += iv; - if (acc->sumsq_f64) acc->sumsq_f64[a] += fv * fv; + if (is_f) { + /* Phase 2 dual encoding: NaN payload = null, skip from sum/sumsq. */ + if (RAY_LIKELY(fv == fv)) { + acc->sum[a].f += fv; + if (acc->sumsq_f64) acc->sumsq_f64[a] += fv * fv; + if (nn) nn[a]++; + } + } else if (RAY_LIKELY(!int_null)) { + acc->sum[a].i += iv; + if (acc->sumsq_f64) acc->sumsq_f64[a] += fv * fv; + if (nn) nn[a]++; + } } else if (op == OP_PROD) { + /* "First non-null" marker: nn[a]==0 when nn is tracked, + * otherwise count[0]==1 (always non-null without nn). */ + bool first_seen = nn ? (nn[a] == 0) : (acc->count[0] == 1); if (is_f) { - if (acc->count[0] == 1) acc->sum[a].f = fv; - else acc->sum[a].f *= fv; - } else { - if (acc->count[0] == 1) acc->sum[a].i = iv; + if (fv == fv) { + if (first_seen) acc->sum[a].f = fv; + else acc->sum[a].f *= fv; + if (nn) nn[a]++; + } + } else if (RAY_LIKELY(!int_null)) { + if (first_seen) acc->sum[a].i = iv; else acc->sum[a].i = (int64_t)((uint64_t)acc->sum[a].i * (uint64_t)iv); + if (nn) nn[a]++; } } else if (op == OP_FIRST) { - if (acc->count[0] == 1) { - if (is_f) acc->sum[a].f = fv; else acc->sum[a].i = iv; + /* Only commit the value AND advance the "first non-null seen" + * marker when the row is non-null — otherwise a null at row 0 + * would block every later non-null row. */ + if (!is_null) { + bool first_seen = nn ? (nn[a] == 0) : (acc->count[0] == 1); + if (first_seen) { + if (is_f) acc->sum[a].f = fv; + else acc->sum[a].i = iv; + } + if (nn) nn[a]++; } } else if (op == OP_LAST) { - if (is_f) acc->sum[a].f = fv; else acc->sum[a].i = iv; + if (!is_null) { + if (is_f) acc->sum[a].f = fv; + else acc->sum[a].i = iv; + if (nn) nn[a]++; + } } else if (op == OP_MIN) { - if (is_f) { if (fv < acc->min_val[a].f) acc->min_val[a].f = fv; } - else { if (iv < acc->min_val[a].i) acc->min_val[a].i = iv; } + if (is_f) { if (fv == fv && fv < acc->min_val[a].f) acc->min_val[a].f = fv; } + else if (!int_null) { if (iv < acc->min_val[a].i) acc->min_val[a].i = iv; } + if (!is_null && nn) nn[a]++; } else if (op == OP_MAX) { - if (is_f) { if (fv > acc->max_val[a].f) acc->max_val[a].f = fv; } - else { if (iv > acc->max_val[a].i) acc->max_val[a].i = iv; } + if (is_f) { if (fv == fv && fv > acc->max_val[a].f) acc->max_val[a].f = fv; } + else if (!int_null) { if (iv > acc->max_val[a].i) acc->max_val[a].i = iv; } + if (!is_null && nn) nn[a]++; } } } @@ -3786,16 +3895,34 @@ static inline void da_accum_row(da_ctx_t* c, da_accum_t* acc, int32_t gid, int64 * COUNT-only queries have acc->sum==NULL; count[gid]++ above suffices. */ if (!acc->sum) return; uint32_t f64m = c->agg_f64_mask; + uint32_t inm = c->agg_int_null_mask; + int64_t* nn = acc->nn_count; for (uint8_t a = 0; a < n_aggs; a++) { if (!c->agg_ptrs[a]) continue; size_t idx = base + a; - if (c->agg_strlen && c->agg_strlen[a]) + if (c->agg_strlen && c->agg_strlen[a]) { acc->sum[idx].i += group_strlen_at(c->agg_cols[a], r); - else if (f64m & (1u << a)) - acc->sum[idx].f += ((const double*)c->agg_ptrs[a])[r]; - else - acc->sum[idx].i += read_col_i64(c->agg_ptrs[a], r, - c->agg_types[a], 0); + if (nn) nn[idx]++; + } else if (f64m & (1u << a)) { + /* Phase 2 dual encoding: NaN payload = null, skip from sum. */ + double v = ((const double*)c->agg_ptrs[a])[r]; + if (RAY_LIKELY(v == v)) { acc->sum[idx].f += v; if (nn) nn[idx]++; } + } else { + /* Phase 3a dual encoding: NULL_I* sentinel = null, skip from sum. + * Only paid when the source column actually advertises nulls. + * + * Phase 3a hazard: this sentinel-compare drops user-stored INT_MIN + * values in HAS_NULLS columns. The plan accepted this tradeoff for + * the cache-line cost of nullmap consultation — dual encoding keeps + * the bitmap as source of truth, so the corruption is bounded to the + * narrow window where HAS_NULLS is set AND a non-null cell holds the + * sentinel value. */ + int64_t v = read_col_i64(c->agg_ptrs[a], r, c->agg_types[a], 0); + if (RAY_LIKELY(!((inm >> a) & 1) || v != c->agg_int_null_sentinel[a])) { + acc->sum[idx].i += v; + if (nn) nn[idx]++; + } + } } return; } @@ -3804,10 +3931,22 @@ static inline void da_accum_row(da_ctx_t* c, da_accum_t* acc, int32_t gid, int64 * dispatch is work-stealing: tasks may be claimed by a single worker * out of index order, so rows do NOT arrive in monotonic order within * a worker. Use explicit min/max comparison against r and update the - * stored value only when the new row beats the current bound. */ + * stored value only when the new row beats the current bound. + * + * Multi-FIRST limitation: first_row[gid] is shared across all FIRST + * aggs in this group, so two FIRST aggs A and B on different columns + * with disjoint null patterns can race — whichever non-null lands + * first stakes first_row and the other agg never gets a chance. + * The result for the "loser" agg is a typed null (nn[idx] stays 0), + * which is strictly safer than the previous behaviour (leaked the + * 0 calloc seed) but still not the true first-non-null value. Fix + * would require per-(group, agg) first_row arrays. Out of scope for + * this phase; documented for future work. */ bool fl_take_first = (acc->first_row && r < acc->first_row[gid]); bool fl_take_last = (acc->last_row && r > acc->last_row[gid]); + bool first_advanced = false, last_advanced = false; + int64_t* nn = acc->nn_count; for (uint8_t a = 0; a < n_aggs; a++) { if (!c->agg_ptrs[a]) continue; size_t idx = base + a; @@ -3819,46 +3958,83 @@ static inline void da_accum_row(da_ctx_t* c, da_accum_t* acc, int32_t gid, int64 da_read_val(c->agg_ptrs[a], c->agg_types[a], 0, r, &fv, &iv); } uint16_t op = c->agg_ops[a]; + bool is_f = (c->agg_types[a] == RAY_F64); + /* Phase 3a dual encoding: NULL_I* sentinel = null. Bit set in + * agg_int_null_mask AND value equal to per-agg sentinel means + * this row is null for an integer aggregation column. */ + bool int_null = (c->agg_int_null_mask & (1u << a)) && + iv == c->agg_int_null_sentinel[a]; + bool is_null = is_f ? !(fv == fv) : int_null; if (op == OP_SUM || op == OP_AVG || op == OP_STDDEV || op == OP_STDDEV_POP || op == OP_VAR || op == OP_VAR_POP) { - if (c->agg_types[a] == RAY_F64) acc->sum[idx].f += fv; - else acc->sum[idx].i = (int64_t)((uint64_t)acc->sum[idx].i + (uint64_t)iv); - if (acc->sumsq_f64) acc->sumsq_f64[idx] += fv * fv; + if (is_f) { + /* Phase 2 dual encoding: NaN payload = null, skip from sum/sumsq. */ + if (RAY_LIKELY(fv == fv)) { + acc->sum[idx].f += fv; + if (acc->sumsq_f64) acc->sumsq_f64[idx] += fv * fv; + if (nn) nn[idx]++; + } + } else if (RAY_LIKELY(!int_null)) { + acc->sum[idx].i = (int64_t)((uint64_t)acc->sum[idx].i + (uint64_t)iv); + if (acc->sumsq_f64) acc->sumsq_f64[idx] += fv * fv; + if (nn) nn[idx]++; + } } else if (op == OP_PROD) { - if (c->agg_types[a] == RAY_F64) { - if (acc->count[gid] == 1) acc->sum[idx].f = fv; - else acc->sum[idx].f *= fv; - } else { - if (acc->count[gid] == 1) acc->sum[idx].i = iv; + /* "First non-null" marker: nn[idx]==0 when nn is tracked, + * otherwise count[gid]==1 (always non-null without nn). */ + bool first_seen = nn ? (nn[idx] == 0) : (acc->count[gid] == 1); + if (is_f) { + if (fv == fv) { + if (first_seen) acc->sum[idx].f = fv; + else acc->sum[idx].f *= fv; + if (nn) nn[idx]++; + } + } else if (RAY_LIKELY(!int_null)) { + if (first_seen) acc->sum[idx].i = iv; else acc->sum[idx].i = (int64_t)((uint64_t)acc->sum[idx].i * (uint64_t)iv); + if (nn) nn[idx]++; } } else if (op == OP_FIRST) { - if (fl_take_first) { - if (c->agg_types[a] == RAY_F64) acc->sum[idx].f = fv; + /* Only stake the first-row claim when this row's value for the + * agg column is actually non-null — a null prefix would block + * later non-null rows otherwise. */ + if (fl_take_first && !is_null) { + if (is_f) acc->sum[idx].f = fv; else acc->sum[idx].i = iv; + first_advanced = true; + if (nn) nn[idx]++; } } else if (op == OP_LAST) { - if (fl_take_last) { - if (c->agg_types[a] == RAY_F64) acc->sum[idx].f = fv; + if (fl_take_last && !is_null) { + if (is_f) acc->sum[idx].f = fv; else acc->sum[idx].i = iv; + last_advanced = true; + if (nn) nn[idx]++; } } else if (op == OP_MIN) { - if (c->agg_types[a] == RAY_F64) { - if (fv < acc->min_val[idx].f) acc->min_val[idx].f = fv; - } else { + if (is_f) { + /* Phase 2 dual encoding: NaN comparisons are always false, but + * make the skip explicit. */ + if (fv == fv && fv < acc->min_val[idx].f) acc->min_val[idx].f = fv; + } else if (!int_null) { if (iv < acc->min_val[idx].i) acc->min_val[idx].i = iv; } + if (!is_null && nn) nn[idx]++; } else if (op == OP_MAX) { - if (c->agg_types[a] == RAY_F64) { - if (fv > acc->max_val[idx].f) acc->max_val[idx].f = fv; - } else { + if (is_f) { + if (fv == fv && fv > acc->max_val[idx].f) acc->max_val[idx].f = fv; + } else if (!int_null) { if (iv > acc->max_val[idx].i) acc->max_val[idx].i = iv; } + if (!is_null && nn) nn[idx]++; } } - /* Commit row-index bounds after value writes. */ - if (fl_take_first) acc->first_row[gid] = r; - if (fl_take_last) acc->last_row[gid] = r; + /* Commit row-index bounds only when an OP_FIRST/OP_LAST actually + * accepted this row's value. An all-null row at the smallest index + * must NOT advance first_row[gid] — otherwise the next non-null row + * loses the FIRST race. */ + if (first_advanced) acc->first_row[gid] = r; + if (last_advanced) acc->last_row[gid] = r; } static void da_accum_fn(void* ctx, uint32_t worker_id, int64_t start, int64_t end) { @@ -3987,17 +4163,21 @@ static void da_merge_fn(void* ctx, uint32_t wid, int64_t start, int64_t end) { for (uint8_t a = 0; a < n_aggs; a++) { size_t idx = base + a; uint16_t aop = c->agg_ops ? c->agg_ops[a] : OP_SUM; + /* nn_count is per-(group, agg); count is per group. + * Fall back to count when nn_count is absent. */ + int64_t mnn = merged->nn_count ? merged->nn_count[idx] : merged->count[s]; + int64_t wnn = wa->nn_count ? wa->nn_count[idx] : wa->count[s]; if (aop == OP_FIRST) { - /* Keep worker 0 value; take from w only if merged has no data */ - if (merged->count[s] == 0 && wa->count[s] > 0) + /* Keep worker 0 value; take from w only if merged has no non-null value */ + if (mnn == 0 && wnn > 0) merged->sum[idx] = wa->sum[idx]; } else if (aop == OP_LAST) { - /* Overwrite with last worker that has data */ - if (wa->count[s] > 0) + /* Overwrite with last worker that has a non-null value */ + if (wnn > 0) merged->sum[idx] = wa->sum[idx]; } else if (aop == OP_PROD) { - if (wa->count[s] > 0) { - if (merged->count[s] == 0) + if (wnn > 0) { + if (mnn == 0) merged->sum[idx] = wa->sum[idx]; else if (agg_types[a] == RAY_F64) merged->sum[idx].f *= wa->sum[idx].f; @@ -4034,6 +4214,10 @@ static void da_merge_fn(void* ctx, uint32_t wid, int64_t start, int64_t end) { } } } + if (merged->nn_count && wa->nn_count) { + for (uint8_t a = 0; a < n_aggs; a++) + merged->nn_count[base + a] += wa->nn_count[base + a]; + } merged->count[s] += wa->count[s]; } } @@ -4852,13 +5036,31 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, void* agg_ptrs[vla_aggs]; int8_t agg_types[vla_aggs]; + int64_t sc_int_null_sentinel[vla_aggs]; + uint32_t sc_int_null_mask = 0; + bool sc_any_nullable = false; for (uint8_t a = 0; a < n_aggs; a++) { if (agg_vecs[a]) { agg_ptrs[a] = ray_data(agg_vecs[a]); agg_types[a] = agg_vecs[a]->type; + sc_int_null_sentinel[a] = agg_int_null_sentinel_for(agg_vecs[a]->type); + /* Only set the int-null mask bit for storage types whose + * sentinel is meaningful. BOOL/U8/SYM use 0 as their default + * "sentinel" which collides with legitimate values + * (FALSE / zero byte / SYM id 0); gating those would silently + * drop real rows from SUM/MIN/MAX. F64 has its own NaN path. */ + int8_t t = agg_vecs[a]->type; + bool is_sentinel_typed = (t == RAY_I16 || t == RAY_I32 || t == RAY_I64 || + t == RAY_DATE || t == RAY_TIME || t == RAY_TIMESTAMP); + if (is_sentinel_typed && (agg_vecs[a]->attrs & RAY_ATTR_HAS_NULLS)) + sc_int_null_mask |= (1u << a); + if ((agg_vecs[a]->attrs & RAY_ATTR_HAS_NULLS) && + (agg_vecs[a]->type == RAY_F64 || is_sentinel_typed)) + sc_any_nullable = true; } else { agg_ptrs[a] = NULL; agg_types[a] = 0; + sc_int_null_sentinel[a] = 0; } } @@ -4913,6 +5115,11 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, sc_acc[w].count = (int64_t*)scratch_calloc(&sc_acc[w]._h_count, 1 * sizeof(int64_t)); if (!sc_acc[w].count) { alloc_ok = false; break; } + if (sc_any_nullable) { + sc_acc[w].nn_count = (int64_t*)scratch_calloc( + &sc_acc[w]._h_nn_count, n_aggs * sizeof(int64_t)); + if (!sc_acc[w].nn_count) { alloc_ok = false; break; } + } } if (!alloc_ok) { for (uint32_t w = 0; w < sc_n; w++) da_accum_free(&sc_acc[w]); @@ -4933,15 +5140,26 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, .rowsel = rowsel, .accums = sc_acc, .n_accums = sc_n, + .agg_int_null_mask = sc_int_null_mask, + .agg_int_null_sentinel = sc_int_null_sentinel, }; /* Pick specialized tight loop when possible, else generic. * The specialized scalar_sum_*_fn variants don't honour * match_idx — they read data[r] directly — so they're only - * safe when no selection is in flight. */ + * safe when no selection is in flight. They also read the + * slot raw, so they require null-free input: Phase 3a stores + * NULL_I{16,32,64} sentinels in null slots which would poison + * the sum. Fall back to the generic masked path when the + * source vector advertises nulls. (try_linear_sumavg_input_i64 + * already refuses to build a linear plan when any term column + * has nulls, so agg_linear[0].enabled implies null-free.) */ typedef void (*scalar_fn_t)(void*, uint32_t, int64_t, int64_t); scalar_fn_t sc_fn = scalar_accum_fn; - if (n_aggs == 1 && !match_idx && !rowsel && agg_ptrs[0] != NULL) { + bool agg0_has_nulls = (sc_int_null_mask & 1u) != 0 || + (agg_vecs[0] && agg_vecs[0]->type == RAY_F64 && + (agg_vecs[0]->attrs & RAY_ATTR_HAS_NULLS)); + if (n_aggs == 1 && !match_idx && !rowsel && agg_ptrs[0] != NULL && !agg0_has_nulls) { uint16_t op0 = ext->agg_ops[0]; int8_t t0 = agg_types[0]; if ((op0 == OP_SUM || op0 == OP_AVG) && @@ -4967,15 +5185,19 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, if (need_flags & DA_NEED_SUM) { for (uint8_t a = 0; a < n_aggs; a++) { uint16_t merge_op = ext->agg_ops[a]; + /* nn_count is per-agg; count is per worker. Fall back + * to count when nn_count is absent (no nullable aggs). */ + int64_t mnn = m->nn_count ? m->nn_count[a] : m->count[0]; + int64_t wnn = wa->nn_count ? wa->nn_count[a] : wa->count[0]; if (merge_op == OP_FIRST) { - if (m->count[0] == 0 && wa->count[0] > 0) + if (mnn == 0 && wnn > 0) m->sum[a] = wa->sum[a]; } else if (merge_op == OP_LAST) { - if (wa->count[0] > 0) + if (wnn > 0) m->sum[a] = wa->sum[a]; } else if (merge_op == OP_PROD) { - if (wa->count[0] > 0) { - if (m->count[0] == 0) + if (wnn > 0) { + if (mnn == 0) m->sum[a] = wa->sum[a]; else if (agg_types[a] == RAY_F64) m->sum[a].f *= wa->sum[a].f; @@ -5016,6 +5238,10 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, } } } + if (m->nn_count && wa->nn_count) { + for (uint8_t a = 0; a < n_aggs; a++) + m->nn_count[a] += wa->nn_count[a]; + } m->count[0] += wa->count[0]; } for (uint32_t w = 1; w < sc_n; w++) da_accum_free(&sc_acc[w]); @@ -5036,7 +5262,7 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, (double*)m->sum, (int64_t*)m->sum, (double*)m->min_val, (double*)m->max_val, (int64_t*)m->min_val, (int64_t*)m->max_val, - m->count, agg_affine, m->sumsq_f64); + m->count, agg_affine, m->sumsq_f64, m->nn_count); da_accum_free(&sc_acc[0]); scratch_free(sc_hdr); for (uint8_t a = 0; a < n_aggs; a++) @@ -5199,16 +5425,38 @@ da_path:; void* agg_ptrs[vla_aggs]; int8_t agg_types[vla_aggs]; + int64_t da_int_null_sentinel[vla_aggs]; uint32_t agg_f64_mask = 0; + uint32_t da_int_null_mask = 0; + /* Phase 3 follow-up: track whether any agg column can produce + * a null so we can allocate per-(group, agg) non-null counts + * only when required. F64 with HAS_NULLS uses NaN-skip; sentinel- + * typed integers with HAS_NULLS use sentinel-skip. */ + bool da_any_nullable = false; for (uint8_t a = 0; a < n_aggs; a++) { if (agg_vecs[a]) { agg_ptrs[a] = ray_data(agg_vecs[a]); agg_types[a] = agg_vecs[a]->type; if (agg_vecs[a]->type == RAY_F64) agg_f64_mask |= (1u << a); + da_int_null_sentinel[a] = agg_int_null_sentinel_for(agg_vecs[a]->type); + /* Only set the int-null mask bit for storage types whose + * sentinel is meaningful. BOOL/U8/SYM use 0 as their default + * "sentinel" which collides with legitimate values + * (FALSE / zero byte / SYM id 0); gating those would silently + * drop real rows from SUM/MIN/MAX. F64 has its own NaN path. */ + int8_t t = agg_vecs[a]->type; + bool is_sentinel_typed = (t == RAY_I16 || t == RAY_I32 || t == RAY_I64 || + t == RAY_DATE || t == RAY_TIME || t == RAY_TIMESTAMP); + if (is_sentinel_typed && (agg_vecs[a]->attrs & RAY_ATTR_HAS_NULLS)) + da_int_null_mask |= (1u << a); + if ((agg_vecs[a]->attrs & RAY_ATTR_HAS_NULLS) && + (agg_vecs[a]->type == RAY_F64 || is_sentinel_typed)) + da_any_nullable = true; } else { agg_ptrs[a] = NULL; agg_types[a] = 0; + da_int_null_sentinel[a] = 0; } } @@ -5224,6 +5472,9 @@ da_path:; if (need_flags & DA_NEED_MIN) arrays_per_agg += 2; if (need_flags & DA_NEED_MAX) arrays_per_agg += 2; if (need_flags & DA_NEED_SUMSQ) arrays_per_agg += 1; + /* Phase 3 follow-up: nullable aggs add a per-(group, agg) + * non-null count array. ~8 bytes per (group, agg). */ + if (da_any_nullable) arrays_per_agg += 1; uint64_t per_worker_bytes = (uint64_t)n_slots * (arrays_per_agg * n_aggs + 1u) * 8u; if ((uint64_t)da_n_workers * per_worker_bytes > DA_MEM_BUDGET) da_n_workers = 1; @@ -5268,6 +5519,11 @@ da_path:; accums[w].count = (int64_t*)scratch_calloc(&accums[w]._h_count, n_slots * sizeof(int64_t)); if (!accums[w].count) { alloc_ok = false; break; } + if (da_any_nullable) { + accums[w].nn_count = (int64_t*)scratch_calloc( + &accums[w]._h_nn_count, total * sizeof(int64_t)); + if (!accums[w].nn_count) { alloc_ok = false; break; } + } if (da_has_first_last) { accums[w].first_row = (int64_t*)scratch_alloc( &accums[w]._h_first_row, n_slots * sizeof(int64_t)); @@ -5312,6 +5568,8 @@ da_path:; .n_aggs = n_aggs, .need_flags = need_flags, .agg_f64_mask = agg_f64_mask, + .agg_int_null_mask = da_int_null_mask, + .agg_int_null_sentinel = da_int_null_sentinel, .all_sum = all_sum, .n_slots = n_slots, .match_idx = match_idx, @@ -5359,8 +5617,13 @@ da_path:; if (agg_types[a] == RAY_F64) merged->sum[idx].f += wa->sum[idx].f; else merged->sum[idx].i += wa->sum[idx].i; } else if (aop == OP_PROD) { - if (wa->count[s] > 0) { - if (merged->count[s] == 0) + /* Use per-(group, agg) non-null counts when + * available so an all-null worker doesn't + * fold a stale seed into the merged product. */ + int64_t mnn = merged->nn_count ? merged->nn_count[idx] : merged->count[s]; + int64_t wnn = wa->nn_count ? wa->nn_count[idx] : wa->count[s]; + if (wnn > 0) { + if (mnn == 0) merged->sum[idx] = wa->sum[idx]; else if (agg_types[a] == RAY_F64) merged->sum[idx].f *= wa->sum[idx].f; @@ -5401,6 +5664,10 @@ da_path:; } } } + if (merged->nn_count && wa->nn_count) { + for (size_t i = 0; i < total; i++) + merged->nn_count[i] += wa->nn_count[i]; + } for (uint32_t s = 0; s < n_slots; s++) merged->count[s] += wa->count[s]; } @@ -5429,15 +5696,17 @@ da_path:; for (uint8_t a = 0; a < n_aggs; a++) { size_t idx = base + a; uint16_t aop = ext->agg_ops[a]; + int64_t mnn = merged->nn_count ? merged->nn_count[idx] : merged->count[s]; + int64_t wnn = wa->nn_count ? wa->nn_count[idx] : wa->count[s]; if (aop == OP_FIRST) { - if (merged->count[s] == 0 && wa->count[s] > 0) + if (mnn == 0 && wnn > 0) merged->sum[idx] = wa->sum[idx]; } else if (aop == OP_LAST) { - if (wa->count[s] > 0) + if (wnn > 0) merged->sum[idx] = wa->sum[idx]; } else if (aop == OP_PROD) { - if (wa->count[s] > 0) { - if (merged->count[s] == 0) + if (wnn > 0) { + if (mnn == 0) merged->sum[idx] = wa->sum[idx]; else if (agg_types[a] == RAY_F64) merged->sum[idx].f *= wa->sum[idx].f; @@ -5475,6 +5744,10 @@ da_path:; } } } + if (merged->nn_count && wa->nn_count) { + for (size_t i = 0; i < total; i++) + merged->nn_count[i] += wa->nn_count[i]; + } for (uint32_t s = 0; s < n_slots; s++) merged->count[s] += wa->count[s]; } @@ -5490,6 +5763,7 @@ da_path:; da_val_t* da_max_val = merged->max_val; /* may be NULL if !DA_NEED_MAX */ double* da_sumsq = merged->sumsq_f64; /* may be NULL if !DA_NEED_SUMSQ */ int64_t* da_count = merged->count; + int64_t* da_nn_count = merged->nn_count; /* may be NULL when no agg can be null */ uint32_t all_grp_count = 0; for (uint32_t s = 0; s < n_slots; s++) @@ -5539,7 +5813,7 @@ da_path:; /* Agg columns — compact sparse DA arrays into dense, then emit */ size_t dense_total = (size_t)grp_count * n_aggs; ray_t *_h_dsum = NULL, *_h_dmin = NULL, *_h_dmax = NULL; - ray_t *_h_dsq = NULL, *_h_dcnt = NULL; + ray_t *_h_dsq = NULL, *_h_dcnt = NULL, *_h_dnn = NULL; da_val_t* dense_sum = da_sum ? (da_val_t*)scratch_alloc(&_h_dsum, dense_total * sizeof(da_val_t)) : NULL; da_val_t* dense_min_val = da_min_val ? (da_val_t*)scratch_alloc(&_h_dmin, dense_total * sizeof(da_val_t)) : NULL; da_val_t* dense_max_val = da_max_val ? (da_val_t*)scratch_alloc(&_h_dmax, dense_total * sizeof(da_val_t)) : NULL; @@ -5547,6 +5821,9 @@ da_path:; int64_t* dense_counts = grp_count ? (int64_t*)scratch_alloc(&_h_dcnt, grp_count * sizeof(int64_t)) : NULL; + int64_t* dense_nn_counts = (da_nn_count && grp_count) + ? (int64_t*)scratch_alloc(&_h_dnn, dense_total * sizeof(int64_t)) + : NULL; uint32_t gi = 0; for (uint32_t s = 0; s < n_slots; s++) { @@ -5559,6 +5836,7 @@ da_path:; if (dense_min_val) dense_min_val[di] = da_min_val[si]; if (dense_max_val) dense_max_val[di] = da_max_val[si]; if (dense_sumsq) dense_sumsq[di] = da_sumsq[si]; + if (dense_nn_counts) dense_nn_counts[di] = da_nn_count[si]; } gi++; } @@ -5567,11 +5845,13 @@ da_path:; (double*)dense_sum, (int64_t*)dense_sum, (double*)dense_min_val, (double*)dense_max_val, (int64_t*)dense_min_val, (int64_t*)dense_max_val, - dense_counts, agg_affine, dense_sumsq); + dense_counts, agg_affine, dense_sumsq, + dense_nn_counts); scratch_free(_h_dsum); scratch_free(_h_dmin); scratch_free(_h_dmax); scratch_free(_h_dsq); scratch_free(_h_dcnt); + scratch_free(_h_dnn); da_accum_free(&accums[0]); scratch_free(accums_hdr); for (uint8_t a = 0; a < n_aggs; a++) @@ -5602,8 +5882,21 @@ da_path:; if (op == OP_COUNT) continue; if (op != OP_SUM && op != OP_AVG) sp_eligible = false; - else - sp_need_sum = true; + else { + /* Phase 3a: the single-key sparse aggregation path reads agg + * slots raw via read_col_i64 / direct double load; nullable + * input columns would poison the sum with NULL_I* or NULL_F64 + * sentinels. Fall back to slower paths that mask nulls + * properly. Scope note: this gate covers the scalar + * dispatcher and this single-key sparse path only; the + * multi-key radix HT (accum_from_entry, ~line 2155) inherits + * Phase 2's pre-existing nullable-agg gap and is out of scope + * for this commit. */ + if (agg_vecs[a] && (agg_vecs[a]->attrs & RAY_ATTR_HAS_NULLS)) + sp_eligible = false; + else + sp_need_sum = true; + } } if (sp_eligible) { @@ -5870,10 +6163,14 @@ da_path:; int64_t name_id = key_ext ? key_ext->sym : 0; result = ray_table_add_col(result, name_id, key_col); ray_release(key_col); + /* nn_counts == NULL: this fast path rejected HAS_NULLS + * inputs at the sp_eligible gate (~line 5737), so every + * row is non-null and the legacy count-based divisor is + * correct. */ emit_agg_columns(&result, g, ext, agg_vecs, grp_count, n_aggs, (double*)dense_sum, (int64_t*)dense_sum, NULL, NULL, NULL, NULL, - dense_count, agg_affine, NULL); + dense_count, agg_affine, NULL, NULL); scratch_free(_h_sum); scratch_free(_h_cnt); scratch_free(range_sum_hdr); scratch_free(cnt_hdr); @@ -6067,10 +6364,13 @@ da_path:; result = ray_table_add_col(result, name_id, key_col); ray_release(key_col); + /* nn_counts == NULL: same null-free guard as above; the + * emit-filter range path only runs when sp_eligible was + * true. */ emit_agg_columns(&result, g, ext, agg_vecs, grp_count, n_aggs, (double*)dense_sum, (int64_t*)dense_sum, NULL, NULL, NULL, NULL, - dense_count, agg_affine, NULL); + dense_count, agg_affine, NULL, NULL); scratch_free(_h_sum); scratch_free(_h_cnt); @@ -6285,10 +6585,13 @@ da_path:; result = ray_table_add_col(result, name_id, key_col); ray_release(key_col); + /* nn_counts == NULL: sparse HT path only handles SUM/AVG/COUNT + * and is gated to null-free agg columns (sp_eligible guard at + * ~line 5737), so counts[gi] is the correct divisor. */ emit_agg_columns(&result, g, ext, agg_vecs, grp_count, n_aggs, (double*)dense_sum, (int64_t*)dense_sum, NULL, NULL, NULL, NULL, - dense_count, agg_affine, NULL); + dense_count, agg_affine, NULL, NULL); scratch_free(_h_sum); scratch_free(_h_cnt); @@ -7248,7 +7551,10 @@ ht_path:; const char* row = ph->rows + (size_t)gi * rs; int64_t cnt = *(const int64_t*)(const void*)row; bool insuf = (op == OP_VAR || op == OP_STDDEV) ? cnt <= 1 : cnt <= 0; - if (insuf) ray_vec_set_null(agg_outs[a].vec, off + gi, true); + if (insuf) { + ray_vec_set_null(agg_outs[a].vec, off + gi, true); + ((double*)ray_data(agg_outs[a].vec))[off + gi] = NULL_F64; + } } } } @@ -7377,6 +7683,18 @@ sequential_fallback:; int64_t null_mask = rkeys[n_keys]; if (null_mask & (int64_t)(1u << k)) { ray_vec_set_null(new_col, (int64_t)gi, true); + /* Phase 2/3a dual encoding: fill correct-width sentinel. */ + switch (kt) { + case RAY_F64: + ((double*)ray_data(new_col))[gi] = NULL_F64; break; + case RAY_I64: case RAY_TIMESTAMP: + ((int64_t*)ray_data(new_col))[gi] = NULL_I64; break; + case RAY_I32: case RAY_DATE: case RAY_TIME: + ((int32_t*)ray_data(new_col))[gi] = NULL_I32; break; + case RAY_I16: + ((int16_t*)ray_data(new_col))[gi] = NULL_I16; break; + default: break; + } continue; } if (is_wide) { @@ -7587,7 +7905,7 @@ sequential_fallback:; case OP_VAR: case OP_VAR_POP: case OP_STDDEV: case OP_STDDEV_POP: { bool insuf = (agg_op == OP_VAR || agg_op == OP_STDDEV) ? cnt <= 1 : cnt <= 0; - if (insuf) { v = 0.0; ray_vec_set_null(new_col, gi, true); break; } + if (insuf) { v = NULL_F64; ray_vec_set_null(new_col, gi, true); break; } double sum_val = is_f64 ? ROW_RD_F64(row, ly->off_sum, s) : (double)ROW_RD_I64(row, ly->off_sum, s); double sq_val = ly->off_sumsq ? ROW_RD_F64(row, ly->off_sumsq, s) : 0.0; @@ -8244,12 +8562,12 @@ exec_group_per_partition(ray_t* parted_tbl, ray_op_ext_t* ext, const double* sv = (const double*)ray_data(sum_col); for (int64_t r = 0; r < nrows; r++) { double n = (double)cv[r]; - if (n <= 0) { out[r] = 0.0; ray_vec_set_null(out_col, r, true); continue; } + if (n <= 0) { out[r] = NULL_F64; ray_vec_set_null(out_col, r, true); continue; } double mean = sv[r] / n; double var_pop = sq[r] / n - mean * mean; if (var_pop < 0) var_pop = 0; bool insuf = (orig_op == OP_VAR || orig_op == OP_STDDEV) && n <= 1; - if (insuf) { out[r] = 0.0; ray_vec_set_null(out_col, r, true); continue; } + if (insuf) { out[r] = NULL_F64; ray_vec_set_null(out_col, r, true); continue; } if (orig_op == OP_VAR_POP) out[r] = var_pop; else if (orig_op == OP_VAR) out[r] = var_pop * n / (n - 1); else if (orig_op == OP_STDDEV_POP) out[r] = sqrt(var_pop); @@ -8259,12 +8577,12 @@ exec_group_per_partition(ray_t* parted_tbl, ray_op_ext_t* ext, const int64_t* sv = (const int64_t*)ray_data(sum_col); for (int64_t r = 0; r < nrows; r++) { double n = (double)cv[r]; - if (n <= 0) { out[r] = 0.0; ray_vec_set_null(out_col, r, true); continue; } + if (n <= 0) { out[r] = NULL_F64; ray_vec_set_null(out_col, r, true); continue; } double mean = (double)sv[r] / n; double var_pop = sq[r] / n - mean * mean; if (var_pop < 0) var_pop = 0; bool insuf = (orig_op == OP_VAR || orig_op == OP_STDDEV) && n <= 1; - if (insuf) { out[r] = 0.0; ray_vec_set_null(out_col, r, true); continue; } + if (insuf) { out[r] = NULL_F64; ray_vec_set_null(out_col, r, true); continue; } if (orig_op == OP_VAR_POP) out[r] = var_pop; else if (orig_op == OP_VAR) out[r] = var_pop * n / (n - 1); else if (orig_op == OP_STDDEV_POP) out[r] = sqrt(var_pop); @@ -9010,18 +9328,37 @@ static void grpt_phase3_fn(void* ctx_v, uint32_t worker_id, for (int64_t j = 0; j < kept; j++) { /* Key write — replicate same key across kept rows. */ if (e->has_null_key) { - /* Write 0 placeholder then mark null on the output - * column. ray_vec_set_null is not threadsafe across - * workers for the same word; but each partition - * writes a contiguous row range so two partitions - * never touch the same nullmap word — unless a row - * range straddles an 8-row boundary that another + /* Write width-correct sentinel then mark null on the + * output column. Phase 2/3a dual encoding: payload + * must hold INT_MIN/NaN per type, not 0. + * ray_vec_set_null is not threadsafe across workers + * for the same word; but each partition writes a + * contiguous row range so two partitions never touch + * the same nullmap word — unless a row range + * straddles an 8-row boundary that another * partition's range also touches. In practice the * null-key case at most produces K rows and * partitions are large; we serialise null-key * writes by routing the null-key entry into the * sequential final-pass below. */ - grpt_write_key(c->key_out, row + j, 0, kesz); + int64_t null_bits = 0; + switch (c->key_type) { + case RAY_F64: { + double v = NULL_F64; + memcpy(&null_bits, &v, 8); + break; + } + case RAY_I64: case RAY_TIMESTAMP: + null_bits = NULL_I64; break; + case RAY_I32: case RAY_DATE: case RAY_TIME: + null_bits = (int64_t)NULL_I32; break; + case RAY_I16: + null_bits = (int64_t)NULL_I16; break; + default: + /* BOOL/U8 — non-nullable per Phase 1, keep 0. */ + null_bits = 0; break; + } + grpt_write_key(c->key_out, row + j, null_bits, kesz); if (c->key_vec) ray_vec_set_null(c->key_vec, row + j, true); } else { diff --git a/src/ops/linkop.c b/src/ops/linkop.c index 0d0aa11b..895e8853 100644 --- a/src/ops/linkop.c +++ b/src/ops/linkop.c @@ -266,6 +266,36 @@ ray_t* ray_link_deref(ray_t* v, int64_t sym_id) { } } + /* Phase 2/3a dual encoding: fill correct-width sentinel into null + * payload slots so consumers reading raw payload honor the contract. */ + switch (out_type) { + case RAY_F64: { + double* d = (double*)ray_data(result); + for (int64_t i = 0; i < n; i++) + if (ray_vec_is_null(result, i)) d[i] = NULL_F64; + break; + } + case RAY_I64: case RAY_TIMESTAMP: { + int64_t* d = (int64_t*)ray_data(result); + for (int64_t i = 0; i < n; i++) + if (ray_vec_is_null(result, i)) d[i] = NULL_I64; + break; + } + case RAY_I32: case RAY_DATE: case RAY_TIME: { + int32_t* d = (int32_t*)ray_data(result); + for (int64_t i = 0; i < n; i++) + if (ray_vec_is_null(result, i)) d[i] = NULL_I32; + break; + } + case RAY_I16: { + int16_t* d = (int16_t*)ray_data(result); + for (int64_t i = 0; i < n; i++) + if (ray_vec_is_null(result, i)) d[i] = NULL_I16; + break; + } + default: break; + } + /* Type-specific metadata propagation. * RAY_STR: share the source pool so ray_str_t pool_offs are valid. * RAY_SYM: if the source column carries a local sym_dict, share it. diff --git a/src/ops/pivot.c b/src/ops/pivot.c index 778123ca..ac5745a9 100644 --- a/src/ops/pivot.c +++ b/src/ops/pivot.c @@ -522,6 +522,18 @@ ray_t* exec_pivot(ray_graph_t* g, ray_op_t* op, ray_t* tbl) { memcpy(&ent_nmask, ix_entry_p + 8 + (size_t)n_idx * 8, 8); if (ent_nmask & (int64_t)(1u << k)) { ray_vec_set_null(new_col, (int64_t)r, true); + /* Phase 2/3a dual encoding: fill correct-width sentinel. */ + switch (kt) { + case RAY_F64: + ((double*)ray_data(new_col))[r] = NULL_F64; break; + case RAY_I64: case RAY_TIMESTAMP: + ((int64_t*)ray_data(new_col))[r] = NULL_I64; break; + case RAY_I32: case RAY_DATE: case RAY_TIME: + ((int32_t*)ray_data(new_col))[r] = NULL_I32; break; + case RAY_I16: + ((int16_t*)ray_data(new_col))[r] = NULL_I16; break; + default: break; + } continue; } if (idx_wide[k]) { diff --git a/src/ops/query.c b/src/ops/query.c index 8492523d..deb347ea 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -8458,12 +8458,22 @@ ray_t* ray_update(ray_t** args, int64_t n) { /* Null-bit propagation: memcpy above only copies values, * not the nullmap. Carry over orig_col's nulls for the * untouched rows, and pull expr_vec's nulls in for the - * masked rows. Without this, casting a null F64 expr - * back to an I64 column silently produces 0. */ + * masked rows. Phase 3a dual encoding: also overwrite the + * destination payload with the dest-width sentinel — casting + * a NaN/INT_MIN sentinel produces implementation-defined + * garbage that wouldn't match the dual-encoding contract. */ for (int64_t r = 0; r < nrows; r++) { ray_t* src = mask[r] ? expr_vec : orig_col; - if (ray_vec_is_null(src, r)) + if (ray_vec_is_null(src, r)) { ray_vec_set_null(new_col, r, true); + switch (ct) { + case RAY_F64: ((double*)ray_data(new_col))[r] = NULL_F64; break; + case RAY_I64: case RAY_TIMESTAMP: ((int64_t*)ray_data(new_col))[r] = NULL_I64; break; + case RAY_I32: case RAY_DATE: case RAY_TIME:((int32_t*)ray_data(new_col))[r] = NULL_I32; break; + case RAY_I16: ((int16_t*)ray_data(new_col))[r] = NULL_I16; break; + default: break; + } + } } ray_release(expr_vec); result = ray_table_add_col(result, col_name, new_col); @@ -8533,6 +8543,31 @@ ray_t* ray_update(ray_t** args, int64_t n) { if (RAY_ATOM_IS_NULL(expr_vec)) { for (int64_t r = 0; r < nrows; r++) ray_vec_set_null(bcast, r, true); + /* Phase 2/3a dual encoding: fill correct-width + * sentinel into payload. */ + switch (ct) { + case RAY_F64: { + double* d = (double*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_F64; + break; + } + case RAY_I64: case RAY_TIMESTAMP: { + int64_t* d = (int64_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I64; + break; + } + case RAY_I32: case RAY_DATE: case RAY_TIME: { + int32_t* d = (int32_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I32; + break; + } + case RAY_I16: { + int16_t* d = (int16_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I16; + break; + } + default: break; + } } ray_release(expr_vec); expr_vec = bcast; @@ -8549,10 +8584,15 @@ ray_t* ray_update(ray_t** args, int64_t n) { promoted = ray_vec_append(promoted, &v); if (RAY_IS_ERR(promoted)) { ray_release(expr_vec); ray_release(new_col); ray_release(result); ray_release(mask_vec); ray_release(tbl); return promoted; } } - /* Carry the nullmap across the I64→F64 promotion. */ - for (int64_t r = 0; r < nr; r++) - if (ray_vec_is_null(expr_vec, r)) + /* Carry the nullmap across the I64→F64 promotion; + * Phase 2 dual encoding: also overwrite the slot with NaN. */ + double* dst = (double*)ray_data(promoted); + for (int64_t r = 0; r < nr; r++) { + if (ray_vec_is_null(expr_vec, r)) { ray_vec_set_null(promoted, r, true); + dst[r] = NULL_F64; + } + } ray_release(expr_vec); expr_vec = promoted; } @@ -8731,6 +8771,31 @@ ray_t* ray_update(ray_t** args, int64_t n) { if (RAY_ATOM_IS_NULL(expr_vec)) { for (int64_t r = 0; r < nrows; r++) ray_vec_set_null(bcast, r, true); + /* Phase 2/3a dual encoding: fill correct-width + * sentinel into payload. */ + switch (ct) { + case RAY_F64: { + double* d = (double*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_F64; + break; + } + case RAY_I64: case RAY_TIMESTAMP: { + int64_t* d = (int64_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I64; + break; + } + case RAY_I32: case RAY_DATE: case RAY_TIME: { + int32_t* d = (int32_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I32; + break; + } + case RAY_I16: { + int16_t* d = (int16_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I16; + break; + } + default: break; + } } ray_release(expr_vec); expr_vec = bcast; @@ -8747,10 +8812,15 @@ ray_t* ray_update(ray_t** args, int64_t n) { promoted = ray_vec_append(promoted, &v); if (RAY_IS_ERR(promoted)) { ray_release(expr_vec); ray_release(result); ray_release(tbl); return promoted; } } - /* Carry the nullmap across the I64→F64 promotion. */ - for (int64_t r = 0; r < nr; r++) - if (ray_vec_is_null(expr_vec, r)) + /* Carry the nullmap across the I64→F64 promotion; + * Phase 2 dual encoding: also overwrite the slot with NaN. */ + double* dst = (double*)ray_data(promoted); + for (int64_t r = 0; r < nr; r++) { + if (ray_vec_is_null(expr_vec, r)) { ray_vec_set_null(promoted, r, true); + dst[r] = NULL_F64; + } + } ray_release(expr_vec); expr_vec = promoted; } @@ -8823,6 +8893,31 @@ ray_t* ray_update(ray_t** args, int64_t n) { if (RAY_ATOM_IS_NULL(expr_vec)) { for (int64_t r = 0; r < nrows; r++) ray_vec_set_null(bcast, r, true); + /* Phase 2/3a dual encoding: fill correct-width + * sentinel into payload. */ + switch (ct) { + case RAY_F64: { + double* d = (double*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_F64; + break; + } + case RAY_I64: case RAY_TIMESTAMP: { + int64_t* d = (int64_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I64; + break; + } + case RAY_I32: case RAY_DATE: case RAY_TIME: { + int32_t* d = (int32_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I32; + break; + } + case RAY_I16: { + int16_t* d = (int16_t*)ray_data(bcast); + for (int64_t r = 0; r < nrows; r++) d[r] = NULL_I16; + break; + } + default: break; + } } ray_release(expr_vec); expr_vec = bcast; diff --git a/src/ops/string.c b/src/ops/string.c index 84e72a35..7c9512a4 100644 --- a/src/ops/string.c +++ b/src/ops/string.c @@ -889,7 +889,7 @@ ray_t* exec_strlen(ray_graph_t* g, ray_op_t* op) { str_resolve(input, &elems, &pool); for (int64_t i = 0; i < len; i++) { if (ray_vec_is_null((ray_t*)input, i)) { - dst[i] = 0; + dst[i] = NULL_I64; ray_vec_set_null(result, i, true); continue; } @@ -898,7 +898,7 @@ ray_t* exec_strlen(ray_graph_t* g, ray_op_t* op) { } else { for (int64_t i = 0; i < len; i++) { if (ray_vec_is_null((ray_t*)input, i)) { - dst[i] = 0; + dst[i] = NULL_I64; ray_vec_set_null(result, i, true); continue; } diff --git a/src/ops/strop.c b/src/ops/strop.c index 4f68ba18..e22a8ec7 100644 --- a/src/ops/strop.c +++ b/src/ops/strop.c @@ -73,7 +73,7 @@ static ray_t* strlen_vec(ray_t* x) { for (int64_t i = 0; i < n; i++) { if (has_nulls && ray_vec_is_null(x, i)) { - dst[i] = 0; + dst[i] = NULL_I64; ray_vec_set_null(out, i, true); continue; } @@ -102,7 +102,8 @@ static ray_t* strlen_mapcommon(ray_t* x) { for (int64_t p = 0; p < counts->len; p++) { int64_t v = 0; bool is_null = (keys->attrs & RAY_ATTR_HAS_NULLS) && ray_vec_is_null(keys, p); - if (!is_null) strlen_vec_value(keys, p, &v); + if (is_null) v = NULL_I64; + else strlen_vec_value(keys, p, &v); for (int64_t r = 0; r < cnt[p]; r++) { dst[off] = v; if (is_null) ray_vec_set_null(out, off, true); @@ -131,7 +132,7 @@ static ray_t* strlen_parted(ray_t* x) { bool has_nulls = (seg->attrs & RAY_ATTR_HAS_NULLS) != 0; for (int64_t i = 0; i < seg->len; i++) { if (has_nulls && ray_vec_is_null(seg, i)) { - dst[off] = 0; + dst[off] = NULL_I64; ray_vec_set_null(out, off, true); } else { strlen_vec_value(seg, i, &dst[off]); diff --git a/src/ops/temporal.c b/src/ops/temporal.c index d12d7dae..1abbdaf4 100644 --- a/src/ops/temporal.c +++ b/src/ops/temporal.c @@ -158,7 +158,7 @@ ray_t* ray_temporal_extract(ray_t* input, int field) { if (src_has_nulls) { for (int64_t i = 0; i < len; i++) { if (ray_vec_is_null(input, i)) { - out[i] = 0; + out[i] = NULL_I64; ray_vec_set_null(result, i, true); continue; } @@ -173,7 +173,7 @@ ray_t* ray_temporal_extract(ray_t* input, int field) { if (src_has_nulls) { for (int64_t i = 0; i < len; i++) { if (ray_vec_is_null(input, i)) { - out[i] = 0; + out[i] = NULL_I64; ray_vec_set_null(result, i, true); continue; } @@ -280,7 +280,7 @@ ray_t* ray_temporal_truncate(ray_t* input, int kind) { if (src_has_nulls) { for (int64_t i = 0; i < len; i++) { if (ray_vec_is_null(input, i)) { - out[i] = 0; ray_vec_set_null(result, i, true); continue; + out[i] = NULL_I64; ray_vec_set_null(result, i, true); continue; } int64_t us = rte_to_us(t, (int64_t)d32[i]); int64_t r = us % bucket; @@ -298,7 +298,7 @@ ray_t* ray_temporal_truncate(ray_t* input, int kind) { if (src_has_nulls) { for (int64_t i = 0; i < len; i++) { if (ray_vec_is_null(input, i)) { - out[i] = 0; ray_vec_set_null(result, i, true); continue; + out[i] = NULL_I64; ray_vec_set_null(result, i, true); continue; } int64_t us = rte_to_us(t, d64[i]); int64_t r = us % bucket; @@ -366,7 +366,7 @@ ray_t* exec_extract(ray_graph_t* g, ray_op_t* op) { int64_t n = m.morsel_len; \ for (int64_t i = 0; i < n; i++) { \ if (HAS_NULLS && ray_vec_is_null(input, off + i)) { \ - out[off + i] = 0; \ + out[off + i] = NULL_I64; \ ray_vec_set_null(result, off + i, true); \ continue; \ } \ @@ -518,7 +518,7 @@ ray_t* exec_date_trunc(ray_graph_t* g, ray_op_t* op) { int64_t n = m.morsel_len; \ for (int64_t i = 0; i < n; i++) { \ if (HAS_NULLS && ray_vec_is_null(input, off + i)) { \ - out[off + i] = 0; \ + out[off + i] = NULL_I64; \ ray_vec_set_null(result, off + i, true); \ continue; \ } \ diff --git a/src/ops/window.c b/src/ops/window.c index b118267f..d0619de8 100644 --- a/src/ops/window.c +++ b/src/ops/window.c @@ -1127,7 +1127,19 @@ ray_t* exec_window(ray_graph_t* g, ray_op_t* op, ray_t* tbl) { goto oom; } result_vecs[f]->len = nrows; - memset(ray_data(result_vecs[f]), 0, (size_t)nrows * 8); + /* Pre-stamp every slot with the width-correct null sentinel. The + * per-partition compute loops below write valid values into + * "active" slots and call win_set_null on null-producing slots + * without re-writing the payload — so the only way to honor the + * dual-encoding contract for those bitmap-only nulls is to make + * the payload already match the sentinel up front. */ + if (is_f64[f]) { + double* d = (double*)ray_data(result_vecs[f]); + for (int64_t i = 0; i < nrows; i++) d[i] = NULL_F64; + } else { + int64_t* d = (int64_t*)ray_data(result_vecs[f]); + for (int64_t i = 0; i < nrows; i++) d[i] = NULL_I64; + } } /* Order key vectors start at sort_vecs[n_part] */ diff --git a/src/vec/atom.c b/src/vec/atom.c index 2d4b487b..20eaeaf1 100644 --- a/src/vec/atom.c +++ b/src/vec/atom.c @@ -180,7 +180,13 @@ ray_t* ray_typed_null(int8_t type) { ray_t* v = ray_alloc(0); if (RAY_IS_ERR(v)) return v; v->type = type; - v->i64 = 0; + switch (type) { + case -RAY_F64: v->f64 = NULL_F64; break; + case -RAY_I64: case -RAY_TIMESTAMP: v->i64 = NULL_I64; break; + case -RAY_I32: case -RAY_DATE: case -RAY_TIME: v->i32 = NULL_I32; break; + case -RAY_I16: v->i16 = NULL_I16; break; + default: v->i64 = 0; break; + } v->nullmap[0] |= 1; return v; } diff --git a/test/rfl/null/bool_u8_lockdown.rfl b/test/rfl/null/bool_u8_lockdown.rfl new file mode 100644 index 00000000..01431e80 --- /dev/null +++ b/test/rfl/null/bool_u8_lockdown.rfl @@ -0,0 +1,22 @@ +;; Phase 1: BOOL and U8 are non-nullable. +;; +;; Empty cells in CSV ingest must materialize as false / 0 with no null mark. +;; All other nullable types still produce typed nulls as before. + +;; Sanity: typed nulls for the nullable types still parse and report null. +(nil? 0Nh) -- true +(nil? 0Ni) -- true +(nil? 0Nl) -- true +(nil? 0Nf) -- true + +;; CSV ingest: empty BOOL / U8 cells coerce to false / 0, not null. +(.sys.exec "rm -f /tmp/rfl_phase1_bool_u8_unique_path.csv") +(.sys.exec "printf 'b,u\\ntrue,1\\n,\\nfalse,3\\n' > /tmp/rfl_phase1_bool_u8_unique_path.csv") +(set P1Lockdown (.csv.read [B8 U8] "/tmp/rfl_phase1_bool_u8_unique_path.csv")) +(count P1Lockdown) -- 3 +(at P1Lockdown 'b) -- [true false false] +(at P1Lockdown 'u) -- [0x01 0x00 0x03] +(map nil? (at P1Lockdown 'b)) -- [false false false] +(map nil? (at P1Lockdown 'u)) -- [false false false] +(sum (map nil? (at P1Lockdown 'b))) -- 0 +(sum (map nil? (at P1Lockdown 'u))) -- 0 diff --git a/test/rfl/null/f64_dual_encoding.rfl b/test/rfl/null/f64_dual_encoding.rfl new file mode 100644 index 00000000..1ccdd0da --- /dev/null +++ b/test/rfl/null/f64_dual_encoding.rfl @@ -0,0 +1,52 @@ +;; Phase 2 dual-encoding contract: F64 nulls are NaN in the payload AND +;; have the nullmap bit set. Every consumer must agree on null-ness. + +;; ----- 1. Atom construction ----- + +(nil? 0Nf) -- true + +;; ----- 2. CSV ingest ----- + +(.sys.exec "rm -f /tmp/rfl_phase2_f64_dual.csv") +(.sys.exec "printf 'x\\n1.5\\n\\n3.5\\n' > /tmp/rfl_phase2_f64_dual.csv") +(set P2F (.csv.read [F64] "/tmp/rfl_phase2_f64_dual.csv")) +(count P2F) -- 3 +(nil? (at (at P2F 'x) 1)) -- true +(at (at P2F 'x) 0) -- 1.5 +(at (at P2F 'x) 2) -- 3.5 + +;; ----- 3. Aggregations exclude nulls ----- + +(sum (at P2F 'x)) -- 5.0 +(avg (at P2F 'x)) -- 2.5 +(min (at P2F 'x)) -- 1.5 +(max (at P2F 'x)) -- 3.5 + +;; ----- 4. Sort places nulls per policy ----- + +(at (asc (as 'F64 [3.0 0N 1.0 2.0])) 0) -- 0Nf +(at (desc (as 'F64 [3.0 0N 1.0 2.0])) 3) -- 0Nf + +;; ----- 5. Distinct collapses null rows ----- + +(count (distinct (as 'F64 [1.0 0N 2.0 0N 1.0]))) -- 3 +(nil? (at (distinct (as 'F64 [0N 0N 0N])) 0)) -- true +(count (distinct (as 'F64 [0N 0N 0N]))) -- 1 + +;; ----- 6. Group-by buckets non-null F64 rows correctly ----- + +(set Tg (table [v g] (list (as 'F64 [1.0 0N 2.0 0N 3.0]) [10 20 10 20 30]))) +(set Rg (select {c: (count v) from: Tg by: g})) +(count Rg) -- 3 + +;; ----- 7. UPDATE cast preserves null encoding ----- + +(set Tc (table [v] (list (as 'F64 [10.0 20.0 30.0])))) +(set Tu (update {from: Tc v: [1 0N 3]})) +(nil? (at (at Tu 'v) 1)) -- true +(sum (at Tu 'v)) -- 4.0 + +;; ----- 8. Fused-group parity: nullable F64 stays out of fused path ----- + +(set Tn (table [v g] (list (as 'F64 [1.0 2.0 0N 4.0 5.0]) [0 0 1 1 1]))) +(sum (at (select {s: (sum v) from: Tn where: (>= g 0) by: g}) 's)) -- 12.0 diff --git a/test/rfl/null/grouped_agg_null_correctness.rfl b/test/rfl/null/grouped_agg_null_correctness.rfl new file mode 100644 index 00000000..a35f839c --- /dev/null +++ b/test/rfl/null/grouped_agg_null_correctness.rfl @@ -0,0 +1,75 @@ +;; Phase 3 follow-up: per-(group, agg) non-null counts drive AVG/VAR/ +;; STDDEV divisors, and result-side null finalization replaces +;; accumulator seeds (DBL_MAX / -DBL_MAX / 0 / NaN product) for +;; MIN/MAX/PROD/FIRST/LAST on all-null groups. See +;; include/rayforce.h NULL_* paragraph. + +;; ----- AVG divisor excludes nulls ----- +;; Group g=0 has v in [1, 2, 0N, 4] — non-null sum = 7, non-null count = 3. +(set T (table [v g] (list [1 2 0N 4] [0 0 0 0]))) +(at (at (select {a: (avg v) from: T by: g}) 'a) 0) -- 2.3333333333333335 + +;; ----- All-null group MIN returns typed null, not DBL_MAX / 0 ----- +(set Tn (table [v g] (list [0N 0N 5 6] [0 0 1 1]))) +(set Rn (select {m: (min v) from: Tn by: g})) +(nil? (at (at Rn 'm) 0)) -- true +(at (at Rn 'm) 1) -- 5 + +;; ----- All-null group MAX returns typed null ----- +(nil? (at (at (select {m: (max v) from: Tn by: g}) 'm) 0)) -- true + +;; ----- All-null group FIRST/LAST return typed null ----- +(nil? (at (at (select {f: (first v) from: Tn by: g}) 'f) 0)) -- true +(nil? (at (at (select {l: (last v) from: Tn by: g}) 'l) 0)) -- true + +;; ----- FIRST/LAST skip null prefix/suffix (first/last non-null semantics) ----- +(set Tp (table [v g] (list [0N 0N 7 8] [0 0 0 0]))) +(at (at (select {f: (first v) from: Tp by: g}) 'f) 0) -- 7 +(at (at (select {l: (last v) from: Tp by: g}) 'l) 0) -- 8 + +;; ----- F64 equivalents — same accumulator paths, NaN-skip variant ----- +(set Tf (table [v g] (list (as 'F64 [0N 0N 3.0 4.0]) [0 0 1 1]))) +(nil? (at (at (select {m: (min v) from: Tf by: g}) 'm) 0)) -- true +(at (at (select {m: (min v) from: Tf by: g}) 'm) 1) -- 3.0 +(nil? (at (at (select {m: (max v) from: Tf by: g}) 'm) 0)) -- true +(nil? (at (at (select {f: (first v) from: Tf by: g}) 'f) 0)) -- true +(nil? (at (at (select {l: (last v) from: Tf by: g}) 'l) 0)) -- true + +;; ----- F64 AVG divisor excludes NaN-tagged null rows ----- +(set Tfa (table [v g] (list (as 'F64 [1.0 2.0 0N 4.0]) [0 0 0 0]))) +(at (at (select {a: (avg v) from: Tfa by: g}) 'a) 0) -- 2.3333333333333335 + +;; ----- PROD on all-null group returns typed null (not 0 or initial seed) ----- +(set Tprod (table [v g] (list [0N 0N 2 3] [0 0 1 1]))) +(set Rprod (select {p: (prod v) from: Tprod by: g})) +(nil? (at (at Rprod 'p) 0)) -- true +(at (at Rprod 'p) 1) -- 6 + +;; ----- PROD on F64 all-null group returns typed null (no NaN bleed) ----- +(set Tprf (table [v g] (list (as 'F64 [0N 0N 2.0 3.0]) [0 0 1 1]))) +(set Rprf (select {p: (prod v) from: Tprf by: g})) +(nil? (at (at Rprf 'p) 0)) -- true +(at (at Rprf 'p) 1) -- 6.0 + +;; ----- Mixed null and non-null rows: PROD multiplies only non-null values ----- +(set Tprx (table [v g] (list [2 0N 3 0N 5] [0 0 0 0 0]))) +(at (at (select {p: (prod v) from: Tprx by: g}) 'p) 0) -- 30 + +;; ----- FIRST/LAST in scalar (no by:) path with null prefix/suffix ----- +;; scalar_accum_row is a distinct code path from da_accum_row. +(at (at (select {f: (first v) from: (table [v] (list [0N 0N 9 10]))}) 'f) 0) -- 9 +(at (at (select {l: (last v) from: (table [v] (list [9 10 0N 0N]))}) 'l) 0) -- 10 + +;; ----- Scalar AVG over all-null returns typed null ----- +(nil? (at (at (select {a: (avg v) from: (table [v] (list [0N 0N 0N]))}) 'a) 0)) -- true + +;; ----- Scalar MIN/MAX/PROD on all-null return typed null ----- +(nil? (at (at (select {m: (min v) from: (table [v] (list [0N 0N]))}) 'm) 0)) -- true +(nil? (at (at (select {m: (max v) from: (table [v] (list [0N 0N]))}) 'm) 0)) -- true +(nil? (at (at (select {p: (prod v) from: (table [v] (list [0N 0N]))}) 'p) 0)) -- true + +;; ----- STDDEV/VAR on a group with insufficient non-null rows is null ----- +;; Population variance needs ≥1 non-null; sample variance needs ≥2. +(set Tv (table [v g] (list [0N 0N 1 2 3] [0 0 1 1 1]))) +(nil? (at (at (select {s: (stddev v) from: Tv by: g}) 's) 0)) -- true +(nil? (at (at (select {s: (var v) from: Tv by: g}) 's) 0)) -- true diff --git a/test/rfl/null/integer_dual_encoding.rfl b/test/rfl/null/integer_dual_encoding.rfl new file mode 100644 index 00000000..7cc330f3 --- /dev/null +++ b/test/rfl/null/integer_dual_encoding.rfl @@ -0,0 +1,59 @@ +;; Phase 3a dual-encoding contract: integer/temporal nulls hold the +;; INT_MIN sentinel in the payload AND have the nullmap bit set. + +;; ----- 1. Atom construction ----- + +(nil? 0Nh) -- true +(nil? 0Ni) -- true +(nil? 0Nl) -- true + +;; ----- 2. CSV ingest (I64) ----- + +(.sys.exec "rm -f /tmp/rfl_phase3a_int_dual.csv") +(.sys.exec "printf 'x\\n10\\n\\n30\\n' > /tmp/rfl_phase3a_int_dual.csv") +(set P3I (.csv.read [I64] "/tmp/rfl_phase3a_int_dual.csv")) +(count P3I) -- 3 +(nil? (at (at P3I 'x) 1)) -- true +(at (at P3I 'x) 0) -- 10 +(at (at P3I 'x) 2) -- 30 + +;; ----- 3. Aggregations exclude nulls ----- + +(sum (at P3I 'x)) -- 40 +(min (at P3I 'x)) -- 10 +(max (at P3I 'x)) -- 30 + +;; ----- 4. Sort places nulls per policy ----- + +(at (asc [3 0N 1 2]) 0) -- 0Nl +(at (desc [3 0N 1 2]) 3) -- 0Nl + +;; ----- 5. Distinct collapses null rows ----- + +(count (distinct [1 0N 2 0N 1])) -- 3 +(nil? (at (distinct [0N 0N 0N]) 0)) -- true +(count (distinct [0N 0N 0N])) -- 1 + +;; ----- 6. Group-by SUM on nullable I64 (consumer NaN/sentinel-skip) ----- + +(set Tn (table [v g] (list [1 2 0N 4 5] [0 0 1 1 1]))) +(sum (at (select {s: (sum v) from: Tn where: (>= g 0) by: g}) 's)) -- 12 + +;; ----- 7. UPDATE with typed-null broadcast preserves sentinel encoding ----- + +(set Tc (table [a] (list [10 20 30]))) +(set Tu (update {a: 0Nl from: Tc})) +(nil? (at (at Tu 'a) 0)) -- true +(nil? (at (at Tu 'a) 1)) -- true +(nil? (at (at Tu 'a) 2)) -- true + +;; ----- 8. Group-by with null I64 keys buckets nulls together ----- + +(set Tk (table [k v] (list [1 0N 1 0N 2] [10 20 30 40 50]))) +(count (select {c: (count v) from: Tk where: (>= v 0) by: k})) -- 3 + +;; ----- 9. Cast preserves null encoding across widths ----- + +(nil? (at (as 'I32 [1 0N 3]) 1)) -- true +(nil? (at (as 'I16 [1 0N 3]) 1)) -- true +(nil? (at (as 'I64 (as 'I16 [1 0N 3])) 1)) -- true diff --git a/test/test_atom.c b/test/test_atom.c index 00fb1b6e..34b382c7 100644 --- a/test/test_atom.c +++ b/test/test_atom.c @@ -456,6 +456,88 @@ static test_result_t test_atom_eq_list_sym_atoms(void) { PASS(); } +static test_result_t test_atom_typed_null_f64(void) { + /* Phase 2 dual-encoding: ray_typed_null(-RAY_F64) must store NaN in + * the f64 payload AND set nullmap[0]&1. Downstream kernels that + * read the slot raw (without consulting the bitmap) then see NaN. */ + ray_t* v = ray_typed_null(-RAY_F64); + TEST_ASSERT_NOT_NULL(v); + TEST_ASSERT_FALSE(RAY_IS_ERR(v)); + TEST_ASSERT_TRUE(ray_is_atom(v)); + TEST_ASSERT_EQ_I(v->type, -RAY_F64); + TEST_ASSERT_TRUE(v->f64 != v->f64); /* NaN by IEEE-754 */ + TEST_ASSERT_TRUE((v->nullmap[0] & 1) != 0); /* bitmap bit also set */ + ray_release(v); + PASS(); +} + +static test_result_t test_atom_typed_null_i64(void) { + /* Phase 3a: integer typed nulls now use INT_MIN sentinel + bitmap bit. */ + ray_t* v = ray_typed_null(-RAY_I64); + TEST_ASSERT_NOT_NULL(v); + TEST_ASSERT_FALSE(RAY_IS_ERR(v)); + TEST_ASSERT_EQ_I(v->type, -RAY_I64); + TEST_ASSERT_EQ_I(v->i64, NULL_I64); + TEST_ASSERT_TRUE((v->nullmap[0] & 1) != 0); + ray_release(v); + PASS(); +} + +static test_result_t test_atom_typed_null_i16(void) { + ray_t* v = ray_typed_null(-RAY_I16); + TEST_ASSERT_NOT_NULL(v); + TEST_ASSERT_FALSE(RAY_IS_ERR(v)); + TEST_ASSERT_EQ_I(v->type, -RAY_I16); + TEST_ASSERT_EQ_I(v->i16, NULL_I16); + TEST_ASSERT_TRUE((v->nullmap[0] & 1) != 0); + ray_release(v); + PASS(); +} + +static test_result_t test_atom_typed_null_i32(void) { + ray_t* v = ray_typed_null(-RAY_I32); + TEST_ASSERT_NOT_NULL(v); + TEST_ASSERT_FALSE(RAY_IS_ERR(v)); + TEST_ASSERT_EQ_I(v->type, -RAY_I32); + TEST_ASSERT_EQ_I(v->i32, NULL_I32); + TEST_ASSERT_TRUE((v->nullmap[0] & 1) != 0); + ray_release(v); + PASS(); +} + +static test_result_t test_atom_typed_null_date(void) { + ray_t* v = ray_typed_null(-RAY_DATE); + TEST_ASSERT_NOT_NULL(v); + TEST_ASSERT_FALSE(RAY_IS_ERR(v)); + TEST_ASSERT_EQ_I(v->type, -RAY_DATE); + TEST_ASSERT_EQ_I(v->i32, NULL_I32); + TEST_ASSERT_TRUE((v->nullmap[0] & 1) != 0); + ray_release(v); + PASS(); +} + +static test_result_t test_atom_typed_null_time(void) { + ray_t* v = ray_typed_null(-RAY_TIME); + TEST_ASSERT_NOT_NULL(v); + TEST_ASSERT_FALSE(RAY_IS_ERR(v)); + TEST_ASSERT_EQ_I(v->type, -RAY_TIME); + TEST_ASSERT_EQ_I(v->i32, NULL_I32); + TEST_ASSERT_TRUE((v->nullmap[0] & 1) != 0); + ray_release(v); + PASS(); +} + +static test_result_t test_atom_typed_null_timestamp(void) { + ray_t* v = ray_typed_null(-RAY_TIMESTAMP); + TEST_ASSERT_NOT_NULL(v); + TEST_ASSERT_FALSE(RAY_IS_ERR(v)); + TEST_ASSERT_EQ_I(v->type, -RAY_TIMESTAMP); + TEST_ASSERT_EQ_I(v->i64, NULL_I64); + TEST_ASSERT_TRUE((v->nullmap[0] & 1) != 0); + ray_release(v); + PASS(); +} + /* ---- Suite definition -------------------------------------------------- */ const test_entry_t atom_entries[] = { @@ -480,6 +562,13 @@ const test_entry_t atom_entries[] = { { "atom/eq_list_with_nulls", test_atom_eq_list_with_nulls, atom_setup, atom_teardown }, { "atom/eq_list_empty", test_atom_eq_list_empty, atom_setup, atom_teardown }, { "atom/eq_list_sym_atoms", test_atom_eq_list_sym_atoms, atom_setup, atom_teardown }, + { "atom/typed_null_f64", test_atom_typed_null_f64, atom_setup, atom_teardown }, + { "atom/typed_null_i64", test_atom_typed_null_i64, atom_setup, atom_teardown }, + { "atom/typed_null_i16", test_atom_typed_null_i16, atom_setup, atom_teardown }, + { "atom/typed_null_i32", test_atom_typed_null_i32, atom_setup, atom_teardown }, + { "atom/typed_null_date", test_atom_typed_null_date, atom_setup, atom_teardown }, + { "atom/typed_null_time", test_atom_typed_null_time, atom_setup, atom_teardown }, + { "atom/typed_null_timestamp", test_atom_typed_null_timestamp, atom_setup, atom_teardown }, { NULL, NULL, NULL, NULL }, }; diff --git a/test/test_compile.c b/test/test_compile.c index 73e45ef1..fc61f051 100644 --- a/test/test_compile.c +++ b/test/test_compile.c @@ -257,6 +257,366 @@ static test_result_t test_compile_vector_literal(void) { PASS(); } +/* ════════════════════════════════════════════════════════════════════ + * Phase 2e: F64 dual-encoding regression tests. + * + * Each consumer of an F64 vector with a null bit MUST see NULL_F64 + * (= NaN) in the raw `double` payload as well — kernels are allowed to + * read the slot without consulting the bitmap. These tests assert the + * payload, not the bitmap, by reading `((double*)ray_data(v))[idx]` and + * checking `x != x` (NaN's defining property). + * ════════════════════════════════════════════════════════════════════ */ + +static test_result_t test_compile_f64_mixed_literal_null_slot_is_nan(void) { + /* Mixed numeric literal [1.0 0N 3.0] promotes to F64 in parse.c. + * The integer null 0N (typed I64 null with i64=0) used to write 0.0 + * into the f64 slot, breaking the dual-encoding contract. */ + ray_t* r = ray_eval_str("[1.0 0N 3.0]"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on mixed F64 literal"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_F64, "expected F64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + double* d = (double*)ray_data(r); + TEST_ASSERT(d[0] == 1.0, "slot 0 should be 1.0"); + TEST_ASSERT(d[1] != d[1], "slot 1 (null) must be NaN"); + TEST_ASSERT(d[2] == 3.0, "slot 2 should be 3.0"); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_f64_cast_i64_null_slot_is_nan(void) { + /* (as 'F64 [1 0N 3]) — cast an I64 vector with a null slot to F64. + * The cast loop writes (double)src[i] regardless of null status, + * which used to leave 0.0 in the null F64 slot. Phase 2e routes + * the post-cast nullmap copy through a per-slot NULL_F64 fill. */ + ray_t* r = ray_eval_str("(as 'F64 [1 0N 3])"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on cast"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_F64, "expected F64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + double* d = (double*)ray_data(r); + TEST_ASSERT(d[0] == 1.0, "slot 0 should be 1.0"); + TEST_ASSERT(d[1] != d[1], "slot 1 (null) must be NaN"); + TEST_ASSERT(d[2] == 3.0, "slot 2 should be 3.0"); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_i32_cast_i64_null_slot_is_sentinel(void) { + /* Phase 3a: (as 'I32 [1 0N 3]) — narrowing I64→I32 cast over a vector + * with a null slot must leave NULL_I32 (INT32_MIN) in the payload, not + * the cast result (int32_t)NULL_I64 = 0. Mirror of the Phase 2e F64 + * post-cast NaN fill for integer destinations. */ + ray_t* r = ray_eval_str("(as 'I32 [1 0N 3])"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on cast"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I32, "expected I32 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int32_t* d = (int32_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[0], 1); + TEST_ASSERT_EQ_I(d[1], NULL_I32); + TEST_ASSERT_EQ_I(d[2], 3); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_i16_cast_i32_null_slot_is_sentinel(void) { + /* Phase 3a Hazard 3: chained narrowing I64→I32→I16 cast over a vector + * with a null slot must leave NULL_I16 (INT16_MIN) in the I16 payload, + * NOT (int16_t)NULL_I32 = 0. The destination-width sentinel must be + * written post-cast directly — propagating through the cast macro + * truncates the sentinel. */ + ray_t* r = ray_eval_str("(as 'I16 (as 'I32 [1 0N 3]))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on cast"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I16, "expected I16 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int16_t* d = (int16_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[0], 1); + TEST_ASSERT_EQ_I(d[1], NULL_I16); + TEST_ASSERT_EQ_I(d[2], 3); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_i64_cast_i32_null_slot_is_sentinel(void) { + /* Phase 3a: widening I32→I64 cast must still fill NULL_I64 in the + * null payload slot — the cast macro would write (int64_t)NULL_I32 + * = -2147483648, which collides with a legitimate I64 value. */ + ray_t* r = ray_eval_str("(as 'I64 (as 'I32 [1 0N 3]))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on cast"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int64_t* d = (int64_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[0], 1); + TEST_ASSERT_EQ_I(d[1], NULL_I64); + TEST_ASSERT_EQ_I(d[2], 3); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_i64_scalar_null_propagation_slot_is_sentinel(void) { + /* Phase 3a-4: a binary op with a scalar-null I64 operand should fill the + * I64 result payload with NULL_I64, not leave it as the kernel's output. + * `(+ 0Nl [1 2 3])` — scalar-null left operand triggers set_all_null + * with an I64 result vector. Mirror of the Phase 2e F64 NaN-fill. */ + ray_t* r = ray_eval_str("(+ 0Nl [1 2 3])"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on scalar-null add"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int64_t* d = (int64_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[0], NULL_I64); + TEST_ASSERT_EQ_I(d[1], NULL_I64); + TEST_ASSERT_EQ_I(d[2], NULL_I64); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 0)); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 2)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_update_promo_f64_to_i64_null_slot_is_sentinel(void) { + /* Phase 3a-5: UPDATE-WHERE that promotes an F64 expression with nulls into + * an I64 column must fill NULL_I64 in the destination payload, not the + * implementation-defined garbage from (int64_t)NaN. */ + ray_t* r = ray_eval_str( + "(do " + "(set t (table [a b] (list [10 20 30] [1 2 3]))) " + "(set u (update {a: [1.5 0Nf 3.5] where: (> b 0) from: t})) " + "(at u 'a))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on update promo f64->i64"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int64_t* d = (int64_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[1], NULL_I64); /* not (int64_t)NaN */ + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_update_promo_i64_to_f64_null_slot_is_sentinel(void) { + /* Phase 3a-5: UPDATE-WHERE that promotes an I64 expression with nulls into + * an F64 column must fill NULL_F64 in the destination payload, not + * (double)NULL_I64 (a large finite value). */ + ray_t* r = ray_eval_str( + "(do " + "(set t (table [a b] (list [1.0 2.0 3.0] [1 2 3]))) " + "(set u (update {a: [10 0Nl 30] where: (> b 0) from: t})) " + "(at u 'a))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on update promo i64->f64"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_F64, "expected F64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + double* d = (double*)ray_data(r); + TEST_ASSERT(d[1] != d[1], "slot 1 (null) must be NaN"); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_update_atom_broadcast_i64_null_slot_is_sentinel(void) { + /* Phase 3a-6: UPDATE that broadcasts an I64 typed-null atom into an + * I64 column should fill NULL_I64 into the destination payload, not 0. */ + ray_t* r = ray_eval_str( + "(do (set t (table [a] (list [10 20 30])))" + " (set u (update {a: 0Nl from: t}))" + " (at u 'a))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on update atom broadcast i64"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int64_t* d = (int64_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[0], NULL_I64); + TEST_ASSERT_EQ_I(d[1], NULL_I64); + TEST_ASSERT_EQ_I(d[2], NULL_I64); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 0)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_update_atom_broadcast_where_i64_null_slot_is_sentinel(void) { + /* Phase 3a-6: UPDATE-WHERE that broadcasts an I64 typed-null atom into + * an I64 column should fill NULL_I64 into masked slots only. */ + ray_t* r = ray_eval_str( + "(do (set t (table [a b] (list [10 20 30] [1 2 3])))" + " (set u (update {a: 0Nl where: (> b 1) from: t}))" + " (at u 'a))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on update-where atom broadcast i64"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int64_t* d = (int64_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[0], 10); /* unmasked — unchanged */ + TEST_ASSERT_EQ_I(d[1], NULL_I64); /* masked + null broadcast */ + TEST_ASSERT_EQ_I(d[2], NULL_I64); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 2)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_group_by_i64_null_key_slot_is_sentinel(void) { + /* Phase 3a-7: group-by on a nullable I64 column with a null row must + * write NULL_I64 into the result column's null slot, not 0. */ + ray_t* r = ray_eval_str( + "(do (set t (table [k v] (list [1 0Nl 2 0Nl 3] [10 20 30 40 50])))" + " (set r (select {c: (count v) from: t by: k}))" + " (at r 'k))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on group-by i64 null key"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + int64_t n = r->len; + bool found_null_slot = false; + int64_t* d = (int64_t*)ray_data(r); + for (int64_t i = 0; i < n; i++) { + if (ray_vec_is_null(r, i)) { + TEST_ASSERT_EQ_I(d[i], NULL_I64); /* slot also holds sentinel */ + found_null_slot = true; + } + } + TEST_ASSERT_TRUE(found_null_slot); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_pivot_i64_null_key_slot_is_sentinel(void) { + /* Phase 3a-8: pivot on a nullable I64 key column with null rows must + * fill NULL_I64 into the result index-column's null slot, not 0. */ + ray_t* r = ray_eval_str( + "(do (set t (table [k v c] (list [1 0Nl 2 0Nl 3] [10 20 30 40 50] ['a 'b 'a 'b 'c])))" + " (set p (pivot t 'k 'c 'v sum))" + " (at p 'k))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on pivot i64 null key"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + int64_t n = r->len; + bool found_null_slot = false; + int64_t* d = (int64_t*)ray_data(r); + for (int64_t i = 0; i < n; i++) { + if (ray_vec_is_null(r, i)) { + TEST_ASSERT_EQ_I(d[i], NULL_I64); /* slot also holds sentinel */ + found_null_slot = true; + } + } + TEST_ASSERT_TRUE(found_null_slot); + ray_release(r); + PASS(); +} + +/* ════════════════════════════════════════════════════════════════════ + * Phase 3a-13 regressions — producer-side dual-encoding gaps that + * surfaced from the cross-cut integration review (temporal extract, + * strlen, mark_i64_overflow_as_null, median_per_group). + * Each previously wrote 0 / 0.0 to the payload while flipping the null + * bitmap bit — bitmap-only nulls that violate the dual-encoding + * contract. After the fix the slot must carry the width-correct + * sentinel (NULL_I64 / NULL_F64) in addition to the bitmap bit. + * ════════════════════════════════════════════════════════════════════ */ +static test_result_t test_compile_temporal_extract_null_slot_is_sentinel(void) { + /* Phase 3a-13 (C1): extract over a nullable TIMESTAMP column must + * fill NULL_I64 in the result I64 payload — the kernel previously + * wrote 0 with a bitmap bit, which sentinel-aware readers see as a + * legitimate zero. */ + ray_t* r = ray_eval_str( + "(do (set __t13 (as 'TIMESTAMP (list 1000000000 0Np 2000000000)))" + " (yyyy __t13))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on temporal extract null"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int64_t* d = (int64_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[1], NULL_I64); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + TEST_ASSERT_FALSE(ray_vec_is_null(r, 0)); + TEST_ASSERT_FALSE(ray_vec_is_null(r, 2)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_strlen_null_slot_is_sentinel(void) { + /* Phase 3a-13 (C2): strlen over a nullable STR vector must fill + * NULL_I64 in the I64 payload, not 0. Mixed string-vec literal + * `[\"hello\" 0N \"x\"]` parses as a LIST; cast to STR to get a + * proper typed nullable STR vector. */ + ray_t* r = ray_eval_str("(strlen (as 'STR (concat \"hello\" (concat 0N \"x\"))))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on strlen null"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + TEST_ASSERT(r->len == 3, "expected len 3"); + int64_t* d = (int64_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[0], 5); + TEST_ASSERT_EQ_I(d[1], NULL_I64); + TEST_ASSERT_EQ_I(d[2], 1); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 1)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_overflow_neg_int64_min_slot_is_null_i64(void) { + /* Phase 3a-13 (C3): negating INT64_MIN over an i64 column produces + * INT64_MIN (k/q convention surfaces this as typed null). After + * Phase 3a-1 INT64_MIN IS NULL_I64 — mark_i64_overflow_as_null must + * leave the sentinel in place, not overwrite with 0. */ + ray_t* r = ray_eval_str( + "(do (set Vneg (concat -9223372036854775808 (concat -5 (concat 5 0))))" + " (set Tneg (table [v] (list Vneg)))" + " (at (select {x: (neg v) from: Tneg}) 'x))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on neg-overflow"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_I64, "expected I64 vector"); + TEST_ASSERT(r->len == 4, "expected len 4"); + int64_t* d = (int64_t*)ray_data(r); + TEST_ASSERT_EQ_I(d[0], NULL_I64); /* overflow row holds sentinel */ + TEST_ASSERT_TRUE(ray_vec_is_null(r, 0)); + ray_release(r); + PASS(); +} + +static test_result_t test_compile_median_per_group_all_null_slot_is_nan(void) { + /* Phase 3a-13 (C4 — closes Phase 2 gap): median over a per-group + * all-null F64 input must fill NULL_F64 in the result slot, not + * leave it as 0.0. */ + ray_t* r = ray_eval_str( + "(do (set __tm13 (table [k v] (list [1 1 2 2] [0Nf 0Nf 1.0 2.0])))" + " (set __rm13 (select {m: (med v) by: k from: __tm13}))" + " (at __rm13 'm))"); + TEST_ASSERT_NOT_NULL(r); + if (RAY_IS_ERR(r)) { ray_error_free(r); FAIL("eval error on median per-group"); } + TEST_ASSERT(ray_is_vec(r), "expected vector"); + TEST_ASSERT(r->type == RAY_F64, "expected F64 vector"); + TEST_ASSERT(r->len == 2, "expected len 2 (two groups)"); + double* d = (double*)ray_data(r); + /* Group k=1 is all-null → slot must be NaN with bitmap bit set. */ + TEST_ASSERT(d[0] != d[0], "all-null group slot must be NaN"); + TEST_ASSERT_TRUE(ray_vec_is_null(r, 0)); + /* Group k=2 has 1.0 and 2.0 → median 1.5. */ + TEST_ASSERT(d[1] == 1.5, "k=2 group median should be 1.5"); + TEST_ASSERT_FALSE(ray_vec_is_null(r, 1)); + ray_release(r); + PASS(); +} + /* ════════════════════════════════════════════════════════════════════ * 9. let with invalid (non-symbol) name — compile error path (line 244) * Triggers c->error = true in the let handler. @@ -612,6 +972,54 @@ const test_entry_t compile_entries[] = { { "compile/and_special_form", test_compile_and_special_form, compile_setup, compile_teardown }, { "compile/or_special_form", test_compile_or_special_form, compile_setup, compile_teardown }, { "compile/vector_literal", test_compile_vector_literal, compile_setup, compile_teardown }, + { "compile/f64_mixed_literal_null_slot_is_nan", + test_compile_f64_mixed_literal_null_slot_is_nan, + compile_setup, compile_teardown }, + { "compile/f64_cast_i64_null_slot_is_nan", + test_compile_f64_cast_i64_null_slot_is_nan, + compile_setup, compile_teardown }, + { "compile/i32_cast_i64_null_slot_is_sentinel", + test_compile_i32_cast_i64_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/i16_cast_i32_null_slot_is_sentinel", + test_compile_i16_cast_i32_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/i64_cast_i32_null_slot_is_sentinel", + test_compile_i64_cast_i32_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/i64_scalar_null_propagation_slot_is_sentinel", + test_compile_i64_scalar_null_propagation_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/update_promo_f64_to_i64_null_slot_is_sentinel", + test_compile_update_promo_f64_to_i64_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/update_promo_i64_to_f64_null_slot_is_sentinel", + test_compile_update_promo_i64_to_f64_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/update_atom_broadcast_i64_null_slot_is_sentinel", + test_compile_update_atom_broadcast_i64_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/update_atom_broadcast_where_i64_null_slot_is_sentinel", + test_compile_update_atom_broadcast_where_i64_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/group_by_i64_null_key_slot_is_sentinel", + test_compile_group_by_i64_null_key_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/pivot_i64_null_key_slot_is_sentinel", + test_compile_pivot_i64_null_key_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/temporal_extract_null_slot_is_sentinel", + test_compile_temporal_extract_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/strlen_null_slot_is_sentinel", + test_compile_strlen_null_slot_is_sentinel, + compile_setup, compile_teardown }, + { "compile/overflow_neg_int64_min_slot_is_null_i64", + test_compile_overflow_neg_int64_min_slot_is_null_i64, + compile_setup, compile_teardown }, + { "compile/median_per_group_all_null_slot_is_nan", + test_compile_median_per_group_all_null_slot_is_nan, + compile_setup, compile_teardown }, { "compile/let_reserved_name", test_compile_let_reserved_name, compile_setup, compile_teardown }, { "compile/unary_wrong_arity", test_compile_unary_wrong_arity, compile_setup, compile_teardown }, { "compile/binary_wrong_arity", test_compile_binary_wrong_arity, compile_setup, compile_teardown }, diff --git a/test/test_csv.c b/test/test_csv.c index 59d24873..511b4f2f 100644 --- a/test/test_csv.c +++ b/test/test_csv.c @@ -190,7 +190,11 @@ static test_result_t test_csv_null_i64(void) { TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); TEST_ASSERT_EQ_I(((int64_t*)ray_data(col))[0], 10); + + /* Phase 3a: empty I64 cell must be both bitmap-null AND NULL_I64-in-slot. */ TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); + TEST_ASSERT_EQ_I(((int64_t*)ray_data(col))[1], NULL_I64); + TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); TEST_ASSERT_EQ_I(((int64_t*)ray_data(col))[2], 30); @@ -215,7 +219,11 @@ static test_result_t test_csv_null_i64_unparseable(void) { ray_t* col = ray_table_get_col_idx(loaded, 0); TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); TEST_ASSERT_EQ_I(((int64_t*)ray_data(col))[0], 10); + + /* Phase 3a: unparseable I64 cell must be both bitmap-null AND NULL_I64-in-slot. */ TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); + TEST_ASSERT_EQ_I(((int64_t*)ray_data(col))[1], NULL_I64); + TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); TEST_ASSERT_EQ_I(((int64_t*)ray_data(col))[2], 30); @@ -240,7 +248,12 @@ static test_result_t test_csv_null_f64(void) { ray_t* col = ray_table_get_col_idx(loaded, 0); TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); TEST_ASSERT_EQ_F(((double*)ray_data(col))[0], 1.5, 1e-6); + + /* Phase 2: empty F64 cell must be both bitmap-null AND NaN-in-slot. */ TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); + double slot1 = ((double*)ray_data(col))[1]; + TEST_ASSERT_TRUE(slot1 != slot1); /* NaN check */ + TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); TEST_ASSERT_EQ_F(((double*)ray_data(col))[2], 3.5, 1e-6); @@ -251,7 +264,159 @@ static test_result_t test_csv_null_f64(void) { PASS(); } +/* Phase 3a: empty I16 cell must be both bitmap-null AND NULL_I16-in-slot. */ +static test_result_t test_csv_null_i16(void) { + ray_heap_init(); + (void)ray_sym_init(); + + FILE* f = fopen(TMP_CSV, "w"); + fprintf(f, "x\n10\n\n30\n"); + fclose(f); + + int8_t schema[1] = { RAY_I16 }; + ray_t* loaded = ray_read_csv_opts(TMP_CSV, 0, true, schema, 1); + TEST_ASSERT_FALSE(RAY_IS_ERR(loaded)); + + ray_t* col = ray_table_get_col_idx(loaded, 0); + TEST_ASSERT_EQ_I(col->type, RAY_I16); + TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); + TEST_ASSERT_EQ_I(((int16_t*)ray_data(col))[0], 10); + + TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); + TEST_ASSERT_EQ_I(((int16_t*)ray_data(col))[1], NULL_I16); + + TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); + TEST_ASSERT_EQ_I(((int16_t*)ray_data(col))[2], 30); + + ray_release(loaded); + unlink(TMP_CSV); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Phase 3a: empty I32 cell must be both bitmap-null AND NULL_I32-in-slot. */ +static test_result_t test_csv_null_i32(void) { + ray_heap_init(); + (void)ray_sym_init(); + + FILE* f = fopen(TMP_CSV, "w"); + fprintf(f, "x\n10\n\n30\n"); + fclose(f); + + int8_t schema[1] = { RAY_I32 }; + ray_t* loaded = ray_read_csv_opts(TMP_CSV, 0, true, schema, 1); + TEST_ASSERT_FALSE(RAY_IS_ERR(loaded)); + + ray_t* col = ray_table_get_col_idx(loaded, 0); + TEST_ASSERT_EQ_I(col->type, RAY_I32); + TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); + TEST_ASSERT_EQ_I(((int32_t*)ray_data(col))[0], 10); + + TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); + TEST_ASSERT_EQ_I(((int32_t*)ray_data(col))[1], NULL_I32); + + TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); + TEST_ASSERT_EQ_I(((int32_t*)ray_data(col))[2], 30); + + ray_release(loaded); + unlink(TMP_CSV); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Phase 3a: empty DATE cell must be both bitmap-null AND NULL_I32-in-slot. */ +static test_result_t test_csv_null_date(void) { + ray_heap_init(); + (void)ray_sym_init(); + + FILE* f = fopen(TMP_CSV, "w"); + fprintf(f, "d\n2025-01-02\n\n2026-12-31\n"); + fclose(f); + + int8_t schema[1] = { RAY_DATE }; + ray_t* loaded = ray_read_csv_opts(TMP_CSV, 0, true, schema, 1); + TEST_ASSERT_FALSE(RAY_IS_ERR(loaded)); + + ray_t* col = ray_table_get_col_idx(loaded, 0); + TEST_ASSERT_EQ_I(col->type, RAY_DATE); + TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); + + TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); + TEST_ASSERT_EQ_I(((int32_t*)ray_data(col))[1], NULL_I32); + + TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); + + ray_release(loaded); + unlink(TMP_CSV); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Phase 3a: empty TIME cell must be both bitmap-null AND NULL_I32-in-slot. */ +static test_result_t test_csv_null_time(void) { + ray_heap_init(); + (void)ray_sym_init(); + + FILE* f = fopen(TMP_CSV, "w"); + fprintf(f, "t\n12:34:56\n\n23:59:59\n"); + fclose(f); + + int8_t schema[1] = { RAY_TIME }; + ray_t* loaded = ray_read_csv_opts(TMP_CSV, 0, true, schema, 1); + TEST_ASSERT_FALSE(RAY_IS_ERR(loaded)); + + ray_t* col = ray_table_get_col_idx(loaded, 0); + TEST_ASSERT_EQ_I(col->type, RAY_TIME); + TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); + + TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); + TEST_ASSERT_EQ_I(((int32_t*)ray_data(col))[1], NULL_I32); + + TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); + + ray_release(loaded); + unlink(TMP_CSV); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Phase 3a: empty TIMESTAMP cell must be both bitmap-null AND NULL_I64-in-slot. */ +static test_result_t test_csv_null_timestamp(void) { + ray_heap_init(); + (void)ray_sym_init(); + + FILE* f = fopen(TMP_CSV, "w"); + fprintf(f, "ts\n2025-01-02T03:04:05\n\n2026-12-31T23:59:59\n"); + fclose(f); + + int8_t schema[1] = { RAY_TIMESTAMP }; + ray_t* loaded = ray_read_csv_opts(TMP_CSV, 0, true, schema, 1); + TEST_ASSERT_FALSE(RAY_IS_ERR(loaded)); + + ray_t* col = ray_table_get_col_idx(loaded, 0); + TEST_ASSERT_EQ_I(col->type, RAY_TIMESTAMP); + TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); + + TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); + TEST_ASSERT_EQ_I(((int64_t*)ray_data(col))[1], NULL_I64); + + TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); + + ray_release(loaded); + unlink(TMP_CSV); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + static test_result_t test_csv_null_bool(void) { + /* v4 contract (Phase 1 lockdown): BOOL is non-nullable. Empty cells + * materialize as `false`, not as a null bit — the BOOL column has + * neither HAS_NULLS nor any set bitmap bits. */ ray_heap_init(); (void)ray_sym_init(); @@ -265,9 +430,11 @@ static test_result_t test_csv_null_bool(void) { ray_t* col = ray_table_get_col_idx(loaded, 0); TEST_ASSERT_FALSE(ray_vec_is_null(col, 0)); TEST_ASSERT_EQ_I((int)((uint8_t*)ray_data(col))[0], 1); - TEST_ASSERT_TRUE(ray_vec_is_null(col, 1)); /* empty */ + TEST_ASSERT_FALSE(ray_vec_is_null(col, 1)); /* empty → false */ + TEST_ASSERT_EQ_I((int)((uint8_t*)ray_data(col))[1], 0); TEST_ASSERT_FALSE(ray_vec_is_null(col, 2)); TEST_ASSERT_EQ_I((int)((uint8_t*)ray_data(col))[2], 0); + TEST_ASSERT_FALSE(col->attrs & RAY_ATTR_HAS_NULLS); ray_release(loaded); unlink(TMP_CSV); @@ -1251,8 +1418,10 @@ static test_result_t test_csv_explicit_i32_schema(void) { } static test_result_t test_csv_explicit_u8_schema_serial(void) { - /* Force the serial parse path (n_rows ≤ 8192) and exercise truncated-row - * fill defaults plus null-column path. */ + /* v4 contract (Phase 1 lockdown): U8 is non-nullable. Truncated rows + * still fill defaults (0), but no null bit is set and HAS_NULLS is + * stripped post-parse. Exercises the serial parse path + * (n_rows ≤ 8192) plus the past-row-boundary fill branch. */ ray_heap_init(); (void)ray_sym_init(); @@ -1275,17 +1444,15 @@ static test_result_t test_csv_explicit_u8_schema_serial(void) { ray_t* b = ray_table_get_col_idx(loaded, 1); TEST_ASSERT_EQ_I(a->type, RAY_U8); TEST_ASSERT_EQ_I(b->type, RAY_U8); - TEST_ASSERT_TRUE(b->attrs & RAY_ATTR_HAS_NULLS); + TEST_ASSERT_FALSE(b->attrs & RAY_ATTR_HAS_NULLS); const uint8_t* ad = (const uint8_t*)ray_data(a); const uint8_t* bd = (const uint8_t*)ray_data(b); for (int i = 0; i < 200; i++) { TEST_ASSERT_EQ_I((int)ad[i], i % 256); - if (i % 50 == 0) TEST_ASSERT_TRUE(ray_vec_is_null(b, i)); - else { - TEST_ASSERT_FALSE(ray_vec_is_null(b, i)); - TEST_ASSERT_EQ_I((int)bd[i], (i + 1) % 256); - } + TEST_ASSERT_FALSE(ray_vec_is_null(b, i)); + int expected_b = (i % 50 == 0) ? 0 : ((i + 1) % 256); + TEST_ASSERT_EQ_I((int)bd[i], expected_b); } ray_release(loaded); @@ -1328,6 +1495,11 @@ const test_entry_t csv_entries[] = { { "csv/null_i64", test_csv_null_i64, NULL, NULL }, { "csv/null_i64_unparseable", test_csv_null_i64_unparseable, NULL, NULL }, { "csv/null_f64", test_csv_null_f64, NULL, NULL }, + { "csv/null_i16", test_csv_null_i16, NULL, NULL }, + { "csv/null_i32", test_csv_null_i32, NULL, NULL }, + { "csv/null_date", test_csv_null_date, NULL, NULL }, + { "csv/null_time", test_csv_null_time, NULL, NULL }, + { "csv/null_timestamp", test_csv_null_timestamp, NULL, NULL }, { "csv/null_bool", test_csv_null_bool, NULL, NULL }, { "csv/null_sym", test_csv_null_sym, NULL, NULL }, { "csv/no_nulls_no_nullmap", test_csv_no_nulls_no_nullmap, NULL, NULL },