@@ -133,14 +133,6 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
133
133
mq .out = fullwantlist
134
134
mq .work <- struct {}{}
135
135
136
- s , err := pm .network .NewMessageSender (pm .ctx , p )
137
- if err != nil {
138
- log .Error ("error opening stream to peer: " , err )
139
- return nil
140
- }
141
-
142
- mq .sender = s
143
-
144
136
pm .peers [p ] = mq
145
137
go mq .runQueue (pm .ctx )
146
138
return mq
@@ -163,7 +155,11 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) {
163
155
}
164
156
165
157
func (mq * msgQueue ) runQueue (ctx context.Context ) {
166
- defer mq .sender .Close ()
158
+ defer func () {
159
+ if mq .sender != nil {
160
+ mq .sender .Close ()
161
+ }
162
+ }()
167
163
for {
168
164
select {
169
165
case <- mq .work : // there is work to be done
@@ -180,14 +176,25 @@ func (mq *msgQueue) doWork(ctx context.Context) {
180
176
// allow ten minutes for connections
181
177
// this includes looking them up in the dht
182
178
// dialing them, and handshaking
183
- conctx , cancel := context .WithTimeout (ctx , time .Minute * 10 )
184
- defer cancel ()
179
+ if mq .sender == nil {
180
+ conctx , cancel := context .WithTimeout (ctx , time .Minute * 10 )
181
+ defer cancel ()
182
+
183
+ err := mq .network .ConnectTo (conctx , mq .p )
184
+ if err != nil {
185
+ log .Infof ("cant connect to peer %s: %s" , mq .p , err )
186
+ // TODO: cant connect, what now?
187
+ return
188
+ }
185
189
186
- err := mq .network .ConnectTo (conctx , mq .p )
187
- if err != nil {
188
- log .Infof ("cant connect to peer %s: %s" , mq .p , err )
189
- // TODO: cant connect, what now?
190
- return
190
+ nsender , err := mq .network .NewMessageSender (ctx , mq .p )
191
+ if err != nil {
192
+ log .Infof ("cant open new stream to peer %s: %s" , mq .p , err )
193
+ // TODO: cant open stream, what now?
194
+ return
195
+ }
196
+
197
+ mq .sender = nsender
191
198
}
192
199
193
200
// grab outgoing message
@@ -201,9 +208,11 @@ func (mq *msgQueue) doWork(ctx context.Context) {
201
208
mq .outlk .Unlock ()
202
209
203
210
// send wantlist updates
204
- err = mq .sender .SendMsg (wlm )
211
+ err : = mq .sender .SendMsg (wlm )
205
212
if err != nil {
206
213
log .Infof ("bitswap send error: %s" , err )
214
+ mq .sender .Close ()
215
+ mq .sender = nil
207
216
// TODO: what do we do if this fails?
208
217
return
209
218
}
0 commit comments