Skip to content

Commit c8cce79

Browse files
committed
fix: reduce memory used by buffered writers
Allocate them as-needed and use a pool. Work towards #322.
1 parent fb62272 commit c8cce79

File tree

1 file changed

+21
-21
lines changed

1 file changed

+21
-21
lines changed

dht_net.go

+21-21
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@ import (
2121
var dhtReadMessageTimeout = time.Minute
2222
var ErrReadTimeout = fmt.Errorf("timed out reading response")
2323

24-
type bufferedWriteCloser interface {
25-
ggio.WriteCloser
26-
Flush() error
27-
}
28-
2924
// The Protobuf writer performs multiple small writes when writing a message.
3025
// We need to buffer those writes, to make sure that we're not sending a new
3126
// packet for every single write.
@@ -34,12 +29,26 @@ type bufferedDelimitedWriter struct {
3429
ggio.WriteCloser
3530
}
3631

37-
func newBufferedDelimitedWriter(str io.Writer) bufferedWriteCloser {
38-
w := bufio.NewWriter(str)
39-
return &bufferedDelimitedWriter{
40-
Writer: w,
41-
WriteCloser: ggio.NewDelimitedWriter(w),
32+
var writerPool = sync.Pool{
33+
New: func() interface{} {
34+
w := bufio.NewWriter(nil)
35+
return &bufferedDelimitedWriter{
36+
Writer: w,
37+
WriteCloser: ggio.NewDelimitedWriter(w),
38+
}
39+
},
40+
}
41+
42+
func writeMsg(w io.Writer, mes *pb.Message) error {
43+
bw := writerPool.Get().(*bufferedDelimitedWriter)
44+
bw.Reset(w)
45+
err := bw.WriteMsg(mes)
46+
if err == nil {
47+
err = bw.Flush()
4248
}
49+
bw.Reset(nil)
50+
writerPool.Put(bw)
51+
return err
4352
}
4453

4554
func (w *bufferedDelimitedWriter) Flush() error {
@@ -62,7 +71,6 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
6271
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
6372
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
6473
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
65-
w := newBufferedDelimitedWriter(cw)
6674
mPeer := s.Conn().RemotePeer()
6775

6876
for {
@@ -118,10 +126,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
118126
}
119127

120128
// send out response msg
121-
err = w.WriteMsg(resp)
122-
if err == nil {
123-
err = w.Flush()
124-
}
129+
err = writeMsg(cw, resp)
125130
if err != nil {
126131
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
127132
logger.Debugf("error writing response: %v", err)
@@ -237,7 +242,6 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa
237242
type messageSender struct {
238243
s inet.Stream
239244
r ggio.ReadCloser
240-
w bufferedWriteCloser
241245
lk sync.Mutex
242246
p peer.ID
243247
dht *IpfsDHT
@@ -281,7 +285,6 @@ func (ms *messageSender) prep(ctx context.Context) error {
281285
}
282286

283287
ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
284-
ms.w = newBufferedDelimitedWriter(nstr)
285288
ms.s = nstr
286289

287290
return nil
@@ -377,10 +380,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
377380
}
378381

379382
func (ms *messageSender) writeMsg(pmes *pb.Message) error {
380-
if err := ms.w.WriteMsg(pmes); err != nil {
381-
return err
382-
}
383-
return ms.w.Flush()
383+
return writeMsg(ms.s, pmes)
384384
}
385385

386386
func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {

0 commit comments

Comments
 (0)