File size: 2,816 Bytes
2e92879
 
 
 
c2de526
2e92879
 
 
 
 
b2ff4b5
2e92879
 
 
 
 
0b53aa4
 
 
 
b2ff4b5
0b53aa4
 
 
 
 
2e92879
 
 
 
b2ff4b5
 
 
2e92879
 
 
0b53aa4
 
 
 
b2ff4b5
 
 
0b53aa4
 
 
76486d3
b2ff4b5
76486d3
b2ff4b5
 
 
 
 
 
 
 
 
0b53aa4
 
 
 
76486d3
0b53aa4
 
 
 
b2ff4b5
 
 
 
2e92879
 
 
 
c2de526
 
db4111e
 
b2ff4b5
db4111e
 
 
b2ff4b5
 
 
db4111e
 
b2ff4b5
2e92879
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
defmodule Srh.Http.CommandHandler do
  alias Srh.Http.RequestValidator
  alias Srh.Auth.TokenResolver
  alias Srh.Redis.Client
  alias Srh.Redis.ClientWorker

  def handle_command(conn, token) do
    case RequestValidator.validate_redis_body(conn.body_params) do
      {:ok, command_array} ->
        do_handle_command(command_array, token)

      {:error, error_message} ->
        {:malformed_data, error_message}
    end
  end

  def handle_command_array(conn, token) do
    case RequestValidator.validate_pipeline_redis_body(conn.body_params) do
      {:ok, array_of_command_arrays} ->
        do_handle_command_array(array_of_command_arrays, token)

      {:error, error_message} ->
        {:malformed_data, error_message}
    end
  end

  defp do_handle_command(command_array, token) do
    case TokenResolver.resolve(token) do
      {:ok, connection_info} ->
        dispatch_command(command_array, connection_info)

      {:error, msg} ->
        {:not_authorized, msg}
    end
  end

  defp do_handle_command_array(array_of_command_arrays, token) do
    case TokenResolver.resolve(token) do
      {:ok, connection_info} ->
        dispatch_command_array(array_of_command_arrays, connection_info)

      {:error, msg} ->
        {:not_authorized, msg}
    end
  end

  defp dispatch_command_array(_arr, _connection_info, responses \\ [])

  defp dispatch_command_array([current | rest], connection_info, responses) do
    updated_responses =
      case dispatch_command(current, connection_info) do
        {:ok, result_map} ->
          [result_map | responses]

        {:malformed_data, result_json} ->
          # TODO: change up the chain to json this at the last moment, so this isn't here
          [Jason.decode!(result_json) | responses]
      end

    dispatch_command_array(rest, connection_info, updated_responses)
  end

  defp dispatch_command_array([], _connection_info, responses) do
    # The responses will be in reverse order, as we're adding them to the list with the faster method of putting them at head.
    {:ok, Enum.reverse(responses)}
  end

  defp dispatch_command(
         command_array,
         %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info
       )
       when is_number(max_connections) do
    case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do
      {:ok, pid} ->
        # Run the command
        case Client.find_worker(pid)
             |> ClientWorker.redis_command(command_array) do
          {:ok, res} ->
            {:ok, %{result: res}}

          {:error, error} ->
            {
              :malformed_data,
              Jason.encode!(%{
                error: error.message
              })
            }
        end

      {:error, msg} ->
        {:server_error, msg}
    end
  end
end