Description
Not required, but reading #6736 and #6737 will help build strong context for the below proposal.
The issue initially came from solving automatic red cluster recovery for remote store enabled cluster.
Overview
Remote store clusters
Remote-backed storage allows OpenSearch users to automatically create backups of all index transactions and send them to remote storage, providing protection against data loss. It works by indexing writes only to the primary shard, and then uploading the translog to a remote store. Segments are also uploaded to the remote store, allowing replica shards to source a copy from there instead of the primary. This helps optimize write performance by avoiding the need to replicate data to replicas. Remote-backed storage is configured at the cluster level and uses segment replication model underneath. It integrates with various remote storage providers like Amazon S3, Google Cloud Storage, and Azure Blob Storage.
Documentation: https://opensearch.org/docs/latest/tuning-your-cluster/availability-and-recovery/remote-store/index/
Red cluster
An OpenSearch cluster turns red when one or more primary shards are not allocated or available in the cluster, typically due to hardware failures, network problems, configuration issues, data corruption or disk space issues preventing the cluster from serving all data completely. In most of these cases today, the OpenSearch user must act manually to fix the problem. For non-remote storage clusters, the customer can recover the data only if they have snapshot backups taken earlier. For remote storage backed clusters, the user can take into use the _remotestore/restore
api to recover all the ingested data.
Reading material:
Multi-writer detection
With remote store enabled clusters, the remote store serves as the primary and single source of data. Since the writers now writes the data to the remote store, it can also lead to multiple writers trying to write and overwrite the data on remote store. In order to ensure that a write on a stale / isolated primary shard is not acknowledged, we use primary term validation to check if the shard is still the acting primary. If not, the request is not acknowledged. Similarly, for ensuring the the data written on remote store is not overwritten, we use a mix of primary term (which is a monotonically increasing writer version assigned by the cluster manager), node id, creation timestamp amongst other things for disambiguating the correct writer by adding these information in the metadata file name.
Reading material:
- [Remote Store] Primary term validation with replicas - New approach POC #5033
- [Discuss] Remote Storage File Format #8437
Problem statement
The current architecture of multi writer detection has several areas of improvement:
- Existing mechanism for detection of multiple writer works for shards present in replication group observed by the primary shard. If
_remotestore/restore
api is invoked while the stale shard is still writing, then it can lead to multi-writer corruption and data loss. - Recovery of red shard needs to be managed by an external system to ensure that there are no more than 1 writer at any point; can’t implement automated recovery within OpenSearch core.
- Node to node communication is required for primary term validation which can lead to a bad node slowing down the ingestion significantly. This, unnecessarily, couples the primary and replica shard to ensure correctness.
- Lacks direct leasing mechanism for writers when using remote store providers like Amazon S3, Google Cloud Storage, and Azure Blob Storage which provides conditional APIs.
Existing approach: Primary term validation
Before discussing proposed solution, it is important to understand how multi writer detection works for write flow currently.
The current implementation handles write/bulk requests in a synchronous and blocking manner. Here's the step-by-step flow for a successful write operation:
- Client sends a write/bulk request to the primary shard
- Primary shard creates a translog entry locally
- Primary shard uploads the translog to remote storage
- Primary shard uploads the translog metadata file to remote storage
- Primary shard initiates parallel primary term validation calls to all replica shards
- System waits for responses from all replica shards
- Client receives a status code based on the validation responses from replica shards
The primary term validation process handles two distinct failure scenarios:
Scenario 1: Communication Failure with Replica Shard
- Primary shard attempts to communicate with replica shard
- If communication fails, Primary shard notifies cluster manager
- Cluster manager marks the replica shard as failed
Scenario 2: Primary Term Mismatch
- Primary shard receives response from replica shard
- If replica's primary term is higher than current, Primary shard initiates self-failure
- Primary shard notifies cluster manager about its failure
Below is the image representation of primary term validation (existing approach)
Proposed high level solution
The core idea is to leverage conditional writes which allows one to perform write/update/delete operation only if a specified condition is met, ensuring data integrity and consistency by preventing unintended data overwrites. In a system like OpenSearch where it is possible for multiple writers to assume that they are active, the downstream remote store provider can be offloaded the responsibility to determine if a write should be accepted or not by the virtue of the condition that is passed during such writes. For remote store enabled cluster, this means that the primary shard no longer needs to call other nodes (holding replica shards) to confirm if they are still the active primary shard and remote store provider does that for them. If a stale or isolated primary shard sends a write to remote store, then the request fails and the shard can fail itself during such scenarios. The existing mechanism around fault detection like leader checker, follower checker, FS health check, lagging cluster state, etc continues to work as usual.
Before discussing the new proposed approaches for handling multi-writer detection, we discuss below the support around conditional writes and versioning that is provided by the majorly used cloud storage providers.
Cloud Storage Conditional Operations & versioning Support
All major cloud storage providers (AWS S3, Azure Blob Storage, and Google Cloud Storage) offer enterprise-grade object storage solutions with sophisticated data consistency mechanisms and version control capabilities. These services provide comprehensive support for conditional operations (PUT, GET, HEAD) using different version identifiers (ETags, Generation Numbers) and status codes (304, 412) to maintain data consistency and prevent race conditions. Versioning capabilities are implemented at the bucket/container level, with each service offering unique approaches to version management, deletion behaviors, and protection mechanisms. While sharing core functionalities, each service has distinctive features: Azure provides soft delete and snapshot capabilities, AWS offers MFA Delete and Object Lock, and GCS implements generation-based versioning with Object Holds. All services ensure atomic operations and support cache control, though they differ in implementation details, terminology, and additional features such as lease operations and retention policies.
Comprehensive Comparison Table
Category | Feature | AWS S3 | Azure Blob Storage | Google Cloud Storage |
---|---|---|---|---|
Conditional Operations - Write (PUT) | ||||
--- | --- | --- | --- | --- |
Version Identifier | ETag | ETag | Generation Number | |
Existence Check | IfNoneMatch='*' |
if_none_match='*' |
if_generation_match=0 |
|
Modified Check | IfUnmodifiedSince |
if_unmodified_since |
if_metageneration_match |
|
Version Match | IfMatch |
if_match |
if_generation_match |
|
Conditional Operations - Read (GET) | ||||
Modified Since | IfModifiedSince |
if_modified_since |
if_generation_not_match |
|
Not Modified | IfNoneMatch |
if_none_match |
if_generation_match |
|
Cache Control | Yes | Yes | Yes | |
Response Codes | 200, 304 | 200, 304 | 200, 304 | |
Conditional Operations - HEAD | ||||
Method Name | HeadObject | get_blob_properties | Object.reload | |
Metadata Support | Yes | Yes | Yes | |
Conditional Checks | Yes | Yes | Yes | |
Response Codes | 200, 304, 412 | 200, 304, 412 | 200, 304, 412 | |
Version Management | ||||
Basic Configuration | Enable Level | Bucket | Container | Bucket |
Enable Command | put_bucket_versioning |
enable_versioning |
versioning_enabled = True |
|
Disable Support | Suspend only | Yes | Yes | |
Operations | List Versions | list_object_versions |
list_blob_versions |
list_blobs(versions=True) |
Get Version | get_object(VersionId) |
download_blob(version_id) |
download_as_bytes(generation) |
|
Copy Version | Yes | Yes | Yes | |
Restore Version | Via Copy | Via Copy | Via Copy | |
Deletion Behavior | ||||
Version-Specific Delete | Hard Delete | Configurable | Hard Delete | |
Delete Command | delete_object(VersionId) |
delete_blob(version_id) |
delete(generation) |
|
Recovery Possible | No | Yes (if soft delete) | No | |
Retention Period | N/A | 1-365 days | N/A | |
Delete Markers | Yes | No | No | |
Protection Features | ||||
Version Protection | MFA Delete | Soft Delete | Object Holds | |
Compliance | Object Lock | Legal Hold | Bucket Locks | |
Retention Policies | Yes | Yes | Yes | |
Access Control | IAM, Bucket Policies | Azure AD, SAS | IAM, ACLs | |
Additional Features | ||||
Snapshot Support | No | Yes | No | |
Lease Operations | No | Yes | No | |
Atomicity | Yes | Yes | Yes | |
Consistency Model | Strong Read-After-Write | Strong Read-After-Write | Strong Read-After-Write | |
Cross-Region Replication | Yes | Yes | Yes | |
Lifecycle Management | Yes | Yes | Yes |
Proposed approaches
Based on the available support from cloud storage providers, there are 2 major options that we can make into use to implement multi-writer detection strategies:
- Conditional READ or HEAD
- Condition PUT
Approach 1: Writer version guard file with conditional HEAD/GET and immutable metadata files
In this approach, a writer version guard file is created per primary index shard which stores the active primary term. The acting primary shard is responsible for updating the primary term in the guard file whenever it bootstraps on account of primary promotion through the cluster state udpate. The correctness of the update is ensured by the conditional PUT request which requires the last known ETag/generation for condition evaluation. The update is successful if the condition evaluates to true. The last known ETag is retrieved during the bootstrap by reading the header attributes of the writer version guard file. If the primary shard is being bootstrapped on account of index creation, then existence check request is made while creating the object on the remote store.
Note: Will use ETag for the rest of the document but it can interchangeably mean generation as well.
A primary shard would keep the latest ETag (corresponding to the operating primary term) cached locally. During the normal write operation, the following sequence of events happen:
- Client sends a write/bulk request to the primary shard
- Primary shard creates a translog entry locally
- Primary shard uploads the translog to remote storage
- Primary shard uploads the translog metadata file to remote storage
- Primary shard sends a conditional HEAD/GET request with cached ETag to remote storage
- Primary shard waits for conditional HEAD/GET response. There are 2 scenarios:
- Success (200)
- Precondition Failed (412)
- Client receives a status code based on the conditional HEAD response from remote store
- On precondition failed response from remote store, the primary shard also initiates shard failure and intimates the cluster manager about the same.
Below is the diagram representing the same:
Writer version guard file location
The proposed writer version file is situated at the shard id level and is shared for both the translog and metadata path.
<base_path>/<index_uuid>/shard_id>/**writer_version**
Special Considerations
- Primary relocation process still needs sophisticated handling during primary handoff to ensure that the correct primary writer is writing. This is already present. But it is prone to failure scenarios in future.
- Additional handling for GC of the writer guard file
Pros
- Isolation between writer version file and metadata file
- The existing semantics around writing data, metadata, garbage cleaning, shallow snapshots v1/v2 etc remains the same
Cons
- Requires additional dedicated file per Index Shard
- Additional HEAD or GET call per write which means additional cost for writes
Approach 2 [Preferred]: Versioned & mutable metadata file with Conditional writes
In this approach, we employ conditional PUTs on metadata file which is used for writer validation of the active primary shard. The acting primary caches the ETag after each write and uses conditional PUT requests for updating the metadata file along with passing the cached ETag. Also, the metadata file is now mutable which means that the name of the metadata file is fixed. The correctness of the update is ensured by the conditional PUT request which requires the last known ETag/generation for condition evaluation. The update is successful if the condition evaluates to true. The last known ETag is retrieved during the bootstrap by reading the header attributes of the metadata file. If the primary shard is being bootstrapped on account of index creation, then existence check request is made while creating the object on the remote store. By enabling versioning, every update to the metadata file creates a new version with a unique Version ID but the same name.
A primary shard would keep the latest ETag cached locally. During the normal write operation, the following sequence of events happen:
- Client sends a write/bulk request to the primary shard
- Primary shard creates a translog entry locally
- Primary shard uploads the translog to remote storage
- Primary shard uploads the translog metadata file to remote storage using Conditional PUTs with cached ETag
- Primary shard waits for conditional PUT’s response. There are 2 scenarios:
- Success (200)
- Precondition Failed (412)
- ConditionalRequestConflict (409)
- Client receives a status code based on the conditional PUT response from remote store
- On precondition failed response from remote store, the primary shard also initiates shard failure and intimates the cluster manager about the same.
Below is the diagram representing the same:
Metadata file name change
The name for the translog and segment metadata file is now fixed and does not change with each update.
Translog:
<base_path>/<index_uuid>/shard_id>/translog/metadata/**metadata_translog
**Segments:
<base_path>/<index_uuid>/shard_id>/segments/metadata/**metadata_segments**
Pros
- Eliminates need for writer version file by reusing the existing metadata file.
- Simplifies the constructs around multi-writer detection, metadata file management, snapshot and GC.
- No additional cost to remote store and hence a cheaper approach.
- By enabling versioning, the same functionality can be achieved as present today
Cons
- Requires considerable changes in replacing list call with get versions call, get by name with get by version id. However, these changes are easy and doable.
Other benefits
This approach also paves the way for multiple other goals like:
- Reader writer separation: Since the writer does not need to rely on reader, this can be crucial step in achieving reader writer separation. This can also allow in future to scale readers and writers independently.
- Highly available zero replica shards: Due to the delegated responsibility of correctness to remote store, we can have highly available zero replica shard. We can now ensure that there is no divergent writes because of conditional operations which allow to spin up light weight writers and accept indexing in absence of no active shards.
Next steps:
- Detailed review of the preferred approach considering different angles like snapshot v2, garbage cleaning for translog and segments, recoveries, relocation, failovers.
Credits
Thanks to https://github.com/x-INFiN1TY-x for helping explore and refine the proposed approaches.
Appendix
Documentation References
Category | AWS S3 | Azure Blob Storage | Google Cloud Storage |
---|---|---|---|
Conditional Operations | |||
--- | --- | --- | --- |
Write (PUT) | PUT Object | Put Blob | Upload Objects |
Read (GET) | GET Object | Get Blob | Download Objects |
HEAD Operations | HEAD Object | Get Blob Properties | Objects: get |
Version Management | |||
Basic Operations | Versioning | Blob Versioning | Object Versioning |
Delete Operations | Deleting Versions | Soft Delete | Managing Versions |
Protection Features | |||
Access Control | IAM Overview | Azure RBAC | IAM Overview |
Protection Mechanisms | Object Lock | Legal Hold | Bucket Lock |
Additional Features | |||
Consistency | Consistency Model | Data Consistency | Consistency Model |
Replication | Replication | Object Replication | Cross-Region Replication |
Lifecycle | Lifecycle Management | Lifecycle Management | Object Lifecycle Management |
These documentation links provide detailed information about each feature and its implementation in the respective cloud storage service.
Metadata
Metadata
Assignees
Type
Projects
Status