@@ -114,6 +114,8 @@ static int atomic_dec_return_safe(atomic_t *v)
114
114
115
115
#define RBD_OBJ_PREFIX_LEN_MAX 64
116
116
117
+ #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
118
+
117
119
/* Feature bits */
118
120
119
121
#define RBD_FEATURE_LAYERING (1<<0)
@@ -319,6 +321,12 @@ struct rbd_img_request {
319
321
#define for_each_obj_request_safe (ireq , oreq , n ) \
320
322
list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
321
323
324
+ enum rbd_watch_state {
325
+ RBD_WATCH_STATE_UNREGISTERED ,
326
+ RBD_WATCH_STATE_REGISTERED ,
327
+ RBD_WATCH_STATE_ERROR ,
328
+ };
329
+
322
330
struct rbd_mapping {
323
331
u64 size ;
324
332
u64 features ;
@@ -352,7 +360,11 @@ struct rbd_device {
352
360
353
361
struct ceph_file_layout layout ; /* used for all rbd requests */
354
362
363
+ struct mutex watch_mutex ;
364
+ enum rbd_watch_state watch_state ;
355
365
struct ceph_osd_linger_request * watch_handle ;
366
+ u64 watch_cookie ;
367
+ struct delayed_work watch_dwork ;
356
368
357
369
struct workqueue_struct * task_wq ;
358
370
@@ -3083,9 +3095,6 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3083
3095
obj_request_done_set (obj_request );
3084
3096
}
3085
3097
3086
- static int rbd_dev_header_watch_sync (struct rbd_device * rbd_dev );
3087
- static void __rbd_dev_header_unwatch_sync (struct rbd_device * rbd_dev );
3088
-
3089
3098
static void rbd_watch_cb (void * arg , u64 notify_id , u64 cookie ,
3090
3099
u64 notifier_id , void * data , size_t data_len )
3091
3100
{
@@ -3113,35 +3122,34 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3113
3122
rbd_warn (rbd_dev , "notify_ack ret %d" , ret );
3114
3123
}
3115
3124
3125
+ static void __rbd_unregister_watch (struct rbd_device * rbd_dev );
3126
+
3116
3127
static void rbd_watch_errcb (void * arg , u64 cookie , int err )
3117
3128
{
3118
3129
struct rbd_device * rbd_dev = arg ;
3119
- int ret ;
3120
3130
3121
3131
rbd_warn (rbd_dev , "encountered watch error: %d" , err );
3122
3132
3123
- __rbd_dev_header_unwatch_sync (rbd_dev );
3133
+ mutex_lock (& rbd_dev -> watch_mutex );
3134
+ if (rbd_dev -> watch_state == RBD_WATCH_STATE_REGISTERED ) {
3135
+ __rbd_unregister_watch (rbd_dev );
3136
+ rbd_dev -> watch_state = RBD_WATCH_STATE_ERROR ;
3124
3137
3125
- ret = rbd_dev_header_watch_sync (rbd_dev );
3126
- if (ret ) {
3127
- rbd_warn (rbd_dev , "failed to reregister watch: %d" , ret );
3128
- return ;
3138
+ queue_delayed_work (rbd_dev -> task_wq , & rbd_dev -> watch_dwork , 0 );
3129
3139
}
3130
-
3131
- ret = rbd_dev_refresh (rbd_dev );
3132
- if (ret )
3133
- rbd_warn (rbd_dev , "reregisteration refresh failed: %d" , ret );
3140
+ mutex_unlock (& rbd_dev -> watch_mutex );
3134
3141
}
3135
3142
3136
3143
/*
3137
- * Initiate a watch request, synchronously.
3144
+ * watch_mutex must be locked
3138
3145
*/
3139
- static int rbd_dev_header_watch_sync (struct rbd_device * rbd_dev )
3146
+ static int __rbd_register_watch (struct rbd_device * rbd_dev )
3140
3147
{
3141
3148
struct ceph_osd_client * osdc = & rbd_dev -> rbd_client -> client -> osdc ;
3142
3149
struct ceph_osd_linger_request * handle ;
3143
3150
3144
3151
rbd_assert (!rbd_dev -> watch_handle );
3152
+ dout ("%s rbd_dev %p\n" , __func__ , rbd_dev );
3145
3153
3146
3154
handle = ceph_osdc_watch (osdc , & rbd_dev -> header_oid ,
3147
3155
& rbd_dev -> header_oloc , rbd_watch_cb ,
@@ -3153,13 +3161,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3153
3161
return 0 ;
3154
3162
}
3155
3163
3156
- static void __rbd_dev_header_unwatch_sync (struct rbd_device * rbd_dev )
3164
+ /*
3165
+ * watch_mutex must be locked
3166
+ */
3167
+ static void __rbd_unregister_watch (struct rbd_device * rbd_dev )
3157
3168
{
3158
3169
struct ceph_osd_client * osdc = & rbd_dev -> rbd_client -> client -> osdc ;
3159
3170
int ret ;
3160
3171
3161
- if (! rbd_dev -> watch_handle )
3162
- return ;
3172
+ rbd_assert ( rbd_dev -> watch_handle );
3173
+ dout ( "%s rbd_dev %p\n" , __func__ , rbd_dev ) ;
3163
3174
3164
3175
ret = ceph_osdc_unwatch (osdc , rbd_dev -> watch_handle );
3165
3176
if (ret )
@@ -3168,17 +3179,80 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3168
3179
rbd_dev -> watch_handle = NULL ;
3169
3180
}
3170
3181
3171
- /*
3172
- * Tear down a watch request, synchronously.
3173
- */
3174
- static void rbd_dev_header_unwatch_sync (struct rbd_device * rbd_dev )
3182
+ static int rbd_register_watch (struct rbd_device * rbd_dev )
3183
+ {
3184
+ int ret ;
3185
+
3186
+ mutex_lock (& rbd_dev -> watch_mutex );
3187
+ rbd_assert (rbd_dev -> watch_state == RBD_WATCH_STATE_UNREGISTERED );
3188
+ ret = __rbd_register_watch (rbd_dev );
3189
+ if (ret )
3190
+ goto out ;
3191
+
3192
+ rbd_dev -> watch_state = RBD_WATCH_STATE_REGISTERED ;
3193
+ rbd_dev -> watch_cookie = rbd_dev -> watch_handle -> linger_id ;
3194
+
3195
+ out :
3196
+ mutex_unlock (& rbd_dev -> watch_mutex );
3197
+ return ret ;
3198
+ }
3199
+
3200
+ static void cancel_tasks_sync (struct rbd_device * rbd_dev )
3175
3201
{
3176
- __rbd_dev_header_unwatch_sync (rbd_dev );
3202
+ dout ("%s rbd_dev %p\n" , __func__ , rbd_dev );
3203
+
3204
+ cancel_delayed_work_sync (& rbd_dev -> watch_dwork );
3205
+ }
3206
+
3207
+ static void rbd_unregister_watch (struct rbd_device * rbd_dev )
3208
+ {
3209
+ cancel_tasks_sync (rbd_dev );
3210
+
3211
+ mutex_lock (& rbd_dev -> watch_mutex );
3212
+ if (rbd_dev -> watch_state == RBD_WATCH_STATE_REGISTERED )
3213
+ __rbd_unregister_watch (rbd_dev );
3214
+ rbd_dev -> watch_state = RBD_WATCH_STATE_UNREGISTERED ;
3215
+ mutex_unlock (& rbd_dev -> watch_mutex );
3177
3216
3178
- dout ("%s flushing notifies\n" , __func__ );
3179
3217
ceph_osdc_flush_notifies (& rbd_dev -> rbd_client -> client -> osdc );
3180
3218
}
3181
3219
3220
+ static void rbd_reregister_watch (struct work_struct * work )
3221
+ {
3222
+ struct rbd_device * rbd_dev = container_of (to_delayed_work (work ),
3223
+ struct rbd_device , watch_dwork );
3224
+ int ret ;
3225
+
3226
+ dout ("%s rbd_dev %p\n" , __func__ , rbd_dev );
3227
+
3228
+ mutex_lock (& rbd_dev -> watch_mutex );
3229
+ if (rbd_dev -> watch_state != RBD_WATCH_STATE_ERROR )
3230
+ goto fail_unlock ;
3231
+
3232
+ ret = __rbd_register_watch (rbd_dev );
3233
+ if (ret ) {
3234
+ rbd_warn (rbd_dev , "failed to reregister watch: %d" , ret );
3235
+ if (ret != - EBLACKLISTED )
3236
+ queue_delayed_work (rbd_dev -> task_wq ,
3237
+ & rbd_dev -> watch_dwork ,
3238
+ RBD_RETRY_DELAY );
3239
+ goto fail_unlock ;
3240
+ }
3241
+
3242
+ rbd_dev -> watch_state = RBD_WATCH_STATE_REGISTERED ;
3243
+ rbd_dev -> watch_cookie = rbd_dev -> watch_handle -> linger_id ;
3244
+ mutex_unlock (& rbd_dev -> watch_mutex );
3245
+
3246
+ ret = rbd_dev_refresh (rbd_dev );
3247
+ if (ret )
3248
+ rbd_warn (rbd_dev , "reregisteration refresh failed: %d" , ret );
3249
+
3250
+ return ;
3251
+
3252
+ fail_unlock :
3253
+ mutex_unlock (& rbd_dev -> watch_mutex );
3254
+ }
3255
+
3182
3256
/*
3183
3257
* Synchronous osd object method call. Returns the number of bytes
3184
3258
* returned in the outbound buffer, or a negative error code.
@@ -3945,6 +4019,8 @@ static void rbd_spec_free(struct kref *kref)
3945
4019
3946
4020
static void rbd_dev_free (struct rbd_device * rbd_dev )
3947
4021
{
4022
+ WARN_ON (rbd_dev -> watch_state != RBD_WATCH_STATE_UNREGISTERED );
4023
+
3948
4024
ceph_oid_destroy (& rbd_dev -> header_oid );
3949
4025
ceph_oloc_destroy (& rbd_dev -> header_oloc );
3950
4026
@@ -3991,6 +4067,10 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
3991
4067
ceph_oid_init (& rbd_dev -> header_oid );
3992
4068
ceph_oloc_init (& rbd_dev -> header_oloc );
3993
4069
4070
+ mutex_init (& rbd_dev -> watch_mutex );
4071
+ rbd_dev -> watch_state = RBD_WATCH_STATE_UNREGISTERED ;
4072
+ INIT_DELAYED_WORK (& rbd_dev -> watch_dwork , rbd_reregister_watch );
4073
+
3994
4074
rbd_dev -> dev .bus = & rbd_bus_type ;
3995
4075
rbd_dev -> dev .type = & rbd_device_type ;
3996
4076
rbd_dev -> dev .parent = & rbd_root_dev ;
@@ -5222,7 +5302,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5222
5302
goto err_out_format ;
5223
5303
5224
5304
if (!depth ) {
5225
- ret = rbd_dev_header_watch_sync (rbd_dev );
5305
+ ret = rbd_register_watch (rbd_dev );
5226
5306
if (ret ) {
5227
5307
if (ret == - ENOENT )
5228
5308
pr_info ("image %s/%s does not exist\n" ,
@@ -5281,7 +5361,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5281
5361
rbd_dev_unprobe (rbd_dev );
5282
5362
err_out_watch :
5283
5363
if (!depth )
5284
- rbd_dev_header_unwatch_sync (rbd_dev );
5364
+ rbd_unregister_watch (rbd_dev );
5285
5365
err_out_format :
5286
5366
rbd_dev -> image_format = 0 ;
5287
5367
kfree (rbd_dev -> spec -> image_id );
@@ -5348,11 +5428,11 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5348
5428
rc = rbd_dev_device_setup (rbd_dev );
5349
5429
if (rc ) {
5350
5430
/*
5351
- * rbd_dev_header_unwatch_sync () can't be moved into
5431
+ * rbd_unregister_watch () can't be moved into
5352
5432
* rbd_dev_image_release() without refactoring, see
5353
5433
* commit 1f3ef78861ac.
5354
5434
*/
5355
- rbd_dev_header_unwatch_sync (rbd_dev );
5435
+ rbd_unregister_watch (rbd_dev );
5356
5436
rbd_dev_image_release (rbd_dev );
5357
5437
goto out ;
5358
5438
}
@@ -5473,7 +5553,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
5473
5553
if (ret < 0 || already )
5474
5554
return ret ;
5475
5555
5476
- rbd_dev_header_unwatch_sync (rbd_dev );
5556
+ rbd_unregister_watch (rbd_dev );
5477
5557
5478
5558
/*
5479
5559
* Don't free anything from rbd_dev->disk until after all
0 commit comments