File size: 2,717 Bytes
2e92879
 
 
 
c2de526
2e92879
 
 
 
 
 
 
 
 
 
0b53aa4
 
 
 
 
 
 
 
 
2e92879
 
 
 
c0e2bf5
2e92879
 
 
0b53aa4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2e92879
 
 
 
 
c2de526
 
db4111e
 
 
 
 
 
 
 
 
 
 
 
2e92879
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
defmodule Srh.Http.CommandHandler do
  alias Srh.Http.RequestValidator
  alias Srh.Auth.TokenResolver
  alias Srh.Redis.Client
  alias Srh.Redis.ClientWorker

  def handle_command(conn, token) do
    case RequestValidator.validate_redis_body(conn.body_params) do
      {:ok, command_array} ->
        do_handle_command(command_array, token)
      {:error, error_message} ->
        {:malformed_data, error_message}
    end
  end

  def handle_command_array(conn, token) do
    case RequestValidator.validate_pipeline_redis_body(conn.body_params) do
      {:ok, array_of_command_arrays} ->
        do_handle_command_array(array_of_command_arrays, token)
      {:error, error_message} ->
        {:malformed_data, error_message}
    end
  end

  defp do_handle_command(command_array, token) do
    case TokenResolver.resolve(token) do
      {:ok, connection_info} ->
        dispatch_command(command_array, connection_info)
      {:error, msg} -> {:not_authorized, msg}
    end
  end

  defp do_handle_command_array(array_of_command_arrays, token) do
    case TokenResolver.resolve(token) do
      {:ok, connection_info} ->
        dispatch_command_array(array_of_command_arrays, connection_info)
      {:error, msg} -> {:not_authorized, msg}
    end
  end

  defp dispatch_command_array([current | rest], connection_info, responses \\ []) do
    updated_responses = case dispatch_command(current, connection_info) do
      {:ok, result_map} ->
        [result_map | responses]
      {:malformed_data, result_json} ->
        # TODO: change up the chain to json this at the last moment, so this isn't here
        [Jason.decode!(result_json) | responses]
    end

    dispatch_command_array(rest, connection_info, updated_responses)
  end

  defp dispatch_command_array([], connection_info, responses) do
    # The responses will be in reverse order, as we're adding them to the list with the faster method of putting them at head.
    {:ok, Enum.reverse(responses)}
  end

  defp dispatch_command(command_array, %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info)
       when is_number(max_connections) do
    case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do
      {:ok, pid} ->
        # Run the command
        case Client.find_worker(pid)
             |> ClientWorker.redis_command(command_array) do
          {:ok, res} ->
            {:ok, %{result: res}}
          {:error, error} ->
            {
              :malformed_data,
              Jason.encode!(
                %{
                  error: error.message
                }
              )
            }
        end
      {:error, msg} ->
        {:server_error, msg}
    end
  end
end