Skip to content

Commit 368bbc9

Browse files
joshua-kimStephenButtolphaaronbuchwald
authored andcommitted
Implement ACP-118 Aggregator (#3394)
Signed-off-by: Joshua Kim <[email protected]> Co-authored-by: Stephen Buttolph <[email protected]> Co-authored-by: aaronbuchwald <[email protected]>
1 parent 240fc7c commit 368bbc9

File tree

6 files changed

+1074
-99
lines changed

6 files changed

+1074
-99
lines changed

network/p2p/acp118/aggregator.go

+254
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package acp118
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"math/big"
11+
12+
"go.uber.org/zap"
13+
"google.golang.org/protobuf/proto"
14+
15+
"github.com/ava-labs/avalanchego/ids"
16+
"github.com/ava-labs/avalanchego/network/p2p"
17+
"github.com/ava-labs/avalanchego/proto/pb/sdk"
18+
"github.com/ava-labs/avalanchego/utils/crypto/bls"
19+
"github.com/ava-labs/avalanchego/utils/logging"
20+
"github.com/ava-labs/avalanchego/utils/set"
21+
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
22+
)
23+
24+
var errFailedVerification = errors.New("failed verification")
25+
26+
type indexedValidator struct {
27+
*warp.Validator
28+
Index int
29+
}
30+
31+
type result struct {
32+
NodeID ids.NodeID
33+
Validator indexedValidator
34+
Signature *bls.Signature
35+
Err error
36+
}
37+
38+
// NewSignatureAggregator returns an instance of SignatureAggregator
39+
func NewSignatureAggregator(log logging.Logger, client *p2p.Client) *SignatureAggregator {
40+
return &SignatureAggregator{
41+
log: log,
42+
client: client,
43+
}
44+
}
45+
46+
// SignatureAggregator aggregates validator signatures for warp messages
47+
type SignatureAggregator struct {
48+
log logging.Logger
49+
client *p2p.Client
50+
}
51+
52+
// AggregateSignatures blocks until quorumNum/quorumDen signatures from
53+
// validators are requested to be aggregated into a warp message or the context
54+
// is canceled. Returns the signed message and the amount of stake that signed
55+
// the message. Caller is responsible for providing a well-formed canonical
56+
// validator set corresponding to the signer bitset in the message.
57+
func (s *SignatureAggregator) AggregateSignatures(
58+
ctx context.Context,
59+
message *warp.Message,
60+
justification []byte,
61+
validators []*warp.Validator,
62+
quorumNum uint64,
63+
quorumDen uint64,
64+
) (
65+
_ *warp.Message,
66+
aggregatedStake *big.Int,
67+
totalStake *big.Int,
68+
_ error,
69+
) {
70+
request := &sdk.SignatureRequest{
71+
Message: message.UnsignedMessage.Bytes(),
72+
Justification: justification,
73+
}
74+
75+
requestBytes, err := proto.Marshal(request)
76+
if err != nil {
77+
return nil, nil, nil, fmt.Errorf("failed to marshal signature request: %w", err)
78+
}
79+
80+
nodeIDsToValidator := make(map[ids.NodeID]indexedValidator)
81+
// TODO expose concrete type to avoid type casting
82+
bitSetSignature, ok := message.Signature.(*warp.BitSetSignature)
83+
if !ok {
84+
return nil, nil, nil, errors.New("invalid warp signature type")
85+
}
86+
87+
signerBitSet := set.BitsFromBytes(bitSetSignature.Signers)
88+
89+
nonSigners := make([]ids.NodeID, 0, len(validators))
90+
aggregatedStakeWeight := new(big.Int)
91+
totalStakeWeight := new(big.Int)
92+
for i, validator := range validators {
93+
totalStakeWeight.Add(totalStakeWeight, new(big.Int).SetUint64(validator.Weight))
94+
95+
// Only try to aggregate signatures from validators that are not already in
96+
// the signer bit set
97+
if signerBitSet.Contains(i) {
98+
aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(validator.Weight))
99+
continue
100+
}
101+
102+
v := indexedValidator{
103+
Index: i,
104+
Validator: validator,
105+
}
106+
107+
for _, nodeID := range v.NodeIDs {
108+
nodeIDsToValidator[nodeID] = v
109+
}
110+
111+
nonSigners = append(nonSigners, v.NodeIDs...)
112+
}
113+
114+
// Account for requested signatures + the signature that was provided
115+
signatures := make([]*bls.Signature, 0, len(nonSigners)+1)
116+
if bitSetSignature.Signature != [bls.SignatureLen]byte{} {
117+
blsSignature, err := bls.SignatureFromBytes(bitSetSignature.Signature[:])
118+
if err != nil {
119+
return nil, nil, nil, fmt.Errorf("failed to parse bls signature: %w", err)
120+
}
121+
signatures = append(signatures, blsSignature)
122+
}
123+
124+
results := make(chan result)
125+
handler := responseHandler{
126+
message: message,
127+
nodeIDsToValidators: nodeIDsToValidator,
128+
results: results,
129+
}
130+
131+
if err := s.client.AppRequest(ctx, set.Of(nonSigners...), requestBytes, handler.HandleResponse); err != nil {
132+
return nil, nil, nil, fmt.Errorf("failed to send aggregation request: %w", err)
133+
}
134+
135+
minThreshold := new(big.Int).Mul(totalStakeWeight, new(big.Int).SetUint64(quorumNum))
136+
minThreshold.Div(minThreshold, new(big.Int).SetUint64(quorumDen))
137+
138+
// Block until:
139+
// 1. The context is cancelled
140+
// 2. We get responses from all validators
141+
// 3. The specified security threshold is reached
142+
for i := 0; i < len(nonSigners); i++ {
143+
select {
144+
case <-ctx.Done():
145+
// Try to return whatever progress we have if the context is cancelled
146+
msg, err := newWarpMessage(message, signerBitSet, signatures)
147+
if err != nil {
148+
return nil, nil, nil, err
149+
}
150+
151+
return msg, aggregatedStakeWeight, totalStakeWeight, nil
152+
case result := <-results:
153+
if result.Err != nil {
154+
s.log.Debug(
155+
"dropping response",
156+
zap.Stringer("nodeID", result.NodeID),
157+
zap.Error(err),
158+
)
159+
continue
160+
}
161+
162+
// Validators may share public keys so drop any duplicate signatures
163+
if signerBitSet.Contains(result.Validator.Index) {
164+
s.log.Debug(
165+
"dropping duplicate signature",
166+
zap.Stringer("nodeID", result.NodeID),
167+
zap.Error(err),
168+
)
169+
continue
170+
}
171+
172+
signatures = append(signatures, result.Signature)
173+
signerBitSet.Add(result.Validator.Index)
174+
aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(result.Validator.Weight))
175+
176+
if aggregatedStakeWeight.Cmp(minThreshold) != -1 {
177+
msg, err := newWarpMessage(message, signerBitSet, signatures)
178+
if err != nil {
179+
return nil, nil, nil, err
180+
}
181+
182+
return msg, aggregatedStakeWeight, totalStakeWeight, nil
183+
}
184+
}
185+
}
186+
187+
msg, err := newWarpMessage(message, signerBitSet, signatures)
188+
if err != nil {
189+
return nil, nil, nil, err
190+
}
191+
192+
return msg, aggregatedStakeWeight, totalStakeWeight, nil
193+
}
194+
195+
func newWarpMessage(
196+
message *warp.Message,
197+
signerBitSet set.Bits,
198+
signatures []*bls.Signature,
199+
) (*warp.Message, error) {
200+
if len(signatures) == 0 {
201+
return message, nil
202+
}
203+
204+
aggregateSignature, err := bls.AggregateSignatures(signatures)
205+
if err != nil {
206+
return nil, err
207+
}
208+
209+
bitSetSignature := &warp.BitSetSignature{
210+
Signers: signerBitSet.Bytes(),
211+
Signature: [bls.SignatureLen]byte{},
212+
}
213+
copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature))
214+
215+
return warp.NewMessage(&message.UnsignedMessage, bitSetSignature)
216+
}
217+
218+
type responseHandler struct {
219+
message *warp.Message
220+
nodeIDsToValidators map[ids.NodeID]indexedValidator
221+
results chan result
222+
}
223+
224+
func (r *responseHandler) HandleResponse(
225+
_ context.Context,
226+
nodeID ids.NodeID,
227+
responseBytes []byte,
228+
err error,
229+
) {
230+
validator := r.nodeIDsToValidators[nodeID]
231+
if err != nil {
232+
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
233+
return
234+
}
235+
236+
response := &sdk.SignatureResponse{}
237+
if err := proto.Unmarshal(responseBytes, response); err != nil {
238+
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
239+
return
240+
}
241+
242+
signature, err := bls.SignatureFromBytes(response.Signature)
243+
if err != nil {
244+
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
245+
return
246+
}
247+
248+
if !bls.Verify(validator.PublicKey, signature, r.message.UnsignedMessage.Bytes()) {
249+
r.results <- result{NodeID: nodeID, Validator: validator, Err: errFailedVerification}
250+
return
251+
}
252+
253+
r.results <- result{NodeID: nodeID, Validator: validator, Signature: signature}
254+
}

0 commit comments

Comments
 (0)