Skip to content

Commit c27f322

Browse files
committed
Update reader, writer and router addresses handling and change type
1 parent 628acae commit c27f322

File tree

10 files changed

+120
-88
lines changed

10 files changed

+120
-88
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.HashSet;
23-
import java.util.LinkedHashSet;
24+
import java.util.List;
2425
import java.util.Set;
2526
import java.util.concurrent.locks.ReadWriteLock;
2627
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -43,18 +44,18 @@ public class ClusterRoutingTable implements RoutingTable
4344
private final ReadWriteLock tableLock = new ReentrantReadWriteLock();
4445
private final DatabaseName databaseName;
4546
private final Clock clock;
46-
private final Set<BoltServerAddress> disused = new LinkedHashSet<>();
47+
private final Set<BoltServerAddress> disused = new HashSet<>();
4748

4849
private long expirationTimestamp;
4950
private boolean preferInitialRouter = true;
50-
private Set<BoltServerAddress> readers = Collections.emptySet();
51-
private Set<BoltServerAddress> writers = Collections.emptySet();
52-
private Set<BoltServerAddress> routers = Collections.emptySet();
51+
private List<BoltServerAddress> readers = Collections.emptyList();
52+
private List<BoltServerAddress> writers = Collections.emptyList();
53+
private List<BoltServerAddress> routers = Collections.emptyList();
5354

5455
public ClusterRoutingTable( DatabaseName ofDatabase, Clock clock, BoltServerAddress... routingAddresses )
5556
{
5657
this( ofDatabase, clock );
57-
routers = Collections.unmodifiableSet( new LinkedHashSet<>( asList( routingAddresses ) ) );
58+
routers = Collections.unmodifiableList( asList( routingAddresses ) );
5859
}
5960

6061
private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock )
@@ -112,19 +113,19 @@ public void forget( BoltServerAddress address )
112113
}
113114

114115
@Override
115-
public Set<BoltServerAddress> readers()
116+
public List<BoltServerAddress> readers()
116117
{
117118
return executeWithLock( tableLock.readLock(), () -> readers );
118119
}
119120

120121
@Override
121-
public Set<BoltServerAddress> writers()
122+
public List<BoltServerAddress> writers()
122123
{
123124
return executeWithLock( tableLock.readLock(), () -> writers );
124125
}
125126

126127
@Override
127-
public Set<BoltServerAddress> routers()
128+
public List<BoltServerAddress> routers()
128129
{
129130
return executeWithLock( tableLock.readLock(), () -> routers );
130131
}
@@ -185,33 +186,38 @@ public String toString()
185186
expirationTimestamp, clock.millis(), routers, writers, readers, databaseName.description() ) );
186187
}
187188

188-
private Set<BoltServerAddress> newWithoutAddressIfPresent( Set<BoltServerAddress> addresses, BoltServerAddress addressToSkip )
189+
private List<BoltServerAddress> newWithoutAddressIfPresent( List<BoltServerAddress> addresses, BoltServerAddress addressToSkip )
189190
{
190-
return newWithAddressReplacedIfPresent( addresses, addressToSkip, null );
191+
List<BoltServerAddress> newList = new ArrayList<>( addresses.size() );
192+
for ( BoltServerAddress address : addresses )
193+
{
194+
if ( !address.equals( addressToSkip ) )
195+
{
196+
newList.add( address );
197+
}
198+
}
199+
return Collections.unmodifiableList( newList );
191200
}
192201

193-
private Set<BoltServerAddress> newWithAddressReplacedIfPresent( Set<BoltServerAddress> addresses, BoltServerAddress oldAddress,
194-
BoltServerAddress newAddress )
202+
private List<BoltServerAddress> newWithAddressReplacedIfPresent( List<BoltServerAddress> addresses, BoltServerAddress oldAddress,
203+
BoltServerAddress newAddress )
195204
{
196-
if ( !addresses.contains( oldAddress ) )
205+
List<BoltServerAddress> newList = new ArrayList<>( addresses.size() );
206+
for ( BoltServerAddress address : addresses )
197207
{
198-
return addresses;
208+
newList.add( address.equals( oldAddress ) ? newAddress : address );
199209
}
200-
Stream<BoltServerAddress> addressStream = addresses.stream();
201-
addressStream = newAddress != null
202-
? addressStream.map( address -> address.equals( oldAddress ) ? newAddress : address )
203-
: addressStream.filter( address -> !address.equals( oldAddress ) );
204-
return Collections.unmodifiableSet( (Set<? extends BoltServerAddress>) addressStream.collect( Collectors.toCollection( LinkedHashSet::new ) ) );
210+
return Collections.unmodifiableList( newList );
205211
}
206212

207-
private Set<BoltServerAddress> newWithReusedAddresses( Set<BoltServerAddress> currentAddresses, Set<BoltServerAddress> disusedAddresses,
208-
Set<BoltServerAddress> newAddresses )
213+
private List<BoltServerAddress> newWithReusedAddresses( List<BoltServerAddress> currentAddresses, Set<BoltServerAddress> disusedAddresses,
214+
Set<BoltServerAddress> newAddresses )
209215
{
210-
Set<BoltServerAddress> result = Stream.concat( currentAddresses.stream(), disusedAddresses.stream() )
211-
.filter( address -> newAddresses.remove( toBoltServerAddress( address ) ) )
212-
.collect( Collectors.toCollection( LinkedHashSet::new ) );
213-
result.addAll( newAddresses );
214-
return Collections.unmodifiableSet( result );
216+
List<BoltServerAddress> newList = Stream.concat( currentAddresses.stream(), disusedAddresses.stream() )
217+
.filter( address -> newAddresses.remove( toBoltServerAddress( address ) ) )
218+
.collect( Collectors.toCollection( () -> new ArrayList<>( newAddresses.size() ) ) );
219+
newList.addAll( newAddresses );
220+
return Collections.unmodifiableList( newList );
215221
}
216222

217223
private BoltServerAddress toBoltServerAddress( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.List;
2122
import java.util.Set;
2223

2324
import org.neo4j.driver.AccessMode;
@@ -34,12 +35,34 @@ public interface RoutingTable
3435

3536
void forget( BoltServerAddress address );
3637

37-
Set<BoltServerAddress> readers();
38+
/**
39+
* Returns an immutable list of reader addresses.
40+
*
41+
* @return the immutable list of reader addresses.
42+
*/
43+
List<BoltServerAddress> readers();
3844

39-
Set<BoltServerAddress> writers();
45+
/**
46+
* Returns an immutable list of writer addresses.
47+
*
48+
* @return the immutable list of write addresses.
49+
*/
4050

41-
Set<BoltServerAddress> routers();
51+
List<BoltServerAddress> writers();
4252

53+
/**
54+
* Returns an immutable list of router addresses.
55+
*
56+
* @return the immutable list of router addresses.
57+
*/
58+
59+
List<BoltServerAddress> routers();
60+
61+
/**
62+
* Returns an immutable unordered set of all addresses known by this routing table. This includes all router, reader, writer and disused addresses.
63+
*
64+
* @return the immutable set of all addresses.
65+
*/
4366
Set<BoltServerAddress> servers();
4467

4568
DatabaseName database();

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster.loadbalancing;
2020

21+
import java.util.List;
22+
2123
import org.neo4j.driver.Logger;
2224
import org.neo4j.driver.Logging;
2325
import org.neo4j.driver.internal.BoltServerAddress;
2426
import org.neo4j.driver.internal.spi.ConnectionPool;
2527

2628
/**
27-
* Load balancing strategy that finds server with least amount of active (checked out of the pool) connections from
28-
* given readers or writers. It finds a start index for iteration in a round-robin fashion. This is done to prevent
29-
* choosing same first address over and over when all addresses have same amount of active connections.
29+
* Load balancing strategy that finds server with the least amount of active (checked out of the pool) connections from given readers or writers. It finds a
30+
* start index for iteration in a round-robin fashion. This is done to prevent choosing same first address over and over when all addresses have the same amount
31+
* of active connections.
3032
*/
3133
public class LeastConnectedLoadBalancingStrategy implements LoadBalancingStrategy
3234
{
@@ -43,21 +45,21 @@ public LeastConnectedLoadBalancingStrategy( ConnectionPool connectionPool, Loggi
4345
}
4446

4547
@Override
46-
public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
48+
public BoltServerAddress selectReader( List<BoltServerAddress> knownReaders )
4749
{
4850
return select( knownReaders, readersIndex, "reader" );
4951
}
5052

5153
@Override
52-
public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
54+
public BoltServerAddress selectWriter( List<BoltServerAddress> knownWriters )
5355
{
5456
return select( knownWriters, writersIndex, "writer" );
5557
}
5658

57-
private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex addressesIndex,
58-
String addressType )
59+
private BoltServerAddress select( List<BoltServerAddress> addresses, RoundRobinArrayIndex addressesIndex,
60+
String addressType )
5961
{
60-
int size = addresses.length;
62+
int size = addresses.size();
6163
if ( size == 0 )
6264
{
6365
log.trace( "Unable to select %s, no known addresses given", addressType );
@@ -71,10 +73,10 @@ private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArray
7173
BoltServerAddress leastConnectedAddress = null;
7274
int leastActiveConnections = Integer.MAX_VALUE;
7375

74-
// iterate over the array to find least connected address
76+
// iterate over the array to find the least connected address
7577
do
7678
{
77-
BoltServerAddress address = addresses[index];
79+
BoltServerAddress address = addresses.get( index );
7880
int activeConnections = connectionPool.inUseConnections( address );
7981

8082
if ( activeConnections < leastActiveConnections )

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import java.util.ArrayList;
2424
import java.util.List;
25-
import java.util.Set;
2625
import java.util.concurrent.CompletableFuture;
2726
import java.util.concurrent.CompletionStage;
2827

@@ -202,7 +201,7 @@ private CompletionStage<Connection> acquire( AccessMode mode, RoutingTable routi
202201

203202
private void acquire( AccessMode mode, RoutingTable routingTable, CompletableFuture<Connection> result, List<Throwable> attemptErrors )
204203
{
205-
Set<BoltServerAddress> addresses = addressSet( mode, routingTable );
204+
List<BoltServerAddress> addresses = getAddressesByMode( mode, routingTable );
206205
BoltServerAddress address = selectAddress( mode, addresses );
207206

208207
if ( address == null )
@@ -241,7 +240,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, CompletableFut
241240
} );
242241
}
243242

244-
private static Set<BoltServerAddress> addressSet( AccessMode mode, RoutingTable routingTable )
243+
private static List<BoltServerAddress> getAddressesByMode( AccessMode mode, RoutingTable routingTable )
245244
{
246245
switch ( mode )
247246
{
@@ -254,10 +253,8 @@ private static Set<BoltServerAddress> addressSet( AccessMode mode, RoutingTable
254253
}
255254
}
256255

257-
private BoltServerAddress selectAddress( AccessMode mode, Set<BoltServerAddress> servers )
256+
private BoltServerAddress selectAddress( AccessMode mode, List<BoltServerAddress> addresses )
258257
{
259-
BoltServerAddress[] addresses = servers.toArray( BOLT_SERVER_ADDRESSES_EMPTY_ARRAY );
260-
261258
switch ( mode )
262259
{
263260
case READ:

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster.loadbalancing;
2020

21+
import java.util.List;
22+
2123
import org.neo4j.driver.internal.BoltServerAddress;
2224

2325
/**
@@ -31,13 +33,13 @@ public interface LoadBalancingStrategy
3133
* @param knownReaders array of all known readers.
3234
* @return most appropriate reader or {@code null} if it can't be selected.
3335
*/
34-
BoltServerAddress selectReader( BoltServerAddress[] knownReaders );
36+
BoltServerAddress selectReader( List<BoltServerAddress> knownReaders );
3537

3638
/**
3739
* Select most appropriate write address from the given array of addresses.
3840
*
3941
* @param knownWriters array of all known writers.
4042
* @return most appropriate writer or {@code null} if it can't be selected.
4143
*/
42-
BoltServerAddress selectWriter( BoltServerAddress[] knownWriters );
44+
BoltServerAddress selectWriter( List<BoltServerAddress> knownWriters );
4345
}

driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@
2525
import java.io.IOException;
2626
import java.net.InetAddress;
2727
import java.net.UnknownHostException;
28+
import java.util.Arrays;
2829
import java.util.HashMap;
29-
import java.util.LinkedHashSet;
3030
import java.util.List;
3131
import java.util.Map;
32-
import java.util.Set;
3332

3433
import org.neo4j.driver.Logger;
3534
import org.neo4j.driver.Logging;
@@ -519,8 +518,7 @@ private static RoutingTable routingTableMock( BoltServerAddress... routers )
519518
private static RoutingTable routingTableMock( boolean preferInitialRouter, BoltServerAddress... routers )
520519
{
521520
RoutingTable routingTable = mock( RoutingTable.class );
522-
Set<BoltServerAddress> addressSet = new LinkedHashSet<>( asOrderedSet( routers ) );
523-
when( routingTable.routers() ).thenReturn( addressSet );
521+
when( routingTable.routers() ).thenReturn( Arrays.asList( routers ) );
524522
when( routingTable.database() ).thenReturn( defaultDatabase() );
525523
when( routingTable.preferInitialRouter() ).thenReturn( preferInitialRouter );
526524
return routingTable;

driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collections;
2424
import java.util.HashSet;
2525
import java.util.LinkedHashSet;
26+
import java.util.List;
2627
import java.util.Optional;
2728
import java.util.Set;
2829
import java.util.concurrent.CompletionStage;
@@ -259,7 +260,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode )
259260
RoutingTable routingTable = mock( RoutingTable.class );
260261
when( routingTable.isStaleFor( mode ) ).thenReturn( true );
261262

262-
Set<BoltServerAddress> addresses = new LinkedHashSet<>( singletonList( LOCAL_DEFAULT ) );
263+
List<BoltServerAddress> addresses = singletonList( LOCAL_DEFAULT );
263264
when( routingTable.readers() ).thenReturn( addresses );
264265
when( routingTable.writers() ).thenReturn( addresses );
265266
when( routingTable.database() ).thenReturn( defaultDatabase() );

0 commit comments

Comments
 (0)