Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit a7c1f56

Browse files
committed
feat: add latency & count metrics for content routing client
1 parent 0c84bf8 commit a7c1f56

File tree

6 files changed

+208
-46
lines changed

6 files changed

+208
-46
lines changed

client/client.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/ipfs/go-cid"
9+
proto "github.com/ipfs/go-delegated-routing/gen/proto"
10+
ipns "github.com/ipfs/go-ipns"
11+
logging "github.com/ipfs/go-log/v2"
12+
record "github.com/libp2p/go-libp2p-record"
13+
"github.com/libp2p/go-libp2p/core/crypto"
14+
"github.com/libp2p/go-libp2p/core/peer"
15+
)
16+
17+
var logger = logging.Logger("service/client/delegatedrouting")
18+
19+
type DelegatedRoutingClient interface {
20+
FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error)
21+
FindProvidersAsync(ctx context.Context, key cid.Cid) (<-chan FindProvidersAsyncResult, error)
22+
GetIPNS(ctx context.Context, id []byte) ([]byte, error)
23+
GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error)
24+
PutIPNS(ctx context.Context, id []byte, record []byte) error
25+
PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error)
26+
Provide(ctx context.Context, key []cid.Cid, ttl time.Duration) (time.Duration, error)
27+
ProvideAsync(ctx context.Context, key []cid.Cid, ttl time.Duration) (<-chan time.Duration, error)
28+
}
29+
30+
type Client struct {
31+
client proto.DelegatedRouting_Client
32+
validator record.Validator
33+
34+
provider *Provider
35+
identity crypto.PrivKey
36+
}
37+
38+
var _ DelegatedRoutingClient = (*Client)(nil)
39+
40+
// NewClient creates a client.
41+
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
42+
func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) {
43+
if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) {
44+
return nil, errors.New("identity does not match provider")
45+
}
46+
47+
return &Client{
48+
client: c,
49+
validator: ipns.Validator{},
50+
provider: p,
51+
identity: identity,
52+
}, nil
53+
}

client/contentrouting.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,31 @@ func NewContentRoutingClient(c DelegatedRoutingClient) *ContentRoutingClient {
2121
}
2222

2323
func (c *ContentRoutingClient) Provide(ctx context.Context, key cid.Cid, announce bool) error {
24+
var err error
25+
recordMetrics := startMetrics(ctx, "ContentRoutingClient.Provide")
26+
defer recordMetrics(err)
27+
2428
// If 'true' is
2529
// passed, it also announces it, otherwise it is just kept in the local
2630
// accounting of which objects are being provided.
2731
if !announce {
2832
return nil
2933
}
3034

31-
_, err := c.client.Provide(ctx, []cid.Cid{key}, 24*time.Hour)
35+
_, err = c.client.Provide(ctx, []cid.Cid{key}, 24*time.Hour)
3236
return err
3337
}
3438

3539
func (c *ContentRoutingClient) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
40+
var err error
41+
recordMetrics := startMetrics(ctx, "ContentRoutingClient.ProvideMany")
42+
defer recordMetrics(err)
43+
3644
keysAsCids := make([]cid.Cid, 0, len(keys))
3745
for _, m := range keys {
3846
keysAsCids = append(keysAsCids, cid.NewCidV1(cid.Raw, m))
3947
}
40-
_, err := c.client.Provide(ctx, keysAsCids, 24*time.Hour)
48+
_, err = c.client.Provide(ctx, keysAsCids, 24*time.Hour)
4149
return err
4250
}
4351

@@ -51,13 +59,18 @@ func (c *ContentRoutingClient) Ready() bool {
5159
}
5260

5361
func (c *ContentRoutingClient) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo {
62+
var err error
63+
recordMetrics := startMetrics(ctx, "ContentRoutingClient.FindProvidersAsync")
64+
5465
addrInfoCh := make(chan peer.AddrInfo)
5566
resultCh, err := c.client.FindProvidersAsync(ctx, key)
5667
if err != nil {
5768
close(addrInfoCh)
69+
recordMetrics(err)
5870
return addrInfoCh
5971
}
6072
go func() {
73+
defer recordMetrics(nil)
6174
numProcessed := 0
6275
closed := false
6376
for asyncResult := range resultCh {

client/findproviders.go

-44
Original file line numberDiff line numberDiff line change
@@ -2,58 +2,14 @@ package client
22

33
import (
44
"context"
5-
"errors"
6-
"time"
75

86
"github.com/ipfs/go-cid"
97
proto "github.com/ipfs/go-delegated-routing/gen/proto"
10-
ipns "github.com/ipfs/go-ipns"
11-
logging "github.com/ipfs/go-log/v2"
128
"github.com/ipld/edelweiss/values"
13-
record "github.com/libp2p/go-libp2p-record"
14-
"github.com/libp2p/go-libp2p/core/crypto"
159
"github.com/libp2p/go-libp2p/core/peer"
1610
"github.com/multiformats/go-multiaddr"
1711
)
1812

19-
var logger = logging.Logger("service/client/delegatedrouting")
20-
21-
type DelegatedRoutingClient interface {
22-
FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error)
23-
FindProvidersAsync(ctx context.Context, key cid.Cid) (<-chan FindProvidersAsyncResult, error)
24-
GetIPNS(ctx context.Context, id []byte) ([]byte, error)
25-
GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error)
26-
PutIPNS(ctx context.Context, id []byte, record []byte) error
27-
PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error)
28-
Provide(ctx context.Context, key []cid.Cid, ttl time.Duration) (time.Duration, error)
29-
ProvideAsync(ctx context.Context, key []cid.Cid, ttl time.Duration) (<-chan time.Duration, error)
30-
}
31-
32-
type Client struct {
33-
client proto.DelegatedRouting_Client
34-
validator record.Validator
35-
36-
provider *Provider
37-
identity crypto.PrivKey
38-
}
39-
40-
var _ DelegatedRoutingClient = (*Client)(nil)
41-
42-
// NewClient creates a client.
43-
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
44-
func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) {
45-
if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) {
46-
return nil, errors.New("identity does not match provider")
47-
}
48-
49-
return &Client{
50-
client: c,
51-
validator: ipns.Validator{},
52-
provider: p,
53-
identity: identity,
54-
}, nil
55-
}
56-
5713
func (fp *Client) FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error) {
5814
resps, err := fp.client.FindProviders(ctx, cidsToFindProvidersRequest(key))
5915
if err != nil {

client/metrics.go

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"go.opencensus.io/stats"
9+
"go.opencensus.io/stats/view"
10+
"go.opencensus.io/tag"
11+
)
12+
13+
var (
14+
defaultDurationDistribution = view.Distribution(0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000)
15+
16+
measureDuration = stats.Float64("delegated_routing/duration", "The time to complete an entire request", stats.UnitMilliseconds)
17+
measureRequests = stats.Float64("delegated_routing/requests", "The number of requests made", stats.UnitDimensionless)
18+
19+
keyName = tag.MustNewKey("name")
20+
keyError = tag.MustNewKey("error")
21+
22+
durationView = &view.View{
23+
Measure: measureDuration,
24+
TagKeys: []tag.Key{keyName, keyError},
25+
Aggregation: defaultDurationDistribution,
26+
}
27+
requestsView = &view.View{
28+
Measure: measureRequests,
29+
TagKeys: []tag.Key{keyName, keyError},
30+
Aggregation: view.Sum(),
31+
}
32+
33+
DefaultViews = []*view.View{
34+
durationView,
35+
requestsView,
36+
}
37+
)
38+
39+
// startMetrics begins recording metrics.
40+
// The returned function flushes the metrics when called, recording metrics about the passed error.
41+
func startMetrics(ctx context.Context, name string) (done func(err error)) {
42+
start := time.Now()
43+
44+
return func(err error) {
45+
latency := time.Since(start)
46+
47+
errStr := "None"
48+
if err != nil {
49+
logger.Warnw("received delegated routing error", "Error", err)
50+
if errors.Is(err, context.Canceled) {
51+
errStr = "Canceled"
52+
} else if errors.Is(err, context.DeadlineExceeded) {
53+
errStr = "DeadlineExceeded"
54+
} else {
55+
errStr = "Unknown"
56+
}
57+
}
58+
59+
stats.RecordWithTags(ctx,
60+
[]tag.Mutator{
61+
tag.Upsert(keyName, name),
62+
tag.Upsert(keyError, errStr),
63+
},
64+
[]stats.Measurement{
65+
measureDuration.M(float64(latency.Milliseconds())),
66+
measureRequests.M(1),
67+
}...,
68+
)
69+
}
70+
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/multiformats/go-multicodec v0.6.0
1515
github.com/multiformats/go-multihash v0.2.1
1616
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e
17+
go.opencensus.io v0.23.0
1718
)
1819

1920
require (

0 commit comments

Comments
 (0)