Spaces:
Sleeping
Sleeping
File size: 5,257 Bytes
2e92879 c2de526 2e92879 b2ff4b5 2e92879 0b53aa4 b2ff4b5 0b53aa4 abcde98 2e92879 b2ff4b5 2e92879 0b53aa4 b2ff4b5 0b53aa4 abcde98 76486d3 b2ff4b5 76486d3 b2ff4b5 abcde98 b2ff4b5 0b53aa4 76486d3 0b53aa4 abcde98 b2ff4b5 2e92879 c2de526 db4111e b2ff4b5 db4111e abcde98 b2ff4b5 abcde98 db4111e b2ff4b5 2e92879 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
defmodule Srh.Http.CommandHandler do
alias Srh.Http.RequestValidator
alias Srh.Auth.TokenResolver
alias Srh.Redis.Client
alias Srh.Redis.ClientWorker
def handle_command(conn, token) do
case RequestValidator.validate_redis_body(conn.body_params) do
{:ok, command_array} ->
do_handle_command(command_array, token)
{:error, error_message} ->
{:malformed_data, error_message}
end
end
def handle_command_array(conn, token) do
case RequestValidator.validate_pipeline_redis_body(conn.body_params) do
{:ok, array_of_command_arrays} ->
do_handle_command_array(array_of_command_arrays, token)
{:error, error_message} ->
{:malformed_data, error_message}
end
end
def handle_command_transaction_array(conn, token) do
# Transactions use the same body format as pipelines, so we can use the same validator
case RequestValidator.validate_pipeline_redis_body(conn.body_params) do
{:ok, array_of_command_arrays} ->
do_handle_command_transaction_array(array_of_command_arrays, token)
{:error, error_message} ->
{:malformed_data, error_message}
end
end
defp do_handle_command(command_array, token) do
case TokenResolver.resolve(token) do
{:ok, connection_info} ->
dispatch_command(command_array, connection_info)
{:error, msg} ->
{:not_authorized, msg}
end
end
defp do_handle_command_array(array_of_command_arrays, token) do
case TokenResolver.resolve(token) do
{:ok, connection_info} ->
dispatch_command_array(array_of_command_arrays, connection_info)
{:error, msg} ->
{:not_authorized, msg}
end
end
defp do_handle_command_transaction_array(array_of_command_arrays, token) do
case TokenResolver.resolve(token) do
{:ok, connection_info} ->
dispatch_command_transaction_array(array_of_command_arrays, connection_info)
{:error, msg} ->
{:not_authorized, msg}
end
end
defp dispatch_command_array(_arr, _connection_info, responses \\ [])
defp dispatch_command_array([current | rest], connection_info, responses) do
updated_responses =
case dispatch_command(current, connection_info) do
{:ok, result_map} ->
[result_map | responses]
{:redis_error, result} ->
[result | responses]
end
dispatch_command_array(rest, connection_info, updated_responses)
end
defp dispatch_command_array([], _connection_info, responses) do
# The responses will be in reverse order, as we're adding them to the list with the faster method of putting them at head.
{:ok, Enum.reverse(responses)}
end
defp dispatch_command_transaction_array(
command_array,
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info,
responses \\ []
) do
case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do
{:ok, client_pid} ->
# Borrow a client, then run all of the commands (wrapped in MULTI and EXEC)
worker_pid = Client.borrow_worker(client_pid)
wrapped_command_array = [["MULTI"] | command_array]
do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)
# Now manually run the EXEC - this is what contains the information to form the response, not the above
result = case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
{:ok, res} ->
{
:ok,
res
|> Enum.map(&(%{result: &1}))
}
# TODO: Can there be any inline errors here? Wouldn't they fail the whole tx?
{:error, error} ->
{:redis_error, %{error: error.message}}
end
Client.return_worker(client_pid, worker_pid)
result
{:error, msg} ->
{:server_error, msg}
end
end
defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) when is_pid(worker_pid) do
updated_responses = case ClientWorker.redis_command(worker_pid, current) do
{:ok, res} ->
[%{result: res} | responses]
{:error, error} ->
[
%{
error: error.message
} | responses
]
end
do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
end
defp do_dispatch_command_transaction_array([], worker_pid, responses) when is_pid(worker_pid) do
{:ok, Enum.reverse(responses)}
end
defp dispatch_command(
command_array,
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info
)
when is_number(max_connections) do
case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do
{:ok, pid} ->
# Run the command
case Client.find_worker(pid)
|> ClientWorker.redis_command(command_array) do
{:ok, res} ->
{:ok, %{result: res}}
{:error, error} ->
{
:redis_error,
%{
error: error.message
}
}
end
{:error, msg} ->
{:server_error, msg}
end
end
end
|