-
Notifications
You must be signed in to change notification settings - Fork 3
#677 Add batch Id and the number of appended record to the bookeeping #682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…metastore reader interfaces.
…s in JDBC and MongoDB implementations.
WalkthroughAdds batch tracking and appended-record counts across APIs, models, bookkeeper implementations, storage backends, journal I/O, tests, and migrations; threads a new Changes
Sequence Diagram(s)sequenceDiagram
participant Job as Job / Task Runner
participant Metastore as MetastoreReader / MetastoreImpl
participant Bookkeeper as Bookkeeper (Mongo/JDBC/Delta/Text)
participant Storage as Storage (Mongo / RDB / Delta / FS)
Note over Job,Bookkeeper: New inputs: batchId, appendedRecordCount
Job->>Bookkeeper: setRecordCount(table, infoDate, inputCount, outputCount, recordsAppended?, start, finish, isTransient)
alt non-current batch deletion needed
Bookkeeper->>Storage: deleteNonCurrentBatchRecords(table, infoDate) [uses batchId]
end
Bookkeeper->>Storage: saveRecordCountToStorage(DataChunk(..., batchId?, appendedRecordCount?))
Job->>Metastore: getTableRunInfo(table, infoDate, batchId?)
Metastore->>Bookkeeper: getDataChunks(table, infoDate, batchId?)
Bookkeeper->>Storage: query(filter by table, infoDate range, optional batchId)
Storage-->>Bookkeeper: Seq[DataChunk]
Bookkeeper-->>Metastore: Seq[DataChunk]
Metastore-->>Job: Seq[MetaTableRunInfo]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks✅ Passed checks (3 passed)
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (6)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
🔇 Additional comments (10)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala (1)
83-103: Inconsistent MongoDB availability checks.Similar to
JournalMongoDbSuite, the test gating condition usesmongoDbExecutable.nonEmpty(Line 83), but thebeforeblock still usesif (db != null)(Line 52). This inconsistency could cause test failures ifmongoDbExecutableis available butdbinitialization fails.🔎 Proposed fix to align availability checks
before { pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") pramenDb.setupDatabase() - if (db != null) { + if (mongoDbExecutable.nonEmpty) { if (db.doesCollectionExists(collectionName)) { db.dropCollection(collectionName) } if (db.doesCollectionExists(schemaCollectionName)) { db.dropCollection(schemaCollectionName) } } }pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
272-272: Fix extra space in assertion.There's an extraneous space between
chunk1.getand.inputRecordCount.🔎 Proposed fix
- assert(chunk1.get.inputRecordCount == 201 || chunk1.get .inputRecordCount == 102) + assert(chunk1.get.inputRecordCount == 201 || chunk1.get.inputRecordCount == 102)
🧹 Nitpick comments (9)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (1)
33-33: Clarify the rationale for making this constant public.The change from
private valtovalmakesWARN_IF_LONGER_MSpublicly accessible, but the connection to the PR's stated objective (adding batch ID and record counting) is unclear.If the intent is to allow external configuration or testing, consider loading this threshold from application configuration rather than exposing a hardcoded constant. If external components need to reference this value, please clarify the use case.
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala (2)
20-20: Remove unused import.The import
za.co.absa.pramen.core.utils.SlickUtils.logis not used anywhere in this file.🔎 Proposed fix
-import za.co.absa.pramen.core.utils.SlickUtils.log
98-108: Consider passing actual duration to the onTimeout callback.The
onTimeoutcallback currently receivestimeoutMs(the threshold), not the actual elapsed duration. Consider passingdurationinstead so the callback can log or act on the actual time taken.🔎 Proposed refactor
- final def resultWithTimeout[R](timeoutMs: Long)(action: => R)(onTimeout: Long => Unit): R = { + final def resultWithTimeout[R](timeoutMs: Long)(action: => R)(onTimeout: (Long, Long) => Unit): R = { val start = Instant.now val result = action val finish = Instant.now val duration = Duration.between(start, finish).toMillis if (duration > timeoutMs) { - onTimeout(timeoutMs) + onTimeout(timeoutMs, duration) } result }This allows the callback to report both the threshold and actual duration, e.g.,
log.warn(s"Action exceeded ${timeoutMs}ms, took ${duration}ms").pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (1)
197-199: Consider adding a comment explaining the no-op.The empty
deleteNonCurrentBatchRecordsis intentional since CSV-based storage doesn't support efficient in-place deletion. A brief comment would improve clarity for future maintainers.🔎 Suggested documentation
override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { - + // No-op: CSV-based storage doesn't support efficient in-place deletion. + // Cross-batch replacement is not supported for BookkeeperText. }pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (1)
113-117: Consider sorting by jobFinished for consistency.
getBkAllDatasorts byinfoDate, whilegetBkDataultimately sorts byinfoDateafter grouping byjobFinished. For consistency withgetLatestDataChunksemantics (which relies on.lastOptionfor the most recent job), consider sorting byjobFinishedhere as well.🔎 Suggested change
private[core] def getBkAllData(filter: Column): Seq[DataChunk] = { getBkDf(filter) .collect() - .sortBy(_.infoDate) + .sortBy(_.jobFinished) }pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
95-101: Consider adding format option for empty dataset.The
writeEmptyDatasetmethod doesn't explicitly specify the delta format. While it may work, being explicit about the format ensures consistency.🔎 Suggested change for explicit format
override def writeEmptyDataset[T <: Product : universe.TypeTag : ClassTag](pathOrTable: String): Unit = { val df = Seq.empty[T].toDS df.write + .format("delta") .saveAsTable(pathOrTable) }pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
93-99: Consider adding explicit delta format.Same as the Scala 2.13 version - explicitly specifying the delta format would improve clarity.
🔎 Suggested change
override def writeEmptyDataset[T <: Product : universe.TypeTag : ClassTag](pathOrTable: String): Unit = { val df = Seq.empty[T].toDS df.write + .format("delta") .saveAsTable(pathOrTable) }pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (2)
96-101: Consider adjusting the log level.The
log.infostatement at line 99 prints all chunks for every query. This could generate significant log volume in production. Consider:
- Changing to
log.debugfor normal operation- Keeping
log.infoonly for specific diagnostic scenariosOtherwise, the implementation correctly supports batch-aware filtering and sorting.
117-127: Consider adding timeout monitoring for consistency.The deletion logic is correct, but unlike the JDBC version (lines 143-149), this implementation doesn't include timeout monitoring and warning logs for long-running operations.
For consistency and operational visibility, consider adding similar monitoring:
- Wrap the
deleteManycall with timing logic- Log warnings if deletion exceeds a threshold
This is optional but would improve operational consistency across bookkeeper implementations.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (71)
pramen/api/pom.xmlpramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scalapramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scalapramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scalapramen/build.sbtpramen/core/pom.xmlpramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecordMongoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scalapramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scalapramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scalapramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scalapramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/test/resources/log4j.propertiespramen/core/src/test/resources/log4j2.propertiespramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/MongoDbFixture.scalapramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scalapramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperNullSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTransientSuiteEager.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockMongoDbSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scalapramen/extras/pom.xmlpramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scalapramen/pom.xmlpramen/project/Versions.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.
Applied to files:
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scalapramen/project/Versions.scalapramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTransientSuiteEager.scalapramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scalapramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scalapramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scalapramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scalapramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
🧬 Code graph analysis (41)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
infoDate(24-24)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (6)
batchId(32-32)pramenTableName(23-23)pramenTableName(34-36)inputRecordCount(27-27)outputRecordCount(28-28)appendedRecordCount(29-29)pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala (1)
JournalTask(19-42)
pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/MongoDbFixture.scala (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala (1)
MongoDbSingleton(28-79)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala (2)
TaskCompleted(26-49)TaskCompleted(51-94)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (4)
appendedRecordCount(29-29)jobStarted(30-30)jobFinished(31-31)batchId(32-32)pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
appendedRecordCount(32-32)batchId(44-44)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (1)
getTableRunInfo(73-87)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (2)
getTableRunInfo(68-68)batchId(76-76)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (2)
infoDate(24-24)batchId(32-32)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
country(43-43)batchId(44-44)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/MutableStack.scala (1)
nonEmpty(46-48)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (5)
appendedRecordCount(32-32)batchId(44-44)pramenTableName(24-24)inputRecordCount(28-28)outputRecordCount(30-30)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
BookkeepingRecord(19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (1)
SlickUtils(27-161)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
country(43-43)batchId(44-44)pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala (1)
pipelineInfo(45-51)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
getDataChunks(59-76)
pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
country(43-43)batchId(44-44)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
getDataChunks(59-76)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
infoDate(24-24)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala (1)
pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala (1)
PipelineInfo(23-38)
pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
country(43-43)batchId(44-44)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (8)
country(43-43)batchId(44-44)pipelineName(40-40)startedAt(34-34)finishedAt(35-35)sparkApplicationId(38-38)pipelineId(39-39)tenant(42-42)pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala (1)
PipelineInfo(23-38)
pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (1)
getTableRunInfo(73-87)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (2)
getTableRunInfo(68-68)batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (2)
getTableRunInfo(167-167)batchId(185-185)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (2)
infoDate(24-24)batchId(32-32)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala (1)
getBookkeeper(40-42)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala (1)
getBookkeeper(39-41)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
getBookkeeper(44-46)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
batchId(32-32)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
testBookKeeper(27-278)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
country(43-43)batchId(44-44)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
batchId(32-32)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
country(43-43)batchId(44-44)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (3)
batchId(32-32)infoDateBegin(25-25)infoDateEnd(26-26)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
core(141-155)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (1)
getFilter(159-191)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (1)
getFilter(129-166)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
getDataChunks(59-76)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala (1)
getBookkeeper(41-43)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
getBookkeeper(44-46)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
testBookKeeper(27-278)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
pramen(45-67)pramen(118-121)core(141-155)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (4)
pramen(94-113)pramen(184-195)BookkeeperText(32-39)BookkeeperText(41-200)pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
DataChunk(23-32)DataChunk(35-49)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
getBookkeeper(44-46)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
country(43-43)batchId(44-44)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
batchId(32-32)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (3)
getDataChunksFromStorage(68-72)pramen(78-90)pramen(107-111)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
pramen(45-67)pramen(118-121)getDateStr(157-157)pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
DataChunk(23-32)DataChunk(35-49)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/MutableStack.scala (1)
nonEmpty(46-48)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
getDataChunks(59-76)pramen/core/src/main/scala/za/co/absa/pramen/core/utils/MutableStack.scala (1)
isEmpty(42-44)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala (1)
getBookkeeper(40-42)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
getBookkeeper(44-46)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
testBookKeeper(27-278)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecordMongoDb.scala (1)
BookkeepingRecordMongoDb(19-28)pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)
DataChunk(23-32)DataChunk(35-49)fromRecordMongo(45-48)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (9)
jobStarted(30-30)jobFinished(31-31)batchId(32-32)infoDate(24-24)infoDateBegin(25-25)infoDateEnd(26-26)inputRecordCount(27-27)outputRecordCount(28-28)appendedRecordCount(29-29)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
DataChunk(23-32)DataChunk(35-49)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (2)
getTableRunInfo(68-68)batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (2)
getTableRunInfo(167-167)batchId(185-185)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (7)
infoDate(24-24)batchId(32-32)inputRecordCount(27-27)outputRecordCount(28-28)appendedRecordCount(29-29)jobStarted(30-30)jobFinished(31-31)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala (1)
MetaTableRunInfo(21-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
BookkeepingRecord(19-28)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecordMongoDb.scala (1)
BookkeepingRecordMongoDb(19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecordMongoDb.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (9)
infoDate(24-24)infoDateBegin(25-25)infoDateEnd(26-26)inputRecordCount(27-27)outputRecordCount(28-28)appendedRecordCount(29-29)jobStarted(30-30)jobFinished(31-31)batchId(32-32)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (5)
getLatestProcessedDateFromStorage(43-60)getDataChunksFromStorage(68-72)getDataChunksCountFromStorage(74-76)pramen(78-90)pramen(107-111)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (2)
pramen(45-67)pramen(118-121)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
infoDate(24-24)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
getDataChunks(59-76)
pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
pramen(45-67)pramen(118-121)core(141-155)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (4)
pramen(78-90)pramen(107-111)core(113-117)core(119-128)pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
DataChunk(23-32)DataChunk(35-49)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (4)
pramen(94-113)pramen(184-195)BookkeeperText(32-39)BookkeeperText(41-200)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala (1)
core(26-46)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
testBookKeeper(27-278)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (7)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
pramen(45-67)pramen(118-121)core(141-155)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (6)
pramen(217-224)getDataChunksFromStorage(96-101)getFilter(129-166)getLatestDataChunkFromStorage(81-90)saveRecordCountToStorage(103-115)deleteNonCurrentBatchRecords(117-127)pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (2)
SlickUtils(27-161)executeQuery(45-65)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (10)
batchId(32-32)infoDate(24-24)jobFinished(31-31)inputRecordCount(27-27)outputRecordCount(28-28)jobStarted(30-30)BookkeepingRecords(22-38)BookkeepingRecords(40-42)pramenTableName(23-23)pramenTableName(34-36)pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)
DataChunk(23-32)DataChunk(35-49)fromRecord(40-43)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
BookkeepingRecord(19-28)pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala (1)
run(79-125)
pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
DataChunk(23-32)DataChunk(35-49)pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
deleteNonCurrentBatchRecords(71-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
- GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Test Coverage on Scala 2.12.18
| case class MetaTableRunInfo(tableName: String, | ||
| infoDate: LocalDate, | ||
| batchId: Long, | ||
| inputRecordCount: Long, | ||
| outputRecordCount: Long, | ||
| appendedRecordCount: Option[Long], | ||
| jobStarted: Instant, | ||
| jobFinished: Instant) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
echo "=== Searching for MetaTableRunInfo construction sites ==="
rg -n 'MetaTableRunInfo\s*\(' --type=scala -A3
echo -e "\n=== Searching for pattern matches on MetaTableRunInfo ==="
rg -n 'MetaTableRunInfo\s*\(' --type=scala | grep -E '(case|match|extract)'
echo -e "\n=== Searching for MetaTableRunInfo in the codebase ==="
rg -n 'MetaTableRunInfo' --type=scalaRepository: AbsaOSS/pramen
Length of output: 2571
🏁 Script executed:
#!/bin/bash
echo "=== Full MetaTableRunInfo construction in MetastoreReaderBase.scala ==="
sed -n '76,90p' pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala
echo -e "\n=== Case class definition in MetaTableRunInfo.scala ==="
sed -n '21,28p' pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala
echo -e "\n=== Search for other MetaTableRunInfo creations (broader search) ==="
rg -n 'new MetaTableRunInfo|MetaTableRunInfo\(' --type=scala -B2 -A8 | grep -v test | head -100Repository: AbsaOSS/pramen
Length of output: 3638
🏁 Script executed:
#!/bin/bash
echo "=== Checking test files for MetaTableRunInfo construction ==="
find . -path ./target -prune -o -name "*Test.scala" -o -name "*Spec.scala" -type f | xargs grep -l "MetaTableRunInfo" 2>/dev/null
echo -e "\n=== Looking for MetaTableRunInfo construction in test files ==="
rg -n 'MetaTableRunInfo\(' --type=scala pramen/core/src/test | head -50
echo -e "\n=== Checking for deprecation or version info ==="
cat README.md 2>/dev/null | head -50
echo -e "\n=== Checking pom.xml or build.sbt for version ==="
find . -name "pom.xml" -o -name "build.sbt" | xargs head -30 2>/dev/null | head -80Repository: AbsaOSS/pramen
Length of output: 6668
Breaking API change: case class constructor signature modified.
Adding batchId and appendedRecordCount fields to the MetaTableRunInfo case class is a breaking change for external consumers of this public API. While the internal construction site in MetastoreReaderBase.scala has been properly updated to include all parameters, external code constructing this type will fail.
For a public API release, ensure:
- A changelog entry documents this breaking change
- Migration guidance is provided for users (e.g., "if upgrading from X.Y, update MetaTableRunInfo construction calls to include the two new parameters")
- Consider a major version bump if not already planned
🤖 Prompt for AI Agents
In pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala around
lines 21 to 28, the case class constructor was changed by adding batchId and
appendedRecordCount which is a breaking API change; update the release to
document this and provide migration guidance: add a changelog entry that
describes the new fields and their types, include a short migration note showing
example construction before and after (e.g., indicate default or derived values
callers should supply), and if this is part of a public release coordinate a
major version bump (or clearly state compatibility implications) so downstream
users are informed.
| override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { | ||
| val infoDateStr = DataChunk.dateFormatter.format(infoDate) | ||
| val filter = (col("tableName") === lit(table)) && (col("infoDate") === lit(infoDateStr)) && (col("batchId") =!= lit(batchId)) | ||
|
|
||
| spark.sql(s"DROM FROM $schemasFullTableName WHERE tableName='$table' AND infoDate='$infoDateStr' AND batchId != $batchId").count() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical bugs in deleteNonCurrentBatchRecords.
This method has multiple issues:
- Typo:
DROMshould beDELETE - Wrong table: Uses
schemasFullTableNameinstead ofrecordsFullTableName - Unused filter: The
filterColumn variable (lines 72-73) is constructed but never used - Useless
.count(): DELETE statements don't return a result set to count
🔎 Proposed fix
override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {
val infoDateStr = DataChunk.dateFormatter.format(infoDate)
- val filter = (col("tableName") === lit(table)) && (col("infoDate") === lit(infoDateStr)) && (col("batchId") =!= lit(batchId))
-
- spark.sql(s"DROM FROM $schemasFullTableName WHERE tableName='$table' AND infoDate='$infoDateStr' AND batchId != $batchId").count()
+
+ spark.sql(s"DELETE FROM $recordsFullTableName WHERE tableName='$table' AND infoDate='$infoDateStr' AND batchId != $batchId")
}🤖 Prompt for AI Agents
In
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
around lines 70 to 75, fix deleteNonCurrentBatchRecords by: replace the typo
"DROM" with "DELETE", target recordsFullTableName instead of
schemasFullTableName, remove the unused filter Column (or use it if you prefer
DataFrame API), and stop calling .count() on the SQL execution — just run
spark.sql(...) to execute the DELETE statement (no result aggregation). Ensure
the SQL WHERE clause uses table, formatted infoDateStr and batchId correctly and
keep the method returning Unit.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala
Outdated
Show resolved
Hide resolved
| country: Option[String], | ||
| batchId: Long |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, find the TaskCompletedCsv file and examine its structure
find . -name "TaskCompletedCsv.scala" -type fRepository: AbsaOSS/pramen
Length of output: 146
🏁 Script executed:
# Search for CSV parsing/deserialization logic for TaskCompletedCsv
rg -n "TaskCompletedCsv" --type=scala -A 5 -B 5Repository: AbsaOSS/pramen
Length of output: 5989
🏁 Script executed:
# Check for JournalTask definition to compare batchId types
rg -n "class JournalTask|case class JournalTask" --type=scala -A 10Repository: AbsaOSS/pramen
Length of output: 3242
🏁 Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scalaRepository: AbsaOSS/pramen
Length of output: 2266
🏁 Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala | head -60Repository: AbsaOSS/pramen
Length of output: 2190
🏁 Script executed:
# Check the mapping from TaskCompletedCsv to TaskCompleted in JournalHadoopCsv.scala
sed -n '65,100p' pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scalaRepository: AbsaOSS/pramen
Length of output: 1606
🏁 Script executed:
rg -n "case class TaskCompleted" --type=scala -A 25Repository: AbsaOSS/pramen
Length of output: 10398
🏁 Script executed:
# Check if there's any CSV column mapping or schema handling that might handle optional columns
rg -n "batchId" --type=scala -B 2 -A 2Repository: AbsaOSS/pramen
Length of output: 50371
Make batchId optional in TaskCompletedCsv for backwards compatibility.
The field is declared as Long (required), but:
JournalTask.batchIdisOption[Long]TaskCompleted.batchIdisLong
Since TaskCompletedCsv is deserialized from CSV with Spark's schema enforcement, existing CSV files without a batchId column will fail to deserialize. Change batchId: Long to batchId: Option[Long] in TaskCompletedCsv to support old journal data while maintaining the required Long type in the downstream TaskCompleted model (apply .getOrElse() in the mapping logic if a default is needed).
🤖 Prompt for AI Agents
In
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala
around lines 40 to 41, change the batchId field from a required Long to an
Option[Long] to allow deserialization of legacy CSVs that lack the batchId
column; update the case class signature accordingly and ensure downstream
mapping to TaskCompleted preserves required Long semantics by calling
.getOrElse(...) or providing a suitable default where TaskCompleted expects a
Long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskCompleted is created in runtime so batchId is always defined. JournalTask is stored and loaded from a database, so it is optional in order to keep backwards compatibility with existing data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yruslan, thank you for the clarification! I understand now:
TaskCompletedis created at runtime during job execution, sobatchIdis always present and can beLongJournalTaskis persisted to and loaded from the database, sobatchIdisOption[Long]for backwards compatibility with existing DB recordsTaskCompletedCsvis written to journal.csv files at runtime (fromTaskCompleted), so it will always havebatchIddefined
This makes sense. The only edge case would be if there are existing journal.csv files (written before this change) that need to be read by the new code. If those files don't have the batchId column, Spark's CSV reader would fail during deserialization in getEntries. However, if:
- Journal CSV files are ephemeral/cleaned regularly, or
- This is a new deployment, or
- You have a separate migration path for old journal files
...then the current approach is fine. Since you've considered this distinction between runtime and persisted data, I'll defer to your design decision here.
| private def startEmbeddedMongoDb(): (Option[RunningMongodProcess], Int) = { | ||
| try { | ||
| val version: Version = Version.V8_2_2 | ||
| val mongod = Mongod.builder() | ||
| .processOutput( | ||
| Start.to(classOf[ProcessOutput]).initializedWith( | ||
| ProcessOutput.builder() | ||
| .output(new Slf4jProcessor(log, "[mongod-out] ")) | ||
| .error(new Slf4jProcessor(log, "[mongod-err] ")) | ||
| .commands(new Slf4jProcessor(log, "[mongod-cmd] ")) | ||
| .build() | ||
| ) | ||
| ) | ||
| .build() | ||
|
|
||
| val starter = MongodStarter.getInstance(runtimeConfig) | ||
| val executable: TransitionWalker.ReachedState[RunningMongodProcess] = | ||
| mongod.transitions(version) | ||
| .walker() | ||
| .initState(StateID.of(classOf[RunningMongodProcess])) | ||
|
|
||
| val mongodConfig = new MongodConfigBuilder() | ||
| .version(Version.Main.V4_0) | ||
| .net(new Net("localhost", mongoPort, Network.localhostIsIPv6())) | ||
| .build() | ||
| val addr: ServerAddress = executable.current().getServerAddress | ||
| val mongoPort: Int = addr.getPort | ||
|
|
||
| val executable = try { | ||
| val exec = starter.prepare(mongodConfig) | ||
| exec.start() | ||
| Some(exec) | ||
| (Option(executable.current()), mongoPort) | ||
| } catch { | ||
| case _: Throwable => None | ||
| case ex: Throwable => | ||
| log.warn("Couldn't start embedded Mongodb. MongoDB tests will be skipped", ex) | ||
| (None, 0) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
MongoDB 8.2.2 stable release flapdoodle embedded
💡 Result:
Short answer: MongoDB 8.2.2 is a released stable patch (released Nov 19, 2025). Flapdoodle's embedded-mongo project exists but does not yet advertise built-in support for MongoDB 8.2.x binaries — you should verify whether the Flapdoodle release you use includes an 8.2.x artifact or supply the server binary yourself (or run MongoDB 8.2.2 in Docker) for tests. [1][2][3]
Sources
- MongoDB 8.2.2 release notes (8.2.2 — Nov 19, 2025). [1]
- Flapdoodle embedded-mongo project (repo/home). [2]
- Flapdoodle issues tracking “support latest version” / version compatibility discussion. [3]
🌐 Web query:
flapdoodle-de 4.22.0 MongoDB supported versions
💡 Result:
de.flapdoodle.embed.mongo 4.22.0 includes embedded MongoDB binaries for the following MongoDB server series (as enumerated in the Version.Main javadoc): 4.4, 4.3, 4.2, 4.1, 4.0 (4.0.0→4.0.9 beta entries), 3.5, 3.4, 3.3, 3.2, 3.1, 3.0, 2.2, 2.1, 2.0 and earlier 1.x legacy builds. [1][2]
Sources:
- Version.Main (supported versions listing) — de.flapdoodle.embed.mongo 4.22.0 javadoc. [1]
- de.flapdoodle.embed.mongo 4.22.0 package information (Maven). [2]
MongoDB version 8.2.2 is not supported by flapdoodle 4.22.0.
Flapdoodle-de 4.22.0 only includes embedded MongoDB binaries for versions up to 4.4. Attempting to use Version.V8_2_2 will fail at runtime. Change to a supported MongoDB version (e.g., Version.V4_4_14) or upgrade to a newer flapdoodle release that supports MongoDB 8.2.x.
🤖 Prompt for AI Agents
In
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala
around lines 48 to 76, the code uses Version.V8_2_2 which is unsupported by
flapdoodle 4.22.0 and will fail at runtime; replace the version with a supported
one (for example Version.V4_4_14) or alternatively upgrade the flapdoodle
dependency to a release that includes MongoDB 8.2.x, then run tests to ensure
the embedded MongoDB starts correctly.
...e/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| if (db != null) { | ||
| if (mongoDbExecutable.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent MongoDB availability checks.
The test gating condition uses mongoDbExecutable.nonEmpty (Line 40), but the before block still uses if (db != null) (Line 32). This inconsistency could cause test failures if mongoDbExecutable is available but db initialization fails.
🔎 Proposed fix to align availability checks
before {
- if (db != null) {
+ if (mongoDbExecutable.nonEmpty) {
if (db.doesCollectionExists(collectionName)) {
db.dropCollection(collectionName)
}
journal = new JournalMongoDb(connection)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (mongoDbExecutable.nonEmpty) { | |
| before { | |
| if (mongoDbExecutable.nonEmpty) { | |
| if (db.doesCollectionExists(collectionName)) { | |
| db.dropCollection(collectionName) | |
| } | |
| journal = new JournalMongoDb(connection) | |
| } | |
| } |
🤖 Prompt for AI Agents
In
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala
around lines 32 to 40, the test setup uses `if (db != null)` while the gating
condition uses `if (mongoDbExecutable.nonEmpty)`, causing inconsistent
availability checks; change the `before` block check to use `if
(mongoDbExecutable.nonEmpty)` (or consistently use the same
`mongoDbExecutable.nonEmpty` check wherever availability is tested) so both
setup and gating use the same condition and avoid running tests when the
embedded MongoDB isn't actually available.
…relational storages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
59-76: Type mismatch in batchId comparison will cause incorrect filtering.At line 65,
chunk.batchIdisOption[Long]butidisLong(unwrapped fromSome(id)at line 64). ComparingOption[Long] == Longwill always befalse, so batch filtering will never match any chunks.🔎 Proposed fix
batchId match { case Some(id) => - if (chunk.batchId == id) { + if (chunk.batchId.contains(id)) { Some(chunk) } else { None } case None => Some(chunk) }
🧹 Nitpick comments (2)
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
19-19: Consider removing unusedlitimport.The
litfunction is imported but no longer used afterdeleteNonCurrentBatchRecordswas changed to a no-op implementation.🔎 Proposed fix
-import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.functions.colpramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (1)
197-199: Consider adding a comment explaining why this is a no-op.For text/CSV-based storage, deletion of records from an append-only file isn't straightforward. A brief comment would help future maintainers understand this design decision.
🔎 Suggested documentation
override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { - + // No-op: CSV-based bookkeeping is append-only; batch cleanup not supported }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
pramen/build.sbtpramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/CsvUtils.scalapramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala
✅ Files skipped from review due to trivial changes (1)
- pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala
🚧 Files skipped from review as they are similar to previous changes (3)
- pramen/build.sbt
- pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala
- pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.
Applied to files:
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scalapramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scalapramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
🧬 Code graph analysis (8)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (5)
pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
deleteNonCurrentBatchRecords(71-77)pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
deleteNonCurrentBatchRecords(71-77)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)
pramen(93-115)pramen(141-141)pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
DataChunk(28-37)DataChunk(40-48)pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala (6)
core(156-175)core(177-189)core(191-207)core(209-281)core(283-292)core(294-308)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (2)
getTableRunInfo(68-68)batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (2)
getTableRunInfo(167-167)batchId(185-185)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (7)
infoDate(24-24)batchId(32-32)inputRecordCount(27-27)outputRecordCount(28-28)appendedRecordCount(29-29)jobStarted(30-30)jobFinished(31-31)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala (1)
MetaTableRunInfo(21-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (5)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
pramen(45-67)pramen(118-121)core(141-155)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (2)
pramen(94-113)pramen(184-195)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
BookkeepingRecord(19-28)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (11)
infoDate(24-24)infoDateBegin(25-25)infoDateEnd(26-26)inputRecordCount(27-27)outputRecordCount(28-28)jobStarted(30-30)jobFinished(31-31)batchId(32-32)appendedRecordCount(29-29)pramenTableName(23-23)pramenTableName(33-35)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
DataChunk(28-37)DataChunk(40-48)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala (2)
country(43-43)batchId(44-44)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (1)
batchId(32-32)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (6)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
pramen(45-67)pramen(118-121)core(141-155)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (1)
getBookkeeper(54-56)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala (1)
getBookkeeper(41-43)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala (1)
getBookkeeper(40-42)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala (1)
getBookkeeper(39-41)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
getBookkeeper(44-46)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
BookkeepingRecord(19-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
- GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
- GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (42)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)
23-27: Good documentation of CSV compatibility constraint.The scaladoc clearly explains why field order must be preserved, which is critical for maintaining backward compatibility with existing CSV files. This is a best practice for data serialization formats.
36-37: Option[Long] for batch fields preserves semantic clarity.Using
Option[Long]forbatchIdandappendedRecordCountavoids the ambiguity of sentinel values like 0L and explicitly represents the absence of these fields. This aligns well with the BookkeepingRecord model and maintains clear semantics throughout the data flow.
45-47: LGTM: Factory method correctly preserves Option types.The
fromRecordfactory method properly maps fromBookkeepingRecordtoDataChunk, preserving theOption[Long]types forbatchIdandappendedRecordCountwithout using sentinel values.pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CsvUtils.scala (1)
41-48: Proper Option unwrapping for CSV serialization.The updated logic correctly handles
Option[_]fields by unwrappingSome(x)tox.toStringand renderingNoneas an empty string, preventing "Some(...)" or "None" literals from appearing in CSV output. The separator sanitization is correctly applied to the unwrapped value.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (4)
28-28: LGTM: Test signature updated for batch-aware bookkeeping.The
testBookKeepermethod signature now accepts a factory functionLong => Bookkeeper, enabling tests to create bookkeeper instances with specific batch IDs. This aligns with the batch-tracking functionality introduced in the PR.
146-160: Good test coverage for same-batch behavior.The test correctly verifies that multiple
setRecordCountcalls with the same batch ID result in additive entries (2 chunks), withgetLatestDataChunkreturning the most recent entry based onjobFinishedtimestamp.
162-180: Test verifies different-batch replace semantics.This test confirms that when different batch IDs write to the same table/date (with
recordsAppended = None), the new batch replaces the old entry, resulting in only 1 chunk. Theassumeguard correctly skipsBookkeeperTextwhich doesn't implement this feature.
182-200: Test verifies different-batch non-overwrite semantics.This test validates that when
recordsAppended = Some(1), entries from different batches are additive rather than replacing, resulting in 2 chunks. The conditional skip for Spark 2.x +BookkeeperDeltaTableis appropriate for implementation-specific limitations.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (3)
29-29: LGTM: appendedRecordCount column added.The
appendedRecordCountcolumn is correctly defined asOption[Long]and aligns with theBookkeepingRecordcase class definition shown in the relevant snippets.
32-32: LGTM: batchId column added.The
batchIdcolumn is correctly defined asOption[Long]and aligns with theBookkeepingRecordcase class definition.
33-35: LGTM: Projection mapping updated correctly.The default projection (
*) now includes bothappendedRecordCountandbatchIdin the correct positions within the tuple, ensuring proper mapping to/from theBookkeepingRecordcase class viatupledandunapply.pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala (1)
129-129: The CSV field ordering is correct. ThebatchIdfield is positioned as the final field in both theTaskCompletedCsvcase class declaration and the serialization logic inJournalHadoopCsv.scala, matching the order generated byCsvUtils.getHeaders[TaskCompletedCsv]. No changes needed.pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala (1)
26-39: LGTM!The new
recordsAppendedandbatchIdparameters are properly added with sensible defaults, maintaining backward compatibility. The mapping toDataChunkfields (appendedRecordCountandbatchId) is correct.pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala (1)
73-87: LGTM!The method correctly maps
DataChunkfields toMetaTableRunInfo, including the newbatchIdandappendedRecordCountfields. ThegetOrElse(0L)handles theOption[Long]toLongconversion appropriately.pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (3)
41-59: LGTM!The class properly extends
BookkeeperDeltaBasewithbatchId, initializes Delta tables on construction, and correctly implements data retrieval with filtering and ordering.
61-77: LGTM!The
saveRecordCountDeltaanddeleteNonCurrentBatchRecordsmethods correctly use Delta Lake APIs. The deletion filter properly excludes the current batch while matching table and infoDate.
103-118: LGTM!The initialization logic correctly checks for table existence before creating empty Delta tables, preventing duplicate table creation errors.
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)
27-27: LGTM!Constructor properly accepts
batchIdwith a sensible default value for test purposes.
93-114: LGTM!The
setRecordCountmethod correctly incorporates the newrecordsAppendedparameter and properly constructsDataChunkwithOption(batchId)andrecordsAppended.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (3)
19-19: LGTM!The unused
fullstacksimport has been correctly removed.
54-65: LGTM!The
getBookkeeperfactory method properly acceptsbatchId, and the test correctly handles Spark 2.x vs 3.x version differences by using unique prefixes for Spark 2.x.
67-79: LGTM!The test for table creation and the
getNewTablePrefixhelper are well-implemented. UsingMath.abs(Random.nextInt())ensures valid table name prefixes.pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (2)
40-43: LGTM!Constructor properly accepts
batchIdand extendsBookkeeperDeltaBase(batchId).
70-72: LGTM!The no-op implementation with a clear comment explaining Spark 2.x limitations is appropriate. This addresses the previous review comments about bugs in this method.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (3)
41-41: LGTM!Constructor properly accepts
batchIdand passes it to theBookkeeperHadoopparent class.
80-91: LGTM!The data retrieval methods correctly propagate the
batchIdparameter togetFilter, with appropriate defaults (None) for methods that don't require batch-specific filtering.
94-113: LGTM!The
saveRecordCountToStoragemethod correctly constructsDataChunkwith the newbatchId(wrapped inOption) andrecordsAppendedfields.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (4)
29-29: LGTM: Constructor updated for batch awareness.The constructor correctly accepts and propagates
batchIdto the parent class.
113-117: LGTM: Helper for retrieving all batch chunks.This helper correctly retrieves all matching chunks without deduplication, enabling batch-aware queries. The sorting by
infoDateprovides consistent ordering.
78-90: LGTM: Signature updated for batch and append tracking.The method signature correctly includes
recordsAppended, and theDataChunkconstruction at line 87 properly incorporates bothOption(batchId)andrecordsAppended.
68-72: No issues found.getDataChunksFromStoragecorrectly usesgetBkAllDatato return all matching chunks, which is the intended behavior when querying data across all batches for a given date. This behavior is explicitly tested and properly handled by all consumers.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (4)
24-24: LGTM: Base class constructor updated for batch awareness.The constructor correctly accepts
batchIdand makes it available throughout the class hierarchy.
31-35: LGTM: New abstract methods establish batch-aware contract.These methods define the required API for batch operations:
getDataChunksFromStorageretrieves chunks with optional batch filteringdeleteNonCurrentBatchRecordsenables cleanup of stale batch data
94-104: LGTM: New public API for batch-aware chunk retrieval.This method correctly delegates to transient or storage-backed implementations based on the bookkeeping state, passing the
batchIdparameter through appropriately.
45-67: No changes needed. TheisEmptycheck onrecordsAppendedcorrectly distinguishes between full-write/overwrite operations (None) and append operations (Some(n)). This semantic separation is intentional:Nonetriggers cleanup of non-current batch records (appropriate for overwrites), whileSome(n)does not (appropriate for appends, regardless of whether n is 0).pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (7)
40-40: LGTM: Model version bumped for schema change.The version increment from 2 to 3 correctly reflects the index migration from unique to non-unique, supporting multiple batches per (table, date) combination.
43-43: LGTM: Constructor updated for batch awareness.The constructor correctly accepts and propagates
batchIdto the parent class.
94-99: LGTM: Batch-aware chunk retrieval with helpful logging.The implementation correctly filters by
batchIdand includes logging at line 97, which will aid in debugging batch operations.
115-125: LGTM: Batch cleanup implementation.The filter correctly identifies non-current batch records for the given table and date, using
Filters.ne("batchId", batchId)at line 121.
127-164: LGTM: Filter extended for batch-aware queries.The
getFiltermethod correctly incorporates optionalbatchIdfiltering at lines 160-163, combining the base filter with a batch equality check when needed.
190-195: LGTM: Index migration from unique to non-unique.The migration logic correctly handles existing databases by dropping the unique index and recreating it as non-unique, enabling multiple batches per (table, date). The condition
dbVersion < 3 && dbVersion > 0at line 190 appropriately excludes new installations (dbVersion = 0) that get the non-unique index directly at line 184.
101-113: Duplicate handling withinsertOneneeds clarification.Line 112 uses
insertOneto support multiple batches writing to the same (table, date) combination. WhiledeleteNonCurrentBatchRecordscorrectly cleans up old batches, it does not prevent duplicate records from the same batch ifsaveRecordCountToStorageis called multiple times in a single execution.Clarify whether:
- The method is guaranteed to be called only once per batch execution
- Duplicate records from the same batch are acceptable (they share the same batchId and will be filtered by existing queries)
- A unique index on (tableName, infoDate, batchId) should be added to prevent duplicates
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala
Show resolved
Hide resolved
| country = v.country, | ||
| batchId = v.batchId.getOrElse(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
cd /root/repo 2>/dev/null || true
# First, find the TaskCompleted case class definition
echo "=== Searching for TaskCompleted case class definition ==="
rg -nP --type=scala 'case class TaskCompleted' -A 10
echo -e "\n=== Searching for TaskCompletedCsv case class definition ==="
rg -nP --type=scala 'case class TaskCompletedCsv' -A 10
echo -e "\n=== Searching for DataChunk case class definition ==="
rg -nP --type=scala 'case class DataChunk' -A 10
echo -e "\n=== Searching for batchId getOrElse patterns ==="
rg -nP --type=scala 'batchId.*getOrElse' -C 2
echo -e "\n=== Checking the specific file context ==="
sed -n '85,100p' pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scalaRepository: AbsaOSS/pramen
Length of output: 12249
🏁 Script executed:
#!/bin/bash
# Get the complete TaskCompleted definition
echo "=== Full TaskCompleted.scala ==="
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala
echo -e "\n=== Full TaskCompletedCsv.scala ==="
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scalaRepository: AbsaOSS/pramen
Length of output: 6632
Reconsider batchId design consistency: TaskCompleted should likely use Option[Long] like DataChunk.
The code converts TaskCompletedCsv.batchId from Option[Long] to TaskCompleted.batchId as Long using getOrElse(0L). This creates semantic ambiguity: 0L cannot be distinguished from a missing batch ID. More critically, this design is inconsistent with DataChunk, which preserves batchId as Option[Long]. The same problematic conversion pattern appears in JournalJdbc.scala:102 and MetastoreReaderBase.scala:79, suggesting a systematic design issue rather than an isolated case.
Consider changing TaskCompleted.batchId to Option[Long] to align with DataChunk and preserve the source semantics from TaskCompletedCsv, eliminating the sentinel value workaround.
🤖 Prompt for AI Agents
In
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala
around lines 91-92, TaskCompletedCsv.batchId is being converted with
getOrElse(0L) into a Long which loses the distinction between "missing" and a
real 0 value; to fix this, change TaskCompleted.batchId from Long to
Option[Long] so it preserves the Option from TaskCompletedCsv, update all
constructors/fields that instantiate TaskCompleted accordingly, and then
propagate the type change to the other files noted (JournalJdbc.scala around
line 102 and MetastoreReaderBase.scala around line 79) by replacing similar
getOrElse(0L) usages with direct Option passing and adjusting any downstream
code that expects a Long (handle None where appropriate or map to default at the
site that truly needs a concrete Long).
...e/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
149-154: Correct fix for batchId filtering.The filter now properly uses
chunk.batchId.contains(id)to compareOption[Long]withLong. This addresses the issue flagged in a previous review wherechunk.batchId == idwould always returnfalsedue to type mismatch.
🧹 Nitpick comments (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)
27-27: Consider parameterizing batchId per chunk for more flexible testing.The fixed
batchIdconstructor parameter means all chunks created by this mock will share the same batch ID. For simple test scenarios this is fine, but if tests need to verify behavior across multiple batches, they would need to instantiate multiple mocks.🔎 Alternative: Accept batchId in setRecordCount
-class SyncBookkeeperMock(batchId: Long = 123L) extends Bookkeeper { +class SyncBookkeeperMock(defaultBatchId: Long = 123L) extends Bookkeeper {Then in
setRecordCount, you could add an optionalbatchIdparameter:private[pramen] override def setRecordCount(table: String, infoDate: LocalDate, inputRecordCount: Long, outputRecordCount: Long, recordsAppended: Option[Long], jobStarted: Long, jobFinished: Long, - isTableTransient: Boolean): Unit = { + isTableTransient: Boolean, + batchIdOverride: Option[Long] = None): Unit = { + val effectiveBatchId = batchIdOverride.getOrElse(defaultBatchId)
59-76: Correct implementation of batch-aware chunk filtering.The logic correctly:
- Filters by exact
infoDatematch- Applies optional
batchIdfiltering usingchunk.batchId.contains(id)- Sorts results by
jobFinishedThe match expression at lines 63-71 could be slightly simplified, but the current implementation is clear and correct for a test mock.
🔎 Optional: More concise filtering
- batchId match { - case Some(id) => - if (chunk.batchId.contains(id)) { - Some(chunk) - } else { - None - } - case None => Some(chunk) - } + if (batchId.forall(chunk.batchId.contains)) Some(chunk) else Nonepramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (1)
44-50: Remove unnecessary.count()calls on DELETE statements.
spark.sql(s"DELETE FROM ...")executes the deletion but doesn't return a result set to count. The.count()calls are unnecessary and could be misleading. Simply execute the SQL without the count.🔎 Proposed fix
if (spark.catalog.tableExists(fullRecordsTableName)) { - spark.sql(s"DELETE FROM $fullRecordsTableName WHERE true").count() + spark.sql(s"DELETE FROM $fullRecordsTableName WHERE true") } if (spark.catalog.tableExists(fullSchemasTableName)) { - spark.sql(s"DELETE FROM $fullSchemasTableName WHERE true").count() + spark.sql(s"DELETE FROM $fullSchemasTableName WHERE true") }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scalapramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.
Applied to files:
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scalapramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
🧬 Code graph analysis (6)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (2)
getFilter(134-171)pramen(222-229)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (5)
getDataChunksFromStorage(68-72)getLatestDataChunkFromStorage(62-66)getDataChunksCountFromStorage(74-76)pramen(78-90)pramen(107-111)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (2)
pramen(45-67)pramen(118-121)pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (2)
DataChunk(28-37)DataChunk(40-48)
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (2)
batchId(32-32)infoDate(24-24)pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (2)
saveRecordCountDelta(61-69)deleteNonCurrentBatchRecords(71-77)pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (2)
saveRecordCountDelta(61-69)deleteNonCurrentBatchRecords(71-77)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala (1)
deleteNonCurrentBatchRecords(52-52)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (5)
getDataChunksFromStorage(68-72)pramen(78-90)pramen(107-111)core(113-117)core(119-128)pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (1)
deleteNonCurrentBatchRecords(71-77)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (3)
pramen(93-115)pramen(141-141)getDataChunks(59-76)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala (3)
DataChunk(28-37)DataChunk(40-48)fromRecord(45-47)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala (1)
BookkeepingRecord(19-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
- GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
- GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
- GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (31)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)
55-57: LGTM! Clean delegation to the updated getDataChunks method.The simplified implementation correctly retrieves the latest chunk by delegating to
getDataChunkswithbatchId = Noneand taking the last element after the sort byjobFinished.
93-112: LGTM! Correctly updated signature and DataChunk construction.The changes properly integrate the new fields:
recordsAppended: Option[Long]parameter added to the signatureDataChunkconstruction includes bothbatchId(line 111) andrecordsAppended(line 112)Option(batchId)correctly wraps the constructor parameter asSome[Long]pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala (3)
40-43: LGTM! Constructor properly threads batchId to base class.The constructor signature correctly accepts
batchId: Longand forwards it toBookkeeperDeltaBase(batchId), aligning with the batch-aware bookkeeping design across the PR.
70-72: Appropriate no-op for Spark 2.x limitations.The comment clarifies that Delta table DELETE operations aren't supported in Spark 2.x. This is consistent with the 2.12/2.13 versions that implement the actual deletion using
DeltaTable.forName(...).delete(filter). The previous review comment flagged a different (buggy) implementation that has since been replaced with this intentional no-op.
24-24: Verify usage ofInstantimport.The import
java.time.Instantappears to be used insaveSchemaDeltaat line 80 (Instant.now().toEpochMilli), so this import is valid.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala (4)
29-29: LGTM! Base class correctly propagates batchId.The abstract class signature properly accepts
batchId: Longand forwards it to the parentBookkeeperHadoop(batchId).
68-72: New batch-aware data retrieval method looks correct.
getDataChunksFromStoragecorrectly builds a filter including the optionalbatchIdand usesgetBkAllDatato return all matching chunks (not just the latest per date). This aligns with the intent to retrieve all records for a specific batch.
78-90: DataChunk construction correctly includes batch tracking fields.The
saveRecordCountToStoragemethod properly:
- Accepts the new
recordsAppended: Option[Long]parameter- Constructs
DataChunkwithOption(batchId)andrecordsAppendedThis aligns with the updated
DataChunkmodel that now carriesbatchIdandappendedRecordCount.
113-117: Helper methodgetBkAllDataprovides unsorted raw collection.Note that
getBkAllDatacollects all matching records and sorts byjobFinished, which is appropriate for returning all chunks in chronological order. This differs fromgetBkDatawhich groups by(tableName, infoDate)and returns only the latest per group.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (3)
24-24: LGTM! Base class now carries batchId.The abstract class signature correctly includes
batchId: Longfor batch-aware bookkeeping operations.
61-66: Verify the intent of conditional batch record deletion.The logic calls
deleteNonCurrentBatchRecordsonly whenrecordsAppended.isEmpty. This means:
- When appending records (
recordsAppendedisSome(count)), old batch records are preserved- When not appending (full replacement), old batch records are deleted
This appears intentional for the append vs. replace semantics, but please confirm this matches the expected behavior for batch bookkeeping.
94-104: New batch-aware retrieval method correctly routes to appropriate storage.
getDataChunksproperly checks whether the table is transient and routes to eithergetTransientDataChunksorgetDataChunksFromStorage, passing thebatchIdparameter through both paths.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (2)
19-19: Good fix: removed invalidfullstacksimport.The import now correctly includes only
BeforeAndAfterandBeforeAndAfterAll, addressing the previous review comment about the unused/invalidfullstackssymbol.
54-65: LGTM! Test setup correctly uses batch-aware API.The
getBookkeepermethod now acceptsbatchIdand thetestBookKeeperblock properly uses dynamic prefixes based on Spark version, with fresh prefixes for Spark 2.x (which doesn't support DELETE) and reused prefixes for newer versions (with cleanup in thebeforeblock).pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala (1)
97-107: LGTM! Clean utility for elapsed-time monitoring.The
runActionWithElapsedTimeEventmethod provides a reusable pattern for:
- Measuring execution time of an action
- Triggering a callback when execution exceeds a threshold
This is used appropriately in
BookkeeperJdbc.deleteNonCurrentBatchRecordsto log warnings for slow database operations.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala (4)
41-41: LGTM! Constructor properly threads batchId.The class signature correctly accepts
batchId: Longand forwards it toBookkeeperHadoop(batchId).
80-84: New batch-aware retrieval correctly filters and returns all chunks.
getDataChunksFromStorageproperly builds a filter usinggetFilterwith the optionalbatchIdand returns all matching chunks viagetData, which sorts byjobFinished.
106-106: DataChunk construction correctly includes batch tracking fields.The
DataChunkis constructed withOption(batchId)andrecordsAppended, aligning with the updated model.
197-200: Appropriate no-op for CSV-based storage.The comment clearly explains why deletion isn't supported for CSV-based bookkeeping. This is consistent with the design decision that text/CSV storage doesn't support in-place modifications.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (4)
33-33: LGTM! Constructor properly threads batchId to base class.The class correctly extends
BookkeeperBase(true, batchId), enabling batch-aware bookkeeping for JDBC storage.
71-82: New batch-aware retrieval method looks correct.
getDataChunksFromStorageproperly builds a query usinggetFilterwith the optionalbatchId, maps results throughDataChunk.fromRecord, and sorts byjobFinished.
135-153: Good implementation of batch record deletion with timing.The
deleteNonCurrentBatchRecordsmethod:
- Correctly filters for records matching the table/date but with different batchId
- Uses
AlgorithmUtils.runActionWithElapsedTimeEventto warn on slow deletions- Logs the SQL statement when threshold is exceeded for debugging
This aligns well with the new utility added in
AlgorithmUtils.scala.
115-133: Record construction correctly includes batch tracking fields.
BookkeepingRecordis constructed withrecordsAppendedandOption(batchId), matching the updated schema definition inBookkeepingRecords.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala (8)
37-42: LGTM!The version bump to 3 is appropriate given the schema changes (moving to non-unique indexes to support multiple batch records per table/date).
44-50: LGTM!Constructor changes properly thread
batchIdthrough to the base class. The 10-second query warning timeout is a reasonable default for alerting on slow MongoDB operations.
82-94: LGTM!Passing
NoneforbatchIdcorrectly retrieves data across all batches, which is the expected behavior for getting the latest chunk or counting total chunks.
96-101: LGTM!The batch-aware retrieval with optional filtering is well-implemented. Debug logging will aid troubleshooting.
103-115: LGTM!The change from upsert to
insertOneis intentional and aligns with the batch tracking design - allowing multiple records per table/infoDate combination. TheDataChunkis correctly constructed with the newbatchIdandrecordsAppendedfields.
117-132: LGTM!The deletion logic correctly filters by table, infoDate, and excludes the current batchId. The elapsed time monitoring with warning logging is a good observability practice for detecting slow queries.
134-171: LGTM!The filter composition correctly handles the optional
batchIdparameter, appending the batch filter only when provided.
197-202: The migration logic has a silent failure vulnerability due to inner exception handling.While the migration logic correctly drops and recreates the index for databases with
dbVersion < 3 && dbVersion > 0, bothdropIndexandcreateIndexhave internal try-catch blocks that swallow exceptions and only log warnings. IfdropIndexfails (e.g., index doesn't exist), it logs a warning and continues, thencreateIndexis called regardless. This can leave the database in an inconsistent state without surfacing any error to the outer try-catch block ininitCollection. Consider making at least one of these operations fail-fast so migration failures are explicitly surfaced rather than silently logged.Likely an incorrect or invalid review comment.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Fix all issues with AI Agents 🤖
In
@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala:
- Around line 135-153: The Slick filter compares a Rep[Option[Long]] column
incorrectly using r.batchId =!= batchId; update the comparison in
deleteNonCurrentBatchRecords so the right-hand side is an Option by using
Option(batchId) (or Some(batchId)) when comparing r.batchId, ensuring the filter
compiles and behaves correctly for optional batchId values; keep the rest of the
query and error handling intact.
🧹 Nitpick comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (1)
256-286: Clarify expected behavior for concurrent bookkeeper instances.The test validates that multiple bookkeeper instances share state, but the assertion at line 281 accepts two different outcomes (
inputRecordCount == 201 || inputRecordCount == 102), with a comment stating "Both are correct, depending on the notion of 'latest' used." This ambiguity suggests the "latest" semantics may be implementation-dependent.Consider documenting which notion of "latest" each implementation uses (e.g., latest by jobFinished timestamp vs. latest by insertion order) to make the behavior more predictable.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.
Applied to files:
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
🧬 Code graph analysis (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (1)
getBookkeeper(54-56)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala (1)
getBookkeeper(44-46)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala (6)
batchId(32-32)infoDate(24-24)inputRecordCount(27-27)outputRecordCount(28-28)jobStarted(30-30)jobFinished(31-31)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala (1)
batchId(76-76)pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala (1)
batchId(185-185)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala (1)
getDataChunks(94-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
- GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (17)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (5)
27-28: LGTM!The new imports support the performance tracking and logging functionality added in
deleteNonCurrentBatchRecords.
33-33: LGTM!The class constructor correctly accepts and passes
batchIdto the superclass.
71-82: LGTM!The new method correctly implements batchId-aware data chunk retrieval with proper error handling and sorting.
115-124: LGTM!The method signature correctly extends to include
recordsAppended, and theBookkeepingRecordconstruction properly includes both new fields.
187-190: LGTM! Past issue has been fixed.The Slick Option column comparison has been corrected. Line 188 now properly wraps
idinOption(id)when comparing with theRep[Option[Long]]column, resolving the issue flagged in the previous review.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (7)
19-19: LGTM! Imports and mixins are correct.The addition of
BeforeAndAfterandBeforeAndAfterAlltraits properly enables the test setup and cleanup functionality. The previous issue with the invalidfullstacksimport has been resolved.Also applies to: 26-26
27-28: LGTM! Shared prefix field is well-designed.The
bookkeepingTablePrefixfield is appropriately initialized once per test suite and used in conjunction with thebeforeblock cleanup for Spark 3+ tests, enabling efficient table reuse.
39-52: LGTM! Cleanup logic is correct.The
beforeblock properly clears both tables for Spark 3+ tests. The previous copy-paste bug where line 49 was deleting from the wrong table has been resolved—each DELETE statement now correctly targets its intended table.
54-56: LGTM! Method signature updated for batch tracking.The addition of the
batchIdparameter correctly aligns with the PR objective to introduce batch tracking across the Bookkeeper API.
59-65: LGTM! Version-specific test strategies are well-implemented.The conditional logic appropriately handles different Spark versions: Spark 2.x uses unique prefixes per test (avoiding the need for cleanup), while Spark 3+ reuses a shared prefix with the
beforeblock cleanup. This design ensures proper test isolation for both environments.
67-73: LGTM! Table creation test is correct.The test properly verifies that both bookkeeping tables are created with the expected names when a
BookkeeperDeltaTableinstance is instantiated.
76-79: LGTM! Helper method generates unique test prefixes.The
getNewTablePrefixmethod provides a simple and effective way to generate unique table prefixes for test isolation.pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala (2)
85-104: LGTM! Parameter order matches the updated interface.The updated signature correctly includes
recordsAppended: Option[Long]and properly constructs the DataChunk withOption(batchId)to wrap the Long batchId parameter into an Option.
59-68: The batchId filtering logic correctly handles both filtered and unfiltered queries.The pattern
batchId.forall(chunk.batchId.contains)is correctly implemented across all Bookkeeper implementations. All implementations (Text, Hadoop, JDBC, MongoDB, Delta) consistently populate the batchId field asOption(batchId)from the constructor parameter, and filtering logic properly matches this with case patterns that handle bothSome(id)andNonecases.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala (3)
147-161: Confirm same-batch multiple-entry behavior is intended.This test verifies that calling
setRecordCounttwice with the same batch creates 2 separate chunks. While this appears correct based on thedeleteNonCurrentBatchRecordslogic (which only deletes other batches), ensure this is the intended semantic for production use cases.
163-181: Document the feature gap for BookkeeperText.The test skips
BookkeeperTextwith a comment stating "The feature is not implemented in these implementations." This indicates that cross-batch replacement behavior is not supported by BookkeeperText.Is there a tracking issue for implementing batch replacement in BookkeeperText? If this implementation is used in production scenarios requiring multiple batches, the missing feature could lead to unexpected behavior.
183-205: LGTM! Test correctly validates non-overwrite behavior.The test properly verifies that when
recordsAppendedis set toSome(1), records from different batches are preserved (2 chunks expected) rather than being replaced. The Spark version check and Delta Table skip are appropriate given the known limitations.
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.