Skip to content

Commit e4470a0

Browse files
committed
perf(clustering) no concurrent inflating and json decoding on dps
### Summary Previously it could be that `read_thread` caused concurrent blocking when inflating / json decoding incoming data. This commit moves it beyond the semaphore to make it non-concurrent.
1 parent 9e7af0a commit e4470a0

File tree

2 files changed

+67
-75
lines changed

2 files changed

+67
-75
lines changed

kong/clustering/data_plane.lua

+38-43
Original file line numberDiff line numberDiff line change
@@ -148,32 +148,46 @@ function _M:communicate(premature)
148148

149149
local ping_immediately
150150
local config_exit
151+
local next_data
151152

152153
local config_thread = ngx.thread.spawn(function()
153154
while not exiting() and not config_exit do
154155
local ok, err = config_semaphore:wait(1)
155156
if ok then
156-
local config_table = self.next_config
157-
if config_table then
158-
local config_hash = self.next_hash
159-
local hashes = self.next_hashes
160-
161-
local pok, res
162-
pok, res, err = pcall(config_helper.update,
163-
self.declarative_config, config_table, config_hash, hashes)
164-
if pok then
165-
if not res then
166-
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
157+
local data = next_data
158+
if data then
159+
local msg = assert(inflate_gzip(data))
160+
yield()
161+
msg = assert(cjson_decode(msg))
162+
yield()
163+
164+
if msg.type == "reconfigure" then
165+
if msg.timestamp then
166+
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane with timestamp: ",
167+
msg.timestamp, log_suffix)
168+
169+
else
170+
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", log_suffix)
167171
end
168172

169-
ping_immediately = true
173+
local config_table = assert(msg.config_table)
174+
local pok, res
175+
pok, res, err = pcall(config_helper.update, self.declarative_config,
176+
config_table, msg.config_hash, msg.hashes)
177+
if pok then
178+
if not res then
179+
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
180+
end
170181

171-
else
172-
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
173-
end
182+
ping_immediately = true
183+
184+
else
185+
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
186+
end
174187

175-
if self.next_config == config_table then
176-
self.next_config = nil
188+
if next_data == data then
189+
next_data = nil
190+
end
177191
end
178192
end
179193

@@ -223,31 +237,12 @@ function _M:communicate(premature)
223237
last_seen = ngx_time()
224238

225239
if typ == "binary" then
226-
data = assert(inflate_gzip(data))
227-
yield()
228-
229-
local msg = assert(cjson_decode(data))
230-
yield()
231-
232-
if msg.type == "reconfigure" then
233-
if msg.timestamp then
234-
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane with timestamp: ",
235-
msg.timestamp, log_suffix)
236-
237-
else
238-
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", log_suffix)
239-
end
240-
241-
self.next_config = assert(msg.config_table)
242-
self.next_hash = msg.config_hash
243-
self.next_hashes = msg.hashes
244-
245-
if config_semaphore:count() <= 0 then
246-
-- the following line always executes immediately after the `if` check
247-
-- because `:count` will never yield, end result is that the semaphore
248-
-- count is guaranteed to not exceed 1
249-
config_semaphore:post()
250-
end
240+
next_data = data
241+
if config_semaphore:count() <= 0 then
242+
-- the following line always executes immediately after the `if` check
243+
-- because `:count` will never yield, end result is that the semaphore
244+
-- count is guaranteed to not exceed 1
245+
config_semaphore:post()
251246
end
252247

253248
elseif typ == "pong" then
@@ -277,8 +272,8 @@ function _M:communicate(premature)
277272
-- the config thread might be holding a lock if it's in the middle of an
278273
-- update, so we need to give it a chance to terminate gracefully
279274
config_exit = true
280-
ok, err, perr = ngx.thread.wait(config_thread)
281275

276+
ok, err, perr = ngx.thread.wait(config_thread)
282277
if not ok then
283278
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)
284279

kong/clustering/wrpc_data_plane.lua

+29-32
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
local semaphore = require("ngx.semaphore")
32
local declarative = require("kong.db.declarative")
43
local wrpc = require("kong.tools.wrpc")
@@ -10,7 +9,9 @@ local cjson = require("cjson.safe")
109
local utils = require("kong.tools.utils")
1110
local assert = assert
1211
local setmetatable = setmetatable
12+
local tonumber = tonumber
1313
local math = math
14+
local traceback = debug.traceback
1415
local xpcall = xpcall
1516
local ngx = ngx
1617
local ngx_log = ngx.log
@@ -65,14 +66,7 @@ local function get_config_service()
6566
wrpc_config_service:set_handler("ConfigService.SyncConfig", function(peer, data)
6667
-- yield between steps to prevent long delay
6768
if peer.config_semaphore then
68-
local json_config = assert(inflate_gzip(data.config))
69-
yield()
70-
peer.config_obj.next_config = assert(cjson_decode(json_config))
71-
yield()
72-
73-
peer.config_obj.next_hash = data.config_hash
74-
peer.config_obj.next_hashes = data.hashes
75-
peer.config_obj.next_config_version = tonumber(data.version)
69+
peer.config_obj.next_data = data
7670
if peer.config_semaphore:count() <= 0 then
7771
-- the following line always executes immediately after the `if` check
7872
-- because `:count` will never yield, end result is that the semaphore
@@ -148,30 +142,33 @@ function _M:communicate(premature)
148142
peer.semaphore = nil
149143
config_semaphore = nil
150144
end
151-
local config_table = self.next_config
152-
local config_hash = self.next_hash
153-
local config_version = self.next_config_version
154-
local hashes = self.next_hashes
155-
if config_table and config_version > last_config_version then
156-
ngx_log(ngx_INFO, _log_prefix, "received config #", config_version, log_suffix)
157-
158-
local pok, res
159-
pok, res, err = xpcall(config_helper.update, debug.traceback,
160-
self.declarative_config, config_table, config_hash, hashes)
161-
if pok then
162-
last_config_version = config_version
163-
if not res then
164-
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
165-
end
166145

167-
else
168-
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
169-
end
146+
local data = self.next_data
147+
if data then
148+
local config_version = tonumber(data.version)
149+
if config_version > last_config_version then
150+
local config_table = assert(inflate_gzip(data.config))
151+
yield()
152+
config_table = assert(cjson_decode(config_table))
153+
yield()
154+
ngx_log(ngx_INFO, _log_prefix, "received config #", config_version, log_suffix)
155+
156+
local pok, res
157+
pok, res, err = xpcall(config_helper.update, traceback, self.declarative_config,
158+
config_table, data.config_hash, data.hashes)
159+
if pok then
160+
last_config_version = config_version
161+
if not res then
162+
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
163+
end
164+
165+
else
166+
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
167+
end
170168

171-
if self.next_config == config_table then
172-
self.next_config = nil
173-
self.next_hash = nil
174-
self.next_hashes = nil
169+
if self.next_data == data then
170+
self.next_data = nil
171+
end
175172
end
176173
end
177174

@@ -215,8 +212,8 @@ function _M:communicate(premature)
215212
-- the config thread might be holding a lock if it's in the middle of an
216213
-- update, so we need to give it a chance to terminate gracefully
217214
config_exit = true
218-
ok, err, perr = ngx.thread.wait(config_thread)
219215

216+
ok, err, perr = ngx.thread.wait(config_thread)
220217
if not ok then
221218
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)
222219

0 commit comments

Comments
 (0)