Skip to content

Commit 025dd39

Browse files
committed
feat(websocket): support websocket in app-service;
1 parent 1c12af5 commit 025dd39

File tree

10 files changed

+360
-13
lines changed

10 files changed

+360
-13
lines changed

packages/app-service/package-lock.json

+47-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/app-service/package.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
"multer": "^1.4.2",
4040
"node-modules-utils": "^0.6.2",
4141
"nodemailer": "^6.6.3",
42-
"validator": "^12.2.0"
42+
"validator": "^12.2.0",
43+
"ws": "^8.2.3"
4344
},
4445
"devDependencies": {
4546
"@types/dotenv": "^8.2.0",
@@ -53,6 +54,7 @@
5354
"@types/node": "^16.7.10",
5455
"@types/nodemailer": "^6.4.4",
5556
"@types/validator": "^13.1.3",
57+
"@types/ws": "^8.2.0",
5658
"typescript": "^4.2.3"
5759
},
5860
"nodemonConfig": {

packages/app-service/src/cloud-sdk/index.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import { getToken, parseToken } from "../lib/utils/token"
1010
import { invokeInFunction } from "./invoke"
1111
import { createFileStorage } from "../lib/storage"
1212
import { CloudFunction } from "../lib/function"
13+
import { WebSocket } from "ws"
14+
import { WebSocketAgent } from "../lib/ws"
1315

1416

1517
export type InvokeFunctionType = (name: string, param: FunctionContext) => Promise<any>
@@ -90,6 +92,11 @@ export interface CloudSdkInterface {
9092
* 3. 聚合操作
9193
*/
9294
mongo: MongoDriverObject
95+
96+
/**
97+
* WebSocket 连接例表
98+
*/
99+
sockets: Set<WebSocket>
93100
}
94101

95102

@@ -123,7 +130,8 @@ export function create() {
123130
mongo: {
124131
client: DatabaseAgent.accessor.conn,
125132
db: DatabaseAgent.accessor.db
126-
}
133+
},
134+
sockets: WebSocketAgent.clients
127135
}
128136
return cloud
129137
}

packages/app-service/src/index.ts

+19-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* @Author: Maslow<[email protected]>
33
* @Date: 2021-07-30 10:30:29
4-
* @LastEditTime: 2021-11-03 16:28:03
4+
* @LastEditTime: 2021-11-09 19:09:30
55
* @Description:
66
*/
77

@@ -12,6 +12,7 @@ import { router } from './router/index'
1212
import { logger } from './lib/logger'
1313
import { generateUUID } from './lib/utils/rand'
1414
import { initCloudSdkPackage } from './lib/utils/init'
15+
import { WebSocketAgent } from './lib/ws'
1516

1617
initCloudSdkPackage()
1718

@@ -20,9 +21,9 @@ initCloudSdkPackage()
2021
*/
2122
export * from './cloud-sdk'
2223

23-
const server = express()
24-
server.use(express.json() as any)
25-
server.use(express.urlencoded({
24+
const app = express()
25+
app.use(express.json() as any)
26+
app.use(express.urlencoded({
2627
extended: true
2728
}) as any)
2829

@@ -37,7 +38,7 @@ process.on('uncaughtException', err => {
3738
/**
3839
* Allow CORS by default
3940
*/
40-
server.all('*', function (_req, res, next) {
41+
app.all('*', function (_req, res, next) {
4142
res.header('Access-Control-Allow-Origin', '*')
4243
res.header('Access-Control-Allow-Headers', 'Authorization, Content-Type')
4344
res.header('Access-Control-Allow-Methods', '*')
@@ -48,7 +49,7 @@ server.all('*', function (_req, res, next) {
4849
/**
4950
* Parsing bearer token
5051
*/
51-
server.use(function (req, res, next) {
52+
app.use(function (req, res, next) {
5253
const token = splitBearerToken(req.headers['authorization'] ?? '')
5354
const auth = parseToken(token) || null
5455
req['auth'] = auth
@@ -60,6 +61,16 @@ server.use(function (req, res, next) {
6061
next()
6162
})
6263

63-
server.use(router)
64+
app.use(router)
65+
66+
const server = app.listen(Config.PORT, () => logger.info(`server ${process.pid} listened on ${Config.PORT}`))
67+
68+
/**
69+
* WebSocket upgrade & connect
70+
*/
71+
server.on('upgrade', (req, socket, head) => {
72+
WebSocketAgent.server.handleUpgrade(req, socket as any, head, (client) => {
73+
WebSocketAgent.server.emit('connection', client, req)
74+
})
75+
})
6476

65-
server.listen(Config.PORT, () => logger.info(`server ${process.pid} listened on ${Config.PORT}`))

packages/app-service/src/lib/scheduler/scheduler.ts

+30-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
/*
22
* @Author: Maslow<[email protected]>
33
* @Date: 2021-07-30 10:30:29
4-
* @LastEditTime: 2021-11-05 14:46:49
4+
* @LastEditTime: 2021-11-09 20:02:51
55
* @Description:
66
*/
77

88
import { getFunctionById } from "../../api/function"
99
import { addFunctionLog, CloudFunctionLogStruct } from "../../api/function-log"
10-
import { CloudFunction, TriggerScheduler } from "cloud-function-engine"
10+
import { TriggerScheduler } from "cloud-function-engine"
1111
import { createLogger } from "../logger"
1212
import assert = require("assert")
1313
import { ObjectId } from "bson"
14+
import { WebSocket } from "ws"
15+
import { IncomingMessage } from "http"
16+
import { CloudFunction } from "../function"
1417

1518

1619
const logger = createLogger('scheduler')
@@ -37,6 +40,31 @@ export class FrameworkScheduler extends TriggerScheduler {
3740
return func
3841
}
3942

43+
44+
/**
45+
* Trigger an websocket event
46+
* @param event the event name
47+
* @param data the params for function
48+
*/
49+
public websocketEmit(event: string, data: any, socket: WebSocket, request?: IncomingMessage) {
50+
51+
// filter triggers by given eventName
52+
const triggers = this.getEventTriggers()
53+
.filter(tri => tri.event === event)
54+
55+
// trigger the functions' execution
56+
for (const tri of triggers) {
57+
const param: any = {
58+
params: data,
59+
method: event,
60+
requestId: `trigger_${tri.id}`,
61+
socket,
62+
headers: request?.headers
63+
}
64+
this.executeFunction(tri.func_id, param, tri)
65+
}
66+
}
67+
4068
/**
4169
* Will be called by TriggerScheduler
4270
* @override

packages/app-service/src/lib/ws.ts

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { IncomingMessage } from 'http'
2+
import { RawData, WebSocket, WebSocketServer } from 'ws'
3+
import { logger } from './logger'
4+
import { SchedulerInstance } from './scheduler'
5+
6+
7+
export class WebSocketAgent {
8+
private static _server = null
9+
10+
static get server(): WebSocketServer {
11+
if (!this._server) {
12+
this._server = new WebSocketServer({ noServer: true })
13+
this.server.on('connection', handleSocketConnection)
14+
this.server.on('error', error => logger.error('websocket server got error:', error))
15+
}
16+
17+
return this._server
18+
}
19+
20+
static get clients() {
21+
return this.server.clients
22+
}
23+
}
24+
25+
/**
26+
* Handle socket connection
27+
* @param socket
28+
* @param request
29+
*/
30+
function handleSocketConnection(socket: WebSocket, request: IncomingMessage) {
31+
logger.debug(`socket connected`, request.headers)
32+
33+
socket.on('message', (data, isBinary) => {
34+
handleSocketMessage(socket, data, isBinary)
35+
})
36+
37+
socket.on('error', err => handleSocketError(socket, err))
38+
socket.on('close', (code, reason) => handleSocketClose(socket, code, reason))
39+
40+
SchedulerInstance.websocketEmit('WebSocket:connection', null, socket, request)
41+
}
42+
43+
/**
44+
* Handle socket message
45+
* @param _socket
46+
* @param _data
47+
* @param _isBinary
48+
*/
49+
async function handleSocketMessage(socket: WebSocket, data: RawData, isBinary: boolean) {
50+
const param = { data, isBinary }
51+
SchedulerInstance.websocketEmit('WebSocket:message', param, socket)
52+
}
53+
54+
/**
55+
* Handle socket close
56+
* @param _socket
57+
* @param _code
58+
* @param _reason
59+
*/
60+
function handleSocketClose(socket: WebSocket, code: number, reason: Buffer) {
61+
const param = { code, reason }
62+
SchedulerInstance.websocketEmit('WebSocket:close', param, socket)
63+
}
64+
65+
/**
66+
* Handle socket error
67+
* @param _socket
68+
* @param error
69+
*/
70+
function handleSocketError(socket: WebSocket, error: Error) {
71+
logger.error('websocket got err', error)
72+
73+
const param = error
74+
SchedulerInstance.websocketEmit('WebSocket:error', param, socket)
75+
}

packages/system-client/nginx.conf

+21
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,30 @@ server {
4242
set $service_id app_$appid;
4343
proxy_pass http://$service_id:8000;
4444
add_header appid $appid;
45+
# proxy_set_header Host $host:$server_port;
46+
# proxy_set_header Host $host;
47+
# proxy_http_version 1.1;
48+
# proxy_set_header Upgrade $http_upgrade;
49+
# proxy_set_header Connection "upgrade";
50+
# proxy_read_timeout 6000s;
4551
}
4652
}
4753

54+
location /socket {
55+
resolver 127.0.0.11;
56+
if ($host ~* "(\w{8}(-\w{4}){3}-\w{12})\.(.+)$") {
57+
set $appid $1;
58+
set $service_id app_$appid;
59+
proxy_pass http://$service_id:8000/;
60+
# add_header appid $appid;
61+
}
62+
proxy_set_header Host $host;
63+
proxy_http_version 1.1;
64+
proxy_set_header Upgrade $http_upgrade;
65+
proxy_set_header Connection "upgrade";
66+
proxy_read_timeout 6000s;
67+
}
68+
4869
location /deploy/incoming {
4970
if ($host ~* "(\w{8}(-\w{4}){3}-\w{12})\.(.+)$") {
5071
set $appid $1;

packages/system-client/src/components/FunctionEditor/types/globals.js

+7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { response_type } from './response_type'
2+
import { websocket_type } from './websocket_type'
23

34
export const global_declare = `
45
${response_type}
6+
${websocket_type}
57
68
declare class FunctionConsole {
79
private _logs;
@@ -71,6 +73,11 @@ interface FunctionContext {
7173
* Express Response 对象
7274
*/
7375
response: HttpResponse
76+
77+
/**
78+
* WebSocket 对象
79+
*/
80+
socket?: WebSocket
7481
}
7582
7683
interface IModule {

packages/system-client/src/components/FunctionEditor/types/index.js

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ export class AutoImportTypings {
6262
if (!this.isLoaded('axios')) { this.loadDeclaration('axios') }
6363
if (!this.isLoaded('cloud-function-engine')) { this.loadDeclaration('cloud-function-engine') }
6464
if (!this.isLoaded('mongodb')) { this.loadDeclaration('mongodb') }
65+
if (!this.isLoaded('ws')) { this.loadDeclaration('ws') }
6566
if (!this.isLoaded('@types/node')) { this.loadDeclaration('@types/node') }
6667
}
6768

0 commit comments

Comments
 (0)