diff --git a/lib/req_athena.ex b/lib/req_athena.ex index 0878d6c..6f9ff9c 100644 --- a/lib/req_athena.ex +++ b/lib/req_athena.ex @@ -266,7 +266,8 @@ defmodule ReqAthena do body = Map.merge(output_config, %{ QueryExecutionContext: %{Database: Request.fetch_option!(request, :database)}, - QueryString: ReqAthena.Query.to_query_string(query) + QueryString: ReqAthena.Query.to_query_string(query), + ExecutionParameters: ReqAthena.Query.execution_params(query) }) client_request_token = generate_client_request_token(body, cache_query) @@ -291,19 +292,15 @@ defmodule ReqAthena do defp handle_athena_result({request, %{status: 200} = response}) do action = Request.get_private(request, :athena_action) - query = Request.get_private(request, :athena_query) - case {action, ReqAthena.Query.to_prepare?(query)} do - {"StartQueryExecution", _} -> + case action do + "StartQueryExecution" -> get_query_state(request, response) - {"GetQueryExecution", _} -> + "GetQueryExecution" -> wait_query_execution(request, response) - {"GetQueryResults", true} -> - execute_prepared_query(request) - - {"GetQueryResults", _} -> + "GetQueryResults" -> output_format = Request.get_option(request, :format, :none) case output_format do @@ -490,23 +487,6 @@ defmodule ReqAthena do end end - @athena_keys ~w(athena_action athena_query athena_wait_count)a - - defp execute_prepared_query(request) do - {ours_private, theirs_private} = Map.split(request.private, @athena_keys) - - %ReqAthena.Query{prepared: false} = query = ours_private.athena_query - prepared_query = %ReqAthena.Query{query | prepared: true} - - request = %{ - request - | private: theirs_private, - current_request_steps: Keyword.keys(request.request_steps) - } - - Request.halt(request, Req.post!(put_request_body(request, prepared_query))) - end - defp prepare_action(request, action) when is_binary(action) do request = Request.put_private(request, :athena_action, action) diff --git a/lib/req_athena/query.ex b/lib/req_athena/query.ex index 3ae8b0a..d8c6efc 100644 --- a/lib/req_athena/query.ex +++ b/lib/req_athena/query.ex @@ -9,61 +9,11 @@ defmodule ReqAthena.Query do """ def parameterized?(%__MODULE__{} = query), do: List.wrap(query.params) != [] - @doc """ - Returns if this query is using params and if it was not prepared. - - This is useful to determine if the query is going to perform an "EXECUTE" or - a "PREPARE" command. - """ - def to_prepare?(%__MODULE__{} = query), do: parameterized?(query) and query.prepared == false - @doc """ Builds the final query to send to the Athena service. """ - def to_query_string(%__MODULE__{} = query) do - cond do - query.prepared -> - "EXECUTE #{query.statement_name} USING " <> - Enum.map_join(query.params, ", ", &encode_value/1) - - parameterized?(query) -> - if is_nil(query.statement_name), - do: raise(":statement_name is required for a parameterized query") - - "PREPARE #{query.statement_name} FROM #{maybe_around_unload(query)}" - - true -> - maybe_around_unload(query) - end - end - - defp encode_value(value) when is_binary(value), do: "'#{value}'" - defp encode_value(%Date{} = value), do: to_string(value) |> encode_value() - - defp encode_value(%DateTime{} = value) do - value - |> DateTime.to_naive() - |> encode_value() - end - - defp encode_value(%NaiveDateTime{} = value) do - value - |> NaiveDateTime.truncate(:millisecond) - |> to_string() - |> encode_value() - end - - defp encode_value(value), do: value - - def is_select(%{query: query_string}) + def to_query_string(%__MODULE__{query: query_string, unload: [_ | _] = opts} = query) when is_binary(query_string) do - query_string =~ ~r/^[\s]*select/i - end - - def can_use_unload?(_), do: false - - defp maybe_around_unload(%{query: query_string, unload: [_ | _] = opts} = query) - when is_binary(query_string) do # UNLOAD works only with SELECT if is_select(query) do {to, props} = Keyword.pop!(opts, :to) @@ -95,7 +45,40 @@ defmodule ReqAthena.Query do end end - defp maybe_around_unload(%{query: query_string}), do: query_string + def to_query_string(%__MODULE__{query: query_string}), do: query_string + + defp encode_value(value) when is_binary(value), do: "'#{value}'" + defp encode_value(%Date{} = value), do: to_string(value) |> encode_value() + + defp encode_value(%DateTime{} = value) do + value + |> DateTime.to_naive() + |> encode_value() + end + + defp encode_value(%NaiveDateTime{} = value) do + value + |> NaiveDateTime.truncate(:millisecond) + |> to_string() + |> encode_value() + end + + defp encode_value(value), do: value + + def execution_params(%__MODULE__{params: params} = query) do + if parameterized?(query) do + Enum.map(params, &encode_value/1) + else + nil + end + end + + def is_select(%{query: query_string}) + when is_binary(query_string) do + query_string =~ ~r/^[\s]*select/i + end + + def can_use_unload?(_), do: false @doc """ Add attributes required by the "UNLOAD" command.