|
| 1 | +# Subscriptions with OpenAPI-to-GraphQL |
| 2 | +Since version x.x.x, OpenAPI-to-GraphQL supports [GraphQL _subscription_ operations](http://spec.graphql.org/draft/#sec-Subscription). In GraphQL, using a subscription query, clients subscribe to updates on the data defined in the query. In this scneario, when data changes, the server publishes these changes to all clients that have active subscriptions for that data. |
| 3 | + |
| 4 | +The OpenAPI specification can define similar behavior using [callbacks](https://swagger.io/specification/#callbackObject): a callback defines a request that the server may initiate in response to receiving another request. Callbacks can thus be used to model publish/subscribe behavior. I.e., when the server receives a request to update some data, it can then itself issue callback requests (outside of the first request/response cycle) to any number of subscribed clients to inform about the new data. |
| 5 | + |
| 6 | +When enabling the `createSubscriptionsFromCallbacks` option, OpenAPI-to-GraphQL creates a subscription field for any operation in the given OpenAPI document that defines a `callback` object. In such cases, OpenAPI-to-GraphQL creates a [subscribe](http://spec.graphql.org/draft/#Subscribe()) function responsible to subscribing clients to receive results of callbacks being executed, and a special form of a [resolve](http://spec.graphql.org/draft/#ResolveFieldEventStream()) function, which pushes data updates to subscribed clients. |
| 7 | + |
| 8 | +To create these two functions, OpenAPI-to-GraphQL relies on the popular [graphql-subscriptions](https://github.com/apollographql/graphql-subscriptions) package, which provides a unified API to support different network transports (like WebSockets, MQTT, Redis etc. - see this [list of supported transports](https://github.com/apollographql/graphql-subscriptions#pubsub-implementations)). |
| 9 | + |
| 10 | +A typical example of using OpenAPI-to-GraphQL to create a GraphQL server supporting subscriptions may look like this: |
| 11 | + |
| 12 | +### Creating PubSub instance |
| 13 | + |
| 14 | +First, initialize a PubSub instance to spread events between your API and the GraphQL Server, in a `pubsub.js` file. |
| 15 | + |
| 16 | +```javascript |
| 17 | +import { EventEmitter2 } from 'eventEmitter2'; |
| 18 | +import { PubSub } = from 'graphql-subscriptions' |
| 19 | + |
| 20 | +const eventEmitter = new EventEmitter2({ |
| 21 | + wildcard: true, |
| 22 | + delimiter: '/' |
| 23 | +}); |
| 24 | + |
| 25 | +// Create the PubSub instance ( here by wrapping an EventEmitter client ) |
| 26 | +const pubsub = new PubSub() |
| 27 | + |
| 28 | +export default pubsub |
| 29 | +``` |
| 30 | + |
| 31 | +PubSub could also wrap an MQTT client connected to a broker, like in this [example API](../test/example_api5_server.js). |
| 32 | + |
| 33 | +```javascript |
| 34 | +import { connect } = from 'mqtt' |
| 35 | +import { MQTTPubSub } = from 'graphql-mqtt-subscriptions' |
| 36 | + |
| 37 | +const MQTT_PORT = 1883 |
| 38 | + |
| 39 | +// Create a PubSub instance ( here by wrapping a MQTT client ) |
| 40 | +const client = connect(`mqtt://localhost:${MQTT_PORT}`) |
| 41 | + |
| 42 | +const pubsub = new MQTTPubSub({ |
| 43 | + client |
| 44 | +}) |
| 45 | + |
| 46 | +export default pubsub |
| 47 | +``` |
| 48 | + |
| 49 | +## GraphQL server |
| 50 | + |
| 51 | +Create GraphQL schema, resolvers and endpoints. |
| 52 | + |
| 53 | +```javascript |
| 54 | +import { createGraphQLSchema } from 'openapi-to-graphql' |
| 55 | +import express from 'express' |
| 56 | +import { graphqlExpress } from 'apollo-server-express' |
| 57 | +import { execute, printSchema, subscribe } from 'graphql' |
| 58 | +import { SubscriptionServer } from 'subscriptions-transport-ws' |
| 59 | +import { createServer } from 'http' |
| 60 | +import { pubsub } from './pubsub' |
| 61 | + |
| 62 | +const HTTP_PORT = 3000 |
| 63 | + |
| 64 | +const init = async () => { |
| 65 | + // Let OpenAPI-to-GraphQL create the schema |
| 66 | + const schema = await createGraphQLSchema(oasWithCallbackObjects, { |
| 67 | + createSubscriptionsFromCallbacks: true |
| 68 | + }) |
| 69 | + |
| 70 | + // Log GraphQL schema... |
| 71 | + const myGraphQLSchema = printSchema(schema) |
| 72 | + console.log(myGraphQLSchema) |
| 73 | + |
| 74 | + // Set up GraphQL server using Express.js |
| 75 | + const app = express() |
| 76 | + app.use('/graphql', graphqlExpress({ schema })) |
| 77 | + |
| 78 | + // Wrap the Express server... |
| 79 | + const wsServer = createServer(app) |
| 80 | + |
| 81 | + // ...and set up the WebSocket for handling GraphQL subscriptions |
| 82 | + wsServer.listen(HTTP_PORT, () => { |
| 83 | + new SubscriptionServer( |
| 84 | + { |
| 85 | + execute, |
| 86 | + subscribe, |
| 87 | + schema, |
| 88 | + onConnect: (params, socket, ctx) => { |
| 89 | + // adding pubsub to context |
| 90 | + // to be used by graphQL subscribe field |
| 91 | + return { pubsub } |
| 92 | + } |
| 93 | + }, |
| 94 | + { |
| 95 | + server: wsServer, |
| 96 | + path: '/subscriptions' |
| 97 | + } |
| 98 | + ) |
| 99 | + }) |
| 100 | +} |
| 101 | + |
| 102 | +init() |
| 103 | +``` |
| 104 | + |
| 105 | +## API server |
| 106 | + |
| 107 | +A simple example could be the following, when an HTTP client tries to create a device ( via `post('/api/devices')` route ) an event is published by the PubSub instance. |
| 108 | +If a callback like [#/components/callbacks/DevicesEvent](../test/fixtures/example_oas5.json) is declared in your OpenAPI schema and used in path `/devices` for the `post` Operation, a subscription field will be generated by OpenAPI-to-GraphQL. |
| 109 | + |
| 110 | + |
| 111 | +```javascript |
| 112 | +import express from 'express' |
| 113 | +import bodyParser from 'body-parser' |
| 114 | +import pubsub from './pubsub' |
| 115 | + |
| 116 | +const HTTP_PORT = 4000 |
| 117 | + |
| 118 | +const Devices = { |
| 119 | + 'Audio-player': { |
| 120 | + name: 'Audio-player', |
| 121 | + userName: 'johnny' |
| 122 | + }, |
| 123 | + Drone: { |
| 124 | + name: 'Drone', |
| 125 | + userName: 'eric' |
| 126 | + } |
| 127 | +} |
| 128 | + |
| 129 | +const startServer = () => { |
| 130 | + const app = express() |
| 131 | + |
| 132 | + app.use(bodyParser.json()) |
| 133 | + |
| 134 | + const httpServer = app.listen(HTTP_PORT, () => { |
| 135 | + app.get('/api/devices', (req, res) => { |
| 136 | + res.status(200).send(Object.values(Devices)) |
| 137 | + }) |
| 138 | + |
| 139 | + app.post('/api/devices', (req, res) => { |
| 140 | + if (req.body.userName && req.body.name) { |
| 141 | + const device = req.body |
| 142 | + Devices[device.name] = device |
| 143 | + const packet = { |
| 144 | + topic: `/api/${device.userName}/devices/${req.method.toUpperCase()}/${ |
| 145 | + device.name |
| 146 | + }`, |
| 147 | + payload: Buffer.from(JSON.stringify(device)) |
| 148 | + } |
| 149 | + |
| 150 | + // use pubsub to publish the event |
| 151 | + pubsub.publish(packet) |
| 152 | + |
| 153 | + res.status(200).send(device) |
| 154 | + } else { |
| 155 | + res.status(404).send({ |
| 156 | + message: 'Wrong device schema' |
| 157 | + }) |
| 158 | + } |
| 159 | + }) |
| 160 | + |
| 161 | + app.get('/api/devices/:deviceName', (req, res) => { |
| 162 | + if (req.params.deviceName in Devices) { |
| 163 | + res.status(200).send(Devices[req.params.deviceName]) |
| 164 | + } else { |
| 165 | + res.status(404).send({ |
| 166 | + message: 'Wrong device ID.' |
| 167 | + }) |
| 168 | + } |
| 169 | + }) |
| 170 | + |
| 171 | + }) |
| 172 | +} |
| 173 | + |
| 174 | +startServer() |
| 175 | +``` |
| 176 | + |
| 177 | +## GrapQL client |
| 178 | + |
| 179 | +If any GraphQL (WS) client subscribed to the route defined by the callback (`#/components/callbacks/DevicesEvent`), it will get the content transfered by PubSub. |
| 180 | + |
| 181 | +```javascript |
| 182 | +import axios from 'axios' |
| 183 | +import { SubscriptionClient } from 'subscriptions-transport-ws' |
| 184 | +import pubsub from './pubsub' |
| 185 | + |
| 186 | +const GRAPHQL_HTTP_PORT = 3000 |
| 187 | +const REST_HTTP_PORT = 4000 |
| 188 | + |
| 189 | +const device = { |
| 190 | + userName: 'Carlos', |
| 191 | + name: 'Bot' |
| 192 | +} |
| 193 | + |
| 194 | +const startClient = () => { |
| 195 | + // generate subscription : |
| 196 | + // via GraphQL WS API |
| 197 | + const client = new SubscriptionClient( |
| 198 | + `ws://localhost:${GRAPHQL_HTTP_PORT}/subscriptions` |
| 199 | + ) |
| 200 | + |
| 201 | + client.request({ |
| 202 | + query: `subscription watchDevice($topicInput: TopicInput!) { |
| 203 | + devicesEventListener(topicInput: $topicInput) { |
| 204 | + name |
| 205 | + userName |
| 206 | + status |
| 207 | + } |
| 208 | + }`, |
| 209 | + operationName: 'watchDevice', |
| 210 | + variables: { |
| 211 | + topicInput: { |
| 212 | + method: 'POST', |
| 213 | + userName: `${device.userName}` |
| 214 | + } |
| 215 | + } |
| 216 | + }) |
| 217 | + .subscribe({ |
| 218 | + next: {data} => { |
| 219 | + console.log('Device created', data) |
| 220 | + }, |
| 221 | + }) |
| 222 | + |
| 223 | + // or directly via PubSub instance |
| 224 | + // like OpenAPI-to-GraphQL would do |
| 225 | + pubsub.subscribe(`/api/${device.userName}/devices/POST/*`, (...args) => { |
| 226 | + console.log('Device created', args) |
| 227 | + }) |
| 228 | + |
| 229 | + |
| 230 | + // trigger device creation: |
| 231 | + // via GraphQL HTTP API |
| 232 | + axios({ |
| 233 | + url: `http://localhost:${GRAPHQL_HTTP_PORT}/graphql`, |
| 234 | + method: 'POST', |
| 235 | + json: true, |
| 236 | + data: { |
| 237 | + query: `mutation($deviceInput: DeviceInput!) { |
| 238 | + createDevice(deviceInput: $deviceInput) { |
| 239 | + name |
| 240 | + userName |
| 241 | + } |
| 242 | + }`, |
| 243 | + variables: device, |
| 244 | + }, |
| 245 | + }) |
| 246 | + |
| 247 | + // or via REST API |
| 248 | + // like OpenAPI-to-GraphQL would do |
| 249 | + axios({ |
| 250 | + url: `http://localhost:${REST_HTTP_PORT}/api/devices`, |
| 251 | + method: 'POST', |
| 252 | + json: true, |
| 253 | + data: device, |
| 254 | + }) |
| 255 | +} |
| 256 | + |
| 257 | +startClient() |
| 258 | +``` |
| 259 | + |
| 260 | +In this example, we rely on the [`subscriptions-transport-ws` package](https://github.com/apollographql/subscriptions-transport-ws) to create a `SubscriptionServer` that manages WebSockets connections between the GraphQL clients and our server. We also rely on the `graphqlExpress` server provided by the [`apollo-server-express` package](https://github.com/apollographql/apollo-server/tree/master/packages/apollo-server-express) to serve GraphQL from Express.js. |
| 261 | + |
| 262 | + |
| 263 | +Sidenote concerning callback, as you can see in the example Callback, the path (runtime expression) `/api/{$request.body#/userName}/devices/{$request.body#/method}/+` is delimited by `/` and ends with `+`, these symbols are interpreted as delimiters and wildcard when using MQTT topics. |
| 264 | +It needs to be adapted accordingly to the client wrapped in your PubSub instance, for eventEmitter2 you can use `*` and define your own delimiter. |
| 265 | +An helper might be provided in the future, to simplify this process. |
| 266 | + |
| 267 | +## Examples |
| 268 | + |
| 269 | +You can also run the example provided in this project. |
| 270 | + |
| 271 | +Start REST API server (HTTP and MQTT) : |
| 272 | +```sh |
| 273 | +npm run api_sub |
| 274 | +``` |
| 275 | + |
| 276 | +Start GRAPHQL server (HTTP and WS) : |
| 277 | +```sh |
| 278 | +npm run start_dev_sub |
| 279 | +``` |
0 commit comments