@@ -168,7 +168,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
168
168
private final BrokerTopicStats brokerTopicStats ;
169
169
private final Metrics metrics ;
170
170
171
- private final RemoteStorageManager remoteLogStorageManager ;
171
+ private final RemoteStorageManager remoteStorageManager ;
172
172
173
173
private final RemoteLogMetadataManager remoteLogMetadataManager ;
174
174
@@ -238,7 +238,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
238
238
this .brokerTopicStats = brokerTopicStats ;
239
239
this .metrics = metrics ;
240
240
241
- remoteLogStorageManager = createRemoteStorageManager ();
241
+ remoteStorageManager = createRemoteStorageManager ();
242
242
remoteLogMetadataManager = createRemoteLogMetadataManager ();
243
243
rlmCopyQuotaManager = createRLMCopyQuotaManager ();
244
244
rlmFetchQuotaManager = createRLMFetchQuotaManager ();
@@ -248,7 +248,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
248
248
copyThrottleTimeSensor = new RLMQuotaMetrics (metrics , "remote-copy-throttle-time" , RemoteLogManager .class .getSimpleName (),
249
249
"The %s time in millis remote copies was throttled by a broker" , INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS ).sensor ();
250
250
251
- indexCache = new RemoteIndexCache (rlmConfig .remoteLogIndexFileCacheTotalSizeBytes (), remoteLogStorageManager , logDir );
251
+ indexCache = new RemoteIndexCache (rlmConfig .remoteLogIndexFileCacheTotalSizeBytes (), remoteStorageManager , logDir );
252
252
delayInMs = rlmConfig .remoteLogManagerTaskIntervalMs ();
253
253
rlmCopyThreadPool = new RLMScheduledThreadPool (rlmConfig .remoteLogManagerCopierThreadPoolSize (),
254
254
"RLMCopyThreadPool" , "kafka-rlm-copy-thread-pool-%d" );
@@ -374,7 +374,7 @@ RemoteStorageManager createRemoteStorageManager() {
374
374
private void configureRSM () {
375
375
final Map <String , Object > rsmProps = new HashMap <>(rlmConfig .remoteStorageManagerProps ());
376
376
rsmProps .put (ServerConfigs .BROKER_ID_CONFIG , brokerId );
377
- remoteLogStorageManager .configure (rsmProps );
377
+ remoteStorageManager .configure (rsmProps );
378
378
}
379
379
380
380
RemoteLogMetadataManager createRemoteLogMetadataManager () {
@@ -423,7 +423,7 @@ private boolean isRemoteLogManagerConfigured() {
423
423
}
424
424
425
425
public RemoteStorageManager storageManager () {
426
- return remoteLogStorageManager ;
426
+ return remoteStorageManager ;
427
427
}
428
428
429
429
private Stream <Partition > filterPartitions (Set <Partition > partitions ) {
@@ -575,7 +575,7 @@ private void deleteRemoteLogPartition(TopicIdPartition partition) throws RemoteS
575
575
Collection <Uuid > deletedSegmentIds = new ArrayList <>();
576
576
for (RemoteLogSegmentMetadata metadata : metadataList ) {
577
577
deletedSegmentIds .add (metadata .remoteLogSegmentId ().id ());
578
- remoteLogStorageManager .deleteLogSegmentData (metadata );
578
+ remoteStorageManager .deleteLogSegmentData (metadata );
579
579
}
580
580
indexCache .removeAll (deletedSegmentIds );
581
581
@@ -632,7 +632,7 @@ Optional<FileRecords.TimestampAndOffset> lookupTimestamp(RemoteLogSegmentMetadat
632
632
InputStream remoteSegInputStream = null ;
633
633
try {
634
634
// Search forward for the position of the last offset that is greater than or equal to the startingOffset
635
- remoteSegInputStream = remoteLogStorageManager .fetchLogSegment (rlsMetadata , startPos );
635
+ remoteSegInputStream = remoteStorageManager .fetchLogSegment (rlsMetadata , startPos );
636
636
RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream (remoteSegInputStream );
637
637
638
638
while (true ) {
@@ -1027,7 +1027,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
1027
1027
Optional <CustomMetadata > customMetadata ;
1028
1028
1029
1029
try {
1030
- customMetadata = remoteLogStorageManager .copyLogSegmentData (copySegmentStartedRlsm , segmentData );
1030
+ customMetadata = remoteStorageManager .copyLogSegmentData (copySegmentStartedRlsm , segmentData );
1031
1031
} catch (RemoteStorageException e ) {
1032
1032
logger .info ("Copy failed, cleaning segment {}" , copySegmentStartedRlsm .remoteLogSegmentId ());
1033
1033
try {
@@ -1492,7 +1492,7 @@ private boolean deleteRemoteLogSegment(
1492
1492
1493
1493
// Delete the segment in remote storage.
1494
1494
try {
1495
- remoteLogStorageManager .deleteLogSegmentData (segmentMetadata );
1495
+ remoteStorageManager .deleteLogSegmentData (segmentMetadata );
1496
1496
} catch (RemoteStorageException e ) {
1497
1497
brokerTopicStats .topicStats (topic ).failedRemoteDeleteRequestRate ().mark ();
1498
1498
brokerTopicStats .allTopicsStats ().failedRemoteDeleteRequestRate ().mark ();
@@ -1696,7 +1696,7 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws
1696
1696
remoteLogSegmentMetadata = rlsMetadataOptional .get ();
1697
1697
// Search forward for the position of the last offset that is greater than or equal to the target offset
1698
1698
startPos = lookupPositionForOffset (remoteLogSegmentMetadata , offset );
1699
- remoteSegInputStream = remoteLogStorageManager .fetchLogSegment (remoteLogSegmentMetadata , startPos );
1699
+ remoteSegInputStream = remoteStorageManager .fetchLogSegment (remoteLogSegmentMetadata , startPos );
1700
1700
RemoteLogInputStream remoteLogInputStream = getRemoteLogInputStream (remoteSegInputStream );
1701
1701
enrichedRecordBatch = findFirstBatch (remoteLogInputStream , offset );
1702
1702
if (enrichedRecordBatch .batch == null ) {
@@ -2045,7 +2045,7 @@ public void close() {
2045
2045
leaderCopyRLMTasks .values ().forEach (RLMTaskWithFuture ::cancel );
2046
2046
leaderExpirationRLMTasks .values ().forEach (RLMTaskWithFuture ::cancel );
2047
2047
followerRLMTasks .values ().forEach (RLMTaskWithFuture ::cancel );
2048
- Utils .closeQuietly (remoteLogStorageManager , "RemoteLogStorageManager " );
2048
+ Utils .closeQuietly (remoteStorageManager , "RemoteStorageManager " );
2049
2049
Utils .closeQuietly (remoteLogMetadataManager , "RemoteLogMetadataManager" );
2050
2050
Utils .closeQuietly (indexCache , "RemoteIndexCache" );
2051
2051
0 commit comments