Skip to content

Commit 62a1149

Browse files
authored
fix websocket reconnect error (#1064)
1 parent 455d45f commit 62a1149

File tree

2 files changed

+39
-11
lines changed

2 files changed

+39
-11
lines changed

pkg/web/ws.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ const wsInitialPingTime = 1 * time.Second
3030

3131
const DefaultCommandTimeout = 2 * time.Second
3232

33+
var GlobalLock = &sync.Mutex{}
34+
var RouteToConnMap = map[string]string{} // routeid => connid
35+
3336
func RunWebSocketServer(listener net.Listener) {
3437
gr := mux.NewRouter()
3538
gr.HandleFunc("/ws", HandleWs)
@@ -240,6 +243,31 @@ func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, routeI
240243
}
241244
}
242245

246+
func registerConn(wsConnId string, routeId string, wproxy *wshutil.WshRpcProxy) {
247+
GlobalLock.Lock()
248+
defer GlobalLock.Unlock()
249+
curConnId := RouteToConnMap[routeId]
250+
if curConnId != "" {
251+
log.Printf("[websocket] warning: replacing existing connection for route %q\n", routeId)
252+
wshutil.DefaultRouter.UnregisterRoute(routeId)
253+
}
254+
RouteToConnMap[routeId] = wsConnId
255+
wshutil.DefaultRouter.RegisterRoute(routeId, wproxy)
256+
}
257+
258+
func unregisterConn(wsConnId string, routeId string) {
259+
GlobalLock.Lock()
260+
defer GlobalLock.Unlock()
261+
curConnId := RouteToConnMap[routeId]
262+
if curConnId != wsConnId {
263+
// only unregister if we are the current connection (otherwise we were already removed)
264+
log.Printf("[websocket] warning: trying to unregister connection %q for route %q but it is not the current connection (ignoring)\n", wsConnId, routeId)
265+
return
266+
}
267+
delete(RouteToConnMap, routeId)
268+
wshutil.DefaultRouter.UnregisterRoute(routeId)
269+
}
270+
243271
func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
244272
windowId := r.URL.Query().Get("windowid")
245273
if windowId == "" {
@@ -261,23 +289,19 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
261289
wsConnId := uuid.New().String()
262290
outputCh := make(chan any, 100)
263291
closeCh := make(chan any)
264-
eventbus.RegisterWSChannel(wsConnId, windowId, outputCh)
265292
var routeId string
266293
if windowId == wshutil.ElectronRoute {
267294
routeId = wshutil.ElectronRoute
268295
} else {
269296
routeId = wshutil.MakeWindowRouteId(windowId)
270297
}
271-
defer eventbus.UnregisterWSChannel(wsConnId)
272298
log.Printf("[websocket] new connection: windowid:%s connid:%s routeid:%s\n", windowId, wsConnId, routeId)
273-
// we create a wshproxy to handle rpc messages to/from the window
274-
wproxy := wshutil.MakeRpcProxy()
275-
wshutil.DefaultRouter.RegisterRoute(routeId, wproxy)
276-
defer func() {
277-
wshutil.DefaultRouter.UnregisterRoute(routeId)
278-
close(wproxy.ToRemoteCh)
279-
}()
280-
// WshServerFactoryFn(rpcInputCh, rpcOutputCh, wshrpc.RpcContext{})
299+
eventbus.RegisterWSChannel(wsConnId, windowId, outputCh)
300+
defer eventbus.UnregisterWSChannel(wsConnId)
301+
wproxy := wshutil.MakeRpcProxy() // we create a wshproxy to handle rpc messages to/from the window
302+
defer close(wproxy.ToRemoteCh)
303+
registerConn(wsConnId, routeId, wproxy)
304+
defer unregisterConn(wsConnId, routeId)
281305
wg := &sync.WaitGroup{}
282306
wg.Add(2)
283307
go func() {

pkg/wshutil/wshrouter.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,14 @@ func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient) {
274274
log.Printf("[router] registering wsh route %q\n", routeId)
275275
router.Lock.Lock()
276276
defer router.Lock.Unlock()
277+
alreadyExists := router.RouteMap[routeId] != nil
278+
if alreadyExists {
279+
log.Printf("[router] warning: route %q already exists (replacing)\n", routeId)
280+
}
277281
router.RouteMap[routeId] = rpc
278282
go func() {
279283
// announce
280-
if router.GetUpstreamClient() != nil {
284+
if !alreadyExists && router.GetUpstreamClient() != nil {
281285
announceMsg := RpcMessage{Command: wshrpc.Command_RouteAnnounce, Source: routeId}
282286
announceBytes, _ := json.Marshal(announceMsg)
283287
router.GetUpstreamClient().SendRpcMessage(announceBytes)

0 commit comments

Comments
 (0)