-
Notifications
You must be signed in to change notification settings - Fork 50
Async streams for HTTP #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -865,6 +865,13 @@ int aws_http_message_set_response_status(struct aws_http_message *response_messa | |
| AWS_HTTP_API | ||
| struct aws_input_stream *aws_http_message_get_body_stream(const struct aws_http_message *message); | ||
|
|
||
| /** | ||
| * Get the async body stream. | ||
| * Returns NULL if no async body stream is set. | ||
| */ | ||
| AWS_HTTP_API | ||
| struct aws_async_input_stream *aws_http_message_get_async_body_stream(const struct aws_http_message *message); | ||
|
|
||
| /** | ||
| * Set the body stream. | ||
| * NULL is an acceptable value for messages with no body. | ||
|
|
@@ -874,6 +881,17 @@ struct aws_input_stream *aws_http_message_get_body_stream(const struct aws_http_ | |
| AWS_HTTP_API | ||
| void aws_http_message_set_body_stream(struct aws_http_message *message, struct aws_input_stream *body_stream); | ||
|
|
||
| /** | ||
| * Set the async body stream. | ||
| * NULL is an acceptable value for messages with no body. | ||
| * Note: The message does NOT take ownership of the body stream. | ||
| * The stream must not be destroyed until the message is complete. | ||
|
Comment on lines
+887
to
+888
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should, and the same comments for So, the input streams are refcounted, so that we should keep a refcount on it with the http message.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to make the design easier to use, let's say it's an error to set both. you can only set either one of the regular body stream or the async body stream |
||
| */ | ||
| AWS_HTTP_API | ||
| void aws_http_message_set_async_body_stream( | ||
| struct aws_http_message *message, | ||
| struct aws_async_input_stream *async_body_stream); | ||
|
|
||
| /** | ||
| * aws_future<aws_http_message*> | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1030,6 +1030,17 @@ static void s_on_channel_write_complete( | |
| * to run again instead of calling the function directly. | ||
| * This way, if the message completes synchronously, | ||
| * we're not hogging the network by writing message after message in a tight loop */ | ||
|
|
||
| /* If encoder is waiting for async body read, don't reschedule - the async callback will do it */ | ||
| if (connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_ASYNC_WAITING) { | ||
|
Comment on lines
+1034
to
+1035
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the connection is a user of the encoder, so it is better to not know the implementation detail of the encoder. It's better to have a helper function to check if encoder is waiting for data async, instead of just querying the state of the encoder. |
||
| AWS_LOGF_TRACE( | ||
| AWS_LS_HTTP_CONNECTION, | ||
| "id=%p: Encoder waiting for async body, not rescheduling task.", | ||
| (void *)&connection->base); | ||
| connection->thread_data.is_outgoing_stream_task_active = false; | ||
| return; | ||
| } | ||
|
|
||
| aws_channel_schedule_task_now(channel, &connection->outgoing_stream_task); | ||
| } | ||
|
|
||
|
|
@@ -1089,15 +1100,21 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f | |
| AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "id=%p: Outgoing stream task has begun.", (void *)&connection->base); | ||
| } | ||
|
|
||
| struct aws_io_message *msg = aws_channel_slot_acquire_max_message_for_write(connection->base.channel_slot); | ||
| if (!msg) { | ||
| AWS_LOGF_ERROR( | ||
| AWS_LS_HTTP_CONNECTION, | ||
| "id=%p: Failed to acquire message from pool, error %d (%s). Closing connection.", | ||
| (void *)&connection->base, | ||
| aws_last_error(), | ||
| aws_error_name(aws_last_error())); | ||
| goto error; | ||
| struct aws_io_message *msg = NULL; | ||
| if (connection->thread_data.pending_async_message) { | ||
| msg = connection->thread_data.pending_async_message; | ||
| connection->thread_data.pending_async_message = NULL; | ||
| } else { | ||
| msg = aws_channel_slot_acquire_max_message_for_write(connection->base.channel_slot); | ||
| if (!msg) { | ||
| AWS_LOGF_ERROR( | ||
| AWS_LS_HTTP_CONNECTION, | ||
| "id=%p: Failed to acquire message from pool, error %d (%s). Closing connection.", | ||
| (void *)&connection->base, | ||
| aws_last_error(), | ||
| aws_error_name(aws_last_error())); | ||
| goto error; | ||
| } | ||
| } | ||
|
|
||
| /* Set up callback so we can send another message when this one completes */ | ||
|
|
@@ -1113,7 +1130,16 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f | |
| goto error; | ||
| } | ||
|
|
||
| if (msg->message_data.len > 0) { | ||
| if (connection->thread_data.encoder.async_error) { | ||
| /* Error receiving data asynchronously. Need to note when this is happening, but for now, abandon ship */ | ||
| if (msg) { | ||
| aws_mem_release(msg->allocator, msg); | ||
| } | ||
| s_shutdown_due_to_error(connection, connection->thread_data.encoder.async_error); | ||
| return; | ||
|
Comment on lines
+1135
to
+1139
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. trivial: seems like we can just goto error, with a |
||
| } | ||
|
|
||
| if (msg->message_data.len > 0 && connection->thread_data.encoder.state != AWS_H1_ENCODER_STATE_ASYNC_WAITING) { | ||
| AWS_LOGF_TRACE( | ||
| AWS_LS_HTTP_CONNECTION, | ||
| "id=%p: Outgoing stream task is sending message of size %zu.", | ||
|
|
@@ -1130,7 +1156,22 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f | |
|
|
||
| goto error; | ||
| } | ||
| } else if (connection->thread_data.encoder.message && connection->thread_data.encoder.message->async_body) { | ||
| AWS_LOGF_TRACE( | ||
| AWS_LS_HTTP_CONNECTION, | ||
| "id=%p: Outgoing async stream task is either complete or waiting on future. Never reschedule task.", | ||
| (void *)&connection->base); | ||
|
|
||
| if (connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_ASYNC_WAITING) { | ||
| connection->thread_data.pending_async_message = msg; | ||
| } else { | ||
| if (msg->message_data.len > 0) { | ||
| aws_channel_slot_send_message(connection->base.channel_slot, msg, AWS_CHANNEL_DIR_WRITE); | ||
| } else { | ||
| aws_mem_release(msg->allocator, msg); | ||
| } | ||
| } | ||
| connection->thread_data.is_outgoing_stream_task_active = false; | ||
| } else { | ||
| /* If message is empty, warn that no work is being done | ||
| * and reschedule the task to try again next tick. | ||
|
|
@@ -1552,6 +1593,9 @@ static struct aws_h1_connection *s_connection_new( | |
|
|
||
| aws_h1_encoder_init(&connection->thread_data.encoder, alloc); | ||
|
|
||
| /* hacking around with adding connection to encoder. there should be a better way to do it */ | ||
| connection->thread_data.encoder.connection = connection; | ||
|
Comment on lines
+1596
to
+1597
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the better way to decouple them is probably expose something like So that we keep the logic of handling the stream read done within the connection instead of the encoder |
||
|
|
||
| aws_channel_task_init( | ||
| &connection->outgoing_stream_task, s_outgoing_stream_task, connection, "http1_connection_outgoing_stream"); | ||
| aws_channel_task_init( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on what does this do?