Scott Hiett commited on
Commit
2dbe17d
·
unverified ·
2 Parent(s): 691cebf 2b07033

Merge pull request #13 from hiett/12-connection-refused-does-not-return-an-error

Browse files
.github/workflows/test.yml CHANGED
@@ -30,6 +30,14 @@ jobs:
30
  with:
31
  repository: upstash/upstash-redis
32
 
 
 
 
 
 
 
 
 
33
  - name: Run @upstash/redis Test Suite
34
  run: deno test -A ./pkg
35
  env:
 
30
  with:
31
  repository: upstash/upstash-redis
32
 
33
+ # The following tests fail because of bugs with Upstash's implementation of Redis, NOT because of our library
34
+ # So we remove them from the test suite
35
+ - name: Remove JSON tests
36
+ run: |
37
+ rm ./pkg/commands/json_get.test.ts
38
+ rm ./pkg/commands/json_mget.test.ts
39
+ rm ./pkg/commands/json_objlen.test.ts
40
+
41
  - name: Run @upstash/redis Test Suite
42
  run: deno test -A ./pkg
43
  env:
.gitignore CHANGED
@@ -29,4 +29,6 @@ srh-*.tar
29
 
30
  *.iml
31
 
32
- srh-config/
 
 
 
29
 
30
  *.iml
31
 
32
+ srh-config/
33
+
34
+ test-project/
lib/srh/auth/token_resolver.ex CHANGED
@@ -70,11 +70,13 @@ defmodule Srh.Auth.TokenResolver do
70
  {srh_max_connections, ""} = Integer.parse(System.get_env("SRH_MAX_CONNECTIONS", "3"))
71
 
72
  # Create a config-file-like structure that the ETS layout expects, with just one entry
73
- config_file_data = Map.put(%{}, srh_token, %{
74
- "srh_id" => "env_config_connection", # Jason.parse! expects these keys to be strings, not atoms, so we need to replicate that setup
75
- "connection_string" => srh_connection_string,
76
- "max_connections" => srh_max_connections
77
- })
 
 
78
 
79
  IO.puts("Loaded config from env. #{map_size(config_file_data)} entries.")
80
  # Load this into ETS
@@ -98,17 +100,17 @@ defmodule Srh.Auth.TokenResolver do
98
  # The env strategy uses the same ETS table as the file strategy, so we can fall back on that
99
  defp do_resolve("env", token), do: do_resolve("file", token)
100
 
101
- defp do_resolve("redis", _token) do
102
- {
103
- :ok,
104
- # This is done to replicate what will eventually be API endpoints, so they keys are not atoms
105
- Jason.decode!(
106
- Jason.encode!(%{
107
- srh_id: "1000",
108
- connection_string: "redis://localhost:6379",
109
- max_connections: 10
110
- })
111
- )
112
- }
113
- end
114
  end
 
70
  {srh_max_connections, ""} = Integer.parse(System.get_env("SRH_MAX_CONNECTIONS", "3"))
71
 
72
  # Create a config-file-like structure that the ETS layout expects, with just one entry
73
+ config_file_data =
74
+ Map.put(%{}, srh_token, %{
75
+ # Jason.parse! expects these keys to be strings, not atoms, so we need to replicate that setup
76
+ "srh_id" => "env_config_connection",
77
+ "connection_string" => srh_connection_string,
78
+ "max_connections" => srh_max_connections
79
+ })
80
 
81
  IO.puts("Loaded config from env. #{map_size(config_file_data)} entries.")
82
  # Load this into ETS
 
100
  # The env strategy uses the same ETS table as the file strategy, so we can fall back on that
101
  defp do_resolve("env", token), do: do_resolve("file", token)
102
 
103
+ # defp do_resolve("redis", _token) do
104
+ # {
105
+ # :ok,
106
+ # # This is done to replicate what will eventually be API endpoints, so they keys are not atoms
107
+ # Jason.decode!(
108
+ # Jason.encode!(%{
109
+ # srh_id: "1000",
110
+ # connection_string: "redis://localhost:6379",
111
+ # max_connections: 10
112
+ # })
113
+ # )
114
+ # }
115
+ # end
116
  end
lib/srh/http/base_router.ex CHANGED
@@ -54,7 +54,8 @@ defmodule Srh.Http.BaseRouter do
54
  |> get_req_header("upstash-encoding")
55
  |> RequestValidator.validate_encoding_header() do
56
  {:ok, _encoding_enabled} -> true
57
- {:error, _} -> false # it's not required to be present
 
58
  end
59
  end
60
 
@@ -63,7 +64,9 @@ defmodule Srh.Http.BaseRouter do
63
  true ->
64
  # We need to use the encoder to
65
  ResultEncoder.encode_response(response)
66
- false -> response
 
 
67
  end
68
  end
69
 
@@ -85,6 +88,9 @@ defmodule Srh.Http.BaseRouter do
85
  {:not_authorized, message} ->
86
  %{code: 401, message: message, json: false}
87
 
 
 
 
88
  {:server_error, _} ->
89
  %{code: 500, message: "An error occurred internally", json: false}
90
 
 
54
  |> get_req_header("upstash-encoding")
55
  |> RequestValidator.validate_encoding_header() do
56
  {:ok, _encoding_enabled} -> true
57
+ # it's not required to be present
58
+ {:error, _} -> false
59
  end
60
  end
61
 
 
64
  true ->
65
  # We need to use the encoder to
66
  ResultEncoder.encode_response(response)
67
+
68
+ false ->
69
+ response
70
  end
71
  end
72
 
 
88
  {:not_authorized, message} ->
89
  %{code: 401, message: message, json: false}
90
 
91
+ {:connection_error, message} ->
92
+ %{code: 500, message: Jason.encode!(%{error: message}), json: true}
93
+
94
  {:server_error, _} ->
95
  %{code: 500, message: "An error occurred internally", json: false}
96
 
lib/srh/http/command_handler.ex CHANGED
@@ -73,11 +73,20 @@ defmodule Srh.Http.CommandHandler do
73
  {:ok, result_map} ->
74
  [result_map | responses]
75
 
 
 
 
76
  {:redis_error, result} ->
77
  [result | responses]
78
  end
79
 
80
- dispatch_command_array(rest, connection_info, updated_responses)
 
 
 
 
 
 
81
  end
82
 
83
  defp dispatch_command_array([], _connection_info, responses) do
@@ -95,43 +104,56 @@ defmodule Srh.Http.CommandHandler do
95
  # Borrow a client, then run all of the commands (wrapped in MULTI and EXEC)
96
  worker_pid = Client.borrow_worker(client_pid)
97
 
98
- wrapped_command_array = [["MULTI"] | command_array]
99
- do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)
 
 
 
 
100
 
101
- # Now manually run the EXEC - this is what contains the information to form the response, not the above
102
- result = case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
103
- {:ok, res} ->
104
- {
105
- :ok,
106
- res
107
- |> Enum.map(&(%{result: &1}))
108
- }
109
- # TODO: Can there be any inline errors here? Wouldn't they fail the whole tx?
 
 
 
 
 
 
 
 
 
110
 
111
  {:error, error} ->
112
- {:redis_error, %{error: error.message}}
113
  end
114
 
115
- Client.return_worker(client_pid, worker_pid)
116
-
117
- result
118
  {:error, msg} ->
119
  {:server_error, msg}
120
  end
121
  end
122
 
123
- defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) when is_pid(worker_pid) do
124
- updated_responses = case ClientWorker.redis_command(worker_pid, current) do
125
- {:ok, res} ->
126
- [%{result: res} | responses]
127
-
128
- {:error, error} ->
129
- [
130
- %{
131
- error: error.message
132
- } | responses
133
- ]
134
- end
 
 
 
135
 
136
  do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
137
  end
@@ -154,16 +176,34 @@ defmodule Srh.Http.CommandHandler do
154
  {:ok, %{result: res}}
155
 
156
  {:error, error} ->
157
- {
158
- :redis_error,
159
- %{
160
- error: error.message
161
- }
162
- }
163
  end
164
 
165
  {:error, msg} ->
166
  {:server_error, msg}
167
  end
168
  end
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
169
  end
 
73
  {:ok, result_map} ->
74
  [result_map | responses]
75
 
76
+ {:connection_error, result} ->
77
+ {:connection_error, result}
78
+
79
  {:redis_error, result} ->
80
  [result | responses]
81
  end
82
 
83
+ case updated_responses do
84
+ {:connection_error, result} ->
85
+ {:connection_error, result}
86
+
87
+ _ ->
88
+ dispatch_command_array(rest, connection_info, updated_responses)
89
+ end
90
  end
91
 
92
  defp dispatch_command_array([], _connection_info, responses) do
 
104
  # Borrow a client, then run all of the commands (wrapped in MULTI and EXEC)
105
  worker_pid = Client.borrow_worker(client_pid)
106
 
107
+ # We are manually going to invoke the MULTI, because there might be a connection error to the Redis server.
108
+ # In that case, we don't want the error to be wound up in the array of errors,
109
+ # we instead want to return the error immediately.
110
+ case ClientWorker.redis_command(worker_pid, ["MULTI"]) do
111
+ {:ok, _} ->
112
+ do_dispatch_command_transaction_array(command_array, worker_pid, responses)
113
 
114
+ # Now manually run the EXEC - this is what contains the information to form the response, not the above
115
+ result =
116
+ case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
117
+ {:ok, res} ->
118
+ {
119
+ :ok,
120
+ res
121
+ |> Enum.map(&%{result: &1})
122
+ }
123
+
124
+ {:error, error} ->
125
+ decode_error(error, srh_id)
126
+ end
127
+
128
+ Client.return_worker(client_pid, worker_pid)
129
+
130
+ # Fire back the result here, because the initial Multi was successful
131
+ result
132
 
133
  {:error, error} ->
134
+ decode_error(error, srh_id)
135
  end
136
 
 
 
 
137
  {:error, msg} ->
138
  {:server_error, msg}
139
  end
140
  end
141
 
142
+ defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses)
143
+ when is_pid(worker_pid) do
144
+ updated_responses =
145
+ case ClientWorker.redis_command(worker_pid, current) do
146
+ {:ok, res} ->
147
+ [%{result: res} | responses]
148
+
149
+ {:error, error} ->
150
+ [
151
+ %{
152
+ error: error.message
153
+ }
154
+ | responses
155
+ ]
156
+ end
157
 
158
  do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
159
  end
 
176
  {:ok, %{result: res}}
177
 
178
  {:error, error} ->
179
+ decode_error(error, srh_id)
 
 
 
 
 
180
  end
181
 
182
  {:error, msg} ->
183
  {:server_error, msg}
184
  end
185
  end
186
+
187
+ # Figure out if it's an actual Redis error or a Redix error
188
+ defp decode_error(error, srh_id) do
189
+ case error do
190
+ %{reason: :closed} ->
191
+ IO.puts(
192
+ "WARNING: SRH was unable to connect to the Redis server. Please make sure it is running, and the connection information is correct. SRH ID: #{srh_id}"
193
+ )
194
+
195
+ {
196
+ :connection_error,
197
+ "SRH: Unable to connect to the Redis server. See SRH logs for more information."
198
+ }
199
+
200
+ _ ->
201
+ {
202
+ :redis_error,
203
+ %{
204
+ error: error.message
205
+ }
206
+ }
207
+ end
208
+ end
209
  end
lib/srh/http/request_validator.ex CHANGED
@@ -34,7 +34,6 @@ defmodule Srh.Http.RequestValidator do
34
  defp do_validate_encoding_header([first_item | rest]) do
35
  case first_item do
36
  "base64" -> {:ok, true}
37
-
38
  _ -> do_validate_encoding_header(rest)
39
  end
40
  end
 
34
  defp do_validate_encoding_header([first_item | rest]) do
35
  case first_item do
36
  "base64" -> {:ok, true}
 
37
  _ -> do_validate_encoding_header(rest)
38
  end
39
  end
lib/srh/http/result_encoder.ex CHANGED
@@ -1,5 +1,4 @@
1
  defmodule Srh.Http.ResultEncoder do
2
-
3
  # Authentication errors don't get encoded, we need to skip over those
4
  def encode_response({:not_authorized, message}) do
5
  {:not_authorized, message}
@@ -10,6 +9,10 @@ defmodule Srh.Http.ResultEncoder do
10
  {:redis_error, error_result_map}
11
  end
12
 
 
 
 
 
13
  # List-based responses, they will contain multiple entries
14
  # It's important to note that this is DIFFERENT from a list of values,
15
  # as it's a list of separate command responses. Each is a map that either
@@ -27,12 +30,16 @@ defmodule Srh.Http.ResultEncoder do
27
  ## RESULT LIST ENCODING ##
28
 
29
  defp encode_response_list([current | rest], encoded_responses) do
30
- encoded_current_entry = case current do
31
- %{result: value} ->
32
- %{result: encode_result_value(value)} # Encode the value
33
- %{error: error_message} ->
34
- %{error: error_message} # We don't encode errors
35
- end
 
 
 
 
36
 
37
  encode_response_list(rest, [encoded_current_entry | encoded_responses])
38
  end
 
1
  defmodule Srh.Http.ResultEncoder do
 
2
  # Authentication errors don't get encoded, we need to skip over those
3
  def encode_response({:not_authorized, message}) do
4
  {:not_authorized, message}
 
9
  {:redis_error, error_result_map}
10
  end
11
 
12
+ def encode_response({:connection_error, error_result_map}) do
13
+ {:connection_error, error_result_map}
14
+ end
15
+
16
  # List-based responses, they will contain multiple entries
17
  # It's important to note that this is DIFFERENT from a list of values,
18
  # as it's a list of separate command responses. Each is a map that either
 
30
  ## RESULT LIST ENCODING ##
31
 
32
  defp encode_response_list([current | rest], encoded_responses) do
33
+ encoded_current_entry =
34
+ case current do
35
+ %{result: value} ->
36
+ # Encode the value
37
+ %{result: encode_result_value(value)}
38
+
39
+ %{error: error_message} ->
40
+ # We don't encode errors
41
+ %{error: error_message}
42
+ end
43
 
44
  encode_response_list(rest, [encoded_current_entry | encoded_responses])
45
  end
lib/srh/redis/client_registry.ex CHANGED
@@ -48,10 +48,9 @@ defmodule Srh.Redis.ClientRegistry do
48
  {:ok, pid},
49
  %{
50
  state_update
51
- |
52
- currently_borrowed_pids:
53
- [pid | state_update.currently_borrowed_pids]
54
- |> Enum.uniq()
55
  }
56
  }
57
  end
@@ -73,10 +72,9 @@ defmodule Srh.Redis.ClientRegistry do
73
  :noreply,
74
  %{
75
  state
76
- |
77
- worker_pids:
78
- [pid | state.worker_pids]
79
- |> Enum.uniq()
80
  }
81
  }
82
  end
 
48
  {:ok, pid},
49
  %{
50
  state_update
51
+ | currently_borrowed_pids:
52
+ [pid | state_update.currently_borrowed_pids]
53
+ |> Enum.uniq()
 
54
  }
55
  }
56
  end
 
72
  :noreply,
73
  %{
74
  state
75
+ | worker_pids:
76
+ [pid | state.worker_pids]
77
+ |> Enum.uniq()
 
78
  }
79
  }
80
  end
lib/srh/redis/client_worker.ex CHANGED
@@ -31,6 +31,7 @@ defmodule Srh.Redis.ClientWorker do
31
  {:ok, res} ->
32
  {:reply, {:ok, res}, state}
33
 
 
34
  {:error, res} ->
35
  {:reply, {:error, res}, state}
36
  end
@@ -52,7 +53,6 @@ defmodule Srh.Redis.ClientWorker do
52
  {:noreply, state}
53
  end
54
 
55
- # TODO: Handle host / port connections
56
  def handle_info(
57
  :create_connection,
58
  %{
@@ -62,6 +62,8 @@ defmodule Srh.Redis.ClientWorker do
62
  } = state
63
  )
64
  when is_binary(connection_string) do
 
 
65
  {:ok, pid} = Redix.start_link(connection_string)
66
  {:noreply, %{state | redix_pid: pid}}
67
  end
 
31
  {:ok, res} ->
32
  {:reply, {:ok, res}, state}
33
 
34
+ # Both connection errors and Redis command errors will be handled here
35
  {:error, res} ->
36
  {:reply, {:error, res}, state}
37
  end
 
53
  {:noreply, state}
54
  end
55
 
 
56
  def handle_info(
57
  :create_connection,
58
  %{
 
62
  } = state
63
  )
64
  when is_binary(connection_string) do
65
+ # NOTE: Redix only seems to open the connection when the first command is sent
66
+ # This means that this will return :ok even if the connection string may not actually be connectable
67
  {:ok, pid} = Redix.start_link(connection_string)
68
  {:noreply, %{state | redix_pid: pid}}
69
  end