Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/aws/http/private/h1_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ struct aws_h1_connection {
bool is_outgoing_stream_task_active : 1;

bool is_processing_read_messages : 1;

struct aws_io_message *pending_async_message;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on what does this do?

} thread_data;

/* Any thread may touch this data, but the lock must be held */
Expand Down
9 changes: 9 additions & 0 deletions include/aws/http/private/h1_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct aws_h1_encoder_message {
struct aws_byte_buf outgoing_head_buf;
/* Single stream used for unchunked body */
struct aws_input_stream *body;
/* Single async stream used for unchunked body */
struct aws_async_input_stream *async_body;

/* Pointer to list of `struct aws_h1_chunk`, used for chunked encoding.
* List is owned by aws_h1_stream.
Expand All @@ -55,6 +57,7 @@ enum aws_h1_encoder_state {
AWS_H1_ENCODER_STATE_HEAD,
/* Write streaming body, without chunked encoding, because Content-Length is known */
AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM,
AWS_H1_ENCODER_STATE_ASYNC_WAITING,
/* Write streaming body, with chunked encoding, because Content-Length is unknown */
AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM,
AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK,
Expand All @@ -81,6 +84,12 @@ struct aws_h1_encoder {
uint64_t chunk_count;
/* Encoder logs with this stream ptr as the ID, and passes this ptr to the chunk_complete callback */
struct aws_http_stream *current_stream;
/* Future to record pending future of async body read */
struct aws_future_bool *pending_async_future;
/* Connection to schedule again while async reading. */
struct aws_h1_connection *connection;
/* async error recorded */
int async_error;
};

struct aws_h1_chunk *aws_h1_chunk_new(struct aws_allocator *allocator, const struct aws_http1_chunk_options *options);
Expand Down
18 changes: 18 additions & 0 deletions include/aws/http/request_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,13 @@ int aws_http_message_set_response_status(struct aws_http_message *response_messa
AWS_HTTP_API
struct aws_input_stream *aws_http_message_get_body_stream(const struct aws_http_message *message);

/**
* Get the async body stream.
* Returns NULL if no async body stream is set.
*/
AWS_HTTP_API
struct aws_async_input_stream *aws_http_message_get_async_body_stream(const struct aws_http_message *message);

/**
* Set the body stream.
* NULL is an acceptable value for messages with no body.
Expand All @@ -874,6 +881,17 @@ struct aws_input_stream *aws_http_message_get_body_stream(const struct aws_http_
AWS_HTTP_API
void aws_http_message_set_body_stream(struct aws_http_message *message, struct aws_input_stream *body_stream);

/**
* Set the async body stream.
* NULL is an acceptable value for messages with no body.
* Note: The message does NOT take ownership of the body stream.
* The stream must not be destroyed until the message is complete.
Comment on lines +887 to +888
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should, and the same comments for aws_http_message_set_body_stream is actually out dated, that I forgot to remove.

So, the input streams are refcounted, so that we should keep a refcount on it with the http message.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make the design easier to use, let's say it's an error to set both. you can only set either one of the regular body stream or the async body stream

*/
AWS_HTTP_API
void aws_http_message_set_async_body_stream(
struct aws_http_message *message,
struct aws_async_input_stream *async_body_stream);

/**
* aws_future<aws_http_message*>
*/
Expand Down
64 changes: 54 additions & 10 deletions source/h1_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,17 @@ static void s_on_channel_write_complete(
* to run again instead of calling the function directly.
* This way, if the message completes synchronously,
* we're not hogging the network by writing message after message in a tight loop */

/* If encoder is waiting for async body read, don't reschedule - the async callback will do it */
if (connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_ASYNC_WAITING) {
Comment on lines +1034 to +1035
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the connection is a user of the encoder, so it is better to not know the implementation detail of the encoder.

It's better to have a helper function to check if encoder is waiting for data async, instead of just querying the state of the encoder.

AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Encoder waiting for async body, not rescheduling task.",
(void *)&connection->base);
connection->thread_data.is_outgoing_stream_task_active = false;
return;
}

aws_channel_schedule_task_now(channel, &connection->outgoing_stream_task);
}

Expand Down Expand Up @@ -1089,15 +1100,21 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f
AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "id=%p: Outgoing stream task has begun.", (void *)&connection->base);
}

struct aws_io_message *msg = aws_channel_slot_acquire_max_message_for_write(connection->base.channel_slot);
if (!msg) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_CONNECTION,
"id=%p: Failed to acquire message from pool, error %d (%s). Closing connection.",
(void *)&connection->base,
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
struct aws_io_message *msg = NULL;
if (connection->thread_data.pending_async_message) {
msg = connection->thread_data.pending_async_message;
connection->thread_data.pending_async_message = NULL;
} else {
msg = aws_channel_slot_acquire_max_message_for_write(connection->base.channel_slot);
if (!msg) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_CONNECTION,
"id=%p: Failed to acquire message from pool, error %d (%s). Closing connection.",
(void *)&connection->base,
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
}
}

/* Set up callback so we can send another message when this one completes */
Expand All @@ -1113,7 +1130,16 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f
goto error;
}

if (msg->message_data.len > 0) {
if (connection->thread_data.encoder.async_error) {
/* Error receiving data asynchronously. Need to note when this is happening, but for now, abandon ship */
if (msg) {
aws_mem_release(msg->allocator, msg);
}
s_shutdown_due_to_error(connection, connection->thread_data.encoder.async_error);
return;
Comment on lines +1135 to +1139
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: seems like we can just goto error, with a aws_raise_error(connection->thread_data.encoder.async_error)

}

if (msg->message_data.len > 0 && connection->thread_data.encoder.state != AWS_H1_ENCODER_STATE_ASYNC_WAITING) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Outgoing stream task is sending message of size %zu.",
Expand All @@ -1130,7 +1156,22 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f

goto error;
}
} else if (connection->thread_data.encoder.message && connection->thread_data.encoder.message->async_body) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Outgoing async stream task is either complete or waiting on future. Never reschedule task.",
(void *)&connection->base);

if (connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_ASYNC_WAITING) {
connection->thread_data.pending_async_message = msg;
} else {
if (msg->message_data.len > 0) {
aws_channel_slot_send_message(connection->base.channel_slot, msg, AWS_CHANNEL_DIR_WRITE);
} else {
aws_mem_release(msg->allocator, msg);
}
}
connection->thread_data.is_outgoing_stream_task_active = false;
} else {
/* If message is empty, warn that no work is being done
* and reschedule the task to try again next tick.
Expand Down Expand Up @@ -1552,6 +1593,9 @@ static struct aws_h1_connection *s_connection_new(

aws_h1_encoder_init(&connection->thread_data.encoder, alloc);

/* hacking around with adding connection to encoder. there should be a better way to do it */
connection->thread_data.encoder.connection = connection;
Comment on lines +1596 to +1597
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the better way to decouple them is probably expose something like

aws_encoder_set_async_stream_read_done(callback, user_data);

So that we keep the logic of handling the stream read done within the connection instead of the encoder


aws_channel_task_init(
&connection->outgoing_stream_task, s_outgoing_stream_task, connection, "http1_connection_outgoing_stream");
aws_channel_task_init(
Expand Down
95 changes: 94 additions & 1 deletion source/h1_encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/common/error.h>
#include <aws/http/private/h1_connection.h>
#include <aws/http/private/h1_encoder.h>
#include <aws/http/private/strutil.h>
#include <aws/http/status_code.h>
#include <aws/io/async_stream.h>
#include <aws/io/future.h>
#include <aws/io/logging.h>
#include <aws/io/stream.h>

Expand All @@ -29,6 +33,9 @@ static int s_scan_outgoing_headers(

size_t total = 0;
bool has_body_stream = aws_http_message_get_body_stream(message);
if (!has_body_stream) {
has_body_stream = aws_http_message_get_async_body_stream(message);
}
bool has_content_length_header = false;
bool has_transfer_encoding_header = false;

Expand Down Expand Up @@ -249,6 +256,7 @@ int aws_h1_encoder_message_init_from_request(
AWS_ZERO_STRUCT(*message);

message->body = aws_input_stream_acquire(aws_http_message_get_body_stream(request));
message->async_body = aws_async_input_stream_acquire(aws_http_message_get_async_body_stream(request));
message->pending_chunk_list = pending_chunk_list;

struct aws_byte_cursor method;
Expand Down Expand Up @@ -422,6 +430,7 @@ int aws_h1_encoder_message_init_from_response(

void aws_h1_encoder_message_clean_up(struct aws_h1_encoder_message *message) {
aws_input_stream_release(message->body);
aws_async_input_stream_release(message->async_body);
aws_byte_buf_clean_up(&message->outgoing_head_buf);
aws_h1_trailer_destroy(message->trailer);
AWS_ZERO_STRUCT(*message);
Expand Down Expand Up @@ -693,6 +702,68 @@ static int s_switch_state(struct aws_h1_encoder *encoder, enum aws_h1_encoder_st
return AWS_OP_SUCCESS;
}

static void s_on_async_body_read_complete(void *user_data) {
struct aws_h1_encoder *encoder = user_data;

struct aws_h1_connection *connection = encoder->connection;

int error = aws_future_bool_get_error(encoder->pending_async_future);
if (error) {
ENCODER_LOG(
ERROR,
encoder,
"Encountered error after future was complete. Setting async_error, should be caught in the connection "
"event loop.");
encoder->async_error = error;
}

bool eof = !error && aws_future_bool_get_result(encoder->pending_async_future);

ENCODER_LOGF(DEBUG, encoder, "Async body read complete. error=%d eof=%d", error, eof);

aws_future_bool_release(encoder->pending_async_future);
encoder->pending_async_future = NULL;

if (eof) {
ENCODER_LOG(DEBUG, encoder, "EOF reached, switching to DONE state");
s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE);
} else {
ENCODER_LOG(
DEBUG,
encoder,
"Error occurred or buffer was full but eof not reached. We have to initiate a new encode request with a "
"new buffer.");
s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM);
}

aws_h1_connection_try_write_outgoing_stream(connection);
}

static int s_encode_stream_async(
struct aws_h1_encoder *encoder,
struct aws_byte_buf *dst,
struct aws_async_input_stream *stream) {

if (dst->capacity == dst->len) {
return AWS_OP_ERR;
}

ENCODER_LOG(TRACE, encoder, "Reading from async body stream.");

encoder->pending_async_future = aws_async_input_stream_read_to_fill(stream, dst);

if (aws_future_bool_is_done(encoder->pending_async_future)) {
s_on_async_body_read_complete(encoder);
return encoder->async_error;
}

aws_future_bool_register_callback(encoder->pending_async_future, s_on_async_body_read_complete, encoder);

s_switch_state(encoder, AWS_H1_ENCODER_STATE_ASYNC_WAITING);

return AWS_OP_SUCCESS;
}

/* Initial state. Waits until a new message is set */
static int s_state_fn_init(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
(void)dst;
Expand Down Expand Up @@ -720,7 +791,8 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf *
aws_byte_buf_clean_up(&encoder->message->outgoing_head_buf);

/* Pick next state */
if (encoder->message->body && encoder->message->content_length) {
/* Experimentally supporting async streams for unchunked requests.*/
if ((encoder->message->body || encoder->message->async_body) && encoder->message->content_length) {
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM);

} else if (encoder->message->body && encoder->message->has_chunked_encoding_header) {
Expand All @@ -736,6 +808,9 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf *

/* Write out body with known Content-Length (not using chunked encoding). */
static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
if (encoder->message->async_body) {
return s_encode_stream_async(encoder, dst, encoder->message->async_body);
}
bool done;
if (s_encode_stream(encoder, dst, encoder->message->body, encoder->message->content_length, &done)) {
return AWS_OP_ERR;
Expand All @@ -750,6 +825,17 @@ static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, stru
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE);
}

static int s_state_fn_async_waiting(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
(void)dst;
ENCODER_LOG(
ERROR,
encoder,
"This point should never be reached. We should come back to the encoder only after the state has changed from "
"ASYNC WAITING");

return AWS_OP_ERR;
}

/* Write out body (of unknown Content-Length) using chunked encoding.
* Each pass through this state writes out 1 chunk of body data (or nothing at all). */
static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
Expand Down Expand Up @@ -994,6 +1080,7 @@ static struct encoder_state_def s_encoder_states[] = {
[AWS_H1_ENCODER_STATE_INIT] = {.fn = s_state_fn_init, .name = "INIT"},
[AWS_H1_ENCODER_STATE_HEAD] = {.fn = s_state_fn_head, .name = "HEAD"},
[AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM] = {.fn = s_state_fn_unchunked_body_stream, .name = "BODY"},
[AWS_H1_ENCODER_STATE_ASYNC_WAITING] = {.fn = s_state_fn_async_waiting, .name = "WAITING"},
[AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM] = {.fn = s_state_fn_chunked_body_stream, .name = "CHUNKED_BODY_STREAM"},
[AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK] =
{.fn = s_state_fn_chunked_body_stream_last_chunk, .name = "LAST_CHUNK"},
Expand All @@ -1019,6 +1106,12 @@ int aws_h1_encoder_process(struct aws_h1_encoder *encoder, struct aws_byte_buf *
enum aws_h1_encoder_state prev_state;
do {
prev_state = encoder->state;

/* Exit if waiting for async read - callback will resume encoding */
if (encoder->state == AWS_H1_ENCODER_STATE_ASYNC_WAITING) {
return AWS_OP_SUCCESS;
}

if (s_encoder_states[encoder->state].fn(encoder, out_buf)) {
return AWS_OP_ERR;
}
Expand Down
1 change: 0 additions & 1 deletion source/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,6 @@ void aws_http_library_clean_up(void) {
return;
}
s_library_initialized = false;

aws_thread_join_all_managed();
aws_unregister_error_info(&s_error_list);
aws_unregister_log_subject_info_list(&s_log_subject_list);
Expand Down
Loading