Skip to content

Relax connection termination policy in routing driver #424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class ClusterRoutingTable implements RoutingTable
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
{
this( clock );
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>(),
new HashSet<BoltServerAddress>() );
}

private ClusterRoutingTable( Clock clock )
Expand All @@ -66,14 +67,16 @@ public boolean isStaleFor( AccessMode mode )
}

@Override
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
public synchronized RoutingTableChange update( ClusterComposition cluster )
{
expirationTimeout = cluster.expirationTimestamp();
// todo: what if server is added as reader and removed as writer? we should not treat it as removed
Copy link
Contributor

@zhenlineo zhenlineo Nov 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer your todo, there is no need to know if the status of an address in RoutingTable, we only care about the whole distinct addresses in the RoutingTable. The ConnectionPools only need to know if an address is active (can reuse connections) or passive (can be terminated once no inUse connections in the pool)

See more explanation in lutovich#1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Set<BoltServerAddress> added = new HashSet<>();
Set<BoltServerAddress> removed = new HashSet<>();
readers.update( cluster.readers(), removed );
writers.update( cluster.writers(), removed );
routers.update( cluster.routers(), removed );
return removed;
readers.update( cluster.readers(), added, removed );
writers.update( cluster.writers(), added, removed );
routers.update( cluster.routers(), added, removed );
return new RoutingTableChange( added, removed );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.Set;

import org.neo4j.driver.internal.RoutingErrorHandler;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;
Expand All @@ -35,6 +33,7 @@
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
{
private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer";
private static final boolean PURGE_ON_ERROR = Boolean.getBoolean( "purgeOnError" );

private final ConnectionPool connections;
private final RoutingTable routingTable;
Expand Down Expand Up @@ -113,8 +112,14 @@ private synchronized void forget( BoltServerAddress address )
{
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
routingTable.forget( address );
// drop all current connections to the address
connections.purge( address );
if ( PURGE_ON_ERROR )
{
connections.purge( address );
}
else
{
connections.passivate( address );
}
}

synchronized void ensureRouting( AccessMode mode )
Expand All @@ -131,16 +136,35 @@ synchronized void refreshRoutingTable()

// get a new routing table
ClusterComposition cluster = rediscovery.lookupClusterComposition( routingTable, connections );
Set<BoltServerAddress> removed = routingTable.update( cluster );
// purge connections to removed addresses
for ( BoltServerAddress address : removed )
{
connections.purge( address );
}
RoutingTableChange routingTableChange = routingTable.update( cluster );
updateConnectionPool( routingTableChange );

log.info( "Refreshed routing information. %s", routingTable );
}

private void updateConnectionPool( RoutingTableChange routingTableChange )
{
if ( PURGE_ON_ERROR )
{
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
{
connections.purge( removedAddress );
}
}
else
{
for ( BoltServerAddress addedAddress : routingTableChange.added() )
{
connections.activate( addedAddress );
}
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
{
connections.passivate( removedAddress );
}
connections.compact();
}
}

private RoundRobinAddressSet addressSetFor( AccessMode mode )
{
switch ( mode )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.neo4j.driver.internal.cluster;

import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -55,17 +56,20 @@ int next( int divisor )
return index % divisor;
}

public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> added,
Set<BoltServerAddress> removed )
{
BoltServerAddress[] prev = this.addresses;
if ( addresses.isEmpty() )
{
this.addresses = NONE;
Collections.addAll( removed, prev );
return;
}
if ( prev.length == 0 )
{
this.addresses = addresses.toArray( NONE );
Collections.addAll( added, this.addresses );
return;
}
BoltServerAddress[] copy = null;
Expand Down Expand Up @@ -101,6 +105,7 @@ public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServe
for ( BoltServerAddress address : addresses )
{
copy[j++] = address;
added.add( address );
}
this.addresses = copy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.Set;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.v1.AccessMode;

public interface RoutingTable
{
boolean isStaleFor( AccessMode mode );

Set<BoltServerAddress> update( ClusterComposition cluster );
RoutingTableChange update( ClusterComposition cluster );

void forget( BoltServerAddress address );

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import java.util.Collections;
import java.util.Set;

import org.neo4j.driver.internal.net.BoltServerAddress;

import static java.util.Collections.unmodifiableSet;

public class RoutingTableChange
{
public static final RoutingTableChange EMPTY = new RoutingTableChange(
Collections.<BoltServerAddress>emptySet(), Collections.<BoltServerAddress>emptySet() );

private final Set<BoltServerAddress> added;
private final Set<BoltServerAddress> removed;

public RoutingTableChange( Set<BoltServerAddress> added, Set<BoltServerAddress> removed )
{
this.added = added;
this.removed = removed;
}

public Set<BoltServerAddress> added()
{
return unmodifiableSet( added );
}

public Set<BoltServerAddress> removed()
{
return unmodifiableSet( removed );
}

@Override
public String toString()
{
return "RoutingTableChange{" +
"added=" + added +
", removed=" + removed +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/
package org.neo4j.driver.internal.net.pooling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -46,6 +44,7 @@ public class BlockingPooledConnectionQueue
private final BlockingQueue<PooledConnection> queue;
private final Logger logger;

private final AtomicBoolean isPassive = new AtomicBoolean( false );
private final AtomicBoolean isTerminating = new AtomicBoolean( false );

/** Keeps track of acquired connections */
Expand All @@ -69,15 +68,13 @@ public boolean offer( PooledConnection pooledConnection )
acquiredConnections.remove( pooledConnection );
boolean offer = queue.offer( pooledConnection );
// not added back to the queue, dispose of the connection
if (!offer) {
pooledConnection.dispose();
if ( !offer )
{
disposeSafely( pooledConnection );
}
if (isTerminating.get()) {
PooledConnection connection = queue.poll();
if (connection != null)
{
connection.dispose();
}
if ( isPassive.get() || isTerminating.get() )
{
terminateIdleConnections();
}
return offer;
}
Expand All @@ -89,20 +86,29 @@ public boolean offer( PooledConnection pooledConnection )
*/
public PooledConnection acquire( Supplier<PooledConnection> supplier )
{

PooledConnection connection = queue.poll();
if ( connection == null )
{
connection = supplier.get();
}
acquiredConnections.add( connection );

if (isTerminating.get()) {
if ( isPassive.get() || isTerminating.get() )
{
acquiredConnections.remove( connection );
connection.dispose();
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
disposeSafely( connection );
throw new IllegalStateException( "Pool is " + (isPassive.get() ? "passivated" : "terminated") + ", " +
"new connections can't be acquired" );
}
else
{
return connection;
}
return connection;
}

public int idleConnections()
{
return queue.size();
}

public int activeConnections()
Expand All @@ -116,19 +122,27 @@ void disposeBroken( PooledConnection connection )
disposeSafely( connection );
}

public boolean isEmpty()
public boolean contains( PooledConnection pooledConnection )
{
return queue.isEmpty();
return queue.contains( pooledConnection );
}

public int size()
public void activate()
{
return queue.size();
isPassive.compareAndSet( true, false );
}

public boolean contains( PooledConnection pooledConnection )
public void passivate()
{
return queue.contains( pooledConnection );
if ( isPassive.compareAndSet( false, true ) )
{
terminateIdleConnections();
}
}

public boolean isActive()
{
return !isPassive.get();
}

/**
Expand All @@ -141,15 +155,25 @@ public void terminate()
{
if ( isTerminating.compareAndSet( false, true ) )
{
while ( !queue.isEmpty() )
{
PooledConnection idleConnection = queue.poll();
disposeSafely( idleConnection );
}
for ( PooledConnection acquiredConnection : acquiredConnections )
{
disposeSafely( acquiredConnection );
}
terminateIdleConnections();
terminateAcquiredConnections();
}
}

private void terminateIdleConnections()
{
while ( !queue.isEmpty() )
{
PooledConnection idleConnection = queue.poll();
disposeSafely( idleConnection );
}
}

private void terminateAcquiredConnections()
{
for ( PooledConnection acquiredConnection : acquiredConnections )
{
disposeSafely( acquiredConnection );
}
}

Expand Down
Loading