Skip to content

Commit 7a52666

Browse files
committed
Remove totally static/dynamic distinction.
1 parent 2d92c53 commit 7a52666

File tree

2 files changed

+86
-276
lines changed

2 files changed

+86
-276
lines changed

beacon-chain/sync/subscriber.go

+84-172
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func sliceFromCount(count uint64) []uint64 {
6464
}
6565

6666
func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64 {
67+
if flags.Get().SubscribeToAllSubnets {
68+
return sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)
69+
}
70+
6771
// Get the current epoch.
6872
currentEpoch := slots.ToEpoch(currentSlot)
6973

@@ -105,27 +109,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
105109
s.attesterSlashingSubscriber,
106110
digest,
107111
)
108-
if flags.Get().SubscribeToAllSubnets {
109-
s.subscribeStaticWithSubnets(
110-
"Attestation",
111-
p2p.AttestationSubnetTopicFormat,
112-
s.validateCommitteeIndexBeaconAttestation, /* validator */
113-
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
114-
digest,
115-
sliceFromCount(params.BeaconConfig().AttestationSubnetCount),
116-
)
117-
} else {
118-
s.subscribeDynamicWithSubnets(
119-
"Attestation",
120-
p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})],
121-
p2p.AttestationSubnetTopicFormat,
122-
s.validateCommitteeIndexBeaconAttestation, /* validator */
123-
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
124-
digest,
125-
s.persistentAndAggregatorSubnetIndices,
126-
s.attesterSubnetIndices,
127-
)
128-
}
112+
s.subscribeSubnets(
113+
p2p.AttestationSubnetTopicFormat,
114+
s.validateCommitteeIndexBeaconAttestation, /* validator */
115+
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
116+
digest,
117+
s.persistentAndAggregatorSubnetIndices,
118+
s.attesterSubnetIndices,
119+
)
129120
// Altair Fork Version
130121
if epoch >= params.BeaconConfig().AltairForkEpoch {
131122
s.subscribe(
@@ -134,28 +125,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
134125
s.syncContributionAndProofSubscriber,
135126
digest,
136127
)
137-
138-
if flags.Get().SubscribeToAllSubnets {
139-
s.subscribeStaticWithSubnets(
140-
"Sync committee",
141-
p2p.SyncCommitteeSubnetTopicFormat,
142-
s.validateSyncCommitteeMessage, /* validator */
143-
s.syncCommitteeMessageSubscriber, /* message handler */
144-
digest,
145-
sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount),
146-
)
147-
} else {
148-
s.subscribeDynamicWithSubnets(
149-
"Sync committee",
150-
p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})],
151-
p2p.SyncCommitteeSubnetTopicFormat,
152-
s.validateSyncCommitteeMessage, /* validator */
153-
s.syncCommitteeMessageSubscriber, /* message handler */
154-
digest,
155-
s.activeSyncSubnetIndices,
156-
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
157-
)
158-
}
128+
s.subscribeSubnets(
129+
p2p.SyncCommitteeSubnetTopicFormat,
130+
s.validateSyncCommitteeMessage, /* validator */
131+
s.syncCommitteeMessageSubscriber, /* message handler */
132+
digest,
133+
s.activeSyncSubnetIndices,
134+
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
135+
)
159136
}
160137

161138
// New Gossip Topic in Capella
@@ -170,13 +147,13 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
170147

171148
// New Gossip Topic in Deneb
172149
if epoch >= params.BeaconConfig().DenebForkEpoch {
173-
s.subscribeStaticWithSubnets(
174-
"Blob",
150+
s.subscribeSubnets(
175151
p2p.BlobSubnetTopicFormat,
176152
s.validateBlob, /* validator */
177153
s.blobSubscriber, /* message handler */
178154
digest,
179-
sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount),
155+
func(primitives.Slot) []uint64 { return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount) },
156+
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
180157
)
181158
}
182159
}
@@ -357,99 +334,6 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
357334
}
358335
}
359336

360-
// subscribeStaticWithSubnets with the given topic and index. A given validator and subscription handler is
361-
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
362-
func (s *Service) subscribeStaticWithSubnets(
363-
description string,
364-
topic string,
365-
validator wrappedVal,
366-
handle subHandler,
367-
digest [4]byte,
368-
subnetIndices []uint64,
369-
) {
370-
// Retrieve the genesis validators root.
371-
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
372-
373-
// Retrieve the fork data
374-
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
375-
if err != nil {
376-
// Impossible condition as it would mean digest does not exist.
377-
panic(err)
378-
}
379-
380-
// Retrieve the base protobuf message.
381-
base := p2p.GossipTopicMappings(topic, epoch)
382-
if base == nil {
383-
// Impossible condition as it would mean topic does not exist.
384-
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
385-
}
386-
387-
// Subscribe to the subnets.
388-
for _, subnetIndex := range subnetIndices {
389-
fullTopic := s.addDigestAndIndexToTopic(topic, digest, subnetIndex)
390-
s.subscribeWithBase(fullTopic, validator, handle)
391-
}
392-
393-
// Retrieve the genesis time.
394-
genesis := s.cfg.clock.GenesisTime()
395-
396-
// Create a ticker ticking every slot.
397-
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
398-
399-
go func() {
400-
for {
401-
select {
402-
case <-s.ctx.Done():
403-
ticker.Done()
404-
return
405-
406-
case <-ticker.C():
407-
// Do not subscribe if not synced.
408-
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
409-
continue
410-
}
411-
412-
valid, err := isDigestValid(digest, genesis, genesisValidatorsRoot)
413-
if err != nil {
414-
log.Error(err)
415-
continue
416-
}
417-
418-
if !valid {
419-
// Unsubscribes from all our current subnets.
420-
log.WithField("digest", digest).Infof("%s subnets with are no longer valid, unsubscribing from all of them.", description)
421-
for _, subnetIndex := range subnetIndices {
422-
fullTopic := fmt.Sprintf(topic, digest, subnetIndex) + s.cfg.p2p.Encoding().ProtocolSuffix()
423-
s.unSubscribeFromTopic(fullTopic)
424-
}
425-
426-
ticker.Done()
427-
return
428-
}
429-
430-
for _, subnetIndex := range subnetIndices {
431-
// Check that there are enough peers.
432-
fullTopic := s.addDigestAndIndexToTopic(topic, digest, subnetIndex)
433-
if s.enoughPeersAreConnected(fullTopic) {
434-
continue
435-
}
436-
437-
// If not enough peers, search for more.
438-
if _, err := s.cfg.p2p.FindPeersWithSubnet(
439-
s.ctx,
440-
s.addDigestAndIndexToTopic(topic, digest, subnetIndex),
441-
subnetIndex,
442-
flags.Get().MinimumPeersPerSubnet,
443-
); err != nil {
444-
log.WithError(err).Debug("Could not search for peers")
445-
return
446-
}
447-
}
448-
}
449-
}
450-
}()
451-
}
452-
453337
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
454338
// not in the list of wanted subnets.
455339
// TODO: Rename this functions as it does not only revalidate subscriptions.
@@ -477,11 +361,44 @@ func (s *Service) reValidateSubscriptions(
477361
}
478362
}
479363

364+
// searchForPeers searches for peers in the given subnets.
365+
func (s *Service) searchForPeers(
366+
ctx context.Context,
367+
topicFormat string,
368+
digest [4]byte,
369+
currentSlot primitives.Slot,
370+
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
371+
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
372+
) {
373+
// Retrieve the subnets we want to subscribe to.
374+
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
375+
376+
// Retrieve the subnets we want to find peers for.
377+
subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot)
378+
379+
// Combine the subnets to subscribe and the subnets to find peers for.
380+
subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...))
381+
382+
// Find new peers for wanted subnets if needed.
383+
for _, subnetIndex := range subnetsToFindPeersIndex {
384+
topic := fmt.Sprintf(topicFormat, digest, subnetIndex)
385+
386+
// Check if we have enough peers in the subnet. Skip if we do.
387+
if s.enoughPeersAreConnected(topic) {
388+
continue
389+
}
390+
391+
// Not enough peers in the subnet, we need to search for more.
392+
_, err := s.cfg.p2p.FindPeersWithSubnet(ctx, topic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
393+
if err != nil {
394+
log.WithError(err).Debug("Could not search for peers")
395+
}
396+
}
397+
}
398+
480399
// subscribeToSubnets subscribes to needed subnets, unsubscribe from unneeded ones and search for more peers if needed.
481400
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
482401
func (s *Service) subscribeToSubnets(
483-
description string,
484-
topic string,
485402
topicFormat string,
486403
digest [4]byte,
487404
genesisValidatorsRoot [fieldparams.RootLength]byte,
@@ -498,7 +415,7 @@ func (s *Service) subscribeToSubnets(
498415
return true
499416
}
500417

501-
// Do not subscribe is the digest is not valid.
418+
// Check the validity of the digest.
502419
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
503420
if err != nil {
504421
log.Error(err)
@@ -507,6 +424,11 @@ func (s *Service) subscribeToSubnets(
507424

508425
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
509426
if !valid {
427+
description := topicFormat
428+
if pos := strings.LastIndex(topicFormat, "/"); pos != -1 {
429+
description = topicFormat[pos+1:]
430+
}
431+
510432
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warningf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", description)
511433
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
512434
return false
@@ -515,18 +437,12 @@ func (s *Service) subscribeToSubnets(
515437
// Retrieve the subnets we want to subscribe to.
516438
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
517439

518-
// Retrieve the subnets we want to find peers for.
519-
subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot)
520-
521-
// Combine the subnets to subscribe and the subnets to find peers for.
522-
subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...))
523-
524440
// Remove subscriptions that are no longer wanted.
525441
s.reValidateSubscriptions(subscriptions, subnetsToSubscribeIndex, topicFormat, digest)
526442

527443
// Subscribe to wanted subnets.
528444
for _, subnetIndex := range subnetsToSubscribeIndex {
529-
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
445+
subnetTopic := fmt.Sprintf(topicFormat, digest, subnetIndex)
530446

531447
// Check if subscription exists.
532448
if _, exists := subscriptions[subnetIndex]; exists {
@@ -537,30 +453,11 @@ func (s *Service) subscribeToSubnets(
537453
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
538454
subscriptions[subnetIndex] = subscription
539455
}
540-
541-
// Find new peers for wanted subnets if needed.
542-
for _, subnetIndex := range subnetsToFindPeersIndex {
543-
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
544-
545-
// Check if we have enough peers in the subnet. Skip if we do.
546-
if s.enoughPeersAreConnected(subnetTopic) {
547-
continue
548-
}
549-
550-
// Not enough peers in the subnet, we need to search for more.
551-
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
552-
if err != nil {
553-
log.WithError(err).Debug("Could not search for peers")
554-
}
555-
}
556-
557456
return true
558457
}
559458

560-
// subscribeDynamicWithSubnets subscribes to a dynamically changing list of subnets.
561-
func (s *Service) subscribeDynamicWithSubnets(
562-
description string,
563-
topic string,
459+
// subscribeSubnets subscribes to a list of subnets.
460+
func (s *Service) subscribeSubnets(
564461
topicFormat string,
565462
validate wrappedVal,
566463
handle subHandler,
@@ -596,21 +493,32 @@ func (s *Service) subscribeDynamicWithSubnets(
596493
// Retrieve the current slot.
597494
currentSlot := s.cfg.clock.CurrentSlot()
598495

496+
// Subscribe to subnets.
497+
s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
498+
499+
// Derive a new context and cancel function.
500+
ctx, cancel := context.WithCancel(s.ctx)
501+
599502
go func() {
600-
s.subscribeToSubnets(description, topic, topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
503+
// Search for peers.
504+
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
601505

602506
for {
603507
select {
604508
case currentSlot := <-ticker.C():
605-
isDigestValid := s.subscribeToSubnets(description, topic, topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
509+
isDigestValid := s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
606510

607511
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
608512
if !isDigestValid {
609513
ticker.Done()
610514
return
611515
}
612516

517+
// Search for peers.
518+
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
519+
613520
case <-s.ctx.Done():
521+
cancel()
614522
ticker.Done()
615523
return
616524
}
@@ -645,6 +553,10 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool {
645553
}
646554

647555
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
556+
if flags.Get().SubscribeToAllSubnets {
557+
return sliceFromCount(params.BeaconConfig().AttestationSubnetCount)
558+
}
559+
648560
persistentSubnetIndices := s.persistentSubnetIndices()
649561
aggregatorSubnetIndices := s.aggregatorSubnetIndices(currentSlot)
650562

0 commit comments

Comments
 (0)