Skip to content

Commit 22ba550

Browse files
sandeepnRESpetermetz
authored andcommitted
feat(relay): configurable db_open retry mechanism added and in driver
To manage concurrent accesses to a LevelDB instance so that protocols don't fail because of temporary resource contention, added two env variables in driver and config variables in relay to configure retry mechanism while opening database: - Max number of retries - Backoff time in milli seconds. Additionally upgraded go versions to v1.20 in publish workflows. Signed-off-by: Sandeep Nishad <[email protected]>
1 parent 89d5102 commit 22ba550

35 files changed

+206
-42
lines changed

weaver/.github/workflows/deploy_go-pkgs.yml

+5-5
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ jobs:
7575
- name: Set up Go
7676
uses: actions/setup-go@v2
7777
with:
78-
go-version: 1.16
78+
go-version: '1.20.2'
7979

8080
- name: Set current date as env
8181
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
@@ -129,7 +129,7 @@ jobs:
129129
- name: Set up Go
130130
uses: actions/setup-go@v2
131131
with:
132-
go-version: 1.16
132+
go-version: '1.20.2'
133133

134134
- name: Set current date as env
135135
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
@@ -183,7 +183,7 @@ jobs:
183183
- name: Set up Go
184184
uses: actions/setup-go@v2
185185
with:
186-
go-version: 1.16
186+
go-version: '1.20.2'
187187

188188
- name: Set current date as env
189189
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
@@ -237,7 +237,7 @@ jobs:
237237
- name: Set up Go
238238
uses: actions/setup-go@v2
239239
with:
240-
go-version: 1.16
240+
go-version: '1.20.2'
241241

242242
- name: Set current date as env
243243
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
@@ -291,7 +291,7 @@ jobs:
291291
- name: Set up Go
292292
uses: actions/setup-go@v2
293293
with:
294-
go-version: 1.16
294+
go-version: '1.20.2'
295295

296296
- name: Set current date as env
297297
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV

weaver/core/drivers/fabric-driver/.env.docker.template

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ INTEROP_CHAINCODE=<interop-chaincode-name>
1212
DB_PATH=driverdbs
1313
WALLET_PATH=
1414
TLS_CREDENTIALS_DIR=<dir-with-tls-cert-and-key>
15+
LEVELDB_LOCKED_MAX_RETRIES=<max-attempts-in-retry>
16+
LEVELDB_LOCKED_RETRY_BACKOFF_MSEC=<retry-back-off-time-in-ms>
1517
DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-fabric-driver
1618
DOCKER_TAG=1.4.0
1719
EXTERNAL_NETWORK=<docker-bridge-network>

weaver/core/drivers/fabric-driver/.env.template

+2
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ MOCK=false
1313
DB_PATH=driverdbs
1414
WALLET_PATH=
1515
DEBUG=true
16+
LEVELDB_LOCKED_MAX_RETRIES=
17+
LEVELDB_LOCKED_RETRY_BACKOFF_MSEC=

weaver/core/drivers/fabric-driver/docker-compose.yml

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ services:
2424
- DRIVER_TLS_KEY_PATH=${DRIVER_TLS_KEY_PATH}
2525
- WALLET_PATH=${WALLET_PATH}
2626
- DEBUG=false
27+
- LEVELDB_LOCKED_MAX_RETRIES=${LEVELDB_LOCKED_MAX_RETRIES}
28+
- LEVELDB_LOCKED_RETRY_BACKOFF_MSEC=${LEVELDB_LOCKED_RETRY_BACKOFF_MSEC}
2729
volumes:
2830
- ${CONNECTION_PROFILE}:/fabric-driver/ccp.json
2931
- ${DRIVER_CONFIG}:/fabric-driver/config.json

weaver/core/drivers/fabric-driver/server/dbConnector.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class LevelDBConnector implements DBConnector {
4444
DB_TYPE: string = "Level";
4545
DB_NAME: string;
4646
dbHandle: any;
47+
dbOpenMaxRetries: number;
48+
dbRetryBackoffTime: number;
4749

4850
constructor(
4951
dbName: string
@@ -53,15 +55,20 @@ class LevelDBConnector implements DBConnector {
5355
}
5456
this.DB_NAME = dbName;
5557
this.dbHandle = new Level(path.join(process.env.DB_PATH ? process.env.DB_PATH : "./driverdbs", dbName), { valueEncoding: 'json' });
58+
// Retry max attempts, default 250, making it 5 seconds for retries
59+
this.dbOpenMaxRetries = process.env.LEVELDB_LOCKED_MAX_RETRIES ? parseInt(process.env.LEVELDB_LOCKED_MAX_RETRIES) : 250;
60+
// Retry back off time in ms, default 20ms
61+
this.dbRetryBackoffTime = process.env.LEVELDB_LOCKED_RETRY_BACKOFF_MSEC ? parseInt(process.env.LEVELDB_LOCKED_RETRY_BACKOFF_MSEC) : 20;
5662
}
5763

5864
async open(
5965
i: number = 0
6066
): Promise<boolean> {
67+
logger.debug(`attempt #${i} to open database ${this.DB_NAME}`)
6168
try {
6269
await this.dbHandle.open();
6370
} catch (error: any) {
64-
if (i>=10) {
71+
if (i >= this.dbOpenMaxRetries) {
6572
logger.error(`failed to open database connection with error: ${error.toString()}`);
6673
if (error.code == 'LEVEL_DATABASE_NOT_OPEN' && error.cause && error.cause.code == 'LEVEL_LOCKED') {
6774
throw new DBLockedError(error.toString());
@@ -70,7 +77,8 @@ class LevelDBConnector implements DBConnector {
7077
}
7178
}
7279
else {
73-
await delay(1000);
80+
logger.debug(`failed to open database connection with error: ${error.toString()}`);
81+
await delay(this.dbRetryBackoffTime);
7482
await this.open(i+1);
7583
}
7684
}

weaver/core/drivers/fabric-driver/server/events.ts

+14-11
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ async function addEventSubscription(
164164
var subscriptionsSerialized: string = await db.read(key) as string;
165165
subscriptions = JSON.parse(subscriptionsSerialized);
166166

167-
logger.debug(`subscriptions.length: ${subscriptions.length}`);
167+
logger.debug(`existing subscriptions.length: ${subscriptions.length}`);
168168
// check if the event to be subscribed is already present in the DB
169169
for (const subscriptionSerialized of subscriptions) {
170170
var subscription: queryPb.Query = queryPb.Query.deserializeBinary(Buffer.from(subscriptionSerialized, 'base64'));
@@ -200,14 +200,14 @@ async function addEventSubscription(
200200
}
201201
}
202202

203-
logger.debug(`subscriptions.length: ${subscriptions.length}`);
203+
logger.debug(`new subscriptions.length: ${subscriptions.length}`);
204204
subscriptionsSerialized = JSON.stringify(subscriptions);
205205
// insert the value against key in the DB (it can be the scenario of a new key addition, or update to the value of an existing key)
206206
await db.insert(key, subscriptionsSerialized);
207207
await db.close();
208208

209209
// TODO: register the event with fabric sdk
210-
logger.info(`end addEventSubscription() .. requestId: ${query.getRequestId()}`);
210+
logger.debug(`end addEventSubscription() .. requestId: ${query.getRequestId()}`);
211211
return query.getRequestId();
212212

213213
} catch(error: any) {
@@ -269,7 +269,7 @@ const deleteEventSubscription = async (
269269
}
270270

271271
await db.close();
272-
logger.info(`end deleteEventSubscription() .. retVal: ${JSON.stringify(retVal.toObject())}`);
272+
logger.debug(`end deleteEventSubscription() .. retVal: ${JSON.stringify(retVal.toObject())}`);
273273
return retVal;
274274
} catch(error: any) {
275275
logger.error(`Error during delete: ${error.toString()}`);
@@ -280,10 +280,11 @@ const deleteEventSubscription = async (
280280

281281
function filterEventMatcher(keySerialized: string, eventMatcher: eventsPb.EventMatcher) : boolean {
282282
var item: eventsPb.EventMatcher = eventsPb.EventMatcher.deserializeBinary(Buffer.from(keySerialized, 'base64'));
283+
logger.debug(`eventMatcher from db: ${JSON.stringify(item.toObject())}`)
283284
if ((eventMatcher.getEventClassId() == '*' || eventMatcher.getEventClassId() == item.getEventClassId()) &&
284285
(eventMatcher.getTransactionContractId() == '*' || eventMatcher.getTransactionContractId() == item.getTransactionContractId()) &&
285286
(eventMatcher.getTransactionLedgerId() == '*' || eventMatcher.getTransactionLedgerId() == item.getTransactionLedgerId()) &&
286-
(eventMatcher.getTransactionFunc() == '*' || eventMatcher.getTransactionFunc() == item.getTransactionFunc().toLowerCase())) {
287+
(eventMatcher.getTransactionFunc() == '*' || eventMatcher.getTransactionFunc().toLowerCase() == item.getTransactionFunc().toLowerCase())) {
287288

288289
return true;
289290
} else {
@@ -294,7 +295,7 @@ function filterEventMatcher(keySerialized: string, eventMatcher: eventsPb.EventM
294295
async function lookupEventSubscriptions(
295296
eventMatcher: eventsPb.EventMatcher
296297
): Promise<Array<queryPb.Query>> {
297-
logger.info(`finding the subscriptions with eventMatcher: ${JSON.stringify(eventMatcher.toObject())}`);
298+
logger.debug(`finding the subscriptions with eventMatcher: ${JSON.stringify(eventMatcher.toObject())}`);
298299
var subscriptions: Array<string>;
299300
var returnSubscriptions: Array<queryPb.Query> = new Array<queryPb.Query>();
300301
let db: DBConnector;
@@ -313,8 +314,8 @@ async function lookupEventSubscriptions(
313314
}
314315
}
315316

316-
logger.debug(`returnSubscriptions.length: ${returnSubscriptions.length}`);
317-
logger.info(`end lookupEventSubscriptions()`);
317+
logger.info(`found ${returnSubscriptions.length} matching subscriptions`);
318+
logger.debug(`end lookupEventSubscriptions()`);
318319
await db.close();
319320
return returnSubscriptions;
320321

@@ -324,7 +325,7 @@ async function lookupEventSubscriptions(
324325
if (error instanceof DBKeyNotFoundError) {
325326
// case of read failing due to key not found
326327
returnSubscriptions = new Array<queryPb.Query>();
327-
logger.debug(`returnSubscriptions.length: ${returnSubscriptions.length}`);
328+
logger.info(`found ${returnSubscriptions.length} matching subscriptions`);
328329
return returnSubscriptions;
329330
} else {
330331
// case of read failing due to some other issue
@@ -347,6 +348,7 @@ async function readAllEventMatchers(): Promise<Array<eventsPb.EventMatcher>> {
347348
const eventMatcher = eventsPb.EventMatcher.deserializeBinary(Uint8Array.from(Buffer.from(key, 'base64')))
348349
returnMatchers.push(eventMatcher)
349350
}
351+
logger.info(`found ${returnMatchers.length} eventMatchers`);
350352
logger.debug(`end readAllEventMatchers()`);
351353
await db.close();
352354
return returnMatchers;
@@ -434,7 +436,7 @@ async function writeExternalStateHelper(
434436
ccArgs: ccArgsStr,
435437
contractName: ctx.getContractId()
436438
}
437-
logger.info(`invokeObject.ccArgs: ${invokeObject.ccArgs}`)
439+
logger.info(`writing external state to contract: ${ctx.getContractId()} with function: ${ctx.getFunc()}, and args: ${invokeObject.ccArgs} on channel: ${ctx.getLedgerId()}`);
438440

439441
const [ response, responseError ] = await handlePromise(InteroperableHelper.submitTransactionWithRemoteViews(
440442
interopContract,
@@ -450,7 +452,8 @@ async function writeExternalStateHelper(
450452
gateway.disconnect();
451453
throw responseError;
452454
}
453-
gateway.disconnect();
455+
logger.debug(`write successful`);
456+
gateway.disconnect();
454457
} else {
455458
const errorString: string = `erroneous viewPayload identified in WriteExternalState processing`;
456459
logger.error(`error viewPayload.getError(): ${JSON.stringify(viewPayload.getError())}`);

weaver/core/drivers/fabric-driver/server/listener.ts

+8-6
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ const initBlockEventListenerForChannel = async (
5252
// const responsePayload = transaction.payload.action.proposal_response_payload.extension.events.payload;
5353
// Get transaction function name: first argument according to convention
5454
const chaincodeFunc = transaction.payload.chaincode_proposal_payload.input.chaincode_spec.input.args[0].toString();
55-
logger.info(`Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}`);
55+
logger.info(`Event Listener: Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}`);
5656
// Find all matching event subscriptions stored in the database
5757
let eventMatcher = new events_pb.EventMatcher();
5858
eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE);
@@ -68,14 +68,15 @@ const initBlockEventListenerForChannel = async (
6868
} catch(error) {
6969
let errorString: string = error.toString();
7070
if (!(error instanceof DBLockedError)) { // Check if contention was preventing DB access
71+
console.error(`Event Listener: ${error}`)
7172
throw(error);
7273
}
7374
await new Promise(f => setTimeout(f, 2000)); // Sleep 2 seconds
7475
}
7576
}
7677
// Iterate through the view requests in the matching event subscriptions
7778
eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => {
78-
logger.info(`Generating view and collecting proof for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}, responsePayload: ${responsePayload.toString()}`);
79+
logger.info(`Event Listener: Generating view and collecting proof for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}, responsePayload: ${responsePayload.toString()}`);
7980
// Trigger proof collection
8081
const [result, invokeError] = await handlePromise(
8182
invoke(
@@ -90,7 +91,7 @@ const initBlockEventListenerForChannel = async (
9091
const client = getRelayClientForEventPublish();
9192
const viewPayload = packageFabricView(eventSubscriptionQuery, result);
9293

93-
logger.info('Sending block event');
94+
logger.info('Event Listener: Sending block event');
9495
// Sending the Fabric event to the relay.
9596
client.sendDriverState(viewPayload, relayCallback);
9697
}
@@ -115,7 +116,7 @@ const initContractEventListener = (
115116
chaincodeId: string,
116117
): any => {
117118
const listener: ContractListener = async (event: ContractEvent) => {
118-
logger.info(`Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, event class: ${event.eventName}`);
119+
logger.info(`Event Listener: Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, event class: ${event.eventName}`);
119120
// Find all matching event subscriptions stored in the database
120121
let eventMatcher = new events_pb.EventMatcher();
121122
eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE);
@@ -131,14 +132,15 @@ const initContractEventListener = (
131132
} catch(error) {
132133
let errorString: string = error.toString();
133134
if (!(error instanceof DBLockedError)) { // Check if contention was preventing DB access
135+
logger.error(`Event Listener: ${error}`);
134136
throw(error);
135137
}
136138
await new Promise(f => setTimeout(f, 2000)); // Sleep 2 seconds
137139
}
138140
}
139141
// Iterate through the view requests in the matching event subscriptions
140142
eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => {
141-
logger.info(`Generating view and collecting proof for event class: ${event.eventName}, channel: ${channelId}, chaincode: ${chaincodeId}, event.payload: ${event.payload.toString()}`);
143+
logger.info(`Event Listener: Generating view and collecting proof for event class: ${event.eventName}, channel: ${channelId}, chaincode: ${chaincodeId}, event.payload: ${event.payload.toString()}`);
142144
// Trigger proof collection
143145
const [result, invokeError] = await handlePromise(
144146
invoke(
@@ -153,7 +155,7 @@ const initContractEventListener = (
153155
const client = getRelayClientForEventPublish();
154156
const viewPayload = packageFabricView(eventSubscriptionQuery, result);
155157

156-
logger.info('Sending contract event');
158+
logger.info('Event Listener: Sending contract event');
157159
// Sending the Fabric event to the relay.
158160
client.sendDriverState(viewPayload, relayCallback);
159161
}

weaver/core/network/fabric-interop-cc/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.16 AS build
1+
FROM golang:1.20 AS build
22

33
COPY . /fabric-interop-cc
44
WORKDIR /fabric-interop-cc

weaver/core/relay/.env.template.2

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ DRIVER_PORT=<driver-server-port>
66
DRIVER_NAME=<driver-name>
77
RELAY_NAME=<docker-relay-name>
88
RELAY_PORT=<relay-grpc-server-port>
9+
DB_OPEN_MAX_RETRIES=<max-retries-for-opening-db-when-locked>
10+
DB_OPEN_RETRY_BACKOFF_MSEC=<retry-backoff-time-for-opening-db-when-locked>
911
DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server
1012
DOCKER_TAG=1.4.2
1113
EXTERNAL_NETWORK=<docker-bridge-network>

weaver/core/relay/config/Corda_Relay.toml

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ hostname="localhost"
55
db_path="db/Corda_Relay/requests"
66
# This will be replaced by the task queue.
77
remote_db_path="db/Corda_Relay/remote_request"
8+
# max retries opening sled db if it is locked
9+
db_open_max_retries=500
10+
# retry back off time in ms if sled db is locked
11+
db_open_retry_backoff_msec=10
812

913
# FOR TLS
1014
cert_path="credentials/fabric_cert.pem"

weaver/core/relay/config/Corda_Relay2.toml

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ hostname="localhost"
55
db_path="db/Corda_Relay2/requests"
66
# This will be replaced by the task queue.
77
remote_db_path="db/Corda_Relay2/remote_request"
8+
# max retries opening sled db if it is locked
9+
db_open_max_retries=500
10+
# retry back off time in ms if sled db is locked
11+
db_open_retry_backoff_msec=10
812

913
# FOR TLS
1014
cert_path="credentials/fabric_cert.pem"

weaver/core/relay/config/Dummy_Relay.toml

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ hostname="localhost"
44
db_path="db/Dummy_Relay/requests"
55
# This will be replaced by the task queue.
66
remote_db_path="db/Dummy_Relay/remote_request"
7+
# max retries opening sled db if it is locked
8+
db_open_max_retries=500
9+
# retry back off time in ms if sled db is locked
10+
db_open_retry_backoff_msec=10
711

812
# FOR TLS
913
cert_path="credentials/fabric_cert.pem"

weaver/core/relay/config/Dummy_Relay_tls.toml

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ hostname="localhost"
44
db_path="db/Dummy_Relay_tls/requests"
55
# This will be replaced by the task queue.
66
remote_db_path="db/Dummy_Relay_tls/remote_request"
7+
# max retries opening sled db if it is locked
8+
db_open_max_retries=500
9+
# retry back off time in ms if sled db is locked
10+
db_open_retry_backoff_msec=10
711

812
# FOR TLS
913
cert_path="credentials/fabric_cert.pem"

weaver/core/relay/config/Fabric_Relay.toml

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ hostname="localhost"
44
db_path="db/Fabric_Relay/requests"
55
# This will be replaced by the task queue.
66
remote_db_path="db/Fabric_Relay/remote_request"
7+
# max retries opening sled db if it is locked
8+
db_open_max_retries=500
9+
# retry back off time in ms if sled db is locked
10+
db_open_retry_backoff_msec=10
711

812

913
# FOR TLS

weaver/core/relay/config/Fabric_Relay2.toml

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ hostname="localhost"
44
db_path="db/Fabric_Relay2/requests"
55
# This will be replaced by the task queue.
66
remote_db_path="db/Fabric_Relay2/remote_request"
7+
# max retries opening sled db if it is locked
8+
db_open_max_retries=500
9+
# retry back off time in ms if sled db is locked
10+
db_open_retry_backoff_msec=10
711

812

913
# FOR TLS

weaver/core/relay/config/Settings.toml

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ hostname="localhost"
44
db_path="db/requests"
55
# This will be replaced by the task queue.
66
remote_db_path="db/remote_requests"
7+
# max retries opening sled db if it is locked
8+
db_open_max_retries=500
9+
# retry back off time in ms if sled db is locked
10+
db_open_retry_backoff_msec=10
711

812
[networks]
913
[networks.Fabric_Network]

weaver/core/relay/docker-compose.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ services:
9393
# working ok, or if you want to add additional assets such as
9494
# custom configurations and additional remote relay definitions
9595
#
96+
# - DB_OPEN_MAX_RETRIES=${DB_OPEN_MAX_RETRIES}
97+
# max retries opening sled db if it is locked
98+
# - DB_OPEN_RETRY_BACKOFF_MSEC=${DB_OPEN_RETRY_BACKOFF_MSEC}
99+
# retries back off time for opening sled db if it is locked
100+
#
96101
volumes:
97102
#
98103
# Uncomment these two files if you want to mount your specialised

weaver/core/relay/docker/server.template.toml

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ port="${RELAY_PORT}"
99
hostname="0.0.0.0"
1010
db_path="db/${RELAY_NAME}/requests"
1111
remote_db_path="db/${RELAY_NAME}/remote_request"
12+
db_open_max_retries="${DB_OPEN_MAX_RETRIES}"
13+
db_open_retry_backoff_msec="${DB_OPEN_RETRY_BACKOFF_MSEC}"
1214
cert_path="${RELAY_TLS_CERT_PATH}"
1315
key_path="${RELAY_TLS_KEY_PATH}"
1416
tls="${RELAY_TLS}"

0 commit comments

Comments
 (0)