@@ -987,30 +987,48 @@ void ZooKeeper::receiveEvent()
987
987
if (it == operations.end ())
988
988
throw Exception (" Received response for unknown xid" , ZRUNTIMEINCONSISTENCY);
989
989
990
+ // / After this point, we must invoke callback, that we've grabbed from 'operations'.
991
+ // / Invariant: all callbacks are invoked either in case of success or in case of error.
992
+ // / (all callbacks in 'operations' are guaranteed to be invoked)
993
+
990
994
request_info = std::move (it->second );
991
995
operations.erase (it);
992
996
CurrentMetrics::sub (CurrentMetrics::ZooKeeperRequest);
993
997
}
994
998
995
- response = request_info.request ->makeResponse ();
996
-
997
999
auto elapsed_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(clock ::now () - request_info.time ).count ();
998
1000
ProfileEvents::increment (ProfileEvents::ZooKeeperWaitMicroseconds, elapsed_microseconds);
999
1001
}
1000
1002
1001
- if (err)
1002
- response->error = err;
1003
- else
1003
+ try
1004
1004
{
1005
- response->readImpl (*in);
1006
- response->removeRootPath (root_path);
1005
+ if (!response)
1006
+ response = request_info.request ->makeResponse ();
1007
+
1008
+ if (err)
1009
+ response->error = err;
1010
+ else
1011
+ {
1012
+ response->readImpl (*in);
1013
+ response->removeRootPath (root_path);
1014
+ }
1015
+
1016
+ int32_t actual_length = in->count () - count_before_event;
1017
+ if (length != actual_length)
1018
+ throw Exception (" Response length doesn't match. Expected: " + toString (length) + " , actual: " + toString (actual_length), ZMARSHALLINGERROR);
1007
1019
}
1020
+ catch (...)
1021
+ {
1022
+ tryLogCurrentException (__PRETTY_FUNCTION__);
1008
1023
1009
- int32_t actual_length = in->count () - count_before_event;
1010
- if (length != actual_length)
1011
- throw Exception (" Response length doesn't match. Expected: " + toString (length) + " , actual: " + toString (actual_length), ZMARSHALLINGERROR);
1024
+ response->error = ZMARSHALLINGERROR;
1025
+ if (request_info.callback )
1026
+ request_info.callback (*response);
1027
+
1028
+ throw ;
1029
+ }
1012
1030
1013
- // / NOTE: Exception in callback will propagate to receiveThread and will lead to session expiration. This is Ok.
1031
+ // / Exception in callback will propagate to receiveThread and will lead to session expiration. This is Ok.
1014
1032
1015
1033
if (request_info.callback )
1016
1034
request_info.callback (*response);
0 commit comments