Skip to content

Commit c7e0fba

Browse files
committed
Merge pull request #186 from pontusmelke/1.0-block-on-negotiate
Block when negotiating versions
2 parents e5ced14 + ee9dd57 commit c7e0fba

File tree

3 files changed

+169
-42
lines changed

3 files changed

+169
-42
lines changed

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.neo4j.driver.v1.exceptions.ClientException;
3636

3737
import static java.nio.ByteOrder.BIG_ENDIAN;
38+
import static org.neo4j.driver.internal.connector.socket.SocketUtils.blockingRead;
39+
import static org.neo4j.driver.internal.connector.socket.SocketUtils.blockingWrite;
3840

3941
public class SocketClient
4042
{
@@ -70,7 +72,6 @@ public void start()
7072
{
7173
logger.debug( "~~ [CONNECT] %s:%d.", host, port );
7274
channel = ChannelFactory.create( host, port, config, logger );
73-
7475
protocol = negotiateProtocol();
7576
reader = protocol.reader();
7677
writer = protocol.writer();
@@ -170,21 +171,21 @@ private SocketProtocol negotiateProtocol() throws IOException
170171
{
171172
logger.debug( "~~ [HANDSHAKE] [0x6060B017, 1, 0, 0, 0]." );
172173
//Propose protocol versions
173-
ByteBuffer buf = ByteBuffer.allocate( 5 * 4 ).order( BIG_ENDIAN );
174+
ByteBuffer buf = ByteBuffer.allocateDirect( 5 * 4 ).order( BIG_ENDIAN );
174175
buf.putInt( MAGIC_PREAMBLE );
175176
for ( int version : SUPPORTED_VERSIONS )
176177
{
177178
buf.putInt( version );
178179
}
179180
buf.flip();
180181

181-
channel.write( buf );
182+
//Do a blocking write
183+
blockingWrite(channel, buf);
182184

183-
// Read back the servers choice
185+
// Read (blocking) back the servers choice
184186
buf.clear();
185187
buf.limit( 4 );
186-
187-
channel.read( buf );
188+
blockingRead(channel, buf);
188189

189190
// Choose protocol, or fail
190191
buf.flip();
@@ -223,7 +224,6 @@ public static ByteChannel create( String host, int port, Config config, Logger l
223224
SocketChannel soChannel = SocketChannel.open();
224225
soChannel.setOption( StandardSocketOptions.SO_REUSEADDR, true );
225226
soChannel.setOption( StandardSocketOptions.SO_KEEPALIVE, true );
226-
227227
soChannel.connect( new InetSocketAddress( host, port ) );
228228

229229
ByteChannel channel;

driver/src/main/java/org/neo4j/driver/internal/connector/socket/AllOrNothingChannel.java renamed to driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketUtils.java

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,70 +21,45 @@
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
2323
import java.nio.channels.ByteChannel;
24-
import java.nio.channels.SocketChannel;
2524

2625
import org.neo4j.driver.internal.util.BytePrinter;
2726
import org.neo4j.driver.v1.exceptions.ClientException;
2827

2928
/**
30-
* Wraps a regular socket channel such that read and write will not return until the full buffers given have been sent
31-
* or received, respectively.
29+
* Utility class for common operations.
3230
*/
33-
public class AllOrNothingChannel implements ByteChannel
31+
public final class SocketUtils
3432
{
35-
private final SocketChannel channel;
36-
37-
public AllOrNothingChannel( SocketChannel channel ) throws IOException
33+
private SocketUtils()
3834
{
39-
this.channel = channel;
40-
this.channel.configureBlocking( true );
35+
throw new UnsupportedOperationException( "Do not instantiate" );
4136
}
4237

43-
@Override
44-
public int read( ByteBuffer buf ) throws IOException
38+
public static void blockingRead(ByteChannel channel, ByteBuffer buf) throws IOException
4539
{
46-
int toRead = buf.remaining();
47-
while ( buf.remaining() > 0 )
40+
while(buf.hasRemaining())
4841
{
49-
int read = channel.read( buf );
50-
if ( read == -1 )
42+
if (channel.read( buf ) < 0)
5143
{
5244
throw new ClientException( String.format(
5345
"Connection terminated while receiving data. This can happen due to network " +
5446
"instabilities, or due to restarts of the database. Expected %s bytes, received %s.",
5547
buf.limit(), BytePrinter.hex( buf ) ) );
5648
}
5749
}
58-
return toRead;
5950
}
6051

61-
@Override
62-
public int write( ByteBuffer buf ) throws IOException
52+
public static void blockingWrite(ByteChannel channel, ByteBuffer buf) throws IOException
6353
{
64-
int toWrite = buf.remaining();
65-
while( buf.remaining() > 0 )
54+
while(buf.hasRemaining())
6655
{
67-
int write = channel.write( buf );
68-
if( write == -1 )
56+
if (channel.write( buf ) < 0)
6957
{
7058
throw new ClientException( String.format(
7159
"Connection terminated while sending data. This can happen due to network " +
7260
"instabilities, or due to restarts of the database. Expected %s bytes, wrote %s.",
7361
buf.limit(), BytePrinter.hex( buf ) ) );
7462
}
7563
}
76-
return toWrite;
77-
}
78-
79-
@Override
80-
public boolean isOpen()
81-
{
82-
return channel.isOpen();
83-
}
84-
85-
@Override
86-
public void close() throws IOException
87-
{
88-
channel.close();
8964
}
9065
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.connector.socket;
20+
21+
import org.junit.Rule;
22+
import org.junit.Test;
23+
import org.junit.rules.ExpectedException;
24+
25+
import java.io.IOException;
26+
import java.nio.ByteBuffer;
27+
import java.nio.channels.ByteChannel;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
import org.neo4j.driver.v1.exceptions.ClientException;
32+
33+
import static org.hamcrest.CoreMatchers.equalTo;
34+
import static org.hamcrest.MatcherAssert.assertThat;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.when;
37+
38+
public class SocketUtilsTest
39+
{
40+
@Rule
41+
public ExpectedException exception = ExpectedException.none();
42+
43+
@Test
44+
public void shouldReadAllBytes() throws IOException
45+
{
46+
// Given
47+
ByteBuffer buffer = ByteBuffer.allocate( 4 );
48+
ByteAtATimeChannel channel = new ByteAtATimeChannel( new byte[]{0, 1, 2, 3} );
49+
50+
// When
51+
SocketUtils.blockingRead(channel, buffer );
52+
buffer.flip();
53+
54+
// Then
55+
assertThat(buffer.get(), equalTo((byte) 0));
56+
assertThat(buffer.get(), equalTo((byte) 1));
57+
assertThat(buffer.get(), equalTo((byte) 2));
58+
assertThat(buffer.get(), equalTo((byte) 3));
59+
}
60+
61+
@Test
62+
public void shouldFailIfConnectionFailsWhileReading() throws IOException
63+
{
64+
// Given
65+
ByteBuffer buffer = ByteBuffer.allocate( 4 );
66+
ByteChannel channel = mock( ByteChannel.class );
67+
when(channel.read( buffer )).thenReturn( -1 );
68+
69+
//Expect
70+
exception.expect( ClientException.class );
71+
72+
// When
73+
SocketUtils.blockingRead(channel, buffer );
74+
}
75+
76+
@Test
77+
public void shouldWriteAllBytes() throws IOException
78+
{
79+
// Given
80+
ByteBuffer buffer = ByteBuffer.wrap( new byte[]{0, 1, 2, 3});
81+
ByteAtATimeChannel channel = new ByteAtATimeChannel( new byte[0] );
82+
83+
// When
84+
SocketUtils.blockingWrite(channel, buffer );
85+
86+
// Then
87+
assertThat(channel.writtenBytes.get(0), equalTo((byte) 0));
88+
assertThat(channel.writtenBytes.get(1), equalTo((byte) 1));
89+
assertThat(channel.writtenBytes.get(2), equalTo((byte) 2));
90+
assertThat(channel.writtenBytes.get(3), equalTo((byte) 3));
91+
}
92+
93+
@Test
94+
public void shouldFailIfConnectionFailsWhileWriting() throws IOException
95+
{
96+
// Given
97+
ByteBuffer buffer = ByteBuffer.allocate( 4 );
98+
ByteChannel channel = mock( ByteChannel.class );
99+
when(channel.write( buffer )).thenReturn( -1 );
100+
101+
//Expect
102+
exception.expect( ClientException.class );
103+
104+
// When
105+
SocketUtils.blockingWrite(channel, buffer );
106+
}
107+
108+
private static class ByteAtATimeChannel implements ByteChannel
109+
{
110+
111+
private final byte[] bytes;
112+
private int index = 0;
113+
private List<Byte> writtenBytes = new ArrayList<>( );
114+
115+
private ByteAtATimeChannel( byte[] bytes )
116+
{
117+
this.bytes = bytes;
118+
}
119+
120+
@Override
121+
public int read( ByteBuffer dst ) throws IOException
122+
{
123+
if (index >= bytes.length)
124+
{
125+
return -1;
126+
}
127+
128+
dst.put( bytes[index++]);
129+
return 1;
130+
}
131+
132+
@Override
133+
public int write( ByteBuffer src ) throws IOException
134+
{
135+
writtenBytes.add( src.get() );
136+
return 1;
137+
}
138+
139+
@Override
140+
public boolean isOpen()
141+
{
142+
return true;
143+
}
144+
145+
@Override
146+
public void close() throws IOException
147+
{
148+
149+
}
150+
}
151+
152+
}

0 commit comments

Comments
 (0)