Skip to content

Conversation

@Rumpelshtinskiy
Copy link
Collaborator

@Rumpelshtinskiy Rumpelshtinskiy commented Oct 19, 2025

Closes #

Please check if the PR fulfills these requirements

  • Tests for the changes have been added (for bug fixes / features)
  • Docs have been added / updated (for bug fixes / features)
  • CHANGELOG.md has been updated (for bug fixes / features / docs)

What kind of change does this PR introduce?

feature

What was changed?

Extended CopyToBucketVisitor trait. Added batch copy method and using it.

Copy link
Member

@atimin atimin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you for the PR. I haven't tested it yet, I think we need to change the implementation first. Please, read my comments.

PS: please, install pre-commit. The project has confiuration for this, it can lint the project automatically.

.send()
.await
}
async fn visit_batch(&self, entry_name: &str, records: Vec<Record>) -> Result<BTreeMap<u64, ReductError>, ReductError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need this, change signature of the visit method:

   async fn visit(&self, entry_name: &str, records: Vec<Record>) -> Result<(), ReductError> {
       
       if records.len() == 1 {
           let record = records.pop();
           self.dst_bucket
               .write_record(entry_name)
               .timestamp_us(record.timestamp_us())
               .labels(record.labels().clone())
               .content_type(record.content_type())
               .content_length(record.content_length() as u64)
               .stream(record.stream_bytes())
               .send()
               .await
       } else {
           self.dst_bucket
               .write_record(entry_name)
               .timestamp_us(record.timestamp_us())
               .labels(record.labels().clone())
               .content_type(record.content_type())
               .content_length(record.content_length() as u64)
               .stream(record.stream_bytes())
               .send()
               .await
       }
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

src/cmd/cp.rs Outdated
.arg(
make_ext_arg()
)
.arg(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no parameter is needed, we can do batching automatically

src_bucket: Bucket,
query_params: QueryParams,
visitor: V,
dst_bucket_v: V,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the visitor name wasn't nice, but the bucket is missliding. It can be a folder on a file system. Let's call it dst or something like this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

while let Some(record) = record_stream.next().await {
if let Err(err) = record {
if let Err(e) = make_attempt!(err) {
if let Err(e) = make_attempt(&mut attempts, &mut transfer_progress, &mut params, record_count, timestamp, err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to remove the macros and change the method a lot, just add batching before the visit call, like you do in your method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave only one macros - print_error_progress. make_attempt convert to a method, because the method needed in couple other methods.

@Rumpelshtinskiy Rumpelshtinskiy force-pushed the enable_batching branch 2 times, most recently from bec8b04 to 44e99bd Compare October 29, 2025 20:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants