Skip to content

TOOLS-4148 Add context to StreamDocument and replace channelQuorumError with errgroups#982

Merged
autarch merged 1 commit intomasterfrom
04-03-tools-4148_add_context_to_streamdocument_replace_channelquorumerror_with_errgroup
Apr 8, 2026
Merged

TOOLS-4148 Add context to StreamDocument and replace channelQuorumError with errgroups#982
autarch merged 1 commit intomasterfrom
04-03-tools-4148_add_context_to_streamdocument_replace_channelquorumerror_with_errgroup

Conversation

@autarch
Copy link
Copy Markdown
Collaborator

@autarch autarch commented Apr 3, 2026

Each StreamDocument implementation (JSON, CSV, TSV) now accepts a context.Context. These implementations now use errgroups internally for their two goroutines and propagate cancellation via ctx.Done(). This allows us to eliminate the use of things like csvErrChan for managing errors. Instead, the goroutines are func() error and return errors to the error group.

The MongoImport.importDocuments func also uses an error group to manage the inputReader.StreamDocument and mongoImport.ingestDocuments goroutines. Again, this allows us to eliminate a channel for passing errors. It also eliminates the use of channelQuorumError, which was a sort of "error group via an error channel" thing in this code base. This func and its tests have been removed as part of this PR.

There will be follow-up PRs to make proper use of the error group context when reading from channels. This is needed to ensure that we always abort early if a goroutine exits with an error. Right now, the streamDocuments func doesn't accept a context, so it will hang if the readDocs channel is full because ingestDocuments stops reading from it.

Because of this, we temporarily skip the TestImportMIOSOE test, which hangs in this PR because of this.

Copy link
Copy Markdown
Collaborator Author

autarch commented Apr 6, 2026

This PR isn't really correct as-is. It needs to be combined with the next PR in the stack, #983, in order to work fully. I split them up because it seemed like this was already quite large. But if you prefer, I can combine them.

Copy link
Copy Markdown
Contributor

@mmcclimon mmcclimon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % some small things; this code was so weird before!

Comment thread mongoimport/common.go Outdated
Comment on lines +457 to +445
readDocs chan Converter,
outputChan chan bson.D,
recordChan chan Converter,
readDocs chan bson.D,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This renaming is confusing to me (and recordChan is sort of a terrible name, because "record" can be either a noun or a verb).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through all the code that deals with these channels and renamed them everywhere to docsInChan and streamOutChan.

Comment thread mongoimport/csv_test.go Outdated
)
docChan := make(chan bson.D, 1)
So(r.StreamDocument(true, docChan), ShouldNotBeNil)
So(r.StreamDocument(context.Background(), true, docChan), ShouldNotBeNil)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might as well use t.Context() in all these tests.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@autarch autarch force-pushed the 04-03-tools-4148_add_context_to_streamdocument_replace_channelquorumerror_with_errgroup branch 5 times, most recently from 1ee1492 to 0e31559 Compare April 7, 2026 22:08
Copy link
Copy Markdown
Collaborator Author

autarch commented Apr 8, 2026

Merge activity

  • Apr 8, 3:28 PM UTC: A user started a stack merge that includes this pull request via Graphite.
  • Apr 8, 3:28 PM UTC: Graphite rebased this pull request as part of a merge.
  • Apr 8, 3:29 PM UTC: @autarch merged this pull request with Graphite.

…Error` with errgroups

Each `StreamDocument` implementation (JSON, CSV, TSV) now accepts a `context.Context`. These implementations now use errgroups internally for their two goroutines and propagate cancellation via ctx.Done(). This allows us to eliminate the use of things like `csvErrChan` for managing errors. Instead, the goroutines are `func() error` and return errors to the error group.

The `MongoImport.importDocuments` func also uses an error group to manage the `inputReader.StreamDocument` and  `mongoImport.ingestDocuments` goroutines. Again, this allows us to eliminate a channel for passing errors. It also eliminates the use of `channelQuorumError`, which was a sort of "error group via an error channel" thing in this code base. This func and its tests have been removed as part of this PR.

This also renames some variables in the `streamDocuments` func. Previously, the two channels it was passed were named `readDocs` and `outputChan`. But this was _very_ confusing, because the callers were passing a channel named `readDocs` as the _latter_ of those two. In other words, `streamDocuments` renamed the `readDocs` channel to `outputChan` but then used `readDocs` for a different channel.

There will be follow-up PRs to make proper use of the error group context when reading from channels. This is needed to ensure that we always abort early if a goroutine exits with an error.
@autarch autarch force-pushed the 04-03-tools-4148_add_context_to_streamdocument_replace_channelquorumerror_with_errgroup branch from 0e31559 to 8d84af8 Compare April 8, 2026 15:28
@autarch autarch merged commit 91d5e3e into master Apr 8, 2026
2 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants