Skip to content

Commit cd66fa3

Browse files
authored
feat: new event-based decisions log buffer implementation (#7446)
This new event-based buffer provides a performance improvement over the existing buffer by reducing locks and allowing concurrent writes and uploads. The buffer size is managed by number of individual events opposed to total bytes. Signed-off-by: sspaink <[email protected]>
1 parent c8febc8 commit cd66fa3

File tree

7 files changed

+1743
-680
lines changed

7 files changed

+1743
-680
lines changed

docs/content/configuration.md

+18-16
Original file line numberDiff line numberDiff line change
@@ -779,22 +779,24 @@ included in the actual bundle gzipped tarball.
779779

780780
## Decision Logs
781781

782-
| Field | Type | Required | Description |
783-
|----------------------------------------------------| -- | --- | --- |
784-
| `decision_logs.service` | `string` | No | Name of the service to use to contact remote server. If no `plugin` is specified, and `console` logging is disabled, this will default to the first `service` name defined in the Services configuration. |
785-
| `decision_logs.partition_name` | `string` | No | Deprecated: Use `resource` instead. Path segment to include in status updates. |
786-
| `decision_logs.resource` | `string` | No (default: `/logs`) | Full path to use for sending decision logs to a remote server. |
787-
| `decision_logs.reporting.buffer_size_limit_bytes` | `int64` | No | Decision log buffer size limit in bytes. OPA will drop old events from the log if this limit is exceeded. By default, no limit is set. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. |
788-
| `decision_logs.reporting.max_decisions_per_second` | `float64` | No | Maximum number of decision log events to buffer per second. OPA will drop events if the rate limit is exceeded. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. |
789-
| `decision_logs.reporting.upload_size_limit_bytes` | `int64` | No (default: `32768`) | Decision log upload size limit in bytes. OPA will chunk uploads to cap message body to this limit. |
790-
| `decision_logs.reporting.min_delay_seconds` | `int64` | No (default: `300`) | Minimum amount of time to wait between uploads. |
791-
| `decision_logs.reporting.max_delay_seconds` | `int64` | No (default: `600`) | Maximum amount of time to wait between uploads. |
792-
| `decision_logs.reporting.trigger` | `string` | No (default: `periodic`) | Controls how decision logs are reported to the remote server. Allowed values are `periodic` and `manual` (`manual` triggers are only possible when using OPA as a Go package). |
793-
| `decision_logs.mask_decision` | `string` | No (default: `/system/log/mask`) | Set path of masking decision. |
794-
| `decision_logs.drop_decision` | `string` | No (default: `/system/log/drop`) | Set path of drop decision. |
795-
| `decision_logs.plugin` | `string` | No | Use the named plugin for decision logging. If this field exists, the other configuration fields are not required. |
796-
| `decision_logs.console` | `boolean` | No (default: `false`) | Log the decisions locally to the console. When enabled alongside a remote decision logging API the `service` must be configured, the default `service` selection will be disabled. |
797-
| `decision_logs.request_context.http.headers` | `array` | No | List of HTTP headers to include in the decision log. OPA will include the values for these headers in the decision log if they exist in the incoming HTTP request. |
782+
| Field | Type | Required | Description |
783+
|----------------------------------------------------|-----------|-----------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
784+
| `decision_logs.service` | `string` | No | Name of the service to use to contact remote server. If no `plugin` is specified, and `console` logging is disabled, this will default to the first `service` name defined in the Services configuration. |
785+
| `decision_logs.partition_name` | `string` | No | Deprecated: Use `resource` instead. Path segment to include in status updates. |
786+
| `decision_logs.resource` | `string` | No (default: `/logs`) | Full path to use for sending decision logs to a remote server. |
787+
| `decision_logs.reporting.buffer_type` | `string` | No (default: `size`) | Toggles the type of buffer to use. The two available options are "size" or "event". Refer to the [Decision Log Plugin README](https://github.com/open-policy-agent/opa/tree/main/v1/plugins/logs/README.md) for for a detailed comparison. |
788+
| `decision_logs.reporting.buffer_size_limit_events` | `int64` | No (default: `10000`) | Decision log buffer size limit by events. OPA will drop old events from the log if this limit is exceeded. By default, 100 events are held. This number has to be greater than zero. Only works with "event" buffer type. |
789+
| `decision_logs.reporting.buffer_size_limit_bytes` | `int64` | No (default: `unlimited`) | Decision log buffer size limit in bytes. OPA will drop old events from the log if this limit is exceeded. By default, no limit is set. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. Only works with "size" buffer type. |
790+
| `decision_logs.reporting.max_decisions_per_second` | `float64` | No | Maximum number of decision log events to buffer per second. OPA will drop events if the rate limit is exceeded. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. |
791+
| `decision_logs.reporting.upload_size_limit_bytes` | `int64` | No (default: `32768`) | Decision log upload size limit in bytes. OPA will chunk uploads to cap message body to this limit. |
792+
| `decision_logs.reporting.min_delay_seconds` | `int64` | No (default: `300`) | Minimum amount of time to wait between uploads. |
793+
| `decision_logs.reporting.max_delay_seconds` | `int64` | No (default: `600`) | Maximum amount of time to wait between uploads. |
794+
| `decision_logs.reporting.trigger` | `string` | No (default: `periodic`) | Controls how decision logs are reported to the remote server. Allowed values are `periodic` and `manual` (`manual` triggers are only possible when using OPA as a Go package). |
795+
| `decision_logs.mask_decision` | `string` | No (default: `/system/log/mask`) | Set path of masking decision. |
796+
| `decision_logs.drop_decision` | `string` | No (default: `/system/log/drop`) | Set path of drop decision. |
797+
| `decision_logs.plugin` | `string` | No | Use the named plugin for decision logging. If this field exists, the other configuration fields are not required. |
798+
| `decision_logs.console` | `boolean` | No (default: `false`) | Log the decisions locally to the console. When enabled alongside a remote decision logging API the `service` must be configured, the default `service` selection will be disabled. |
799+
| `decision_logs.request_context.http.headers` | `array` | No | List of HTTP headers to include in the decision log. OPA will include the values for these headers in the decision log if they exist in the incoming HTTP request. |
798800

799801
## Discovery
800802

v1/plugins/logs/README.md

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Decision Log Plugin
2+
3+
The decision log plugin is responsible for gathering decision events from multiple sources and upload them to a service.
4+
[This plugin is highly configurable](https://www.openpolicyagent.org/docs/latest/configuration/#decision-logs), allowing
5+
the user to decide when to upload, drop or proxy a logged event. Each configuration can be dynamically updated while OPA is running.
6+
7+
Events are uploaded in gzip compressed JSON array's at a user defined interval. This can either be triggered periodically
8+
or manually through the SDK. The size of the gzip compressed JSON array is limited by `upload_size_limit_bytes`.
9+
10+
There are two buffer implementations that can be selected by setting `decision_logs.reporting.buffer_type`, defaults to `size`
11+
12+
## Event Buffer
13+
14+
* `decision_logs.reporting.buffer_type=event`
15+
16+
As events are logged each event is encoded and saved in a buffer. When an upload is triggered, all the events currently
17+
in the buffer are uploaded in chunks (limited by `upload_size_limit_bytes`). The oldest events will drop if the buffer
18+
is full, the limit can be configured by changing `buffer_size_limit_events`.
19+
20+
Pros:
21+
* Compressing only on upload keeps the event and JSON array buffers separate so they don't have to be in sync.
22+
* Individual events can be dropped quicker without having to decompress the events.
23+
* Using a channel as a buffer allows events to be written to the buffer concurrently.
24+
25+
Cons:
26+
* Upload will be slower as the events need to be compressed.
27+
* A buffer limit has to be set, unlimited size isn't allowed.
28+
29+
```mermaid
30+
---
31+
title: Event Upload Flow
32+
---
33+
flowchart LR
34+
1["Producer 1"] -. event .-> Buffer
35+
2["Producer 2"] -. event .-> Buffer
36+
3["Producer 3"] -. event .-> Buffer
37+
subgraph log [Log Plugin]
38+
Buffer --> package
39+
subgraph package [JSON Array]
40+
A["[event, event, event, event ....]"]
41+
end
42+
end
43+
package -. POST .-> service
44+
classDef large font-size:20pt;
45+
46+
```
47+
48+
## Size Buffer
49+
50+
* `decision_logs.reporting.buffer_type=size`
51+
52+
As events are logged they are encoded and compressed into a JSON Array before being added to the buffer. When an upload
53+
is triggered the current buffer is emptied, uploading each JSON Array of events as chunks. By default, the buffer is an
54+
unlimited size but if `buffer_size_limit_bytes` is configured the oldest events will be dropped.
55+
56+
Pros:
57+
* Uploads are quicker because each event is already encoded and compressed.
58+
* The local memory in bytes of the buffer can be limited.
59+
60+
Cons:
61+
* Events can flow between the encoder and buffer requiring a heavy use of locks.
62+
* Dropping an individual event requires decompressing an entire array of events.
63+
* Adding events to the buffer is slower as compression happens on write.
64+
65+
```mermaid
66+
---
67+
title: Event Upload Flow
68+
---
69+
flowchart LR
70+
1["Producer 1"] -. event .-> Encoder
71+
2["Producer 2"] -. event .-> Encoder
72+
3["Producer 3"] -. event .-> Encoder
73+
subgraph log [Log Plugin]
74+
Encoder --> package
75+
subgraph package [JSON Array]
76+
A["[event, event, ...]"]
77+
end
78+
subgraph Buffer [Buffer]
79+
B["[[event, event, ...], [event, event, ...]]"]
80+
end
81+
package --> Buffer
82+
Buffer --> Encoder
83+
84+
end
85+
Buffer -. POST .-> service
86+
classDef large font-size:20pt;
87+
88+
```

v1/plugins/logs/encoder.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ func (enc *chunkEncoder) Write(event EventV1) (result [][]byte, err error) {
6464
return enc.WriteBytes(buf.Bytes())
6565
}
6666

67+
// WriteBytes attempts to write a serialized event to the current chunk.
68+
// If the upload limit is reached the chunk is closed and a result is returned.
69+
// The incoming event that didn't fit is added to the next chunk.
6770
func (enc *chunkEncoder) WriteBytes(bs []byte) (result [][]byte, err error) {
6871
if len(bs) == 0 {
6972
return nil, nil
@@ -232,7 +235,6 @@ func (dec *chunkDecoder) decode() ([]EventV1, error) {
232235
if err := json.NewDecoder(gr).Decode(&events); err != nil {
233236
return nil, err
234237
}
235-
gr.Close()
236238

237-
return events, nil
239+
return events, gr.Close()
238240
}

0 commit comments

Comments
 (0)