Skip to content

Commit d16c6ab

Browse files
author
Michael Kipper
committed
Implement the scale_factor for host-based sliding windows
1 parent efd72d0 commit d16c6ab

18 files changed

+184
-61
lines changed

ext/semian/resource.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,8 @@ static inline void
337337
semian_resource_free(void *ptr)
338338
{
339339
semian_resource_t *res = (semian_resource_t *) ptr;
340+
dprintf("Freeing resource sem_id:%d", res->sem_id);
341+
340342
if (res->name) {
341343
free(res->name);
342344
res->name = NULL;

ext/semian/sliding_window.c

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,18 @@ static int
4141
check_max_size_arg(VALUE max_size)
4242
{
4343
int retval = -1;
44-
switch (TYPE(max_size)) {
44+
switch (rb_type(max_size)) {
4545
case T_NIL:
46+
case T_UNDEF:
4647
retval = SLIDING_WINDOW_MAX_SIZE; break;
4748
case T_FLOAT:
4849
rb_warn("semian sliding window max_size is a float, converting to fixnum");
4950
retval = (int)(RFLOAT_VALUE(max_size)); break;
50-
default:
51+
case T_FIXNUM:
52+
case T_BIGNUM:
5153
retval = RB_NUM2INT(max_size); break;
54+
default:
55+
rb_raise(rb_eArgError, "unknown type for max_size: %d", TYPE(max_size));
5256
}
5357

5458
if (retval <= 0) {
@@ -60,6 +64,33 @@ check_max_size_arg(VALUE max_size)
6064
return retval;
6165
}
6266

67+
static float
68+
check_scale_factor_arg(VALUE scale_factor)
69+
{
70+
float retval = 1.0;
71+
switch(rb_type(scale_factor)) {
72+
case T_NIL:
73+
case T_UNDEF:
74+
retval = 1.0; break;
75+
case T_FLOAT:
76+
retval = rb_float_value(scale_factor); break;
77+
case T_FIXNUM:
78+
case T_BIGNUM:
79+
rb_warn("semian sliding window scale_factor is an int, converting to float");
80+
retval = (float)RB_NUM2INT(scale_factor); break;
81+
default:
82+
rb_raise(rb_eArgError, "unknown type for scale_factor: %d", TYPE(scale_factor));
83+
}
84+
85+
if (retval <= 0.0) {
86+
rb_raise(rb_eArgError, "scale_factor must be greater than zero");
87+
} else if (retval > 1.0) {
88+
rb_raise(rb_eArgError, "scale_factor cannot be greater than 1.0");
89+
}
90+
91+
return retval;
92+
}
93+
6394
static VALUE
6495
grow_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
6596
{
@@ -106,12 +137,12 @@ shrink_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
106137
window->end = 0;
107138
} else if (window->end > window->start) {
108139
// Easy case - the window doesn't wrap around
109-
window->end = window->start + new_length;
140+
window->start = window->start + new_length;
110141
} else {
111142
// Hard case - the window wraps, so re-index the data
112143
// Adapted from http://www.cplusplus.com/reference/algorithm/rotate/
113144
int first = 0;
114-
int middle = window->start;
145+
int middle = (window->end - new_max_size + window->max_size) % window->max_size;
115146
int last = window->max_size;
116147
int next = middle;
117148
while (first != next) {
@@ -169,10 +200,11 @@ Init_SlidingWindow()
169200
VALUE cSlidingWindow = rb_const_get(cSimple, rb_intern("SlidingWindow"));
170201

171202
rb_define_alloc_func(cSlidingWindow, semian_simple_sliding_window_alloc);
172-
rb_define_method(cSlidingWindow, "initialize_sliding_window", semian_simple_sliding_window_initialize, 2);
203+
rb_define_method(cSlidingWindow, "initialize_sliding_window", semian_simple_sliding_window_initialize, 3);
173204
rb_define_method(cSlidingWindow, "size", semian_simple_sliding_window_size, 0);
174205
rb_define_method(cSlidingWindow, "resize_to", semian_simple_sliding_window_resize_to, 1);
175-
rb_define_method(cSlidingWindow, "max_size", semian_simple_sliding_window_max_size, 0);
206+
rb_define_method(cSlidingWindow, "max_size", semian_simple_sliding_window_max_size_get, 0);
207+
rb_define_method(cSlidingWindow, "max_size=", semian_simple_sliding_window_max_size_set, 1);
176208
rb_define_method(cSlidingWindow, "values", semian_simple_sliding_window_values, 0);
177209
rb_define_method(cSlidingWindow, "last", semian_simple_sliding_window_last, 0);
178210
rb_define_method(cSlidingWindow, "<<", semian_simple_sliding_window_push, 1);
@@ -212,7 +244,7 @@ static int max(int a, int b) {
212244
}
213245

214246
VALUE
215-
semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size)
247+
semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size, VALUE scale_factor)
216248
{
217249
semian_simple_sliding_window_t *res = get_object(self);
218250

@@ -228,13 +260,15 @@ semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size)
228260
res->sem_id = initialize_single_semaphore(res->key, SEM_DEFAULT_PERMISSIONS);
229261
res->shmem = get_or_create_shared_memory(res->key, init_fn);
230262
res->error_threshold = check_max_size_arg(max_size);
263+
res->scale_factor = check_scale_factor_arg(scale_factor);
231264

232265
sem_meta_lock(res->sem_id);
233266
{
234267
int workers = get_number_of_registered_workers(res);
235-
float scale_factor = (workers > 1) ? 0.2 : 1.0; // TODO: Parameterize
236-
int error_threshold = max(res->error_threshold, (int) ceil(workers * scale_factor * res->error_threshold));
268+
float scale = (workers > 1) ? res->scale_factor : 1.0; // TODO: Parameterize
269+
int error_threshold = max(res->error_threshold, (int) ceil(workers * scale * res->error_threshold));
237270

271+
dprintf(" workers:%d scale:%0.2f error_threshold:%d", workers, scale, error_threshold);
238272
resize_window(res->shmem, error_threshold);
239273
}
240274
sem_meta_unlock(res->sem_id);
@@ -264,6 +298,10 @@ semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size)
264298
VALUE retval = Qnil;
265299

266300
int new_max_size = RB_NUM2INT(new_size);
301+
if (new_max_size < 1) {
302+
rb_raise(rb_eArgError, "cannot resize to %d", new_max_size);
303+
}
304+
267305
sem_meta_lock(res->sem_id);
268306
{
269307
retval = resize_window(res->shmem, new_max_size);
@@ -274,7 +312,7 @@ semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size)
274312
}
275313

276314
VALUE
277-
semian_simple_sliding_window_max_size(VALUE self)
315+
semian_simple_sliding_window_max_size_get(VALUE self)
278316
{
279317
semian_simple_sliding_window_t *res = get_object(self);
280318
VALUE retval;
@@ -288,6 +326,26 @@ semian_simple_sliding_window_max_size(VALUE self)
288326
return retval;
289327
}
290328

329+
VALUE
330+
semian_simple_sliding_window_max_size_set(VALUE self, VALUE new_size)
331+
{
332+
semian_simple_sliding_window_t *res = get_object(self);
333+
VALUE retval;
334+
335+
int new_max_size = RB_NUM2INT(new_size);
336+
if (new_max_size < 1) {
337+
rb_raise(rb_eArgError, "max_size must be positive");
338+
}
339+
340+
sem_meta_lock(res->sem_id);
341+
{
342+
retval = resize_window(res->shmem, new_max_size);
343+
}
344+
sem_meta_unlock(res->sem_id);
345+
346+
return retval;
347+
}
348+
291349
VALUE
292350
semian_simple_sliding_window_values(VALUE self)
293351
{

ext/semian/sliding_window.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
void Init_SlidingWindow();
88

99
VALUE semian_simple_sliding_window_alloc(VALUE klass);
10-
VALUE semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size);
10+
VALUE semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size, VALUE scale_factor);
1111
VALUE semian_simple_sliding_window_size(VALUE self);
1212
VALUE semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size);
13-
VALUE semian_simple_sliding_window_max_size(VALUE self);
13+
VALUE semian_simple_sliding_window_max_size_get(VALUE self);
14+
VALUE semian_simple_sliding_window_max_size_set(VALUE self, VALUE new_size);
1415
VALUE semian_simple_sliding_window_push(VALUE self, VALUE value);
1516
VALUE semian_simple_sliding_window_values(VALUE self);
1617
VALUE semian_simple_sliding_window_last(VALUE self);

ext/semian/sysv_semaphores.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ raise_semian_syscall_error(const char *syscall, int error_num)
2929
void
3030
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota)
3131
{
32-
3332
res->key = generate_key(id_str);
33+
dprintf("Initializing semaphore set for key:%lu", res->key);
34+
3435
res->strkey = (char*) malloc((2 /*for 0x*/+ sizeof(uint64_t) /*actual key*/+ 1 /*null*/) * sizeof(char));
3536
sprintf(res->strkey, "0x%08x", (unsigned int) res->key);
3637

@@ -60,6 +61,7 @@ initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permis
6061
Ensure that a worker for this process is registered.
6162
Note that from ruby we ensure that at most one worker may be registered per process.
6263
*/
64+
dprintf("Registering worker for sem_id:%d", res->sem_id);
6365
if (perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, 1, SEM_UNDO, NULL) == -1) {
6466
rb_raise(eInternal, "error incrementing registered workers, errno: %d (%s)", errno, strerror(errno));
6567
}
@@ -120,10 +122,12 @@ perform_semop(int sem_id, short index, short op, short flags, struct timespec *t
120122
int
121123
get_sem_val(int sem_id, int sem_index)
122124
{
125+
dprintf("get_sem_val(sem_id: %d, sem_index: %d)", sem_id, sem_index);
123126
int ret = semctl(sem_id, sem_index, GETVAL);
124127
if (ret == -1) {
125128
rb_raise(eInternal, "error getting value of %s for sem %d, errno: %d (%s)", SEMINDEX_STRING[sem_index], sem_id, errno, strerror(errno));
126129
}
130+
dprintf("get_sem_val(sem_id: %d, sem_index: %d) == %d", sem_id, sem_index, ret);
127131
return ret;
128132
}
129133

@@ -245,6 +249,7 @@ diff_timespec_ms(struct timespec *end, struct timespec *begin)
245249
int
246250
initialize_single_semaphore(uint64_t key, long permissions)
247251
{
252+
dprintf("Initializing single semaphore for key:%lu", key);
248253
int sem_id = semget(key, 1, IPC_CREAT | IPC_EXCL | permissions);
249254

250255
/*

ext/semian/sysv_semaphores.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ initialize_single_semaphore(uint64_t key, long permissions);
118118
static inline void
119119
dprint_sem_vals(int sem_id)
120120
{
121+
dprintf("dprintf(%d)", sem_id);
121122
dprintf("sem_id: %d, lock: %d, tickets: %d configured: %d, registered workers %d",
122123
sem_id,
123124
get_sem_val(sem_id, SI_SEM_LOCK),

ext/semian/types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ typedef struct {
7878
int sem_id;
7979
uint64_t parent_key;
8080
int error_threshold;
81+
float scale_factor;
8182
semian_simple_sliding_window_shared_t* shmem;
8283
} semian_simple_sliding_window_t;
8384

lib/semian.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ def create_circuit_breaker(name, **options)
259259
exceptions: Array(exceptions) + [::Semian::BaseError],
260260
half_open_resource_timeout: options[:half_open_resource_timeout],
261261
implementation: implementation(**options),
262+
scale_factor: options[:scale_factor],
262263
)
263264
end
264265

lib/semian/circuit_breaker.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,18 @@ class CircuitBreaker #:nodoc:
77
attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error
88

99
def initialize(name, exceptions:, success_threshold:, error_threshold:,
10-
error_timeout:, implementation:, half_open_resource_timeout: nil)
11-
@name = name.to_sym
12-
initialize_circuit_breaker(name, error_threshold)
10+
error_timeout:, implementation:, half_open_resource_timeout: nil, scale_factor: nil)
11+
initialize_circuit_breaker(name, error_threshold) if respond_to?(:initialize_circuit_breaker)
1312

13+
@name = name.to_sym
1414
@success_count_threshold = success_threshold
1515
@error_count_threshold = error_threshold
16+
@scale_factor = scale_factor
1617
@error_timeout = error_timeout
1718
@exceptions = exceptions
1819
@half_open_resource_timeout = half_open_resource_timeout
1920

20-
@errors = implementation::SlidingWindow.new(name, max_size: @error_count_threshold)
21+
@errors = implementation::SlidingWindow.new(name, max_size: @error_count_threshold, scale_factor: @scale_factor)
2122
@successes = implementation::Integer.new("#{name}_successes")
2223
state_val = implementation::Integer.new("#{name}_state")
2324
@state = implementation::State.new(state_val)

lib/semian/simple_sliding_window.rb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ class SlidingWindow #:nodoc:
99
# like this: if @max_size = 4, current time is 10, @window =[5,7,9,10].
1010
# Another push of (11) at 11 sec would make @window [7,9,10,11], shifting off 5.
1111

12-
def initialize(name, max_size:)
13-
initialize_sliding_window(name, max_size)
12+
def initialize(name, max_size:, scale_factor: nil)
13+
initialize_sliding_window(name, max_size, scale_factor) if respond_to?(:initialize_sliding_window)
14+
15+
@name = name.to_sym
1416
@max_size = max_size
1517
@window = []
1618
end
@@ -44,6 +46,12 @@ def clear
4446
end
4547
alias_method :destroy, :clear
4648

49+
def max_size=(value)
50+
raise ArgumentError, "max_size must be positive" if value <= 0
51+
@max_size = value
52+
resize_to(value)
53+
end
54+
4755
private
4856

4957
def resize_to(size)

test/adapter_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def test_adapter_registers_consumer
2121
end
2222

2323
def test_unregister
24-
skip if ENV["SKIP_FLAKY_TESTS"]
24+
skip if flaky
2525
client = Semian::AdapterTestClient.new(quota: 0.5)
2626
assert_nil(Semian.resources[:testing])
2727
resource = Semian.register(:testing, tickets: 2, error_threshold: 1, error_timeout: 0, success_threshold: 0)

test/circuit_breaker_test.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ def test_acquire_yield_when_the_circuit_is_closed
2525
def test_acquire_raises_circuit_open_error_when_the_circuit_is_open
2626
open_circuit!
2727
assert_raises Semian::OpenCircuitError do
28-
puts "test: Acquiring resource #{@resource.circuit_breaker}"
2928
@resource.acquire { 1 + 1 }
3029
end
3130
assert_match(/State transition from closed to open/, @strio.string)
@@ -150,6 +149,10 @@ def test_semian_wide_env_var_disables_circuit_breaker
150149
end
151150

152151
class RawResource
152+
def initialize
153+
@timeout = 2
154+
end
155+
153156
def timeout
154157
@timeout || 2
155158
end

test/grpc_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def test_unavailable_server_opens_the_circuit
6464
end
6565

6666
def test_timeout_opens_the_circuit
67-
skip if ENV["SKIP_FLAKY_TESTS"]
67+
skip if flaky
6868
stub = build_insecure_stub(EchoStub, host: "#{SemianConfig['toxiproxy_upstream_host']}:#{SemianConfig['grpc_toxiproxy_port']}", opts: {timeout: 0.1})
6969
run_services_on_server(@server, services: [EchoService]) do
7070
Toxiproxy['semian_test_grpc'].downstream(:latency, latency: 1000).apply do
@@ -86,7 +86,7 @@ def test_timeout_opens_the_circuit
8686
def test_instrumentation
8787
notified = false
8888
subscriber = Semian.subscribe do |event, resource, scope, adapter|
89-
next if event != :success
89+
next unless event == :success
9090

9191
notified = true
9292
assert_equal Semian[@host], resource

test/helpers/circuit_breaker_helper.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ def half_open_cicuit!(resource = @resource, backwards_time_travel = 10)
1515

1616
def trigger_error!(resource = @resource, error = SomeError)
1717
resource.acquire do
18-
puts "Triggering error"
1918
raise error
2019
end
2120
rescue error

test/lru_hash_test.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ def test_clean_instrumentation
113113

114114
notified = false
115115
subscriber = Semian.subscribe do |event, resource, scope, adapter, payload|
116+
next unless event == :lru_hash_gc
117+
116118
notified = true
117-
assert_equal :lru_hash_gc, event
118119
assert_equal @lru_hash, resource
119120
assert_nil scope
120121
assert_nil adapter
@@ -234,6 +235,7 @@ def create_circuit_breaker(name, exceptions = true, bulkhead = false, error_time
234235
exceptions: [::Semian::BaseError],
235236
half_open_resource_timeout: nil,
236237
implementation: implementation,
238+
scale_factor: 1.0,
237239
)
238240
circuit_breaker.mark_failed(nil) if exceptions
239241
Semian::ProtectedResource.new(name, create_bulkhead(name, bulkhead), circuit_breaker)

test/net_http_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ def test_5xxs_trip_circuit_when_fatal_server_flag_enabled
361361
end
362362

363363
def test_5xxs_dont_raise_exceptions_unless_fatal_server_flag_enabled
364-
skip if ENV["SKIP_FLAKY_TESTS"]
364+
skip if flaky
365365
with_semian_configuration do
366366
with_server do
367367
http = Net::HTTP.new(SemianConfig['http_host'], SemianConfig['http_port_service_a'])

0 commit comments

Comments
 (0)