18
18
import reactor .core .publisher .Flux ;
19
19
import reactor .core .publisher .Mono ;
20
20
21
- import java .util .List ;
22
-
23
21
import org .reactivestreams .Publisher ;
24
22
25
- import org .springframework .dao .TransientDataAccessResourceException ;
26
23
import org .springframework .data .r2dbc .convert .R2dbcConverter ;
27
24
import org .springframework .data .r2dbc .core .DatabaseClient ;
28
- import org .springframework .data .r2dbc .core .PreparedOperation ;
25
+ import org .springframework .data .r2dbc .core .R2dbcEntityOperations ;
26
+ import org .springframework .data .r2dbc .core .R2dbcEntityTemplate ;
29
27
import org .springframework .data .r2dbc .core .ReactiveDataAccessStrategy ;
30
- import org .springframework .data .r2dbc .core .StatementMapper ;
31
28
import org .springframework .data .r2dbc .query .Criteria ;
29
+ import org .springframework .data .r2dbc .query .Query ;
32
30
import org .springframework .data .relational .core .mapping .RelationalPersistentProperty ;
33
31
import org .springframework .data .relational .core .sql .Functions ;
34
32
import org .springframework .data .relational .core .sql .Select ;
38
36
import org .springframework .data .relational .core .sql .render .SqlRenderer ;
39
37
import org .springframework .data .relational .repository .query .RelationalEntityInformation ;
40
38
import org .springframework .data .repository .reactive .ReactiveCrudRepository ;
39
+ import org .springframework .data .util .Lazy ;
41
40
import org .springframework .transaction .annotation .Transactional ;
42
41
import org .springframework .util .Assert ;
43
42
51
50
public class SimpleR2dbcRepository <T , ID > implements ReactiveCrudRepository <T , ID > {
52
51
53
52
private final RelationalEntityInformation <T , ID > entity ;
54
- private final DatabaseClient databaseClient ;
55
- private final R2dbcConverter converter ;
56
- private final ReactiveDataAccessStrategy accessStrategy ;
53
+ private final R2dbcEntityOperations entityOperations ;
54
+ private final Lazy <RelationalPersistentProperty > idProperty ;
55
+
56
+ /**
57
+ * Create a new {@link SimpleR2dbcRepository}.
58
+ *
59
+ * @param entity
60
+ * @param entityOperations
61
+ * @param converter
62
+ * @since 1.1
63
+ */
64
+ SimpleR2dbcRepository (RelationalEntityInformation <T , ID > entity , R2dbcEntityOperations entityOperations ,
65
+ R2dbcConverter converter ) {
66
+
67
+ this .entity = entity ;
68
+ this .entityOperations = entityOperations ;
69
+ this .idProperty = Lazy .of (() -> converter //
70
+ .getMappingContext () //
71
+ .getRequiredPersistentEntity (this .entity .getJavaType ()) //
72
+ .getRequiredIdProperty ());
73
+ }
57
74
75
+ /**
76
+ * Create a new {@link SimpleR2dbcRepository}.
77
+ *
78
+ * @param entity
79
+ * @param databaseClient
80
+ * @param converter
81
+ * @param accessStrategy
82
+ */
58
83
public SimpleR2dbcRepository (RelationalEntityInformation <T , ID > entity , DatabaseClient databaseClient ,
59
84
R2dbcConverter converter , ReactiveDataAccessStrategy accessStrategy ) {
85
+
60
86
this .entity = entity ;
61
- this .databaseClient = databaseClient ;
62
- this .converter = converter ;
63
- this .accessStrategy = accessStrategy ;
87
+ this .entityOperations = new R2dbcEntityTemplate (databaseClient );
88
+ this .idProperty = Lazy .of (() -> converter //
89
+ .getMappingContext () //
90
+ .getRequiredPersistentEntity (this .entity .getJavaType ()) //
91
+ .getRequiredIdProperty ());
64
92
}
65
93
66
94
/* (non-Javadoc)
@@ -73,28 +101,10 @@ public <S extends T> Mono<S> save(S objectToSave) {
73
101
Assert .notNull (objectToSave , "Object to save must not be null!" );
74
102
75
103
if (this .entity .isNew (objectToSave )) {
76
-
77
- return this .databaseClient .insert () //
78
- .into (this .entity .getJavaType ()) //
79
- .table (this .entity .getTableName ()).using (objectToSave ) //
80
- .map (this .converter .populateIdIfNecessary (objectToSave )) //
81
- .first () //
82
- .defaultIfEmpty (objectToSave );
104
+ return this .entityOperations .insert (objectToSave );
83
105
}
84
106
85
- return this .databaseClient .update () //
86
- .table (this .entity .getJavaType ()) //
87
- .table (this .entity .getTableName ()).using (objectToSave ) //
88
- .fetch ().rowsUpdated ().handle ((rowsUpdated , sink ) -> {
89
-
90
- if (rowsUpdated == 0 ) {
91
- sink .error (new TransientDataAccessResourceException (
92
- String .format ("Failed to update table [%s]. Row with Id [%s] does not exist." ,
93
- this .entity .getTableName (), this .entity .getId (objectToSave ))));
94
- } else {
95
- sink .next (objectToSave );
96
- }
97
- });
107
+ return this .entityOperations .update (objectToSave );
98
108
}
99
109
100
110
/* (non-Javadoc)
@@ -129,20 +139,7 @@ public Mono<T> findById(ID id) {
129
139
130
140
Assert .notNull (id , "Id must not be null!" );
131
141
132
- List <SqlIdentifier > columns = this .accessStrategy .getAllColumns (this .entity .getJavaType ());
133
- String idProperty = getIdProperty ().getName ();
134
-
135
- StatementMapper mapper = this .accessStrategy .getStatementMapper ().forType (this .entity .getJavaType ());
136
- StatementMapper .SelectSpec selectSpec = mapper .createSelect (this .entity .getTableName ()) //
137
- .withProjection (columns ) //
138
- .withCriteria (Criteria .where (idProperty ).is (id ));
139
-
140
- PreparedOperation <?> operation = mapper .getMappedObject (selectSpec );
141
-
142
- return this .databaseClient .execute (operation ) //
143
- .as (this .entity .getJavaType ()) //
144
- .fetch () //
145
- .one ();
142
+ return this .entityOperations .selectOne (getIdQuery (id ), this .entity .getJavaType ());
146
143
}
147
144
148
145
/* (non-Javadoc)
@@ -161,18 +158,7 @@ public Mono<Boolean> existsById(ID id) {
161
158
162
159
Assert .notNull (id , "Id must not be null!" );
163
160
164
- String idProperty = getIdProperty ().getName ();
165
-
166
- StatementMapper mapper = this .accessStrategy .getStatementMapper ().forType (this .entity .getJavaType ());
167
- StatementMapper .SelectSpec selectSpec = mapper .createSelect (this .entity .getTableName ()).withProjection (idProperty ) //
168
- .withCriteria (Criteria .where (idProperty ).is (id ));
169
-
170
- PreparedOperation <?> operation = mapper .getMappedObject (selectSpec );
171
-
172
- return this .databaseClient .execute (operation ) //
173
- .map ((r , md ) -> r ) //
174
- .first () //
175
- .hasElement ();
161
+ return this .entityOperations .exists (getIdQuery (id ), this .entity .getJavaType ());
176
162
}
177
163
178
164
/* (non-Javadoc)
@@ -188,7 +174,7 @@ public Mono<Boolean> existsById(Publisher<ID> publisher) {
188
174
*/
189
175
@ Override
190
176
public Flux <T > findAll () {
191
- return this .databaseClient .select (). from ( this .entity .getJavaType ()). fetch (). all ( );
177
+ return this .entityOperations .select (Query . empty (), this .entity .getJavaType ());
192
178
}
193
179
194
180
/* (non-Javadoc)
@@ -216,17 +202,9 @@ public Flux<T> findAllById(Publisher<ID> idPublisher) {
216
202
return Flux .empty ();
217
203
}
218
204
219
- List <SqlIdentifier > columns = this .accessStrategy .getAllColumns (this .entity .getJavaType ());
220
205
String idProperty = getIdProperty ().getName ();
221
206
222
- StatementMapper mapper = this .accessStrategy .getStatementMapper ().forType (this .entity .getJavaType ());
223
- StatementMapper .SelectSpec selectSpec = mapper .createSelect (this .entity .getTableName ()) //
224
- .withProjection (columns ) //
225
- .withCriteria (Criteria .where (idProperty ).in (ids ));
226
-
227
- PreparedOperation <?> operation = mapper .getMappedObject (selectSpec );
228
-
229
- return this .databaseClient .execute (operation ).as (this .entity .getJavaType ()).fetch ().all ();
207
+ return this .entityOperations .select (Query .query (Criteria .where (idProperty ).in (ids )), this .entity .getJavaType ());
230
208
});
231
209
}
232
210
@@ -235,17 +213,7 @@ public Flux<T> findAllById(Publisher<ID> idPublisher) {
235
213
*/
236
214
@ Override
237
215
public Mono <Long > count () {
238
-
239
- Table table = Table .create (this .accessStrategy .toSql (this .entity .getTableName ()));
240
- Select select = StatementBuilder //
241
- .select (Functions .count (table .column (this .accessStrategy .toSql (getIdProperty ().getColumnName ())))) //
242
- .from (table ) //
243
- .build ();
244
-
245
- return this .databaseClient .execute (SqlRenderer .toString (select )) //
246
- .map ((r , md ) -> r .get (0 , Long .class )) //
247
- .first () //
248
- .defaultIfEmpty (0L );
216
+ return this .entityOperations .count (Query .empty (), this .entity .getJavaType ());
249
217
}
250
218
251
219
/* (non-Javadoc)
@@ -257,13 +225,7 @@ public Mono<Void> deleteById(ID id) {
257
225
258
226
Assert .notNull (id , "Id must not be null!" );
259
227
260
- return this .databaseClient .delete () //
261
- .from (this .entity .getJavaType ()) //
262
- .table (this .entity .getTableName ()) //
263
- .matching (Criteria .where (getIdProperty ().getName ()).is (id )) //
264
- .fetch () //
265
- .rowsUpdated () //
266
- .then ();
228
+ return this .entityOperations .delete (getIdQuery (id ), this .entity .getJavaType ()).then ();
267
229
}
268
230
269
231
/* (non-Javadoc)
@@ -274,20 +236,16 @@ public Mono<Void> deleteById(ID id) {
274
236
public Mono <Void > deleteById (Publisher <ID > idPublisher ) {
275
237
276
238
Assert .notNull (idPublisher , "The Id Publisher must not be null!" );
277
- StatementMapper statementMapper = this .accessStrategy .getStatementMapper ().forType (this .entity .getJavaType ());
278
239
279
240
return Flux .from (idPublisher ).buffer ().filter (ids -> !ids .isEmpty ()).concatMap (ids -> {
280
241
281
242
if (ids .isEmpty ()) {
282
243
return Flux .empty ();
283
244
}
284
245
285
- return this .databaseClient .delete () //
286
- .from (this .entity .getJavaType ()) //
287
- .table (this .entity .getTableName ()) //
288
- .matching (Criteria .where (getIdProperty ().getName ()).in (ids )) //
289
- .fetch () //
290
- .rowsUpdated ();
246
+ String idProperty = getIdProperty ().getName ();
247
+
248
+ return this .entityOperations .delete (Query .query (Criteria .where (idProperty ).in (ids )), this .entity .getJavaType ());
291
249
}).then ();
292
250
}
293
251
@@ -336,14 +294,14 @@ public Mono<Void> deleteAll(Publisher<? extends T> objectPublisher) {
336
294
@ Override
337
295
@ Transactional
338
296
public Mono <Void > deleteAll () {
339
- return this .databaseClient .delete (). from ( this .entity .getTableName ()).then ();
297
+ return this .entityOperations .delete (Query . empty (), this .entity .getJavaType ()).then ();
340
298
}
341
299
342
300
private RelationalPersistentProperty getIdProperty () {
301
+ return this .idProperty .get ();
302
+ }
343
303
344
- return this .converter //
345
- .getMappingContext () //
346
- .getRequiredPersistentEntity (this .entity .getJavaType ()) //
347
- .getRequiredIdProperty ();
304
+ private Query getIdQuery (Object id ) {
305
+ return Query .query (Criteria .where (getIdProperty ().getName ()).is (id ));
348
306
}
349
307
}
0 commit comments