Conversation
Collaborator
Author
This was referenced Apr 3, 2026
536bf7d to
10981d5
Compare
Collaborator
Author
|
This PR isn't really correct as-is. It needs to be combined with the next PR in the stack, #983, in order to work fully. I split them up because it seemed like this was already quite large. But if you prefer, I can combine them. |
mmcclimon
approved these changes
Apr 7, 2026
Contributor
mmcclimon
left a comment
There was a problem hiding this comment.
LGTM % some small things; this code was so weird before!
Comment on lines
+457
to
+445
| readDocs chan Converter, | ||
| outputChan chan bson.D, | ||
| recordChan chan Converter, | ||
| readDocs chan bson.D, |
Contributor
There was a problem hiding this comment.
This renaming is confusing to me (and recordChan is sort of a terrible name, because "record" can be either a noun or a verb).
Collaborator
Author
There was a problem hiding this comment.
I went through all the code that deals with these channels and renamed them everywhere to docsInChan and streamOutChan.
| ) | ||
| docChan := make(chan bson.D, 1) | ||
| So(r.StreamDocument(true, docChan), ShouldNotBeNil) | ||
| So(r.StreamDocument(context.Background(), true, docChan), ShouldNotBeNil) |
Contributor
There was a problem hiding this comment.
We might as well use t.Context() in all these tests.
1ee1492 to
0e31559
Compare
Collaborator
Author
…Error` with errgroups Each `StreamDocument` implementation (JSON, CSV, TSV) now accepts a `context.Context`. These implementations now use errgroups internally for their two goroutines and propagate cancellation via ctx.Done(). This allows us to eliminate the use of things like `csvErrChan` for managing errors. Instead, the goroutines are `func() error` and return errors to the error group. The `MongoImport.importDocuments` func also uses an error group to manage the `inputReader.StreamDocument` and `mongoImport.ingestDocuments` goroutines. Again, this allows us to eliminate a channel for passing errors. It also eliminates the use of `channelQuorumError`, which was a sort of "error group via an error channel" thing in this code base. This func and its tests have been removed as part of this PR. This also renames some variables in the `streamDocuments` func. Previously, the two channels it was passed were named `readDocs` and `outputChan`. But this was _very_ confusing, because the callers were passing a channel named `readDocs` as the _latter_ of those two. In other words, `streamDocuments` renamed the `readDocs` channel to `outputChan` but then used `readDocs` for a different channel. There will be follow-up PRs to make proper use of the error group context when reading from channels. This is needed to ensure that we always abort early if a goroutine exits with an error.
0e31559 to
8d84af8
Compare
This was referenced Apr 8, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Each
StreamDocumentimplementation (JSON, CSV, TSV) now accepts acontext.Context. These implementations now use errgroups internally for their two goroutines and propagate cancellation via ctx.Done(). This allows us to eliminate the use of things likecsvErrChanfor managing errors. Instead, the goroutines arefunc() errorand return errors to the error group.The
MongoImport.importDocumentsfunc also uses an error group to manage theinputReader.StreamDocumentandmongoImport.ingestDocumentsgoroutines. Again, this allows us to eliminate a channel for passing errors. It also eliminates the use ofchannelQuorumError, which was a sort of "error group via an error channel" thing in this code base. This func and its tests have been removed as part of this PR.There will be follow-up PRs to make proper use of the error group context when reading from channels. This is needed to ensure that we always abort early if a goroutine exits with an error. Right now, the
streamDocumentsfunc doesn't accept a context, so it will hang if thereadDocschannel is full becauseingestDocumentsstops reading from it.Because of this, we temporarily skip the
TestImportMIOSOEtest, which hangs in this PR because of this.