Skip to content

Commit b786a50

Browse files
authored
XHTTP server: Fix stream-up "single POST problem", Use united httpServerConn instead of recover()
#4373 (comment) #4406 (comment)
1 parent b38a53e commit b786a50

File tree

2 files changed

+63
-81
lines changed

2 files changed

+63
-81
lines changed

transport/internet/splithttp/hub.go

Lines changed: 44 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,6 @@ type httpSession struct {
4747
isFullyConnected *done.Instance
4848
}
4949

50-
func (h *requestHandler) maybeReapSession(isFullyConnected *done.Instance, sessionId string) {
51-
shouldReap := done.New()
52-
go func() {
53-
time.Sleep(30 * time.Second)
54-
shouldReap.Close()
55-
}()
56-
57-
select {
58-
case <-isFullyConnected.Wait():
59-
return
60-
case <-shouldReap.Wait():
61-
h.sessions.Delete(sessionId)
62-
}
63-
}
64-
6550
func (h *requestHandler) upsertSession(sessionId string) *httpSession {
6651
// fast path
6752
currentSessionAny, ok := h.sessions.Load(sessionId)
@@ -84,7 +69,21 @@ func (h *requestHandler) upsertSession(sessionId string) *httpSession {
8469
}
8570

8671
h.sessions.Store(sessionId, s)
87-
go h.maybeReapSession(s.isFullyConnected, sessionId)
72+
73+
shouldReap := done.New()
74+
go func() {
75+
time.Sleep(30 * time.Second)
76+
shouldReap.Close()
77+
}()
78+
go func() {
79+
select {
80+
case <-shouldReap.Wait():
81+
h.sessions.Delete(sessionId)
82+
s.uploadQueue.Close()
83+
case <-s.isFullyConnected.Wait():
84+
}
85+
}()
86+
8887
return s
8988
}
9089

@@ -183,12 +182,13 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
183182
writer.WriteHeader(http.StatusBadRequest)
184183
return
185184
}
186-
uploadDone := done.New()
185+
httpSC := &httpServerConn{
186+
Instance: done.New(),
187+
Reader: request.Body,
188+
ResponseWriter: writer,
189+
}
187190
err = currentSession.uploadQueue.Push(Packet{
188-
Reader: &httpRequestBodyReader{
189-
requestReader: request.Body,
190-
uploadDone: uploadDone,
191-
},
191+
Reader: httpSC,
192192
})
193193
if err != nil {
194194
errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)")
@@ -200,25 +200,21 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
200200
scStreamUpServerSecs := h.config.GetNormalizedScStreamUpServerSecs()
201201
if referrer != "" && scStreamUpServerSecs.To > 0 {
202202
go func() {
203-
defer func() {
204-
recover()
205-
}()
206203
for {
207-
_, err := writer.Write(bytes.Repeat([]byte{'X'}, int(h.config.GetNormalizedXPaddingBytes().rand())))
204+
_, err := httpSC.Write(bytes.Repeat([]byte{'X'}, int(h.config.GetNormalizedXPaddingBytes().rand())))
208205
if err != nil {
209206
break
210207
}
211-
writer.(http.Flusher).Flush()
212208
time.Sleep(time.Duration(scStreamUpServerSecs.rand()) * time.Second)
213209
}
214210
}()
215211
}
216212
select {
217213
case <-request.Context().Done():
218-
case <-uploadDone.Wait():
214+
case <-httpSC.Wait():
219215
}
220216
}
221-
uploadDone.Close()
217+
httpSC.Close()
222218
return
223219
}
224220

@@ -262,11 +258,6 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
262258

263259
writer.WriteHeader(http.StatusOK)
264260
} else if request.Method == "GET" || sessionId == "" { // stream-down, stream-one
265-
responseFlusher, ok := writer.(http.Flusher)
266-
if !ok {
267-
panic("expected http.ResponseWriter to be an http.Flusher")
268-
}
269-
270261
if sessionId != "" {
271262
// after GET is done, the connection is finished. disable automatic
272263
// session reaping, and handle it in defer
@@ -287,20 +278,18 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
287278
}
288279

289280
writer.WriteHeader(http.StatusOK)
281+
writer.(http.Flusher).Flush()
290282

291-
responseFlusher.Flush()
292-
293-
downloadDone := done.New()
294-
283+
httpSC := &httpServerConn{
284+
Instance: done.New(),
285+
Reader: request.Body,
286+
ResponseWriter: writer,
287+
}
295288
conn := splitConn{
296-
writer: &httpResponseBodyWriter{
297-
responseWriter: writer,
298-
downloadDone: downloadDone,
299-
responseFlusher: responseFlusher,
300-
},
301-
reader: request.Body,
302-
localAddr: h.localAddr,
289+
writer: httpSC,
290+
reader: httpSC,
303291
remoteAddr: remoteAddr,
292+
localAddr: h.localAddr,
304293
}
305294
if sessionId != "" { // if not stream-one
306295
conn.reader = currentSession.uploadQueue
@@ -311,7 +300,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
311300
// "A ResponseWriter may not be used after [Handler.ServeHTTP] has returned."
312301
select {
313302
case <-request.Context().Done():
314-
case <-downloadDone.Wait():
303+
case <-httpSC.Wait():
315304
}
316305

317306
conn.Close()
@@ -321,45 +310,30 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
321310
}
322311
}
323312

324-
type httpRequestBodyReader struct {
325-
requestReader io.ReadCloser
326-
uploadDone *done.Instance
327-
}
328-
329-
func (c *httpRequestBodyReader) Read(b []byte) (int, error) {
330-
return c.requestReader.Read(b)
331-
}
332-
333-
func (c *httpRequestBodyReader) Close() error {
334-
defer c.uploadDone.Close()
335-
return c.requestReader.Close()
336-
}
337-
338-
type httpResponseBodyWriter struct {
313+
type httpServerConn struct {
339314
sync.Mutex
340-
responseWriter http.ResponseWriter
341-
responseFlusher http.Flusher
342-
downloadDone *done.Instance
315+
*done.Instance
316+
io.Reader // no need to Close request.Body
317+
http.ResponseWriter
343318
}
344319

345-
func (c *httpResponseBodyWriter) Write(b []byte) (int, error) {
320+
func (c *httpServerConn) Write(b []byte) (int, error) {
346321
c.Lock()
347322
defer c.Unlock()
348-
if c.downloadDone.Done() {
323+
if c.Done() {
349324
return 0, io.ErrClosedPipe
350325
}
351-
n, err := c.responseWriter.Write(b)
326+
n, err := c.ResponseWriter.Write(b)
352327
if err == nil {
353-
c.responseFlusher.Flush()
328+
c.ResponseWriter.(http.Flusher).Flush()
354329
}
355330
return n, err
356331
}
357332

358-
func (c *httpResponseBodyWriter) Close() error {
333+
func (c *httpServerConn) Close() error {
359334
c.Lock()
360335
defer c.Unlock()
361-
c.downloadDone.Close()
362-
return nil
336+
return c.Instance.Close()
363337
}
364338

365339
type Listener struct {

transport/internet/splithttp/upload_queue.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type Packet struct {
2020

2121
type uploadQueue struct {
2222
reader io.ReadCloser
23+
nomore bool
2324
pushedPackets chan Packet
2425
writeCloseMutex sync.Mutex
2526
heap uploadHeap
@@ -42,19 +43,15 @@ func (h *uploadQueue) Push(p Packet) error {
4243
h.writeCloseMutex.Lock()
4344
defer h.writeCloseMutex.Unlock()
4445

45-
runtime.Gosched()
46-
if h.reader != nil && p.Reader != nil {
47-
p.Reader.Close()
48-
return errors.New("h.reader already exists")
49-
}
50-
5146
if h.closed {
52-
if p.Reader != nil {
53-
p.Reader.Close()
54-
}
5547
return errors.New("packet queue closed")
5648
}
57-
49+
if h.nomore {
50+
return errors.New("h.reader already exists")
51+
}
52+
if p.Reader != nil {
53+
h.nomore = true
54+
}
5855
h.pushedPackets <- p
5956
return nil
6057
}
@@ -65,9 +62,20 @@ func (h *uploadQueue) Close() error {
6562

6663
if !h.closed {
6764
h.closed = true
65+
runtime.Gosched() // hope Read() gets the packet
66+
f:
67+
for {
68+
select {
69+
case p := <-h.pushedPackets:
70+
if p.Reader != nil {
71+
h.reader = p.Reader
72+
}
73+
default:
74+
break f
75+
}
76+
}
6877
close(h.pushedPackets)
6978
}
70-
runtime.Gosched()
7179
if h.reader != nil {
7280
return h.reader.Close()
7381
}

0 commit comments

Comments
 (0)