Skip to content

ECOPROJECT-4665 | fix: stream RVTools uploads directly to PostgreSQL via Large Objects#1189

Open
Jos-Jerus wants to merge 1 commit into
kubev2v:mainfrom
Jos-Jerus:stream-rvtools-upload
Open

ECOPROJECT-4665 | fix: stream RVTools uploads directly to PostgreSQL via Large Objects#1189
Jos-Jerus wants to merge 1 commit into
kubev2v:mainfrom
Jos-Jerus:stream-rvtools-upload

Conversation

@Jos-Jerus
Copy link
Copy Markdown
Collaborator

@Jos-Jerus Jos-Jerus commented May 20, 2026

Summary

  • Replace in-memory file buffering with PostgreSQL Large Objects for the RVTools upload path
  • Multipart body streams directly from HTTP request into a Large Object — no temp file, no []byte allocation for the full file
  • Both write (upload) and read (worker) paths now stream in ~32 KB chunks
  • Migration: rvtools_files.data BYTEArvtools_files.file_oid OID

Problem

The upload handler buffered the entire multipart file into a bytes.Buffer (~100 MB peak for a 50 MB file), then the service passed the full []byte to a PostgreSQL INSERT. The read path (WriteToFile) also loaded the entire bytea into []byte. For large files near the 50 MB limit, this caused significant heap pressure and GC overhead.

Benchmark (store-level, 50 MB file)

Read path (WriteToFile) — the primary win

Metric Before (bytea) After (LO) Change
Memory allocated 100 MB 2.7 MB -97%
Heap delta 50 MB 168 KB -99.7%

Write path (CreateFromReader)

The store-level benchmark shows similar allocation because both before/after use bytes.NewReader(data) as the source. The real-world improvement is at the handler level: the multipart body streams directly to the Large Object without ever materializing the full file in Go heap.

Changes

  • Migration: rvtools_files.data BYTEArvtools_files.file_oid OID
  • Store: Create(ctx, id, data []byte)CreateFromReader(ctx, tx, id, io.Reader) using pgx Large Objects API. WriteToFile and Delete rewritten for LO. CreateTmpFile unchanged (delegates to WriteToFile)
  • Service: CreateRVToolsJob(ctx, args, fileContent []byte)CreateRVToolsJob(ctx, args, fileReader io.Reader). Manual transaction management (needed for LO streaming). File store injected as dependency
  • Handler: Removed bytes.Buffer file buffering. XLSX magic bytes validated from first 4 bytes of the stream. io.MultiReader reconstructs the full stream. Multipart part kept open until service finishes reading
  • Wiring: NewJobService now takes RVToolsFile store as 4th parameter

Test plan

  • go vet ./... passes
  • Handler tests pass (228/228)
  • Store tests pass
  • Migration tests pass
  • Memory benchmark shows 97% reduction in read path allocations
  • E2E upload test with real RVTools file
  • Verify worker processes uploaded file correctly end-to-end

Trade-offs

  • Read latency: LO read is slower than bytea scan (135ms → 896ms for 50 MB) due to chunked protocol. Acceptable since the worker processes files asynchronously
  • Connection hold: Transaction stays open during the entire upload stream. With max 10 connections in the pool, this limits concurrent upload throughput. Not a regression since the handler was already occupied for the upload duration

Summary by CodeRabbit

  • New Features

    • Added XLSX file format validation on file uploads to ensure data integrity and prevent invalid files from processing.
  • Bug Fixes

    • Optimized file upload mechanism with streaming capabilities to significantly reduce memory consumption and improve performance when handling larger files.
  • Chores

    • Modernized file storage infrastructure to enhance overall performance and system scalability.

Review Change Stack

@Jos-Jerus Jos-Jerus requested a review from a team as a code owner May 20, 2026 12:26
@Jos-Jerus Jos-Jerus requested review from AvielSegev and ronenav and removed request for a team May 20, 2026 12:26
@openshift-ci
Copy link
Copy Markdown

openshift-ci Bot commented May 20, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign jos-jerus for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 20, 2026

📝 Walkthrough

Walkthrough

This PR refactors RVTools file storage to use PostgreSQL Large Objects with streaming I/O instead of inline BYTEA columns. The handler now streams multipart uploads with header validation, the service uses explicit transactions to atomically persist files and create jobs, and the database schema is migrated accordingly.

Changes

RVTools File Storage Migration to PostgreSQL Large Objects

Layer / File(s) Summary
Database Schema Migration and File Store Interface
pkg/migrations/sql/20260520120000_rvtools_files_to_large_objects.sql, internal/store/rvtools_file.go
SQL migration replaces the data BYTEA column with a file_oid OID column and includes reversible backfill. The RVToolsFile interface removes the byte-slice Create method and adds CreateFromReader that operates within a caller-supplied pgx.Tx.
Large Object Create, Read, and Delete Operations
internal/store/rvtools_file.go
CreateFromReader streams data into a PostgreSQL Large Object within the provided transaction and records the OID. WriteToFile retrieves the OID and streams LO content into an io.Writer. Delete unlinks the LO and deletes the metadata row.
Service Dependency Injection and Transactional Job Creation
internal/service/job.go
JobService adds an rvtoolsFiles field and NewJobService initializes it. CreateRVToolsJob changes signature to accept io.Reader instead of []byte, explicitly begins a transaction, stores the file via CreateFromReader, inserts the River job within the same transaction, and commits atomically.
Streaming Multipart Upload Handler with Magic Byte Validation
internal/handlers/v1alpha1/job.go, internal/handlers/v1alpha1/job_test.go
The CreateRVToolsAssessment handler parses multipart parts, reads only the first 4 bytes of the uploaded file to validate XLSX/ZIP magic bytes, reconstructs a full reader using io.MultiReader, and passes the streaming reader to the service. Test helpers prepend XLSX magic bytes to test payloads.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • kubev2v/migration-planner#1158: Both PRs modify the RVTools upload→CreateRVToolsJobrvtools_files persistence flow, with the earlier PR switching from embedding bytes in job args to storing them in rvtools_files (bytea), while this PR further refactors that path to stream into Large Object storage.

Suggested labels

lgtm, approved

Poem

🐰 Hops through the code with glee,
Large Objects flow so free,
Streaming bytes where they belong,
Transactions keep us strong!
From BYTEA's chains, we're finally free! 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: streaming RVTools uploads to PostgreSQL via Large Objects instead of in-memory buffering, which is the primary objective of the PR.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/handlers/v1alpha1/job_test.go (1)

93-106: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Consider extracting magic bytes to a shared constant.

The XLSX/ZIP magic bytes (0x50 0x4B 0x03 0x04) are correctly prepended to satisfy the handler's new validation logic. The updated comments clearly explain this behavior.

For maintainability, consider extracting these magic bytes to a package-level constant or test helper constant:

♻️ Optional refactor to use a named constant
+// xlsxMagicBytes represents the ZIP/XLSX file signature (PK\x03\x04)
+var xlsxMagicBytes = []byte{0x50, 0x4B, 0x03, 0x04}
+
 // Helper function to create a multipart reader for testing.
 // File content is prefixed with XLSX/ZIP magic bytes for validation.
 createMultipartReader := func(name string, fileContent string) *multipart.Reader {
 	var b bytes.Buffer
 	w := multipart.NewWriter(&b)

 	// Add name field
 	namePart, _ := w.CreateFormField("name")
 	_, _ = io.WriteString(namePart, name)

 	// Add file field with XLSX magic bytes prefix
 	filePart, _ := w.CreateFormFile("file", "test.xlsx")
-	_, _ = filePart.Write([]byte{0x50, 0x4B, 0x03, 0x04})
+	_, _ = filePart.Write(xlsxMagicBytes)
 	_, _ = io.WriteString(filePart, fileContent)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/handlers/v1alpha1/job_test.go` around lines 93 - 106, Extract the
XLSX/ZIP magic bytes (0x50,0x4B,0x03,0x04) into a named test-level constant
(e.g., xlsxMagicBytes or zipMagicBytes) and replace the inline byte slice in the
createMultipartReader helper with that constant; update any other tests that
prepend these bytes to reference the constant so the magic value is centralized
for maintainability and clarity.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/handlers/v1alpha1/job_test.go`:
- Line 66: The test passes nil for the rvtoolsFiles dependency into
handlers.NewServiceHandler which causes a nil pointer panic when
CreateRVToolsJob calls s.rvtoolsFiles.CreateFromReader; fix by either
(preferred) providing a test double: create and pass a mock/fake implementing
the RVToolsFile interface into the NewServiceHandler call in job_test.go so the
success path can be exercised, or (alternative) make CreateRVToolsJob
defensively check s.rvtoolsFiles for nil before calling CreateFromReader and
return a clear error; reference NewServiceHandler, CreateRVToolsJob, and
s.rvtoolsFiles.CreateFromReader when implementing the change.

In `@internal/handlers/v1alpha1/job.go`:
- Around line 38-79: The multipart loop in the handler reads parts with
request.Body.NextPart() and currently keeps iterating after capturing the "file"
part, which causes mime/multipart to discard the rest of the file stream; modify
the switch so that after you set fileReader and filePart in the "file" case you
immediately break out of the for loop (so the remaining bytes are preserved for
CreateRVToolsJob), and replace the EOF string comparison in the NextPart() error
handling with an idiomatic check using io.EOF (err == io.EOF). Ensure the
existing defer that closes filePart remains in place.

In `@internal/service/job.go`:
- Around line 55-75: Wrap the Begin-to-Commit sequence (from s.pool.Begin
through tx.Commit) with a Prometheus histogram timer (e.g.,
upload_tx_duration_seconds) to record transaction/upload duration, and also
measure pool acquisition time by timing the call that waits for a connection (or
instrument s.pool.Begin) into a separate histogram (e.g.,
db_pool_acquire_seconds) and increment a counter (e.g.,
db_pool_acquire_failures_total) on Begin errors; additionally record a
histogram/summary for CreateFromReader duration if file upload is significant.
Use the existing tracer/error handling flows (s.pool.Begin,
s.rvtoolsFiles.CreateFromReader, s.riverClient.InsertTx, tx.Commit) to observe
and label metrics (status=success/failure, maybe tenant/job type) and ensure
metrics are recorded in defer/err branches so failures and commits are both
captured.

In `@internal/store/rvtools_file_bench_test.go`:
- Line 86: In the benchmark loop replace the ignored call to store.Delete(ctx,
id) with error handling: capture the returned error from store.Delete(ctx, id)
and, if non-nil, log it instead of discarding it (use the benchmark logger like
b.Logf or b.Errorf so the failure is recorded but does not abort the whole
benchmark); reference the existing identifiers store.Delete, ctx, id and the
benchmark variable (b) when adding the err check and log.
- Line 181: The Delete call in the benchmark loop ignores errors; update the
benchmark to check the error returned by store.Delete(ctx, id) and handle it
(e.g., call b.Fatalf or b.Errorf with context and the id on failure) so deletion
failures are surfaced and won't silently leave orphaned data; ensure you
reference the existing store.Delete(ctx, id) call and use the benchmark's b (or
testing.T) and ctx/id variables when logging the error.
- Around line 127-134: The benchmark is measuring GC/timing because
resetMemStats() is called inside the timed loop; to fix, stop the timer
(b.StopTimer()) before calling resetMemStats() and reading runtime.ReadMemStats,
then call b.StartTimer() immediately before invoking store.WriteToFile(ctx, id,
io.Discard) so memory measurement (resetMemStats() and runtime.ReadMemStats) is
excluded from timing; keep the same metric reporting (ReportMetric of TotalAlloc
and HeapInuse deltas) but ensure both resetMemStats() and the
runtime.ReadMemStats(&after) run outside the timed section around
store.WriteToFile.
- Around line 161-186: The benchmark currently calls resetMemStats() (and later
runtime.ReadMemStats/ b.ReportMetric using before/after) while the benchmark
timer is running, skewing timing results; to fix, move memory-measurement out of
the timed section by calling resetMemStats() before starting the timed work and
surround the post-work memory sampling and b.ReportMetric calls with
b.StopTimer() ... b.StartTimer() (or simply stop before reading
runtime.ReadMemStats(&after) and reporting) so that the code paths using
resetMemStats(), runtime.ReadMemStats, and
b.ReportMetric("bytes-alloc"/"bytes-heap-delta") don't contribute to benchmark
timing while keeping the existing
id/tx/store.CreateFromReader/store.WriteToFile/store.Delete sequence intact.
- Line 18: The DSN string assigned to the variable dsn contains hardcoded
credentials; replace it by building the DSN from environment variables (e.g.,
DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD, DB_SSLMODE) with sensible
defaults for local development and fallback values if env vars are absent;
import os and use os.Getenv (or a small helper) to read each variable and
assemble the DSN rather than embedding "password=adminpass" directly so
tests/benchmarks no longer contain hardcoded secrets.
- Around line 66-84: The benchmark currently calls resetMemStats() and
runtime.ReadMemStats() inside the timed section which makes GC and memory reads
part of the measured time; update the benchmark around the memory measurement
(around resetMemStats(), runtime.ReadMemStats(&after) and b.ReportMetric(...)
calls) to call b.StopTimer() before resetting/reading memory and b.StartTimer()
after reporting metrics so GC and memstat collection are excluded from the
measured operation time.

In `@internal/store/rvtools_file.go`:
- Around line 66-76: The file_oid lookup is done before starting the
transaction, creating a TOCTOU window; move the SELECT into the transaction so
the metadata and subsequent LO operations are serialized. Change the sequence in
the relevant methods in internal/store/rvtools_file.go to Begin the transaction
first (s.pool.Begin(ctx)), then run the metadata query via the transaction
(tx.QueryRow(...).Scan(&oid)) before calling lo.Open/lo.Unlink, keep the
existing defer tx.Rollback(ctx) and commit after LO ops; also mirror this
pattern used by CreateFromReader to ensure all LO operations occur under the
same transaction.

In `@pkg/migrations/sql/20260520120000_rvtools_files_to_large_objects.sql`:
- Around line 2-7: The migration must be made safe and reversible: do not drop
data or add NOT NULL columns upfront. Instead, in the upgrade use ALTER TABLE
rvtools_files ADD COLUMN file_oid OID NULL; backfill existing rows with UPDATE
rvtools_files SET file_oid = lo_from_bytea(0, data) WHERE data IS NOT NULL; then
ALTER TABLE rvtools_files ALTER COLUMN file_oid SET NOT NULL; and only then DROP
COLUMN data. For the Down path reverse that sequence: ADD COLUMN data BYTEA
NULL; UPDATE rvtools_files SET data = lo_get(file_oid) WHERE file_oid IS NOT
NULL; ALTER TABLE rvtools_files ALTER COLUMN data SET NOT NULL; then DROP COLUMN
file_oid. Ensure you reference the rvtools_files table and functions
lo_from_bytea() and lo_get() exactly as shown and run these steps inside a
transaction or with safe batching for large tables.

---

Outside diff comments:
In `@internal/handlers/v1alpha1/job_test.go`:
- Around line 93-106: Extract the XLSX/ZIP magic bytes (0x50,0x4B,0x03,0x04)
into a named test-level constant (e.g., xlsxMagicBytes or zipMagicBytes) and
replace the inline byte slice in the createMultipartReader helper with that
constant; update any other tests that prepend these bytes to reference the
constant so the magic value is centralized for maintainability and clarity.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 4878a526-1c77-4a31-b4d6-9d3de93d6ca5

📥 Commits

Reviewing files that changed from the base of the PR and between 391ade4 and f9dca4a.

📒 Files selected for processing (7)
  • internal/api_server/server.go
  • internal/handlers/v1alpha1/job.go
  • internal/handlers/v1alpha1/job_test.go
  • internal/service/job.go
  • internal/store/rvtools_file.go
  • internal/store/rvtools_file_bench_test.go
  • pkg/migrations/sql/20260520120000_rvtools_files_to_large_objects.sql

Comment thread internal/handlers/v1alpha1/job_test.go Outdated
Comment thread internal/handlers/v1alpha1/job.go
Comment thread internal/service/job.go
Comment on lines +55 to +75
tx, err := s.pool.Begin(ctx)
if err != nil {
tracer.Error(err).Log()
return nil, fmt.Errorf("beginning transaction: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()

var err error
insertedJob, err = s.riverClient.InsertTx(ctx, tx, args, nil)
if err != nil {
return fmt.Errorf("inserting job: %w", err)
}
return nil
})
if err := s.rvtoolsFiles.CreateFromReader(ctx, tx, fileID, fileReader); err != nil {
tracer.Error(err).Log()
return nil, fmt.Errorf("storing rvtools file: %w", err)
}

insertedJob, err := s.riverClient.InsertTx(ctx, tx, args, nil)
if err != nil {
tracer.Error(err).Log()
return nil, err
return nil, fmt.Errorf("inserting job: %w", err)
}

if err := tx.Commit(ctx); err != nil {
tracer.Error(err).Log()
return nil, fmt.Errorf("committing transaction: %w", err)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add metrics around upload transaction duration and pool pressure.

This path now holds a DB transaction open for the full upload, so slow clients directly translate into longer-held pool connections. A histogram for transaction/upload duration plus pool wait/acquire failures would make saturation visible before uploads start timing out.

As per coding guidelines, “Instrument code with structured logging, metrics (e.g., Prometheus), and tracing.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/job.go` around lines 55 - 75, Wrap the Begin-to-Commit
sequence (from s.pool.Begin through tx.Commit) with a Prometheus histogram timer
(e.g., upload_tx_duration_seconds) to record transaction/upload duration, and
also measure pool acquisition time by timing the call that waits for a
connection (or instrument s.pool.Begin) into a separate histogram (e.g.,
db_pool_acquire_seconds) and increment a counter (e.g.,
db_pool_acquire_failures_total) on Begin errors; additionally record a
histogram/summary for CreateFromReader duration if file upload is significant.
Use the existing tracer/error handling flows (s.pool.Begin,
s.rvtoolsFiles.CreateFromReader, s.riverClient.InsertTx, tx.Commit) to observe
and label metrics (status=success/failure, maybe tenant/job type) and ensure
metrics are recorded in defer/err branches so failures and commits are both
captured.

Comment thread internal/store/rvtools_file_bench_test.go Outdated
Comment thread internal/store/rvtools_file_bench_test.go Outdated
Comment thread internal/store/rvtools_file_bench_test.go Outdated
Comment thread internal/store/rvtools_file_bench_test.go Outdated
Comment thread internal/store/rvtools_file_bench_test.go Outdated
Comment thread internal/store/rvtools_file.go Outdated
Comment thread pkg/migrations/sql/20260520120000_rvtools_files_to_large_objects.sql Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@pkg/migrations/sql/20260520120000_rvtools_files_to_large_objects.sql`:
- Around line 10-15: The down migration currently extracts LO data with
lo_get(file_oid) but never removes the underlying large objects, leaving
orphaned entries in pg_largeobject; modify the rollback SQL to call
lo_unlink(file_oid) for rows where file_oid IS NOT NULL (after or as part of
transferring data back) before executing ALTER TABLE rvtools_files DROP COLUMN
file_oid so that the large objects are unlinked and do not remain orphaned;
reference the rvtools_files table and the file_oid, lo_get and lo_unlink
functions when making this change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: d6bfc666-6686-416d-9123-57bdc7d37e9b

📥 Commits

Reviewing files that changed from the base of the PR and between f9dca4a and 1b3a93e.

📒 Files selected for processing (4)
  • internal/handlers/v1alpha1/job.go
  • internal/service/job.go
  • internal/store/rvtools_file.go
  • pkg/migrations/sql/20260520120000_rvtools_files_to_large_objects.sql

Comment on lines +10 to +15
ALTER TABLE rvtools_files ADD COLUMN data BYTEA;
UPDATE rvtools_files
SET data = lo_get(file_oid)
WHERE file_oid IS NOT NULL;
ALTER TABLE rvtools_files ALTER COLUMN data SET NOT NULL;
ALTER TABLE rvtools_files DROP COLUMN file_oid;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

Down migration leaves orphaned Large Objects.

When reverting, lo_get(file_oid) extracts the data but does not unlink the LO from pg_largeobject. After dropping file_oid, those LOs become orphaned.

If a clean rollback matters, add lo_unlink() before dropping the column:

Suggested fix
 -- +goose Down
 ALTER TABLE rvtools_files ADD COLUMN data BYTEA;
 UPDATE rvtools_files
 SET data = lo_get(file_oid)
 WHERE file_oid IS NOT NULL;
 ALTER TABLE rvtools_files ALTER COLUMN data SET NOT NULL;
+SELECT lo_unlink(file_oid) FROM rvtools_files WHERE file_oid IS NOT NULL;
 ALTER TABLE rvtools_files DROP COLUMN file_oid;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ALTER TABLE rvtools_files ADD COLUMN data BYTEA;
UPDATE rvtools_files
SET data = lo_get(file_oid)
WHERE file_oid IS NOT NULL;
ALTER TABLE rvtools_files ALTER COLUMN data SET NOT NULL;
ALTER TABLE rvtools_files DROP COLUMN file_oid;
ALTER TABLE rvtools_files ADD COLUMN data BYTEA;
UPDATE rvtools_files
SET data = lo_get(file_oid)
WHERE file_oid IS NOT NULL;
ALTER TABLE rvtools_files ALTER COLUMN data SET NOT NULL;
SELECT lo_unlink(file_oid) FROM rvtools_files WHERE file_oid IS NOT NULL;
ALTER TABLE rvtools_files DROP COLUMN file_oid;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/migrations/sql/20260520120000_rvtools_files_to_large_objects.sql` around
lines 10 - 15, The down migration currently extracts LO data with
lo_get(file_oid) but never removes the underlying large objects, leaving
orphaned entries in pg_largeobject; modify the rollback SQL to call
lo_unlink(file_oid) for rows where file_oid IS NOT NULL (after or as part of
transferring data back) before executing ALTER TABLE rvtools_files DROP COLUMN
file_oid so that the large objects are unlinked and do not remain orphaned;
reference the rvtools_files table and the file_oid, lo_get and lo_unlink
functions when making this change.

@Jos-Jerus Jos-Jerus force-pushed the stream-rvtools-upload branch 3 times, most recently from bfa99bf to 619c6da Compare May 26, 2026 08:36
… via Large Objects

- Replace in-memory file buffering with PostgreSQL Large Objects
- Handler streams multipart body directly to DB, no temp file or []byte
- Store interface: Create([]byte) -> CreateFromReader(tx, io.Reader)
- Service accepts io.Reader, streams to LO within atomic transaction
- Migration: rvtools_files.data BYTEA -> file_oid OID with lo_from_bytea backfill
- Break multipart loop after capturing file part to preserve stream
- OID lookups moved inside transactions to eliminate TOCTOU
- 256 KB LO buffer to reduce protocol round-trips
- Read path memory for 50 MB file: 100 MB -> 2.7 MB (97% reduction)

Signed-off-by: yjacobi <yjacobi@redhat.com>
@Jos-Jerus Jos-Jerus force-pushed the stream-rvtools-upload branch from 619c6da to 80c6095 Compare May 28, 2026 08:59
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/handlers/v1alpha1/job_test.go (1)

93-110: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add a regression case for multipart field order.

All current helpers/tests send name before file, so this suite never exercises the new streaming path when the file arrives first. Please add at least one case with file before name—ideally on a 202 success path as well—so this contract is covered.

As per coding guidelines, “Coverage: Strive for high test coverage on critical logic paths.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/handlers/v1alpha1/job_test.go` around lines 93 - 110, The tests
never exercise the streaming path where the file field precedes the name field;
update tests to add a regression case that builds a multipart body with the file
part written before the name part (e.g., add a new helper or overload
createMultipartReader to accept an order flag, or add
createMultipartReaderFileFirst) and add a 202 success test that uses this
file-first multipart body so the handler's streaming branch is executed and
covered; ensure you still prefix the file part with XLSX/ZIP magic bytes (the
existing filePart write logic) and reuse multipart.NewWriter/W.Boundary()
semantics so the new case mirrors existing validation logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/handlers/v1alpha1/job.go`:
- Around line 38-76: The multipart loop (partsLoop) currently breaks as soon as
it finds the "file" part, which preserves the file stream but ignores any
subsequent form fields (e.g., "name"), making the handler depend on form-order;
update the logic in the handler inside partsLoop so that you either (A) fully
parse and collect all non-file fields (like reading and assigning "name") by
continuing to iterate over NextPart() until EOF before you start streaming the
file, then reconstruct the file reader from the validated header and remaining
stream, or (B) explicitly detect and return a clear 400 when "file" is
encountered before required fields (checking if "name" is already set) instead
of silently breaking; refer to the partsLoop, request.Body.NextPart(), the
"name" case and the "file" case / fileReader/filePart variables to locate where
to implement the chosen fix.
- Around line 50-57: Replace the unbounded io.ReadAll(part) when reading the
"name" multipart part with a size-limited read: define a constant like
maxNameSize, wrap the part with io.LimitReader(part, maxNameSize) (or
io.LimitedReader) and read from that into nameBytes; if the read hits the limit
(or an extra-byte probe returns data) return a 400 error indicating the name is
too large. Update the code paths around nameBytes, part.Close(), and the error
return so the handler (the code branch handling case "name") enforces the size
limit and fails fast on oversized input.

---

Outside diff comments:
In `@internal/handlers/v1alpha1/job_test.go`:
- Around line 93-110: The tests never exercise the streaming path where the file
field precedes the name field; update tests to add a regression case that builds
a multipart body with the file part written before the name part (e.g., add a
new helper or overload createMultipartReader to accept an order flag, or add
createMultipartReaderFileFirst) and add a 202 success test that uses this
file-first multipart body so the handler's streaming branch is executed and
covered; ensure you still prefix the file part with XLSX/ZIP magic bytes (the
existing filePart write logic) and reuse multipart.NewWriter/W.Boundary()
semantics so the new case mirrors existing validation logic.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: f6074ae1-4b12-4197-bcec-b82a762cf5ba

📥 Commits

Reviewing files that changed from the base of the PR and between 1b3a93e and 80c6095.

📒 Files selected for processing (5)
  • internal/handlers/v1alpha1/job.go
  • internal/handlers/v1alpha1/job_test.go
  • internal/service/job.go
  • internal/store/rvtools_file.go
  • pkg/migrations/sql/20260520120000_rvtools_files_to_large_objects.sql

Comment on lines +38 to +76
partsLoop:
for {
part, err := request.Body.NextPart()
if err != nil {
if err == io.EOF {
break
}
logger.Error(err).WithString("step", "parse_multipart").Log()
return server.CreateRVToolsAssessment400JSONResponse{Message: fmt.Sprintf("failed to parse form: %v", err)}, nil
}

switch part.FormName() {
case "name":
nameBytes, err := io.ReadAll(part)
_ = part.Close()
if err != nil {
return fmt.Errorf("failed to read name: %w", err)
logger.Error(err).WithString("step", "read_name").Log()
return server.CreateRVToolsAssessment400JSONResponse{Message: fmt.Sprintf("failed to read name: %v", err)}, nil
}
name = string(nameBytes)
case "file":
buff := bytes.NewBuffer([]byte{})
n, err := io.Copy(buff, part)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
// Validate XLSX magic bytes from the first 4 bytes of the stream
header := make([]byte, 4)
n, err := io.ReadFull(part, header)
if err != nil || n < 4 {
_ = part.Close()
logger.Error(fmt.Errorf("file too short or unreadable")).WithString("step", "validation").Log()
return server.CreateRVToolsAssessment400JSONResponse{Message: "file is not a valid Excel file"}, nil
}
if n == 0 {
return fmt.Errorf("rvtools file is empty")
if err := validator.ValidateXLSXMagicBytes(header); err != nil {
_ = part.Close()
logger.Error(err).WithString("step", "validation").Log()
return server.CreateRVToolsAssessment400JSONResponse{Message: err.Error()}, nil
}
fileContent = buff.Bytes()
// Reconstruct full reader: validated header + remaining stream.
// Break immediately — NextPart() would discard unread file bytes.
fileReader = io.MultiReader(bytes.NewReader(header), part)
filePart = part
break partsLoop
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Multipart parsing now depends on name arriving before file.

Breaking out as soon as the file part is found preserves the stream, but it also makes every later form field unreachable. A request that sends file before name now falls through with name == "" and gets a misleading validation error, even though multipart field order is not a safe API contract to assume. Either reject that case explicitly with a clear 400 or restructure the flow so required non-file fields are fully parsed before streaming starts.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/handlers/v1alpha1/job.go` around lines 38 - 76, The multipart loop
(partsLoop) currently breaks as soon as it finds the "file" part, which
preserves the file stream but ignores any subsequent form fields (e.g., "name"),
making the handler depend on form-order; update the logic in the handler inside
partsLoop so that you either (A) fully parse and collect all non-file fields
(like reading and assigning "name") by continuing to iterate over NextPart()
until EOF before you start streaming the file, then reconstruct the file reader
from the validated header and remaining stream, or (B) explicitly detect and
return a clear 400 when "file" is encountered before required fields (checking
if "name" is already set) instead of silently breaking; refer to the partsLoop,
request.Body.NextPart(), the "name" case and the "file" case /
fileReader/filePart variables to locate where to implement the chosen fix.

Comment on lines 50 to 57
case "name":
nameBytes, err := io.ReadAll(part)
_ = part.Close()
if err != nil {
return fmt.Errorf("failed to read name: %w", err)
logger.Error(err).WithString("step", "read_name").Log()
return server.CreateRVToolsAssessment400JSONResponse{Message: fmt.Sprintf("failed to read name: %v", err)}, nil
}
name = string(nameBytes)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Bound the name part before calling io.ReadAll.

io.ReadAll(part) reintroduces unbounded buffering on attacker-controlled input. A single oversized name field can consume arbitrary memory before any of the streaming logic helps. Read it through an io.LimitedReader and fail fast once the limit is exceeded.

Possible fix
+const maxAssessmentNameBytes = 1024
+
 		switch part.FormName() {
 		case "name":
-			nameBytes, err := io.ReadAll(part)
+			lr := &io.LimitedReader{R: part, N: maxAssessmentNameBytes + 1}
+			nameBytes, err := io.ReadAll(lr)
 			_ = part.Close()
 			if err != nil {
 				logger.Error(err).WithString("step", "read_name").Log()
 				return server.CreateRVToolsAssessment400JSONResponse{Message: fmt.Sprintf("failed to read name: %v", err)}, nil
 			}
+			if len(nameBytes) > maxAssessmentNameBytes {
+				logger.Error(fmt.Errorf("name field too large")).WithString("step", "validation").Log()
+				return server.CreateRVToolsAssessment400JSONResponse{Message: "name is too long"}, nil
+			}
 			name = string(nameBytes)

As per coding guidelines, “API Design: APIs (REST, gRPC) must be well-defined, versioned, and backward-compatible. Input validation is mandatory.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/handlers/v1alpha1/job.go` around lines 50 - 57, Replace the
unbounded io.ReadAll(part) when reading the "name" multipart part with a
size-limited read: define a constant like maxNameSize, wrap the part with
io.LimitReader(part, maxNameSize) (or io.LimitedReader) and read from that into
nameBytes; if the read hits the limit (or an extra-byte probe returns data)
return a 400 error indicating the name is too large. Update the code paths
around nameBytes, part.Close(), and the error return so the handler (the code
branch handling case "name") enforces the size limit and fails fast on oversized
input.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant