Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 6 additions & 26 deletions lib/req_athena.ex
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ defmodule ReqAthena do
body =
Map.merge(output_config, %{
QueryExecutionContext: %{Database: Request.fetch_option!(request, :database)},
QueryString: ReqAthena.Query.to_query_string(query)
QueryString: ReqAthena.Query.to_query_string(query),
ExecutionParameters: ReqAthena.Query.execution_params(query)
})

client_request_token = generate_client_request_token(body, cache_query)
Expand All @@ -291,19 +292,15 @@ defmodule ReqAthena do

defp handle_athena_result({request, %{status: 200} = response}) do
action = Request.get_private(request, :athena_action)
query = Request.get_private(request, :athena_query)

case {action, ReqAthena.Query.to_prepare?(query)} do
{"StartQueryExecution", _} ->
case action do
"StartQueryExecution" ->
get_query_state(request, response)

{"GetQueryExecution", _} ->
"GetQueryExecution" ->
wait_query_execution(request, response)

{"GetQueryResults", true} ->
execute_prepared_query(request)

{"GetQueryResults", _} ->
"GetQueryResults" ->
output_format = Request.get_option(request, :format, :none)

case output_format do
Expand Down Expand Up @@ -490,23 +487,6 @@ defmodule ReqAthena do
end
end

@athena_keys ~w(athena_action athena_query athena_wait_count)a

defp execute_prepared_query(request) do
{ours_private, theirs_private} = Map.split(request.private, @athena_keys)

%ReqAthena.Query{prepared: false} = query = ours_private.athena_query
prepared_query = %ReqAthena.Query{query | prepared: true}

request = %{
request
| private: theirs_private,
current_request_steps: Keyword.keys(request.request_steps)
}

Request.halt(request, Req.post!(put_request_body(request, prepared_query)))
end

defp prepare_action(request, action) when is_binary(action) do
request = Request.put_private(request, :athena_action, action)

Expand Down
87 changes: 35 additions & 52 deletions lib/req_athena/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,11 @@ defmodule ReqAthena.Query do
"""
def parameterized?(%__MODULE__{} = query), do: List.wrap(query.params) != []

@doc """
Returns if this query is using params and if it was not prepared.

This is useful to determine if the query is going to perform an "EXECUTE" or
a "PREPARE" command.
"""
def to_prepare?(%__MODULE__{} = query), do: parameterized?(query) and query.prepared == false

@doc """
Builds the final query to send to the Athena service.
"""
def to_query_string(%__MODULE__{} = query) do
cond do
query.prepared ->
"EXECUTE #{query.statement_name} USING " <>
Enum.map_join(query.params, ", ", &encode_value/1)

parameterized?(query) ->
if is_nil(query.statement_name),
do: raise(":statement_name is required for a parameterized query")

"PREPARE #{query.statement_name} FROM #{maybe_around_unload(query)}"

true ->
maybe_around_unload(query)
end
end

defp encode_value(value) when is_binary(value), do: "'#{value}'"
defp encode_value(%Date{} = value), do: to_string(value) |> encode_value()

defp encode_value(%DateTime{} = value) do
value
|> DateTime.to_naive()
|> encode_value()
end

defp encode_value(%NaiveDateTime{} = value) do
value
|> NaiveDateTime.truncate(:millisecond)
|> to_string()
|> encode_value()
end

defp encode_value(value), do: value

def is_select(%{query: query_string})
def to_query_string(%__MODULE__{query: query_string, unload: [_ | _] = opts} = query)
when is_binary(query_string) do
query_string =~ ~r/^[\s]*select/i
end

def can_use_unload?(_), do: false

defp maybe_around_unload(%{query: query_string, unload: [_ | _] = opts} = query)
when is_binary(query_string) do
# UNLOAD works only with SELECT
if is_select(query) do
{to, props} = Keyword.pop!(opts, :to)
Expand Down Expand Up @@ -95,7 +45,40 @@ defmodule ReqAthena.Query do
end
end

defp maybe_around_unload(%{query: query_string}), do: query_string
def to_query_string(%__MODULE__{query: query_string}), do: query_string

defp encode_value(value) when is_binary(value), do: "'#{value}'"
defp encode_value(%Date{} = value), do: to_string(value) |> encode_value()

defp encode_value(%DateTime{} = value) do
value
|> DateTime.to_naive()
|> encode_value()
end

defp encode_value(%NaiveDateTime{} = value) do
value
|> NaiveDateTime.truncate(:millisecond)
|> to_string()
|> encode_value()
end

defp encode_value(value), do: value

def execution_params(%__MODULE__{params: params} = query) do
if parameterized?(query) do
Enum.map(params, &encode_value/1)
else
nil
end
end

def is_select(%{query: query_string})
when is_binary(query_string) do
query_string =~ ~r/^[\s]*select/i
end

def can_use_unload?(_), do: false

@doc """
Add attributes required by the "UNLOAD" command.
Expand Down