Skip to content

Commit d77614a

Browse files
author
Matthieu Vachon
committed
Added possibility to customize subscription resolver timeout value
The previous value was hard-coded to 1 second. This is problematic for resolver that takes more time than this to return a result. When parsing the schema, it's not possible to pass a custom value for the subscription resolver timeout. Extracted from graph-gophers#317
1 parent 4c772c1 commit d77614a

File tree

5 files changed

+82
-16
lines changed

5 files changed

+82
-16
lines changed

graphql.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"reflect"
8+
"time"
89

910
"github.com/graph-gophers/graphql-go/errors"
1011
"github.com/graph-gophers/graphql-go/internal/common"
@@ -64,13 +65,14 @@ type Schema struct {
6465
schema *schema.Schema
6566
res *resolvable.Schema
6667

67-
maxDepth int
68-
maxParallelism int
69-
tracer trace.Tracer
70-
validationTracer trace.ValidationTracer
71-
logger log.Logger
72-
useStringDescriptions bool
73-
disableIntrospection bool
68+
maxDepth int
69+
maxParallelism int
70+
tracer trace.Tracer
71+
validationTracer trace.ValidationTracer
72+
logger log.Logger
73+
useStringDescriptions bool
74+
disableIntrospection bool
75+
subscribeResolverTimeout time.Duration
7476
}
7577

7678
// SchemaOpt is an option to pass to ParseSchema or MustParseSchema.
@@ -135,6 +137,15 @@ func DisableIntrospection() SchemaOpt {
135137
}
136138
}
137139

140+
// SubscribeResolverTimeout is an option to control the amount of time
141+
// we allow for a single subscribe message resolver to complete it's job
142+
// before it times out and returns an error to the subscriber.
143+
func SubscribeResolverTimeout(timeout time.Duration) SchemaOpt {
144+
return func(s *Schema) {
145+
s.subscribeResolverTimeout = timeout
146+
}
147+
}
148+
138149
// Response represents a typical response of a GraphQL server. It may be encoded to JSON directly or
139150
// it may be further processed to a custom response type, for example to include custom error data.
140151
// Errors are intentionally serialized first based on the advice in https://github.com/facebook/graphql/commit/7b40390d48680b15cb93e02d46ac5eb249689876#diff-757cea6edf0288677a9eea4cfc801d87R107
@@ -190,7 +201,7 @@ func (s *Schema) exec(ctx context.Context, queryString string, operationName str
190201

191202
// Subscriptions are not valid in Exec. Use schema.Subscribe() instead.
192203
if op.Type == query.Subscription {
193-
return &Response{Errors: []*errors.QueryError{&errors.QueryError{Message: "graphql-ws protocol header is missing"}}}
204+
return &Response{Errors: []*errors.QueryError{{Message: "graphql-ws protocol header is missing"}}}
194205
}
195206
if op.Type == query.Mutation {
196207
if _, ok := s.schema.EntryPoints["mutation"]; !ok {

internal/exec/exec.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"reflect"
99
"sync"
10+
"time"
1011

1112
"github.com/graph-gophers/graphql-go/errors"
1213
"github.com/graph-gophers/graphql-go/internal/common"
@@ -20,9 +21,10 @@ import (
2021

2122
type Request struct {
2223
selected.Request
23-
Limiter chan struct{}
24-
Tracer trace.Tracer
25-
Logger log.Logger
24+
Limiter chan struct{}
25+
Tracer trace.Tracer
26+
Logger log.Logger
27+
SubscribeResolverTimeout time.Duration
2628
}
2729

2830
func (r *Request) handlePanic(ctx context.Context) {

internal/exec/subscribe.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,12 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *query
115115
}
116116
var out bytes.Buffer
117117
func() {
118-
// TODO: configurable timeout
119-
subCtx, cancel := context.WithTimeout(ctx, time.Second)
118+
timeout := r.SubscribeResolverTimeout
119+
if timeout == 0 {
120+
timeout = time.Second
121+
}
122+
123+
subCtx, cancel := context.WithTimeout(ctx, timeout)
120124
defer cancel()
121125

122126
// resolve response

subscription_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"testing"
8+
"time"
89

910
graphql "github.com/graph-gophers/graphql-go"
1011
qerrors "github.com/graph-gophers/graphql-go/errors"
@@ -473,3 +474,50 @@ const schema = `
473474
hello: String!
474475
}
475476
`
477+
478+
type subscriptionsCustomTimeout struct{}
479+
480+
type messageResolver struct{}
481+
482+
func (r messageResolver) Msg() string {
483+
time.Sleep(5 * time.Millisecond)
484+
return "failed!"
485+
}
486+
487+
func (r *subscriptionsCustomTimeout) OnTimeout() <-chan *messageResolver {
488+
c := make(chan *messageResolver)
489+
go func() {
490+
c <- &messageResolver{}
491+
close(c)
492+
}()
493+
494+
return c
495+
}
496+
497+
func TestSchemaSubscribe_CustomResolverTimeout(t *testing.T) {
498+
r := &struct {
499+
*subscriptionsCustomTimeout
500+
}{
501+
subscriptionsCustomTimeout: &subscriptionsCustomTimeout{},
502+
}
503+
gqltesting.RunSubscribe(t, &gqltesting.TestSubscription{
504+
Schema: graphql.MustParseSchema(`
505+
type Query {}
506+
type Subscription {
507+
onTimeout : Message!
508+
}
509+
510+
type Message {
511+
msg: String!
512+
}
513+
`, r, graphql.SubscribeResolverTimeout(1*time.Millisecond)),
514+
Query: `
515+
subscription {
516+
onTimeout { msg }
517+
}
518+
`,
519+
ExpectedResults: []gqltesting.TestResponse{
520+
{Errors: []*qerrors.QueryError{{Message: "context deadline exceeded"}}},
521+
},
522+
})
523+
}

subscriptions.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ func (s *Schema) subscribe(ctx context.Context, queryString string, operationNam
5454
Vars: variables,
5555
Schema: s.schema,
5656
},
57-
Limiter: make(chan struct{}, s.maxParallelism),
58-
Tracer: s.tracer,
59-
Logger: s.logger,
57+
Limiter: make(chan struct{}, s.maxParallelism),
58+
Tracer: s.tracer,
59+
Logger: s.logger,
60+
SubscribeResolverTimeout: s.subscribeResolverTimeout,
6061
}
6162
varTypes := make(map[string]*introspection.Type)
6263
for _, v := range op.Vars {

0 commit comments

Comments
 (0)