Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Jan 2, 2026

Summary by CodeRabbit

  • New Features

    • Batch ID tracking added across pipelines, jobs, journals and bookkeeping; table run info can return multiple batch-specific entries.
    • Appended-record-count tracked for ingestions and bookkeeping; journal CSV now includes batch IDs.
  • Chores

    • Project version bumped to 1.13.0-SNAPSHOT.
    • Embedded MongoDB dependency upgraded to 4.22.0.
    • Database schema extended with batch and appended-record columns.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 2, 2026

Walkthrough

Adds batch tracking and appended-record counts across APIs, models, bookkeeper implementations, storage backends, journal I/O, tests, and migrations; threads a new batchId: Long and appendedRecordCount: Option[Long] through bookkeeping and metastore flows and bumps project and embedded MongoDB versions.

Changes

Cohort / File(s) Summary
Version & Build
\pramen/pom.xml`, `pramen/api/pom.xml`, `pramen/core/pom.xml`, `pramen/extras/pom.xml`, `pramen/build.sbt`, `pramen/project/Versions.scala``
Bumped project/parent to 1.13.0-SNAPSHOT; set JVM target 1.8; upgraded embedded MongoDB to 4.22.0.
API Models & Reader
\pramen/api/src/main/scala/.../MetaTableRunInfo.scala`, `.../PipelineInfo.scala`, `.../MetastoreReader.scala``
Added batchId: Long and appendedRecordCount: Option[Long] to models; changed MetastoreReader.getTableRunInfo to accept batchId: Option[Long] and return Seq[MetaTableRunInfo].
Bookkeeper Core & Base
\pramen/core/src/main/scala/.../bookkeeper/Bookkeeper.scala`, `.../BookkeeperBase.scala`, `.../BookkeeperHadoop.scala``
Threaded batchId: Long into base constructors; added getDataChunks(..., batchId: Option[Long]); extended setRecordCount / save helpers to accept recordsAppended: Option[Long]; updated getFilter signatures to accept optional batchId.
Bookkeeper Implementations
\pramen/core/src/main/scala/.../BookkeeperMongoDb.scala`, `.../BookkeeperJdbc.scala`, `.../BookkeeperText.scala`, `.../BookkeeperDeltaBase.scala`, `.../BookkeeperDeltaPath.scala`, `.../BookkeeperNull.scala``
Constructors now accept batchId; added batch-aware getDataChunksFromStorage and deleteNonCurrentBatchRecords; persistence now records appendedRecordCount and batchId; Mongo model version and index adjusted for multi-batch.
Delta Table Backends
\pramen/core/src/main/scala_2.*.../BookkeeperDeltaTable.scala`, `pramen/core/src/main/scala/.../bookkeeper/BookkeeperDeltaBase.scala`, `.../BookkeeperDeltaPath.scala``
Added batchId param; switched writes to .format("delta") where applicable; added Delta init and deleteNonCurrentBatchRecords; added helpers to read all data matching filters including batchId.
Bookkeeping Schema & Models
\pramen/core/src/main/scala/.../bookkeeper/model/BookkeepingRecord.scala`, `.../BookkeepingRecords.scala`, `.../model/DataChunk.scala``
Added appendedRecordCount: Option[Long] and batchId: Option[Long] to BookkeepingRecord and DataChunk; updated DB projections and added DataChunk.fromRecord.
Journal Models & IO
\pramen/core/src/main/scala/.../journal/model/TaskCompleted.scala`, `TaskCompletedCsv.scala`, `JournalTask.scala`, `JournalTasks.scala`, `JournalHadoopCsv.scala`, `JournalJdbc.scala``
Propagated batchId into journal models, CSV serialization, and DB mapping (added batch_id column and CSV field).
Metastore & Pipeline Wiring
\pramen/core/src/main/scala/.../metastore/MetastoreImpl.scala`, `MetastoreReaderBase.scala`, `state/PipelineStateImpl.scala``
Metastore now queries bookkeeper.getDataChunks(..., batchId) and returns Seq[MetaTableRunInfo]; PipelineInfo now includes batchId.
DB Migrations
\pramen/core/src/main/scala/.../rdb/PramenDb.scala``
Bumped MODEL_VERSION (8 → 9) and added migration to add batch_id and appended_record_count columns to bookkeeping and journal tables for older DB versions.
Job Call Sites
\pramen/core/src/main/scala/.../pipeline/PythonTransformationJob.scala`, `SinkJob.scala`, `runner/task/TaskRunnerBase.scala``
Updated bookkeeper.setRecordCount invocations to pass recordsAppended: Option[Long] (None or Some(value)) in new arg position.
Utilities & Constants
\pramen/core/src/main/scala/.../utils/AlgorithmUtils.scala`, `SlickUtils.scala`, `CsvUtils.scala``
Added timing helper in AlgorithmUtils; made WARN_IF_LONGER_MS public; CSV serialization now unwraps Option values and sanitizes separators.
Tests — Mocks, Fixtures & Suites
many files under \pramen/core/src/test/...``
Tests, mocks and factories updated to include batchId and appended-record parameters; adapted embedded MongoDB mock to new flapdoodle API; updated many call sites to pass None/Some(...) and to construct per-batch bookkeeper instances; logging config adjusted for MongoDbSingleton.
Test Logging
\pramen/core/src/test/resources/log4j.properties`, `log4j2.properties``
Enabled WARN logging for MongoDbSingleton in test configs.

Sequence Diagram(s)

sequenceDiagram
  participant Job as Job / Task Runner
  participant Metastore as MetastoreReader / MetastoreImpl
  participant Bookkeeper as Bookkeeper (Mongo/JDBC/Delta/Text)
  participant Storage as Storage (Mongo / RDB / Delta / FS)

  Note over Job,Bookkeeper: New inputs: batchId, appendedRecordCount
  Job->>Bookkeeper: setRecordCount(table, infoDate, inputCount, outputCount, recordsAppended?, start, finish, isTransient)
  alt non-current batch deletion needed
    Bookkeeper->>Storage: deleteNonCurrentBatchRecords(table, infoDate) [uses batchId]
  end
  Bookkeeper->>Storage: saveRecordCountToStorage(DataChunk(..., batchId?, appendedRecordCount?))
  Job->>Metastore: getTableRunInfo(table, infoDate, batchId?)
  Metastore->>Bookkeeper: getDataChunks(table, infoDate, batchId?)
  Bookkeeper->>Storage: query(filter by table, infoDate range, optional batchId)
  Storage-->>Bookkeeper: Seq[DataChunk]
  Bookkeeper-->>Metastore: Seq[DataChunk]
  Metastore-->>Job: Seq[MetaTableRunInfo]
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 I nibbled code and stitched a seam,

Batch IDs now hop into the stream.
Appended counts tucked in a row,
Bookkeepers track where data go.
Mongo leapt forward — ready, gleam!

Pre-merge checks

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and specifically describes the main change: adding batch ID and appended record count to bookkeeping, which is the core focus of the extensive changeset across multiple files.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

📜 Recent review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3791376 and acacfba.

📒 Files selected for processing (1)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala
🧰 Additional context used
🧬 Code graph analysis (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (6)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
  • pramen (45-67)
  • pramen (118-121)
  • core (141-155)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (3)
  • pramen (94-113)
  • pramen (184-195)
  • deleteNonCurrentBatchRecords (197-200)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (2)
  • SlickUtils (27-161)
  • executeQuery (45-65)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (10)
  • batchId (32-32)
  • infoDate (24-24)
  • jobFinished (31-31)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • jobStarted (30-30)
  • BookkeepingRecords (22-37)
  • BookkeepingRecords (39-41)
  • pramenTableName (23-23)
  • pramenTableName (33-35)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)
  • DataChunk (28-37)
  • DataChunk (40-48)
  • fromRecord (45-47)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
  • BookkeepingRecord (19-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Coverage on Scala 2.12.18
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
🔇 Additional comments (10)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (10)

27-28: LGTM! Imports are properly used.

The new imports (WARN_IF_LONGER_MS, log, AlgorithmUtils, TimeUtils) are all utilized in the deleteNonCurrentBatchRecords method for performance monitoring and logging.


33-33: LGTM! Class signature correctly threads batchId.

The batchId parameter is now properly passed to the BookkeeperBase superclass, consistent with the PR objectives to add batch tracking to bookkeeping operations.


41-69: LGTM! Consistent use of DataChunk.fromRecord.

The mapping has been updated to use DataChunk.fromRecord, which is consistent with the pattern applied throughout the file and properly handles the new batchId and appendedRecordCount fields.


71-82: LGTM! New batch-aware data retrieval method.

The new getDataChunksFromStorage method properly accepts and passes the batchId parameter to enable batch-specific queries. Error handling and result mapping are consistent with other query methods in the class.


84-100: LGTM! Correct batch filtering for "latest" semantics.

Passing None for batchId is appropriate here since this method retrieves the latest chunk across all batches. Batch-specific queries are handled by the new getDataChunksFromStorage method.


102-113: LGTM! Consistent batch filtering pattern.

Passing None for batchId is consistent with counting data chunks across all batches, which is the expected behavior for this method.


115-133: LGTM! Signature properly extended with recordsAppended.

The method signature and BookkeepingRecord construction correctly include the new recordsAppended field and properly wrap batchId in an Option to match the expected types.


135-153: LGTM! Previous issue resolved and good performance monitoring.

The Slick Option column comparison on line 139 has been correctly fixed to use Option(batchId). The addition of elapsed-time monitoring is a good practice for tracking potentially slow delete operations.


155-157: LGTM! Standard override.

The override correctly returns the offsetManagement instance.


159-191: LGTM! Previous issue resolved and clean conditional filtering.

The Slick Option column comparison on line 188 has been correctly fixed to use Option(id). The conditional filtering pattern cleanly supports both batch-specific and batch-agnostic queries, making this method flexible and reusable.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala (1)

83-103: Inconsistent MongoDB availability checks.

Similar to JournalMongoDbSuite, the test gating condition uses mongoDbExecutable.nonEmpty (Line 83), but the before block still uses if (db != null) (Line 52). This inconsistency could cause test failures if mongoDbExecutable is available but db initialization fails.

🔎 Proposed fix to align availability checks
  before {
    pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;")
    pramenDb.setupDatabase()

-    if (db != null) {
+    if (mongoDbExecutable.nonEmpty) {
      if (db.doesCollectionExists(collectionName)) {
        db.dropCollection(collectionName)
      }
      if (db.doesCollectionExists(schemaCollectionName)) {
        db.dropCollection(schemaCollectionName)
      }
    }
  }
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)

272-272: Fix extra space in assertion.

There's an extraneous space between chunk1.get and .inputRecordCount.

🔎 Proposed fix
-        assert(chunk1.get.inputRecordCount == 201 || chunk1.get .inputRecordCount == 102)
+        assert(chunk1.get.inputRecordCount == 201 || chunk1.get.inputRecordCount == 102)
🧹 Nitpick comments (9)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (1)

33-33: Clarify the rationale for making this constant public.

The change from private val to val makes WARN_IF_LONGER_MS publicly accessible, but the connection to the PR's stated objective (adding batch ID and record counting) is unclear.

If the intent is to allow external configuration or testing, consider loading this threshold from application configuration rather than exposing a hardcoded constant. If external components need to reference this value, please clarify the use case.

pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala (2)

20-20: Remove unused import.

The import za.co.absa.pramen.core.utils.SlickUtils.log is not used anywhere in this file.

🔎 Proposed fix
-import za.co.absa.pramen.core.utils.SlickUtils.log

98-108: Consider passing actual duration to the onTimeout callback.

The onTimeout callback currently receives timeoutMs (the threshold), not the actual elapsed duration. Consider passing duration instead so the callback can log or act on the actual time taken.

🔎 Proposed refactor
-  final def resultWithTimeout[R](timeoutMs: Long)(action: => R)(onTimeout: Long => Unit): R = {
+  final def resultWithTimeout[R](timeoutMs: Long)(action: => R)(onTimeout: (Long, Long) => Unit): R = {
     val start = Instant.now
     val result = action
     val finish = Instant.now
 
     val duration = Duration.between(start, finish).toMillis
     if (duration > timeoutMs) {
-      onTimeout(timeoutMs)
+      onTimeout(timeoutMs, duration)
     }
     result
   }

This allows the callback to report both the threshold and actual duration, e.g., log.warn(s"Action exceeded ${timeoutMs}ms, took ${duration}ms").

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (1)

197-199: Consider adding a comment explaining the no-op.

The empty deleteNonCurrentBatchRecords is intentional since CSV-based storage doesn't support efficient in-place deletion. A brief comment would improve clarity for future maintainers.

🔎 Suggested documentation
  override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {
-
+    // No-op: CSV-based storage doesn't support efficient in-place deletion.
+    // Cross-batch replacement is not supported for BookkeeperText.
  }
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (1)

113-117: Consider sorting by jobFinished for consistency.

getBkAllData sorts by infoDate, while getBkData ultimately sorts by infoDate after grouping by jobFinished. For consistency with getLatestDataChunk semantics (which relies on .lastOption for the most recent job), consider sorting by jobFinished here as well.

🔎 Suggested change
  private[core] def getBkAllData(filter: Column): Seq[DataChunk] = {
    getBkDf(filter)
      .collect()
-      .sortBy(_.infoDate)
+      .sortBy(_.jobFinished)
  }
pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)

95-101: Consider adding format option for empty dataset.

The writeEmptyDataset method doesn't explicitly specify the delta format. While it may work, being explicit about the format ensures consistency.

🔎 Suggested change for explicit format
  override def writeEmptyDataset[T <: Product : universe.TypeTag : ClassTag](pathOrTable: String): Unit = {
    val df = Seq.empty[T].toDS

    df.write
+      .format("delta")
      .saveAsTable(pathOrTable)
  }
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)

93-99: Consider adding explicit delta format.

Same as the Scala 2.13 version - explicitly specifying the delta format would improve clarity.

🔎 Suggested change
  override def writeEmptyDataset[T <: Product : universe.TypeTag : ClassTag](pathOrTable: String): Unit = {
    val df = Seq.empty[T].toDS

    df.write
+      .format("delta")
      .saveAsTable(pathOrTable)
  }
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (2)

96-101: Consider adjusting the log level.

The log.info statement at line 99 prints all chunks for every query. This could generate significant log volume in production. Consider:

  • Changing to log.debug for normal operation
  • Keeping log.info only for specific diagnostic scenarios

Otherwise, the implementation correctly supports batch-aware filtering and sorting.


117-127: Consider adding timeout monitoring for consistency.

The deletion logic is correct, but unlike the JDBC version (lines 143-149), this implementation doesn't include timeout monitoring and warning logs for long-running operations.

For consistency and operational visibility, consider adding similar monitoring:

  • Wrap the deleteMany call with timing logic
  • Log warnings if deletion exceeds a threshold

This is optional but would improve operational consistency across bookkeeper implementations.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 388197d and cc942c1.

📒 Files selected for processing (71)
  • pramen/api/pom.xml
  • pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala
  • pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala
  • pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala
  • pramen/build.sbt
  • pramen/core/pom.xml
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecordMongoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala
  • pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/test/resources/log4j.properties
  • pramen/core/src/test/resources/log4j2.properties
  • pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/MongoDbFixture.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperNullSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTransientSuiteEager.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockMongoDbSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala
  • pramen/extras/pom.xml
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala
  • pramen/pom.xml
  • pramen/project/Versions.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.

Applied to files:

  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala
  • pramen/project/Versions.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTransientSuiteEager.scala
  • pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala
  • pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala
  • pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
🧬 Code graph analysis (41)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
  • infoDate (24-24)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (6)
  • batchId (32-32)
  • pramenTableName (23-23)
  • pramenTableName (34-36)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • appendedRecordCount (29-29)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala (1)
  • JournalTask (19-42)
pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/MongoDbFixture.scala (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala (1)
  • MongoDbSingleton (28-79)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala (2)
  • TaskCompleted (26-49)
  • TaskCompleted (51-94)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (4)
  • appendedRecordCount (29-29)
  • jobStarted (30-30)
  • jobFinished (31-31)
  • batchId (32-32)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • appendedRecordCount (32-32)
  • batchId (44-44)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (1)
  • getTableRunInfo (73-87)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (2)
  • getTableRunInfo (68-68)
  • batchId (76-76)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (2)
  • infoDate (24-24)
  • batchId (32-32)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • country (43-43)
  • batchId (44-44)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/MutableStack.scala (1)
  • nonEmpty (46-48)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (5)
  • appendedRecordCount (32-32)
  • batchId (44-44)
  • pramenTableName (24-24)
  • inputRecordCount (28-28)
  • outputRecordCount (30-30)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
  • BookkeepingRecord (19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (1)
  • SlickUtils (27-161)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • country (43-43)
  • batchId (44-44)
pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala (1)
  • pipelineInfo (45-51)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
  • getDataChunks (59-76)
pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • country (43-43)
  • batchId (44-44)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
  • getDataChunks (59-76)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
  • infoDate (24-24)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala (1)
pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala (1)
  • PipelineInfo (23-38)
pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • country (43-43)
  • batchId (44-44)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (8)
  • country (43-43)
  • batchId (44-44)
  • pipelineName (40-40)
  • startedAt (34-34)
  • finishedAt (35-35)
  • sparkApplicationId (38-38)
  • pipelineId (39-39)
  • tenant (42-42)
pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala (1)
  • PipelineInfo (23-38)
pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (1)
  • getTableRunInfo (73-87)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (2)
  • getTableRunInfo (68-68)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (2)
  • getTableRunInfo (167-167)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (2)
  • infoDate (24-24)
  • batchId (32-32)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala (1)
  • getBookkeeper (40-42)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala (1)
  • getBookkeeper (39-41)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
  • getBookkeeper (44-46)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
  • batchId (32-32)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
  • testBookKeeper (27-278)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • country (43-43)
  • batchId (44-44)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
  • batchId (32-32)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • country (43-43)
  • batchId (44-44)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (3)
  • batchId (32-32)
  • infoDateBegin (25-25)
  • infoDateEnd (26-26)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • core (141-155)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (1)
  • getFilter (159-191)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (1)
  • getFilter (129-166)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
  • getDataChunks (59-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala (1)
  • getBookkeeper (41-43)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
  • getBookkeeper (44-46)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
  • testBookKeeper (27-278)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
  • pramen (45-67)
  • pramen (118-121)
  • core (141-155)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (4)
  • pramen (94-113)
  • pramen (184-195)
  • BookkeeperText (32-39)
  • BookkeeperText (41-200)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
  • DataChunk (23-32)
  • DataChunk (35-49)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
  • getBookkeeper (44-46)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • country (43-43)
  • batchId (44-44)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
  • batchId (32-32)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (3)
  • getDataChunksFromStorage (68-72)
  • pramen (78-90)
  • pramen (107-111)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
  • pramen (45-67)
  • pramen (118-121)
  • getDateStr (157-157)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
  • DataChunk (23-32)
  • DataChunk (35-49)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/MutableStack.scala (1)
  • nonEmpty (46-48)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
  • getDataChunks (59-76)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/MutableStack.scala (1)
  • isEmpty (42-44)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala (1)
  • getBookkeeper (40-42)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
  • getBookkeeper (44-46)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
  • testBookKeeper (27-278)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecordMongoDb.scala (1)
  • BookkeepingRecordMongoDb (19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)
  • DataChunk (23-32)
  • DataChunk (35-49)
  • fromRecordMongo (45-48)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (9)
  • jobStarted (30-30)
  • jobFinished (31-31)
  • batchId (32-32)
  • infoDate (24-24)
  • infoDateBegin (25-25)
  • infoDateEnd (26-26)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • appendedRecordCount (29-29)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
  • DataChunk (23-32)
  • DataChunk (35-49)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (2)
  • getTableRunInfo (68-68)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (2)
  • getTableRunInfo (167-167)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (7)
  • infoDate (24-24)
  • batchId (32-32)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • appendedRecordCount (29-29)
  • jobStarted (30-30)
  • jobFinished (31-31)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala (1)
  • MetaTableRunInfo (21-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
  • BookkeepingRecord (19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecordMongoDb.scala (1)
  • BookkeepingRecordMongoDb (19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecordMongoDb.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (9)
  • infoDate (24-24)
  • infoDateBegin (25-25)
  • infoDateEnd (26-26)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • appendedRecordCount (29-29)
  • jobStarted (30-30)
  • jobFinished (31-31)
  • batchId (32-32)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (5)
  • getLatestProcessedDateFromStorage (43-60)
  • getDataChunksFromStorage (68-72)
  • getDataChunksCountFromStorage (74-76)
  • pramen (78-90)
  • pramen (107-111)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (2)
  • pramen (45-67)
  • pramen (118-121)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
  • infoDate (24-24)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
  • getDataChunks (59-76)
pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
  • pramen (45-67)
  • pramen (118-121)
  • core (141-155)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (4)
  • pramen (78-90)
  • pramen (107-111)
  • core (113-117)
  • core (119-128)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
  • DataChunk (23-32)
  • DataChunk (35-49)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (4)
  • pramen (94-113)
  • pramen (184-195)
  • BookkeeperText (32-39)
  • BookkeeperText (41-200)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala (1)
  • core (26-46)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
  • testBookKeeper (27-278)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (7)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
  • pramen (45-67)
  • pramen (118-121)
  • core (141-155)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (6)
  • pramen (217-224)
  • getDataChunksFromStorage (96-101)
  • getFilter (129-166)
  • getLatestDataChunkFromStorage (81-90)
  • saveRecordCountToStorage (103-115)
  • deleteNonCurrentBatchRecords (117-127)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (2)
  • SlickUtils (27-161)
  • executeQuery (45-65)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (10)
  • batchId (32-32)
  • infoDate (24-24)
  • jobFinished (31-31)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • jobStarted (30-30)
  • BookkeepingRecords (22-38)
  • BookkeepingRecords (40-42)
  • pramenTableName (23-23)
  • pramenTableName (34-36)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)
  • DataChunk (23-32)
  • DataChunk (35-49)
  • fromRecord (40-43)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
  • BookkeepingRecord (19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala (1)
  • run (79-125)
pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
  • DataChunk (23-32)
  • DataChunk (35-49)
pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
  • deleteNonCurrentBatchRecords (71-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18

Comment on lines 21 to 28
case class MetaTableRunInfo(tableName: String,
infoDate: LocalDate,
batchId: Long,
inputRecordCount: Long,
outputRecordCount: Long,
appendedRecordCount: Option[Long],
jobStarted: Instant,
jobFinished: Instant)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
echo "=== Searching for MetaTableRunInfo construction sites ==="
rg -n 'MetaTableRunInfo\s*\(' --type=scala -A3

echo -e "\n=== Searching for pattern matches on MetaTableRunInfo ==="
rg -n 'MetaTableRunInfo\s*\(' --type=scala | grep -E '(case|match|extract)'

echo -e "\n=== Searching for MetaTableRunInfo in the codebase ==="
rg -n 'MetaTableRunInfo' --type=scala

Repository: AbsaOSS/pramen

Length of output: 2571


🏁 Script executed:

#!/bin/bash
echo "=== Full MetaTableRunInfo construction in MetastoreReaderBase.scala ==="
sed -n '76,90p' pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala

echo -e "\n=== Case class definition in MetaTableRunInfo.scala ==="
sed -n '21,28p' pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala

echo -e "\n=== Search for other MetaTableRunInfo creations (broader search) ==="
rg -n 'new MetaTableRunInfo|MetaTableRunInfo\(' --type=scala -B2 -A8 | grep -v test | head -100

Repository: AbsaOSS/pramen

Length of output: 3638


🏁 Script executed:

#!/bin/bash
echo "=== Checking test files for MetaTableRunInfo construction ==="
find . -path ./target -prune -o -name "*Test.scala" -o -name "*Spec.scala" -type f | xargs grep -l "MetaTableRunInfo" 2>/dev/null

echo -e "\n=== Looking for MetaTableRunInfo construction in test files ==="
rg -n 'MetaTableRunInfo\(' --type=scala pramen/core/src/test | head -50

echo -e "\n=== Checking for deprecation or version info ==="
cat README.md 2>/dev/null | head -50

echo -e "\n=== Checking pom.xml or build.sbt for version ==="
find . -name "pom.xml" -o -name "build.sbt" | xargs head -30 2>/dev/null | head -80

Repository: AbsaOSS/pramen

Length of output: 6668


Breaking API change: case class constructor signature modified.

Adding batchId and appendedRecordCount fields to the MetaTableRunInfo case class is a breaking change for external consumers of this public API. While the internal construction site in MetastoreReaderBase.scala has been properly updated to include all parameters, external code constructing this type will fail.

For a public API release, ensure:

  1. A changelog entry documents this breaking change
  2. Migration guidance is provided for users (e.g., "if upgrading from X.Y, update MetaTableRunInfo construction calls to include the two new parameters")
  3. Consider a major version bump if not already planned
🤖 Prompt for AI Agents
In pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala around
lines 21 to 28, the case class constructor was changed by adding batchId and
appendedRecordCount which is a breaking API change; update the release to
document this and provide migration guidance: add a changelog entry that
describes the new fields and their types, include a short migration note showing
example construction before and after (e.g., indicate default or derived values
callers should supply), and if this is part of a public release coordinate a
major version bump (or clearly state compatibility implications) so downstream
users are informed.

Comment on lines 70 to 75
override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {
val infoDateStr = DataChunk.dateFormatter.format(infoDate)
val filter = (col("tableName") === lit(table)) && (col("infoDate") === lit(infoDateStr)) && (col("batchId") =!= lit(batchId))

spark.sql(s"DROM FROM $schemasFullTableName WHERE tableName='$table' AND infoDate='$infoDateStr' AND batchId != $batchId").count()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical bugs in deleteNonCurrentBatchRecords.

This method has multiple issues:

  1. Typo: DROM should be DELETE
  2. Wrong table: Uses schemasFullTableName instead of recordsFullTableName
  3. Unused filter: The filter Column variable (lines 72-73) is constructed but never used
  4. Useless .count(): DELETE statements don't return a result set to count
🔎 Proposed fix
  override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {
    val infoDateStr = DataChunk.dateFormatter.format(infoDate)
-    val filter = (col("tableName") === lit(table)) && (col("infoDate") === lit(infoDateStr)) && (col("batchId") =!= lit(batchId))
-
-    spark.sql(s"DROM FROM $schemasFullTableName WHERE tableName='$table' AND infoDate='$infoDateStr' AND batchId != $batchId").count()
+
+    spark.sql(s"DELETE FROM $recordsFullTableName WHERE tableName='$table' AND infoDate='$infoDateStr' AND batchId != $batchId")
  }
🤖 Prompt for AI Agents
In
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
around lines 70 to 75, fix deleteNonCurrentBatchRecords by: replace the typo
"DROM" with "DELETE", target recordsFullTableName instead of
schemasFullTableName, remove the unused filter Column (or use it if you prefer
DataFrame API), and stop calling .count() on the SQL execution — just run
spark.sql(...) to execute the DELETE statement (no result aggregation). Ensure
the SQL WHERE clause uses table, formatted infoDateStr and batchId correctly and
keep the method returning Unit.

Comment on lines 40 to 41
country: Option[String],
batchId: Long
Copy link

@coderabbitai coderabbitai bot Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, find the TaskCompletedCsv file and examine its structure
find . -name "TaskCompletedCsv.scala" -type f

Repository: AbsaOSS/pramen

Length of output: 146


🏁 Script executed:

# Search for CSV parsing/deserialization logic for TaskCompletedCsv
rg -n "TaskCompletedCsv" --type=scala -A 5 -B 5

Repository: AbsaOSS/pramen

Length of output: 5989


🏁 Script executed:

# Check for JournalTask definition to compare batchId types
rg -n "class JournalTask|case class JournalTask" --type=scala -A 10

Repository: AbsaOSS/pramen

Length of output: 3242


🏁 Script executed:

cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala

Repository: AbsaOSS/pramen

Length of output: 2266


🏁 Script executed:

cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala | head -60

Repository: AbsaOSS/pramen

Length of output: 2190


🏁 Script executed:

# Check the mapping from TaskCompletedCsv to TaskCompleted in JournalHadoopCsv.scala
sed -n '65,100p' pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala

Repository: AbsaOSS/pramen

Length of output: 1606


🏁 Script executed:

rg -n "case class TaskCompleted" --type=scala -A 25

Repository: AbsaOSS/pramen

Length of output: 10398


🏁 Script executed:

# Check if there's any CSV column mapping or schema handling that might handle optional columns
rg -n "batchId" --type=scala -B 2 -A 2

Repository: AbsaOSS/pramen

Length of output: 50371


Make batchId optional in TaskCompletedCsv for backwards compatibility.

The field is declared as Long (required), but:

  • JournalTask.batchId is Option[Long]
  • TaskCompleted.batchId is Long

Since TaskCompletedCsv is deserialized from CSV with Spark's schema enforcement, existing CSV files without a batchId column will fail to deserialize. Change batchId: Long to batchId: Option[Long] in TaskCompletedCsv to support old journal data while maintaining the required Long type in the downstream TaskCompleted model (apply .getOrElse() in the mapping logic if a default is needed).

🤖 Prompt for AI Agents
In
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala
around lines 40 to 41, change the batchId field from a required Long to an
Option[Long] to allow deserialization of legacy CSVs that lack the batchId
column; update the case class signature accordingly and ensure downstream
mapping to TaskCompleted preserves required Long semantics by calling
.getOrElse(...) or providing a suitable default where TaskCompleted expects a
Long.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskCompleted is created in runtime so batchId is always defined. JournalTask is stored and loaded from a database, so it is optional in order to keep backwards compatibility with existing data.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yruslan, thank you for the clarification! I understand now:

  • TaskCompleted is created at runtime during job execution, so batchId is always present and can be Long
  • JournalTask is persisted to and loaded from the database, so batchId is Option[Long] for backwards compatibility with existing DB records
  • TaskCompletedCsv is written to journal.csv files at runtime (from TaskCompleted), so it will always have batchId defined

This makes sense. The only edge case would be if there are existing journal.csv files (written before this change) that need to be read by the new code. If those files don't have the batchId column, Spark's CSV reader would fail during deserialization in getEntries. However, if:

  • Journal CSV files are ephemeral/cleaned regularly, or
  • This is a new deployment, or
  • You have a separate migration path for old journal files

...then the current approach is fine. Since you've considered this distinction between runtime and persisted data, I'll defer to your design decision here.

Comment on lines +48 to 76
private def startEmbeddedMongoDb(): (Option[RunningMongodProcess], Int) = {
try {
val version: Version = Version.V8_2_2
val mongod = Mongod.builder()
.processOutput(
Start.to(classOf[ProcessOutput]).initializedWith(
ProcessOutput.builder()
.output(new Slf4jProcessor(log, "[mongod-out] "))
.error(new Slf4jProcessor(log, "[mongod-err] "))
.commands(new Slf4jProcessor(log, "[mongod-cmd] "))
.build()
)
)
.build()

val starter = MongodStarter.getInstance(runtimeConfig)
val executable: TransitionWalker.ReachedState[RunningMongodProcess] =
mongod.transitions(version)
.walker()
.initState(StateID.of(classOf[RunningMongodProcess]))

val mongodConfig = new MongodConfigBuilder()
.version(Version.Main.V4_0)
.net(new Net("localhost", mongoPort, Network.localhostIsIPv6()))
.build()
val addr: ServerAddress = executable.current().getServerAddress
val mongoPort: Int = addr.getPort

val executable = try {
val exec = starter.prepare(mongodConfig)
exec.start()
Some(exec)
(Option(executable.current()), mongoPort)
} catch {
case _: Throwable => None
case ex: Throwable =>
log.warn("Couldn't start embedded Mongodb. MongoDB tests will be skipped", ex)
(None, 0)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

MongoDB 8.2.2 stable release flapdoodle embedded

💡 Result:

Short answer: MongoDB 8.2.2 is a released stable patch (released Nov 19, 2025). Flapdoodle's embedded-mongo project exists but does not yet advertise built-in support for MongoDB 8.2.x binaries — you should verify whether the Flapdoodle release you use includes an 8.2.x artifact or supply the server binary yourself (or run MongoDB 8.2.2 in Docker) for tests. [1][2][3]

Sources

  • MongoDB 8.2.2 release notes (8.2.2 — Nov 19, 2025). [1]
  • Flapdoodle embedded-mongo project (repo/home). [2]
  • Flapdoodle issues tracking “support latest version” / version compatibility discussion. [3]

🌐 Web query:

flapdoodle-de 4.22.0 MongoDB supported versions

💡 Result:

de.flapdoodle.embed.mongo 4.22.0 includes embedded MongoDB binaries for the following MongoDB server series (as enumerated in the Version.Main javadoc): 4.4, 4.3, 4.2, 4.1, 4.0 (4.0.0→4.0.9 beta entries), 3.5, 3.4, 3.3, 3.2, 3.1, 3.0, 2.2, 2.1, 2.0 and earlier 1.x legacy builds. [1][2]

Sources:

  • Version.Main (supported versions listing) — de.flapdoodle.embed.mongo 4.22.0 javadoc. [1]
  • de.flapdoodle.embed.mongo 4.22.0 package information (Maven). [2]

MongoDB version 8.2.2 is not supported by flapdoodle 4.22.0.

Flapdoodle-de 4.22.0 only includes embedded MongoDB binaries for versions up to 4.4. Attempting to use Version.V8_2_2 will fail at runtime. Change to a supported MongoDB version (e.g., Version.V4_4_14) or upgrade to a newer flapdoodle release that supports MongoDB 8.2.x.

🤖 Prompt for AI Agents
In
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala
around lines 48 to 76, the code uses Version.V8_2_2 which is unsupported by
flapdoodle 4.22.0 and will fail at runtime; replace the version with a supported
one (for example Version.V4_4_14) or alternatively upgrade the flapdoodle
dependency to a release that includes MongoDB 8.2.x, then run tests to ensure
the embedded MongoDB starts correctly.

}

if (db != null) {
if (mongoDbExecutable.nonEmpty) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Inconsistent MongoDB availability checks.

The test gating condition uses mongoDbExecutable.nonEmpty (Line 40), but the before block still uses if (db != null) (Line 32). This inconsistency could cause test failures if mongoDbExecutable is available but db initialization fails.

🔎 Proposed fix to align availability checks
  before {
-    if (db != null) {
+    if (mongoDbExecutable.nonEmpty) {
      if (db.doesCollectionExists(collectionName)) {
        db.dropCollection(collectionName)
      }
      journal = new JournalMongoDb(connection)
    }
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (mongoDbExecutable.nonEmpty) {
before {
if (mongoDbExecutable.nonEmpty) {
if (db.doesCollectionExists(collectionName)) {
db.dropCollection(collectionName)
}
journal = new JournalMongoDb(connection)
}
}
🤖 Prompt for AI Agents
In
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala
around lines 32 to 40, the test setup uses `if (db != null)` while the gating
condition uses `if (mongoDbExecutable.nonEmpty)`, causing inconsistent
availability checks; change the `before` block check to use `if
(mongoDbExecutable.nonEmpty)` (or consistently use the same
`mongoDbExecutable.nonEmpty` check wherever availability is tested) so both
setup and gating use the same condition and avoid running tests when the
embedded MongoDB isn't actually available.

@github-actions
Copy link

github-actions bot commented Jan 2, 2026

Unit Test Coverage

Overall Project 84.2% -0.35% 🍏
Files changed 78.96%

Module Coverage
pramen:core Jacoco Report 86.19% -0.39%
Files
Module File Coverage
pramen:core Jacoco Report JournalHadoopCsv.scala 100% 🍏
DataChunk.scala 100% -23.08%
BookkeeperDeltaPath.scala 100% 🍏
JournalTasks.scala 100% 🍏
TaskCompletedCsv.scala 100% -14.08%
JournalTask.scala 100% -14.08%
CsvUtils.scala 100% 🍏
BookkeepingRecord.scala 100% -40%
BookkeepingRecords.scala 100% 🍏
TaskCompleted.scala 97.58% -4.03%
BookkeeperText.scala 97.19% 🍏
BookkeeperDeltaBase.scala 96.56% 🍏
BookkeeperDeltaTable.scala 94.79% 🍏
BookkeeperHadoop.scala 92.38% -6.73%
Bookkeeper.scala 91.89% 🍏
PythonTransformationJob.scala 90.41% 🍏
PipelineStateImpl.scala 90.28% 🍏
AlgorithmUtils.scala 89.33% -2.25% 🍏
SinkJob.scala 88.34% 🍏
MetastoreImpl.scala 84.59% 🍏
TaskRunnerBase.scala 82.71% 🍏
JournalJdbc.scala 82.32% 🍏
MetastoreReaderBase.scala 77.27% 🍏
BookkeeperBase.scala 76.62% -11.94%
BookkeeperMongoDb.scala 74.42% -8.22%
BookkeeperNull.scala 66.67% -12.5%
SlickUtils.scala 63.64% 🍏
BookkeeperJdbc.scala 55.06% -7.69%
PramenDb.scala 48.32% -3.61%

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)

59-76: Type mismatch in batchId comparison will cause incorrect filtering.

At line 65, chunk.batchId is Option[Long] but id is Long (unwrapped from Some(id) at line 64). Comparing Option[Long] == Long will always be false, so batch filtering will never match any chunks.

🔎 Proposed fix
       batchId match {
         case Some(id) =>
-          if (chunk.batchId == id) {
+          if (chunk.batchId.contains(id)) {
             Some(chunk)
           } else {
             None
           }
         case None => Some(chunk)
       }
🧹 Nitpick comments (2)
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)

19-19: Consider removing unused lit import.

The lit function is imported but no longer used after deleteNonCurrentBatchRecords was changed to a no-op implementation.

🔎 Proposed fix
-import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.functions.col
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (1)

197-199: Consider adding a comment explaining why this is a no-op.

For text/CSV-based storage, deletion of records from an append-only file isn't straightforward. A brief comment would help future maintainers understand this design decision.

🔎 Suggested documentation
  override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {
-
+    // No-op: CSV-based bookkeeping is append-only; batch cleanup not supported
  }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cc942c1 and e41464a.

📒 Files selected for processing (19)
  • pramen/build.sbt
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CsvUtils.scala
  • pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala
✅ Files skipped from review due to trivial changes (1)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala
🚧 Files skipped from review as they are similar to previous changes (3)
  • pramen/build.sbt
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala
  • pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.

Applied to files:

  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala
  • pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala
  • pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
🧬 Code graph analysis (8)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (5)
pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
  • deleteNonCurrentBatchRecords (71-77)
pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
  • deleteNonCurrentBatchRecords (71-77)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)
  • pramen (93-115)
  • pramen (141-141)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
  • DataChunk (28-37)
  • DataChunk (40-48)
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala (6)
  • core (156-175)
  • core (177-189)
  • core (191-207)
  • core (209-281)
  • core (283-292)
  • core (294-308)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (2)
  • getTableRunInfo (68-68)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (2)
  • getTableRunInfo (167-167)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (7)
  • infoDate (24-24)
  • batchId (32-32)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • appendedRecordCount (29-29)
  • jobStarted (30-30)
  • jobFinished (31-31)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala (1)
  • MetaTableRunInfo (21-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (5)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
  • pramen (45-67)
  • pramen (118-121)
  • core (141-155)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (2)
  • pramen (94-113)
  • pramen (184-195)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
  • BookkeepingRecord (19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (11)
  • infoDate (24-24)
  • infoDateBegin (25-25)
  • infoDateEnd (26-26)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • jobStarted (30-30)
  • jobFinished (31-31)
  • batchId (32-32)
  • appendedRecordCount (29-29)
  • pramenTableName (23-23)
  • pramenTableName (33-35)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
  • DataChunk (28-37)
  • DataChunk (40-48)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
  • country (43-43)
  • batchId (44-44)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
  • batchId (32-32)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (6)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
  • pramen (45-67)
  • pramen (118-121)
  • core (141-155)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (1)
  • getBookkeeper (54-56)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala (1)
  • getBookkeeper (41-43)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala (1)
  • getBookkeeper (40-42)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala (1)
  • getBookkeeper (39-41)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
  • getBookkeeper (44-46)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
  • BookkeepingRecord (19-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (42)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)

23-27: Good documentation of CSV compatibility constraint.

The scaladoc clearly explains why field order must be preserved, which is critical for maintaining backward compatibility with existing CSV files. This is a best practice for data serialization formats.


36-37: Option[Long] for batch fields preserves semantic clarity.

Using Option[Long] for batchId and appendedRecordCount avoids the ambiguity of sentinel values like 0L and explicitly represents the absence of these fields. This aligns well with the BookkeepingRecord model and maintains clear semantics throughout the data flow.


45-47: LGTM: Factory method correctly preserves Option types.

The fromRecord factory method properly maps from BookkeepingRecord to DataChunk, preserving the Option[Long] types for batchId and appendedRecordCount without using sentinel values.

pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CsvUtils.scala (1)

41-48: Proper Option unwrapping for CSV serialization.

The updated logic correctly handles Option[_] fields by unwrapping Some(x) to x.toString and rendering None as an empty string, preventing "Some(...)" or "None" literals from appearing in CSV output. The separator sanitization is correctly applied to the unwrapped value.

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (4)

28-28: LGTM: Test signature updated for batch-aware bookkeeping.

The testBookKeeper method signature now accepts a factory function Long => Bookkeeper, enabling tests to create bookkeeper instances with specific batch IDs. This aligns with the batch-tracking functionality introduced in the PR.


146-160: Good test coverage for same-batch behavior.

The test correctly verifies that multiple setRecordCount calls with the same batch ID result in additive entries (2 chunks), with getLatestDataChunk returning the most recent entry based on jobFinished timestamp.


162-180: Test verifies different-batch replace semantics.

This test confirms that when different batch IDs write to the same table/date (with recordsAppended = None), the new batch replaces the old entry, resulting in only 1 chunk. The assume guard correctly skips BookkeeperText which doesn't implement this feature.


182-200: Test verifies different-batch non-overwrite semantics.

This test validates that when recordsAppended = Some(1), entries from different batches are additive rather than replacing, resulting in 2 chunks. The conditional skip for Spark 2.x + BookkeeperDeltaTable is appropriate for implementation-specific limitations.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (3)

29-29: LGTM: appendedRecordCount column added.

The appendedRecordCount column is correctly defined as Option[Long] and aligns with the BookkeepingRecord case class definition shown in the relevant snippets.


32-32: LGTM: batchId column added.

The batchId column is correctly defined as Option[Long] and aligns with the BookkeepingRecord case class definition.


33-35: LGTM: Projection mapping updated correctly.

The default projection (*) now includes both appendedRecordCount and batchId in the correct positions within the tuple, ensuring proper mapping to/from the BookkeepingRecord case class via tupled and unapply.

pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala (1)

129-129: The CSV field ordering is correct. The batchId field is positioned as the final field in both the TaskCompletedCsv case class declaration and the serialization logic in JournalHadoopCsv.scala, matching the order generated by CsvUtils.getHeaders[TaskCompletedCsv]. No changes needed.

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala (1)

26-39: LGTM!

The new recordsAppended and batchId parameters are properly added with sensible defaults, maintaining backward compatibility. The mapping to DataChunk fields (appendedRecordCount and batchId) is correct.

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (1)

73-87: LGTM!

The method correctly maps DataChunk fields to MetaTableRunInfo, including the new batchId and appendedRecordCount fields. The getOrElse(0L) handles the Option[Long] to Long conversion appropriately.

pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (3)

41-59: LGTM!

The class properly extends BookkeeperDeltaBase with batchId, initializes Delta tables on construction, and correctly implements data retrieval with filtering and ordering.


61-77: LGTM!

The saveRecordCountDelta and deleteNonCurrentBatchRecords methods correctly use Delta Lake APIs. The deletion filter properly excludes the current batch while matching table and infoDate.


103-118: LGTM!

The initialization logic correctly checks for table existence before creating empty Delta tables, preventing duplicate table creation errors.

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)

27-27: LGTM!

Constructor properly accepts batchId with a sensible default value for test purposes.


93-114: LGTM!

The setRecordCount method correctly incorporates the new recordsAppended parameter and properly constructs DataChunk with Option(batchId) and recordsAppended.

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (3)

19-19: LGTM!

The unused fullstacks import has been correctly removed.


54-65: LGTM!

The getBookkeeper factory method properly accepts batchId, and the test correctly handles Spark 2.x vs 3.x version differences by using unique prefixes for Spark 2.x.


67-79: LGTM!

The test for table creation and the getNewTablePrefix helper are well-implemented. Using Math.abs(Random.nextInt()) ensures valid table name prefixes.

pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (2)

40-43: LGTM!

Constructor properly accepts batchId and extends BookkeeperDeltaBase(batchId).


70-72: LGTM!

The no-op implementation with a clear comment explaining Spark 2.x limitations is appropriate. This addresses the previous review comments about bugs in this method.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (3)

41-41: LGTM!

Constructor properly accepts batchId and passes it to the BookkeeperHadoop parent class.


80-91: LGTM!

The data retrieval methods correctly propagate the batchId parameter to getFilter, with appropriate defaults (None) for methods that don't require batch-specific filtering.


94-113: LGTM!

The saveRecordCountToStorage method correctly constructs DataChunk with the new batchId (wrapped in Option) and recordsAppended fields.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (4)

29-29: LGTM: Constructor updated for batch awareness.

The constructor correctly accepts and propagates batchId to the parent class.


113-117: LGTM: Helper for retrieving all batch chunks.

This helper correctly retrieves all matching chunks without deduplication, enabling batch-aware queries. The sorting by infoDate provides consistent ordering.


78-90: LGTM: Signature updated for batch and append tracking.

The method signature correctly includes recordsAppended, and the DataChunk construction at line 87 properly incorporates both Option(batchId) and recordsAppended.


68-72: No issues found. getDataChunksFromStorage correctly uses getBkAllData to return all matching chunks, which is the intended behavior when querying data across all batches for a given date. This behavior is explicitly tested and properly handled by all consumers.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (4)

24-24: LGTM: Base class constructor updated for batch awareness.

The constructor correctly accepts batchId and makes it available throughout the class hierarchy.


31-35: LGTM: New abstract methods establish batch-aware contract.

These methods define the required API for batch operations:

  • getDataChunksFromStorage retrieves chunks with optional batch filtering
  • deleteNonCurrentBatchRecords enables cleanup of stale batch data

94-104: LGTM: New public API for batch-aware chunk retrieval.

This method correctly delegates to transient or storage-backed implementations based on the bookkeeping state, passing the batchId parameter through appropriately.


45-67: No changes needed. The isEmpty check on recordsAppended correctly distinguishes between full-write/overwrite operations (None) and append operations (Some(n)). This semantic separation is intentional: None triggers cleanup of non-current batch records (appropriate for overwrites), while Some(n) does not (appropriate for appends, regardless of whether n is 0).

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (7)

40-40: LGTM: Model version bumped for schema change.

The version increment from 2 to 3 correctly reflects the index migration from unique to non-unique, supporting multiple batches per (table, date) combination.


43-43: LGTM: Constructor updated for batch awareness.

The constructor correctly accepts and propagates batchId to the parent class.


94-99: LGTM: Batch-aware chunk retrieval with helpful logging.

The implementation correctly filters by batchId and includes logging at line 97, which will aid in debugging batch operations.


115-125: LGTM: Batch cleanup implementation.

The filter correctly identifies non-current batch records for the given table and date, using Filters.ne("batchId", batchId) at line 121.


127-164: LGTM: Filter extended for batch-aware queries.

The getFilter method correctly incorporates optional batchId filtering at lines 160-163, combining the base filter with a batch equality check when needed.


190-195: LGTM: Index migration from unique to non-unique.

The migration logic correctly handles existing databases by dropping the unique index and recreating it as non-unique, enabling multiple batches per (table, date). The condition dbVersion < 3 && dbVersion > 0 at line 190 appropriately excludes new installations (dbVersion = 0) that get the non-unique index directly at line 184.


101-113: Duplicate handling with insertOne needs clarification.

Line 112 uses insertOne to support multiple batches writing to the same (table, date) combination. While deleteNonCurrentBatchRecords correctly cleans up old batches, it does not prevent duplicate records from the same batch if saveRecordCountToStorage is called multiple times in a single execution.

Clarify whether:

  1. The method is guaranteed to be called only once per batch execution
  2. Duplicate records from the same batch are acceptable (they share the same batchId and will be filtered by existing queries)
  3. A unique index on (tableName, infoDate, batchId) should be added to prevent duplicates

Comment on lines +91 to +92
country = v.country,
batchId = v.batchId.getOrElse(0L)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
cd /root/repo 2>/dev/null || true

# First, find the TaskCompleted case class definition
echo "=== Searching for TaskCompleted case class definition ==="
rg -nP --type=scala 'case class TaskCompleted' -A 10

echo -e "\n=== Searching for TaskCompletedCsv case class definition ==="
rg -nP --type=scala 'case class TaskCompletedCsv' -A 10

echo -e "\n=== Searching for DataChunk case class definition ==="
rg -nP --type=scala 'case class DataChunk' -A 10

echo -e "\n=== Searching for batchId getOrElse patterns ==="
rg -nP --type=scala 'batchId.*getOrElse' -C 2

echo -e "\n=== Checking the specific file context ==="
sed -n '85,100p' pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala

Repository: AbsaOSS/pramen

Length of output: 12249


🏁 Script executed:

#!/bin/bash
# Get the complete TaskCompleted definition
echo "=== Full TaskCompleted.scala ==="
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala

echo -e "\n=== Full TaskCompletedCsv.scala ==="
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala

Repository: AbsaOSS/pramen

Length of output: 6632


Reconsider batchId design consistency: TaskCompleted should likely use Option[Long] like DataChunk.

The code converts TaskCompletedCsv.batchId from Option[Long] to TaskCompleted.batchId as Long using getOrElse(0L). This creates semantic ambiguity: 0L cannot be distinguished from a missing batch ID. More critically, this design is inconsistent with DataChunk, which preserves batchId as Option[Long]. The same problematic conversion pattern appears in JournalJdbc.scala:102 and MetastoreReaderBase.scala:79, suggesting a systematic design issue rather than an isolated case.

Consider changing TaskCompleted.batchId to Option[Long] to align with DataChunk and preserve the source semantics from TaskCompletedCsv, eliminating the sentinel value workaround.

🤖 Prompt for AI Agents
In
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala
around lines 91-92, TaskCompletedCsv.batchId is being converted with
getOrElse(0L) into a Long which loses the distinction between "missing" and a
real 0 value; to fix this, change TaskCompleted.batchId from Long to
Option[Long] so it preserves the Option from TaskCompletedCsv, update all
constructors/fields that instantiate TaskCompleted accordingly, and then
propagate the type change to the other files noted (JournalJdbc.scala around
line 102 and MetastoreReaderBase.scala around line 79) by replacing similar
getOrElse(0L) usages with direct Option passing and adjusting any downstream
code that expects a Long (handle None where appropriate or map to default at the
site that truly needs a concrete Long).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)

149-154: Correct fix for batchId filtering.

The filter now properly uses chunk.batchId.contains(id) to compare Option[Long] with Long. This addresses the issue flagged in a previous review where chunk.batchId == id would always return false due to type mismatch.

🧹 Nitpick comments (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)

27-27: Consider parameterizing batchId per chunk for more flexible testing.

The fixed batchId constructor parameter means all chunks created by this mock will share the same batch ID. For simple test scenarios this is fine, but if tests need to verify behavior across multiple batches, they would need to instantiate multiple mocks.

🔎 Alternative: Accept batchId in setRecordCount
-class SyncBookkeeperMock(batchId: Long = 123L) extends Bookkeeper {
+class SyncBookkeeperMock(defaultBatchId: Long = 123L) extends Bookkeeper {

Then in setRecordCount, you could add an optional batchId parameter:

  private[pramen] override def setRecordCount(table: String,
                                              infoDate: LocalDate,
                                              inputRecordCount: Long,
                                              outputRecordCount: Long,
                                              recordsAppended: Option[Long],
                                              jobStarted: Long,
                                              jobFinished: Long,
-                                              isTableTransient: Boolean): Unit = {
+                                              isTableTransient: Boolean,
+                                              batchIdOverride: Option[Long] = None): Unit = {
+    val effectiveBatchId = batchIdOverride.getOrElse(defaultBatchId)

59-76: Correct implementation of batch-aware chunk filtering.

The logic correctly:

  • Filters by exact infoDate match
  • Applies optional batchId filtering using chunk.batchId.contains(id)
  • Sorts results by jobFinished

The match expression at lines 63-71 could be slightly simplified, but the current implementation is clear and correct for a test mock.

🔎 Optional: More concise filtering
-        batchId match {
-          case Some(id) =>
-            if (chunk.batchId.contains(id)) {
-              Some(chunk)
-            } else {
-              None
-            }
-          case None => Some(chunk)
-        }
+        if (batchId.forall(chunk.batchId.contains)) Some(chunk) else None
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (1)

44-50: Remove unnecessary .count() calls on DELETE statements.

spark.sql(s"DELETE FROM ...") executes the deletion but doesn't return a result set to count. The .count() calls are unnecessary and could be misleading. Simply execute the SQL without the count.

🔎 Proposed fix
       if (spark.catalog.tableExists(fullRecordsTableName)) {
-        spark.sql(s"DELETE FROM $fullRecordsTableName WHERE true").count()
+        spark.sql(s"DELETE FROM $fullRecordsTableName WHERE true")
       }

       if (spark.catalog.tableExists(fullSchemasTableName)) {
-        spark.sql(s"DELETE FROM $fullSchemasTableName WHERE true").count()
+        spark.sql(s"DELETE FROM $fullSchemasTableName WHERE true")
       }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e41464a and b5a52c7.

📒 Files selected for processing (9)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala
  • pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.

Applied to files:

  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala
  • pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
🧬 Code graph analysis (6)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (2)
  • getFilter (134-171)
  • pramen (222-229)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (5)
  • getDataChunksFromStorage (68-72)
  • getLatestDataChunkFromStorage (62-66)
  • getDataChunksCountFromStorage (74-76)
  • pramen (78-90)
  • pramen (107-111)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (2)
  • pramen (45-67)
  • pramen (118-121)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
  • DataChunk (28-37)
  • DataChunk (40-48)
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (2)
  • batchId (32-32)
  • infoDate (24-24)
pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (2)
  • saveRecordCountDelta (61-69)
  • deleteNonCurrentBatchRecords (71-77)
pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (2)
  • saveRecordCountDelta (61-69)
  • deleteNonCurrentBatchRecords (71-77)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala (1)
  • deleteNonCurrentBatchRecords (52-52)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (5)
  • getDataChunksFromStorage (68-72)
  • pramen (78-90)
  • pramen (107-111)
  • core (113-117)
  • core (119-128)
pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
  • deleteNonCurrentBatchRecords (71-77)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (3)
  • pramen (93-115)
  • pramen (141-141)
  • getDataChunks (59-76)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)
  • DataChunk (28-37)
  • DataChunk (40-48)
  • fromRecord (45-47)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
  • BookkeepingRecord (19-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (31)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)

55-57: LGTM! Clean delegation to the updated getDataChunks method.

The simplified implementation correctly retrieves the latest chunk by delegating to getDataChunks with batchId = None and taking the last element after the sort by jobFinished.


93-112: LGTM! Correctly updated signature and DataChunk construction.

The changes properly integrate the new fields:

  • recordsAppended: Option[Long] parameter added to the signature
  • DataChunk construction includes both batchId (line 111) and recordsAppended (line 112)
  • Option(batchId) correctly wraps the constructor parameter as Some[Long]
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (3)

40-43: LGTM! Constructor properly threads batchId to base class.

The constructor signature correctly accepts batchId: Long and forwards it to BookkeeperDeltaBase(batchId), aligning with the batch-aware bookkeeping design across the PR.


70-72: Appropriate no-op for Spark 2.x limitations.

The comment clarifies that Delta table DELETE operations aren't supported in Spark 2.x. This is consistent with the 2.12/2.13 versions that implement the actual deletion using DeltaTable.forName(...).delete(filter). The previous review comment flagged a different (buggy) implementation that has since been replaced with this intentional no-op.


24-24: Verify usage of Instant import.

The import java.time.Instant appears to be used in saveSchemaDelta at line 80 (Instant.now().toEpochMilli), so this import is valid.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (4)

29-29: LGTM! Base class correctly propagates batchId.

The abstract class signature properly accepts batchId: Long and forwards it to the parent BookkeeperHadoop(batchId).


68-72: New batch-aware data retrieval method looks correct.

getDataChunksFromStorage correctly builds a filter including the optional batchId and uses getBkAllData to return all matching chunks (not just the latest per date). This aligns with the intent to retrieve all records for a specific batch.


78-90: DataChunk construction correctly includes batch tracking fields.

The saveRecordCountToStorage method properly:

  1. Accepts the new recordsAppended: Option[Long] parameter
  2. Constructs DataChunk with Option(batchId) and recordsAppended

This aligns with the updated DataChunk model that now carries batchId and appendedRecordCount.


113-117: Helper method getBkAllData provides unsorted raw collection.

Note that getBkAllData collects all matching records and sorts by jobFinished, which is appropriate for returning all chunks in chronological order. This differs from getBkData which groups by (tableName, infoDate) and returns only the latest per group.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)

24-24: LGTM! Base class now carries batchId.

The abstract class signature correctly includes batchId: Long for batch-aware bookkeeping operations.


61-66: Verify the intent of conditional batch record deletion.

The logic calls deleteNonCurrentBatchRecords only when recordsAppended.isEmpty. This means:

  • When appending records (recordsAppended is Some(count)), old batch records are preserved
  • When not appending (full replacement), old batch records are deleted

This appears intentional for the append vs. replace semantics, but please confirm this matches the expected behavior for batch bookkeeping.


94-104: New batch-aware retrieval method correctly routes to appropriate storage.

getDataChunks properly checks whether the table is transient and routes to either getTransientDataChunks or getDataChunksFromStorage, passing the batchId parameter through both paths.

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (2)

19-19: Good fix: removed invalid fullstacks import.

The import now correctly includes only BeforeAndAfter and BeforeAndAfterAll, addressing the previous review comment about the unused/invalid fullstacks symbol.


54-65: LGTM! Test setup correctly uses batch-aware API.

The getBookkeeper method now accepts batchId and the testBookKeeper block properly uses dynamic prefixes based on Spark version, with fresh prefixes for Spark 2.x (which doesn't support DELETE) and reused prefixes for newer versions (with cleanup in the before block).

pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala (1)

97-107: LGTM! Clean utility for elapsed-time monitoring.

The runActionWithElapsedTimeEvent method provides a reusable pattern for:

  1. Measuring execution time of an action
  2. Triggering a callback when execution exceeds a threshold

This is used appropriately in BookkeeperJdbc.deleteNonCurrentBatchRecords to log warnings for slow database operations.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (4)

41-41: LGTM! Constructor properly threads batchId.

The class signature correctly accepts batchId: Long and forwards it to BookkeeperHadoop(batchId).


80-84: New batch-aware retrieval correctly filters and returns all chunks.

getDataChunksFromStorage properly builds a filter using getFilter with the optional batchId and returns all matching chunks via getData, which sorts by jobFinished.


106-106: DataChunk construction correctly includes batch tracking fields.

The DataChunk is constructed with Option(batchId) and recordsAppended, aligning with the updated model.


197-200: Appropriate no-op for CSV-based storage.

The comment clearly explains why deletion isn't supported for CSV-based bookkeeping. This is consistent with the design decision that text/CSV storage doesn't support in-place modifications.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (4)

33-33: LGTM! Constructor properly threads batchId to base class.

The class correctly extends BookkeeperBase(true, batchId), enabling batch-aware bookkeeping for JDBC storage.


71-82: New batch-aware retrieval method looks correct.

getDataChunksFromStorage properly builds a query using getFilter with the optional batchId, maps results through DataChunk.fromRecord, and sorts by jobFinished.


135-153: Good implementation of batch record deletion with timing.

The deleteNonCurrentBatchRecords method:

  1. Correctly filters for records matching the table/date but with different batchId
  2. Uses AlgorithmUtils.runActionWithElapsedTimeEvent to warn on slow deletions
  3. Logs the SQL statement when threshold is exceeded for debugging

This aligns well with the new utility added in AlgorithmUtils.scala.


115-133: Record construction correctly includes batch tracking fields.

BookkeepingRecord is constructed with recordsAppended and Option(batchId), matching the updated schema definition in BookkeepingRecords.

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (8)

37-42: LGTM!

The version bump to 3 is appropriate given the schema changes (moving to non-unique indexes to support multiple batch records per table/date).


44-50: LGTM!

Constructor changes properly thread batchId through to the base class. The 10-second query warning timeout is a reasonable default for alerting on slow MongoDB operations.


82-94: LGTM!

Passing None for batchId correctly retrieves data across all batches, which is the expected behavior for getting the latest chunk or counting total chunks.


96-101: LGTM!

The batch-aware retrieval with optional filtering is well-implemented. Debug logging will aid troubleshooting.


103-115: LGTM!

The change from upsert to insertOne is intentional and aligns with the batch tracking design - allowing multiple records per table/infoDate combination. The DataChunk is correctly constructed with the new batchId and recordsAppended fields.


117-132: LGTM!

The deletion logic correctly filters by table, infoDate, and excludes the current batchId. The elapsed time monitoring with warning logging is a good observability practice for detecting slow queries.


134-171: LGTM!

The filter composition correctly handles the optional batchId parameter, appending the batch filter only when provided.


197-202: The migration logic has a silent failure vulnerability due to inner exception handling.

While the migration logic correctly drops and recreates the index for databases with dbVersion < 3 && dbVersion > 0, both dropIndex and createIndex have internal try-catch blocks that swallow exceptions and only log warnings. If dropIndex fails (e.g., index doesn't exist), it logs a warning and continues, then createIndex is called regardless. This can leave the database in an inconsistent state without surfacing any error to the outer try-catch block in initCollection. Consider making at least one of these operations fail-fast so migration failures are explicitly surfaced rather than silently logged.

Likely an incorrect or invalid review comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Fix all issues with AI Agents 🤖
In
@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala:
- Around line 135-153: The Slick filter compares a Rep[Option[Long]] column
incorrectly using r.batchId =!= batchId; update the comparison in
deleteNonCurrentBatchRecords so the right-hand side is an Option by using
Option(batchId) (or Some(batchId)) when comparing r.batchId, ensuring the filter
compiles and behaves correctly for optional batchId values; keep the rest of the
query and error handling intact.
🧹 Nitpick comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)

256-286: Clarify expected behavior for concurrent bookkeeper instances.

The test validates that multiple bookkeeper instances share state, but the assertion at line 281 accepts two different outcomes (inputRecordCount == 201 || inputRecordCount == 102), with a comment stating "Both are correct, depending on the notion of 'latest' used." This ambiguity suggests the "latest" semantics may be implementation-dependent.

Consider documenting which notion of "latest" each implementation uses (e.g., latest by jobFinished timestamp vs. latest by insertion order) to make the behavior more predictable.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b5a52c7 and 3791376.

📒 Files selected for processing (4)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.

Applied to files:

  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
🧬 Code graph analysis (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (1)
  • getBookkeeper (54-56)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
  • getBookkeeper (44-46)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (6)
  • batchId (32-32)
  • infoDate (24-24)
  • inputRecordCount (27-27)
  • outputRecordCount (28-28)
  • jobStarted (30-30)
  • jobFinished (31-31)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
  • batchId (76-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
  • batchId (185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
  • getDataChunks (94-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (17)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (5)

27-28: LGTM!

The new imports support the performance tracking and logging functionality added in deleteNonCurrentBatchRecords.


33-33: LGTM!

The class constructor correctly accepts and passes batchId to the superclass.


71-82: LGTM!

The new method correctly implements batchId-aware data chunk retrieval with proper error handling and sorting.


115-124: LGTM!

The method signature correctly extends to include recordsAppended, and the BookkeepingRecord construction properly includes both new fields.


187-190: LGTM! Past issue has been fixed.

The Slick Option column comparison has been corrected. Line 188 now properly wraps id in Option(id) when comparing with the Rep[Option[Long]] column, resolving the issue flagged in the previous review.

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (7)

19-19: LGTM! Imports and mixins are correct.

The addition of BeforeAndAfter and BeforeAndAfterAll traits properly enables the test setup and cleanup functionality. The previous issue with the invalid fullstacks import has been resolved.

Also applies to: 26-26


27-28: LGTM! Shared prefix field is well-designed.

The bookkeepingTablePrefix field is appropriately initialized once per test suite and used in conjunction with the before block cleanup for Spark 3+ tests, enabling efficient table reuse.


39-52: LGTM! Cleanup logic is correct.

The before block properly clears both tables for Spark 3+ tests. The previous copy-paste bug where line 49 was deleting from the wrong table has been resolved—each DELETE statement now correctly targets its intended table.


54-56: LGTM! Method signature updated for batch tracking.

The addition of the batchId parameter correctly aligns with the PR objective to introduce batch tracking across the Bookkeeper API.


59-65: LGTM! Version-specific test strategies are well-implemented.

The conditional logic appropriately handles different Spark versions: Spark 2.x uses unique prefixes per test (avoiding the need for cleanup), while Spark 3+ reuses a shared prefix with the before block cleanup. This design ensures proper test isolation for both environments.


67-73: LGTM! Table creation test is correct.

The test properly verifies that both bookkeeping tables are created with the expected names when a BookkeeperDeltaTable instance is instantiated.


76-79: LGTM! Helper method generates unique test prefixes.

The getNewTablePrefix method provides a simple and effective way to generate unique table prefixes for test isolation.

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)

85-104: LGTM! Parameter order matches the updated interface.

The updated signature correctly includes recordsAppended: Option[Long] and properly constructs the DataChunk with Option(batchId) to wrap the Long batchId parameter into an Option.


59-68: The batchId filtering logic correctly handles both filtered and unfiltered queries.

The pattern batchId.forall(chunk.batchId.contains) is correctly implemented across all Bookkeeper implementations. All implementations (Text, Hadoop, JDBC, MongoDB, Delta) consistently populate the batchId field as Option(batchId) from the constructor parameter, and filtering logic properly matches this with case patterns that handle both Some(id) and None cases.

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (3)

147-161: Confirm same-batch multiple-entry behavior is intended.

This test verifies that calling setRecordCount twice with the same batch creates 2 separate chunks. While this appears correct based on the deleteNonCurrentBatchRecords logic (which only deletes other batches), ensure this is the intended semantic for production use cases.


163-181: Document the feature gap for BookkeeperText.

The test skips BookkeeperText with a comment stating "The feature is not implemented in these implementations." This indicates that cross-batch replacement behavior is not supported by BookkeeperText.

Is there a tracking issue for implementing batch replacement in BookkeeperText? If this implementation is used in production scenarios requiring multiple batches, the missing feature could lead to unexpected behavior.


183-205: LGTM! Test correctly validates non-overwrite behavior.

The test properly verifies that when recordsAppended is set to Some(1), records from different batches are preserved (2 chunks expected) rather than being replaced. The Spark version check and Delta Table skip are appropriate given the known limitations.

@yruslan yruslan merged commit 53323d0 into main Jan 5, 2026
9 checks passed
@yruslan yruslan deleted the feature/677-add-batchid-to-bookkeeping branch January 5, 2026 08:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants