Skip to content

feat: configurable max slot deep count #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ Independent of `CL_API_MAX_RETRIES`.
* **Required:** false
* **Default:** 1 (means that request will be executed once)
---
`CL_API_MAX_SLOT_DEEP_COUNT` - Maximum number of slots that the application uses to find a not missed consensus-layer
slot. The application will use this value to find the next (or previous) not-missed slot next to (or behind) the
specific slot. If the processed slot and all next (or previous) `CL_API_MAX_SLOT_DEEP_COUNT` slots are missed, the app
will throw an error.
* **Required:** false
* **Default:** 32
---
`FETCH_INTERVAL_SLOTS` - Count of slots in Ethereum consensus layer epoch.
* **Required:** false
* **Default:** 32
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ services:
- CL_API_GET_RESPONSE_TIMEOUT=${CL_API_GET_RESPONSE_TIMEOUT:-15000}
- CL_API_MAX_RETRIES=${CL_API_MAX_RETRIES:-1}
- CL_API_GET_BLOCK_INFO_MAX_RETRIES=${CL_API_GET_BLOCK_INFO_MAX_RETRIES:-1}
- CL_API_MAX_SLOT_DEEP_COUNT=${CL_API_MAX_SLOT_DEEP_COUNT:-32}
- FETCH_INTERVAL_SLOTS=${FETCH_INTERVAL_SLOTS:-32}
- CHAIN_SLOT_TIME_SECONDS=${CHAIN_SLOT_TIME_SECONDS:-12}
- SYNC_PARTICIPATION_DISTANCE_DOWN_FROM_CHAIN_AVG=${SYNC_PARTICIPATION_DISTANCE_DOWN_FROM_CHAIN_AVG:-0}
Expand Down
5 changes: 5 additions & 0 deletions src/common/config/env.validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ export class EnvironmentVariables {
@Transform(({ value }) => parseInt(value, 10), { toClassOnly: true })
public CL_API_GET_BLOCK_INFO_MAX_RETRIES = 1;

@IsInt()
@Min(1)
@Transform(({ value }) => parseInt(value, 10), { toClassOnly: true })
public CL_API_MAX_SLOT_DEEP_COUNT = 32;

@IsNumber()
@Min(74240) // Altair
@Transform(({ value }) => parseInt(value, 10), { toClassOnly: true })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class BlockCacheService {
this.cache.set(String(blockId), { ...existing, ...data });
}

public get(blockId: BlockCacheId): BlockCache {
public get(blockId: BlockCacheId): BlockCache | undefined {
return this.cache.get(String(blockId));
}

Expand Down
133 changes: 88 additions & 45 deletions src/common/consensus-provider/consensus-provider.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { urljoin } from 'common/functions/urljoin';
import { PrometheusService, TrackCLRequest } from 'common/prometheus';
import { EpochProcessingState } from 'storage/clickhouse';

import { BlockCache, BlockCacheService } from './block-cache';
import { BlockCacheService } from './block-cache';
import { MaxDeepError, ResponseError, errCommon, errRequest } from './errors';
import {
BlockHeaderResponse,
Expand All @@ -33,7 +33,7 @@ let ForkName: typeof import('@lodestar/params').ForkName;
interface RequestRetryOptions {
maxRetries?: number;
dataOnly?: boolean;
useFallbackOnRejected?: (last_error: any, current_error: any) => boolean;
useFallbackOnRejected?: (lastError: any, currentError: any) => boolean;
useFallbackOnResolved?: (r: any) => boolean;
}

Expand All @@ -48,7 +48,7 @@ export class ConsensusProviderService {
protected workingMode: string;
protected version = '';
protected genesisTime = 0;
protected defaultMaxSlotDeepCount = 32;
protected defaultMaxSlotDeepCount: number;
protected latestSlot = { slot: 0, fetchTime: 0 };
protected forkEpochs: ForkEpochs;

Expand All @@ -60,7 +60,7 @@ export class ConsensusProviderService {
beaconHeaders: (blockId: BlockId): string => `eth/v1/beacon/headers/${blockId}`,
attestationCommittees: (stateId: StateId, epoch: Epoch): string => `eth/v1/beacon/states/${stateId}/committees?epoch=${epoch}`,
syncCommittee: (stateId: StateId, epoch: Epoch): string => `eth/v1/beacon/states/${stateId}/sync_committees?epoch=${epoch}`,
proposerDutes: (epoch: Epoch): string => `eth/v1/validator/duties/proposer/${epoch}`,
proposerDuties: (epoch: Epoch): string => `eth/v1/validator/duties/proposer/${epoch}`,
state: (stateId: StateId): string => `eth/v2/debug/beacon/states/${stateId}`,
};

Expand All @@ -72,6 +72,7 @@ export class ConsensusProviderService {
) {
this.apiUrls = config.get('CL_API_URLS') as NonEmptyArray<string>;
this.workingMode = config.get('WORKING_MODE');
this.defaultMaxSlotDeepCount = config.get('CL_API_MAX_SLOT_DEEP_COUNT');
}

public async getVersion(): Promise<string> {
Expand Down Expand Up @@ -148,16 +149,16 @@ export class ConsensusProviderService {
},
},
).catch((e) => {
if (404 != e.$httpCode) {
if (e.$httpCode !== 404) {
this.logger.error('Unexpected status code while fetching block header');
throw e;
}
});
}

public async getBlockHeader(blockId: BlockId, ignoreCache = false): Promise<BlockHeaderResponse | void> {
const cached: BlockCache = this.cache.get(String(blockId));
if (!ignoreCache && cached && (cached.missed || cached.header)) {
const cached = this.cache.get(String(blockId));
if (!ignoreCache && cached != null && (cached.missed || cached.header)) {
this.logger.debug(`Get ${blockId} header from blocks cache`);
return cached.missed ? undefined : cached.header;
}
Expand All @@ -166,16 +167,17 @@ export class ConsensusProviderService {
async (apiURL: string) => this.apiGet(apiURL, this.endpoints.beaconHeaders(blockId)),
{
maxRetries: this.config.get('CL_API_GET_BLOCK_INFO_MAX_RETRIES'),
useFallbackOnRejected: (last_fallback_err, curr_fallback_error) => {
if (last_fallback_err && last_fallback_err.$httpCode == 404 && curr_fallback_error.$httpCode != 404) {
useFallbackOnRejected: (lastFallbackError, currFallbackError) => {
if (lastFallbackError != null && lastFallbackError.$httpCode === 404 && currFallbackError.$httpCode !== 404) {
this.logger.debug('Request error from last fallback was 404, but current is not. Will be used previous error');
throw last_fallback_err;
throw lastFallbackError;
}

return true;
},
},
).catch((e) => {
if (404 != e.$httpCode) {
if (e.$httpCode !== 404) {
this.logger.error('Unexpected status code while fetching block header');
throw e;
}
Expand All @@ -195,7 +197,11 @@ export class ConsensusProviderService {
*/
public async getBeaconBlockHeaderOrPreviousIfMissed(slot: Slot): Promise<BlockHeaderResponse> {
const header = await this.getBlockHeader(slot);
if (header) return header;

if (header) {
return header;
}

// if block is missed, try to get next not missed block header
const nextNotMissedHeader = await this.getNextNotMissedBlockHeader(slot + 1);

Expand All @@ -211,31 +217,49 @@ export class ConsensusProviderService {
}

public async getNextNotMissedBlockHeader(slot: Slot, maxDeep = this.defaultMaxSlotDeepCount): Promise<BlockHeaderResponse> {
const header = await this.getBlockHeader(slot);
if (!header) {
if (maxDeep < 1) {
throw new MaxDeepError(`Error when trying to get next not missed block header. From ${slot} to ${slot + maxDeep}`);
const initialMaxDeep = maxDeep;

const get = async (slot: Slot, maxDeep: number) => {
const header = await this.getBlockHeader(slot);

if (header == null) {
if (maxDeep < 1) {
throw new MaxDeepError(`Error when trying to get next not missed block header. From ${slot - initialMaxDeep} to ${slot}`);
}

this.logger.log(`Try to get next header from ${slot + 1} slot because ${slot} is missing`);
return await get(slot + 1, maxDeep - 1);
}
this.logger.log(`Try to get next header from ${slot + 1} slot because ${slot} is missing`);
return await this.getNextNotMissedBlockHeader(slot + 1, maxDeep - 1);
}
return header;

return header;
};

return get(slot, maxDeep);
}

public async getPreviousNotMissedBlockHeader(
slot: Slot,
maxDeep = this.defaultMaxSlotDeepCount,
ignoreCache = false,
): Promise<BlockHeaderResponse> {
const header = await this.getBlockHeader(slot, ignoreCache);
if (!header) {
if (maxDeep < 1) {
throw new MaxDeepError(`Error when trying to get previous not missed block header. From ${slot} to ${slot - maxDeep}`);
const initialMaxDeep = maxDeep;

const get = async (slot: Slot, maxDeep: number, ignoreCache: boolean) => {
const header = await this.getBlockHeader(slot, ignoreCache);

if (header == null) {
if (maxDeep < 1) {
throw new MaxDeepError(`Error when trying to get previous not missed block header. From ${slot} to ${slot - initialMaxDeep}`);
}

this.logger.log(`Try to get previous header from ${slot - 1} slot because ${slot} is missing`);
return await get(slot - 1, maxDeep - 1, ignoreCache);
}
this.logger.log(`Try to get previous info from ${slot - 1} slot because ${slot} is missing`);
return await this.getPreviousNotMissedBlockHeader(slot - 1, maxDeep - 1);
}
return header;

return header;
};

return get(slot, maxDeep, ignoreCache);
}

/**
Expand All @@ -262,8 +286,8 @@ export class ConsensusProviderService {
}

public async getBlockInfo(blockId: BlockId): Promise<BlockInfoResponse | void> {
const cached: BlockCache = this.cache.get(String(blockId));
if (cached && (cached.missed || cached.info)) {
const cached = this.cache.get(String(blockId));
if (cached != null && (cached.missed || cached.info)) {
this.logger.debug(`Get ${blockId} info from blocks cache`);
return cached.missed ? undefined : cached.info;
}
Expand All @@ -277,18 +301,20 @@ export class ConsensusProviderService {
this.logger.error(`getBlockInfo: slot [${r.data.message.slot}] is not finalized`);
return true;
}

return false;
},
useFallbackOnRejected: (last_fallback_err, curr_fallback_error) => {
if (last_fallback_err && last_fallback_err.$httpCode == 404 && curr_fallback_error.$httpCode != 404) {
useFallbackOnRejected: (lastFallbackError, currFallbackError) => {
if (lastFallbackError != null && lastFallbackError.$httpCode === 404 && currFallbackError.$httpCode !== 404) {
this.logger.debug('Request error from last fallback was 404, but current is not. Will be used previous error');
throw last_fallback_err;
throw lastFallbackError;
}

return true;
},
},
).catch((e) => {
if (404 != e.$httpCode) {
if (e.$httpCode !== 404) {
this.logger.error('Unexpected status code while fetching block info');
throw e;
}
Expand Down Expand Up @@ -316,6 +342,7 @@ export class ConsensusProviderService {
this.logger.error(`getSyncCommitteeInfo: state ${stateId} for epoch ${epoch} is not finalized`);
return true;
}

return false;
},
});
Expand All @@ -327,7 +354,7 @@ export class ConsensusProviderService {
const dependentRoot = await this.getDutyDependentRoot(epoch, ignoreCache);
this.logger.log(`Proposer Duty root: ${dependentRoot}`);
const res = <{ dependent_root: string; data: ProposerDutyInfo[] }>await this.retryRequest(
async (apiURL: string) => this.apiGet(apiURL, this.endpoints.proposerDutes(epoch)),
async (apiURL: string) => this.apiGet(apiURL, this.endpoints.proposerDuties(epoch)),
{ dataOnly: false },
).catch((e) => {
this.logger.error('Unexpected status code while fetching proposer duties info');
Expand Down Expand Up @@ -361,7 +388,10 @@ export class ConsensusProviderService {
let res;
let err;
for (let i = 0; i < this.apiUrls.length; i++) {
if (res) break;
if (res != null) {
break;
}

res = await callback(this.apiUrls[i])
.catch(rejectDelay(this.config.get('CL_API_RETRY_DELAY_MS')))
.catch(() => retry(() => callback(this.apiUrls[i])))
Expand All @@ -370,26 +400,33 @@ export class ConsensusProviderService {
err = Error('Unresolved data on a successful CL API response');
return undefined;
}

return r;
})
.catch((current_error: any) => {
if (options.useFallbackOnRejected(err, current_error)) {
err = current_error;
.catch((currentError: any) => {
if (options.useFallbackOnRejected(err, currentError)) {
err = currentError;
return undefined;
}
throw current_error;

throw currentError;
});
if (i == this.apiUrls.length - 1 && !res) {

if (i === this.apiUrls.length - 1 && res == null) {
err.message = `Error while doing CL API request on all passed URLs. ${err.message}`;
throw err;
}
if (!res) {
this.logger.warn(`${err.message}. Error while doing CL API request. Will try to switch to another API URL`);

if (res == null) {
this.logger.warn(`Error while doing CL API request. Will try to switch to another API URL. ${err.message}`);
}
}

if (options.dataOnly) return res.data;
else return res;
if (options.dataOnly) {
return res.data;
}

return res;
}

@TrackCLRequest
Expand All @@ -401,12 +438,15 @@ export class ConsensusProviderService {
if (e.response) {
throw new ResponseError(errRequest(e.response.body, subUrl, apiURL), e.response.statusCode);
}

throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});

if (statusCode !== 200) {
const errorText = await body.text();
throw new ResponseError(errRequest(errorText, subUrl, apiURL), statusCode);
}

return (await body.json()) as T;
}

Expand All @@ -424,12 +464,15 @@ export class ConsensusProviderService {
if (e.response) {
throw new ResponseError(errRequest(e.response.body, subUrl, apiURL), e.response.statusCode);
}

throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});

if (statusCode !== 200) {
const errorText = await body.text();
throw new ResponseError(errRequest(errorText, subUrl, apiURL), statusCode);
}

return { body, headers };
}
}
Loading