@@ -2,9 +2,9 @@ package dht
2
2
3
3
import (
4
4
"context"
5
+ "errors"
5
6
"sync"
6
7
7
- u "github.com/ipfs/go-ipfs-util"
8
8
logging "github.com/ipfs/go-log"
9
9
todoctr "github.com/ipfs/go-todocounter"
10
10
process "github.com/jbenet/goprocess"
@@ -18,6 +18,9 @@ import (
18
18
notif "github.com/libp2p/go-libp2p-routing/notifications"
19
19
)
20
20
21
+ // ErrNoPeersQueried is returned when we failed to connect to any peers.
22
+ var ErrNoPeersQueried = errors .New ("failed to query any peers" )
23
+
21
24
var maxQueryConcurrency = AlphaValue
22
25
23
26
type dhtQuery struct {
@@ -77,7 +80,6 @@ type dhtQueryRunner struct {
77
80
peersRemaining todoctr.Counter // peersToQuery + currently processing
78
81
79
82
result * dhtQueryResult // query result
80
- errs u.MultiErr // result errors. maybe should be a map[peer.ID]error
81
83
82
84
rateLimit chan struct {} // processing semaphore
83
85
log logging.EventLogger
@@ -155,23 +157,19 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
155
157
select {
156
158
case <- r .peersRemaining .Done ():
157
159
r .proc .Close ()
158
- r .RLock ()
159
- defer r .RUnlock ()
160
-
161
- err = routing .ErrNotFound
162
-
163
- // if every query to every peer failed, something must be very wrong.
164
- if len (r .errs ) > 0 && len (r .errs ) == r .peersSeen .Size () {
165
- logger .Debugf ("query errs: %s" , r .errs )
166
- err = r .errs [0 ]
160
+ if r .peersQueried .Size () == 0 {
161
+ err = ErrNoPeersQueried
162
+ } else {
163
+ err = routing .ErrNotFound
167
164
}
168
165
169
166
case <- r .proc .Closed ():
170
- r .RLock ()
171
- defer r .RUnlock ()
172
167
err = r .runCtx .Err ()
173
168
}
174
169
170
+ r .RLock ()
171
+ defer r .RUnlock ()
172
+
175
173
if r .result != nil && r .result .success {
176
174
return r .result , nil
177
175
}
@@ -257,10 +255,6 @@ func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
257
255
ID : p ,
258
256
})
259
257
260
- r .Lock ()
261
- r .errs = append (r .errs , err )
262
- r .Unlock ()
263
-
264
258
// This peer is dropping out of the race.
265
259
r .peersRemaining .Decrement (1 )
266
260
return err
@@ -289,10 +283,6 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
289
283
290
284
if err != nil {
291
285
logger .Debugf ("ERROR worker for: %v %v" , p , err )
292
- r .Lock ()
293
- r .errs = append (r .errs , err )
294
- r .Unlock ()
295
-
296
286
} else if res .success {
297
287
logger .Debugf ("SUCCESS worker for: %v %s" , p , res )
298
288
r .Lock ()
0 commit comments