Skip to content

Commit d1b853b

Browse files
ggivotishun
authored andcommitted
OpsForGeo producing "READONLY You can't write against a read only replica " on READS... (#3032)
* OpsForGeo producing "READONLY You can't write against a read only replica" on READS... only if master & replica configured #1813 Divert pure read intentions of georadius and georadiusbymember commands (variants that do not use STORE/STOREDIST) to GEORADIUS_RO/GEORADIUSBYMEMBER_RO This will unify the behaviour between Cluster and Redis Standalone/Replica arrangements Relates to issues #1481 #2568 #2871 Closes #1813 * Fix tests * Remove unused methods * Fix tests and add tests withArgs
1 parent 1cd0ecf commit d1b853b

File tree

6 files changed

+153
-84
lines changed

6 files changed

+153
-84
lines changed

src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
import java.util.Set;
5959

6060
import static io.lettuce.core.protocol.CommandType.EXEC;
61-
import static io.lettuce.core.protocol.CommandType.GEORADIUS;
62-
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER;
6361
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO;
6462
import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO;
6563

@@ -1140,13 +1138,13 @@ public RedisFuture<List<GeoCoordinates>> geopos(K key, V... members) {
11401138

11411139
@Override
11421140
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
1143-
return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name()));
1141+
return georadius_ro(key, longitude, latitude, distance, unit);
11441142
}
11451143

11461144
@Override
11471145
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
11481146
GeoArgs.Unit unit, GeoArgs geoArgs) {
1149-
return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs));
1147+
return georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
11501148
}
11511149

11521150
@Override
@@ -1166,13 +1164,13 @@ protected RedisFuture<List<GeoWithin<V>>> georadius_ro(K key, double longitude,
11661164

11671165
@Override
11681166
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
1169-
return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name()));
1167+
return georadiusbymember_ro(key, member, distance, unit);
11701168
}
11711169

11721170
@Override
11731171
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
11741172
GeoArgs geoArgs) {
1175-
return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs));
1173+
return georadiusbymember_ro(key, member, distance, unit, geoArgs);
11761174
}
11771175

11781176
@Override

src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@
6666
import java.util.function.Supplier;
6767

6868
import static io.lettuce.core.protocol.CommandType.EXEC;
69-
import static io.lettuce.core.protocol.CommandType.GEORADIUS;
70-
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER;
7169
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO;
7270
import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO;
7371

@@ -1203,14 +1201,14 @@ public Flux<Value<GeoCoordinates>> geopos(K key, V... members) {
12031201
}
12041202

12051203
@Override
1206-
public Flux<V> georadius(K key, double longitude, double latitude, double distance, Unit unit) {
1207-
return createDissolvingFlux(() -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name()));
1204+
public Flux<V> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
1205+
return georadius_ro(key, longitude, latitude, distance, unit);
12081206
}
12091207

12101208
@Override
1211-
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, Unit unit, GeoArgs geoArgs) {
1212-
return createDissolvingFlux(
1213-
() -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs));
1209+
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit,
1210+
GeoArgs geoArgs) {
1211+
return georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
12141212
}
12151213

12161214
@Override
@@ -1231,15 +1229,13 @@ protected Flux<GeoWithin<V>> georadius_ro(K key, double longitude, double latitu
12311229
}
12321230

12331231
@Override
1234-
public Flux<V> georadiusbymember(K key, V member, double distance, Unit unit) {
1235-
return createDissolvingFlux(
1236-
() -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name()));
1232+
public Flux<V> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
1233+
return georadiusbymember_ro(key, member, distance, unit);
12371234
}
12381235

12391236
@Override
1240-
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, Unit unit, GeoArgs geoArgs) {
1241-
return createDissolvingFlux(
1242-
() -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs));
1237+
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
1238+
return georadiusbymember_ro(key, member, distance, unit, geoArgs);
12431239
}
12441240

12451241
@Override

src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -249,28 +249,6 @@ public RedisFuture<String> flushdb(FlushMode flushMode) {
249249
.firstOfAsync(executeOnUpstream(kvRedisClusterAsyncCommands -> kvRedisClusterAsyncCommands.flushdb(flushMode)));
250250
}
251251

252-
@Override
253-
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
254-
return super.georadius_ro(key, longitude, latitude, distance, unit);
255-
}
256-
257-
@Override
258-
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
259-
GeoArgs.Unit unit, GeoArgs geoArgs) {
260-
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
261-
}
262-
263-
@Override
264-
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
265-
return super.georadiusbymember_ro(key, member, distance, unit);
266-
}
267-
268-
@Override
269-
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
270-
GeoArgs geoArgs) {
271-
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
272-
}
273-
274252
@Override
275253
public RedisFuture<List<K>> keys(K pattern) {
276254

src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -235,27 +235,6 @@ public Mono<String> flushdb(FlushMode flushMode) {
235235
return Flux.merge(publishers.values()).last();
236236
}
237237

238-
@Override
239-
public Flux<V> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
240-
return super.georadius_ro(key, longitude, latitude, distance, unit);
241-
}
242-
243-
@Override
244-
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit,
245-
GeoArgs geoArgs) {
246-
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
247-
}
248-
249-
@Override
250-
public Flux<V> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
251-
return super.georadiusbymember_ro(key, member, distance, unit);
252-
}
253-
254-
@Override
255-
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
256-
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
257-
}
258-
259238
@Override
260239
public Flux<K> keys(K pattern) {
261240

src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -65,28 +65,6 @@ public RedisClusterPubSubAsyncCommandsImpl(StatefulRedisPubSubConnection<K, V> c
6565
super(connection, codec);
6666
}
6767

68-
@Override
69-
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
70-
return super.georadius_ro(key, longitude, latitude, distance, unit);
71-
}
72-
73-
@Override
74-
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
75-
GeoArgs.Unit unit, GeoArgs geoArgs) {
76-
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
77-
}
78-
79-
@Override
80-
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
81-
return super.georadiusbymember_ro(key, member, distance, unit);
82-
}
83-
84-
@Override
85-
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
86-
GeoArgs geoArgs) {
87-
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
88-
}
89-
9068
@Override
9169
public StatefulRedisClusterPubSubConnectionImpl<K, V> getStatefulConnection() {
9270
return (StatefulRedisClusterPubSubConnectionImpl<K, V>) super.getStatefulConnection();
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package io.lettuce.core.commands;
2+
3+
import io.lettuce.core.*;
4+
import io.lettuce.core.api.sync.RedisCommands;
5+
import io.lettuce.core.codec.StringCodec;
6+
import io.lettuce.core.masterreplica.MasterReplica;
7+
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
8+
import io.lettuce.core.models.role.RedisInstance;
9+
import io.lettuce.core.models.role.RoleParser;
10+
import io.lettuce.test.LettuceExtension;
11+
import io.lettuce.test.condition.EnabledOnCommand;
12+
import io.lettuce.test.settings.TestSettings;
13+
import org.junit.jupiter.api.*;
14+
import org.junit.jupiter.api.extension.ExtendWith;
15+
16+
import java.util.Arrays;
17+
import java.util.List;
18+
import java.util.Set;
19+
20+
import static io.lettuce.TestTags.INTEGRATION_TEST;
21+
import static org.assertj.core.api.Assertions.*;
22+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
23+
24+
/**
25+
* @author Mark Paluch
26+
*/
27+
@Tag(INTEGRATION_TEST)
28+
@ExtendWith(LettuceExtension.class)
29+
@EnabledOnCommand("GEOADD")
30+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
31+
public class GeoMasterReplicaIntegrationTests extends AbstractRedisClientTest {
32+
33+
private StatefulRedisMasterReplicaConnection<String, String> masterReplica;
34+
35+
private RedisCommands<String, String> upstream;
36+
37+
private RedisCommands<String, String> connection1;
38+
39+
private RedisCommands<String, String> connection2;
40+
41+
@BeforeEach
42+
void before() {
43+
44+
RedisURI node1 = RedisURI.Builder.redis(host, TestSettings.port(3)).withDatabase(2).build();
45+
RedisURI node2 = RedisURI.Builder.redis(host, TestSettings.port(4)).withDatabase(2).build();
46+
47+
connection1 = client.connect(node1).sync();
48+
connection2 = client.connect(node2).sync();
49+
50+
RedisInstance node1Instance = RoleParser.parse(this.connection1.role());
51+
RedisInstance node2Instance = RoleParser.parse(this.connection2.role());
52+
53+
if (node1Instance.getRole().isUpstream() && node2Instance.getRole().isReplica()) {
54+
upstream = connection1;
55+
} else if (node2Instance.getRole().isUpstream() && node1Instance.getRole().isReplica()) {
56+
upstream = connection2;
57+
} else {
58+
assumeTrue(false,
59+
String.format("Cannot run the test because I don't have a distinct master and replica but %s and %s",
60+
node1Instance, node2Instance));
61+
}
62+
upstream.flushall();
63+
64+
masterReplica = MasterReplica.connect(client, StringCodec.UTF8, Arrays.asList(node1, node2));
65+
masterReplica.setReadFrom(ReadFrom.REPLICA);
66+
67+
}
68+
69+
@AfterEach
70+
void after() {
71+
72+
if (connection1 != null) {
73+
connection1.getStatefulConnection().close();
74+
}
75+
76+
if (connection2 != null) {
77+
connection2.getStatefulConnection().close();
78+
}
79+
80+
if (masterReplica != null) {
81+
masterReplica.close();
82+
}
83+
}
84+
85+
@BeforeEach
86+
void setUp() {
87+
this.redis.flushall();
88+
}
89+
90+
@Test
91+
void georadiusReadFromReplica() {
92+
93+
prepareGeo(upstream);
94+
95+
upstream.waitForReplication(1, 1000);
96+
97+
Set<String> georadius = masterReplica.sync().georadius(key, 8.6582861, 49.5285695, 1, GeoArgs.Unit.km);
98+
assertThat(georadius).hasSize(1).contains("Weinheim");
99+
}
100+
101+
@Test
102+
void georadiusWithArgsReadFromReplica() {
103+
104+
prepareGeo(upstream);
105+
106+
upstream.waitForReplication(1, 1000);
107+
108+
GeoArgs geoArgs = new GeoArgs().withHash().withCoordinates().withDistance().withCount(1).desc();
109+
110+
List<GeoWithin<String>> result = masterReplica.sync().georadius(key, 8.665351, 49.553302, 5, GeoArgs.Unit.km, geoArgs);
111+
assertThat(result).hasSize(1);
112+
}
113+
114+
@Test
115+
void georadiusbymemberReadFromReplica() {
116+
117+
prepareGeo(upstream);
118+
upstream.waitForReplication(1, 100);
119+
120+
Set<String> empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km);
121+
assertThat(empty).hasSize(1).contains("Bahn");
122+
}
123+
124+
@Test
125+
void georadiusbymemberWithArgsReadFromReplica() {
126+
127+
prepareGeo(upstream);
128+
upstream.waitForReplication(1, 100);
129+
130+
List<GeoWithin<String>> empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km,
131+
new GeoArgs().withHash().withCoordinates().withDistance().desc());
132+
assertThat(empty).isNotEmpty();
133+
}
134+
135+
protected void prepareGeo(RedisCommands<String, String> redis) {
136+
redis.geoadd(key, 8.6638775, 49.5282537, "Weinheim");
137+
redis.geoadd(key, 8.3796281, 48.9978127, "EFS9", 8.665351, 49.553302, "Bahn");
138+
}
139+
140+
}

0 commit comments

Comments
 (0)