Skip to content

Add some networking functionality to Maru #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open

Add some networking functionality to Maru #89

wants to merge 15 commits into from

Conversation

pinges
Copy link

@pinges pinges commented Apr 30, 2025

This PR adds functionality based on libp2p using some internal classes from Teku.

  • static peers can be added
  • gossiping
  • sending of requests to individual peers and receiving of responses

@pinges pinges requested a review from Copilot April 30, 2025 08:41
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds peer-to-peer networking functionality to Maru using libp2p by implementing gossip, static peer management, and RPC communication.

  • Introduces several new classes for handling topics, building a P2P network, and processing RPC requests and responses.
  • Updates configuration and test files to support the new networking features.
  • Adds basic tests to verify static peer connection, disconnection, gossiping, and request/response behaviors.

Reviewed Changes

Copilot reviewed 15 out of 21 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
p2p/src/main/kotlin/maru/p2p/TestTopicHandler.kt Introduces a topic handler for gossip messages with basic validation.
p2p/src/main/kotlin/maru/p2p/P2PNetworkFactory.kt Sets up the P2P network interface parsing (IPv4/IPv6) for network creation.
p2p/src/main/kotlin/maru/p2p/P2PNetworkBuilder.kt Implements the construction of the full libp2p network including gossip and RPC configurations.
p2p/src/main/kotlin/maru/p2p/P2PManager.kt Manages starting, stopping, and maintaining persistent static peer connections.
p2p/src/main/kotlin/maru/p2p/MaruRpcResponseHandler.kt Handles RPC responses, completing a future on response.
p2p/src/main/kotlin/maru/p2p/MaruRpcMethod.kt Configures creation of incoming/outgoing RPC handlers.
p2p/src/main/kotlin/maru/p2p/MaruPreparedGossipMessage.kt Wraps original messages to be used in gossip transmission.
p2p/src/main/kotlin/maru/p2p/MaruPeerHandler.kt Skeleton for handling peer connect/disconnect events.
p2p/src/main/kotlin/maru/p2p/MaruOutgoingRpcRequestHandler.kt Processes outgoing RPC requests and responses.
p2p/src/main/kotlin/maru/p2p/MaruIncomingRpcRequestHandler.kt Processes incoming RPC requests with simple echo logic.
config/src/main/kotlin/maru.config/Config.kt Updates P2P configuration, including networks and static peers settings.
app/src/test/kotlin/maru/testutils/MaruFactory.kt Adjusts test configurations to match the new networking port.
app/src/test/kotlin/maru/app/p2p/P2PTest.kt Adds tests to validate static peer addition/removal, gossiping, and request handling.
app/src/main/kotlin/maru/app/MaruApp.kt Integrates P2PManager into the application startup to enable networking.
Files not reviewed (6)
  • app/build.gradle: Language not supported
  • gradle/versions.gradle: Language not supported
  • key1: Language not supported
  • key2: Language not supported
  • p2p/build.gradle: Language not supported
  • settings.gradle: Language not supported

p0: NodeId?,
p1: RpcStream?,
) {
println("active $this")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can a logger be used for these print statements instead

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem to be removed

@@ -73,7 +96,7 @@ data class MaruConfig(
val persistence: Persistence,
val sotNode: ApiEndpointConfig,
val qbftOptions: QbftOptions,
val p2pConfig: P2P?,
val p2pConfig: P2P,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we agreed that it should be nullable after all. So we don't have to bother with P2P when we don't need it in some tests

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising this again since this seems to be non-nullable still in the latest version of the code

@@ -66,7 +66,7 @@ class HopliteFriendlinessTest {
endpoint = URI.create("http://localhost:8545").toURL(),
),
qbftOptions = QbftOptions(100.milliseconds),
p2pConfig = P2P(port = 3322u),
p2pConfig = P2P(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the port should be 3322, not 9000. Something is wrong with this test. Also, we need to test that the rest of the P2P configs are configurable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can actually see the test is failing because of that


private fun buildP2PNetwork(): P2PNetwork<Peer> {
val filePrivateKeySource = GeneratingFilePrivateKeySource(privateKeyFile.toString())
// TODO: reading/generating this key should be done early on, as it is needed for the validator as well
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO generation should be done here, as a part of P2P network, but if it's set it can be read outside of this class

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But moving the generation out of here also works

)
}

fun addStaticPeer(peerAddress: MultiaddrPeerAddress) {
Copy link
Collaborator

@Filter94 Filter94 May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this method? I don't see how would it be useful

I guess we could expose an RPC method to add static peers, but we won't do it in the near future

Copy link
Author

@pinges pinges May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was what I was thinking, but we could just make it private for now?

Copy link
Collaborator

@Filter94 Filter94 May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linter will probably complain about the dead code. Since it's already written, I guess we can leave it as is, but there's a change we'll never need it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I see it's used in the tests so it's fine to leave it

return SafeFuture.completedFuture(null)
}
return p2pNetwork
.connect(peerAddress)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to keep the registry of open connections and check if any of them are broken instead of unconditionally connecting every time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually checked in the implementation in libp2p

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, I don't really see why we do this reconnect logic here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain why we have this logic at all if the connection is maintained inside libp2p?

)
}

private fun getMessageFactory(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain what this message factory is supposed to do?

Copy link
Author

@pinges pinges May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prepare messages for topics that you are not subscribed to.
When you are sending a message on a topic that you are subscribed to the prepareMessage() of the handler is called. If you are not subscribed a default class can be configured here.
Not sure if we need that ...

Copy link
Collaborator

@Filter94 Filter94 May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be useful for some sniffing, but I don't really see how it's useful on a regular Maru node. It's safer if we don't react on the topics we're not subscribed to in any way


class P2PManager(
private val privateKeyBytes: ByteArray,
private val p2pConfig: P2P?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If p2pConfig is null, why would we create a P2PManager?

log.info(config.toString())
}

val p2pManager =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can create a P2PManager if config.p2pConfig is null

privateKeyBytes =
privateKeyBytes
.slice(
privateKeyBytes.size - 32..privateKeyBytes.size - 1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain, why do we take the last 32 bytes of the private key here? It should also be reflected in the name, because we have privateKeyBytes above that we read from a file and we're passing modified privateKeyBytes here.

appConfig
.domainFriendly()
.persistence.dataPath
.resolve("private-key")
Copy link
Collaborator

@Filter94 Filter94 May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's a convention over configuration approach, which is arguable. We can use ./$dataPath/private-key as the default private key location in case private key's location isn't specified, but I feel like the software would be much more usable if we'd also allow an arbitrary private key path

)

data class ApiEndpointConfig(
val endpoint: URL,
val jwtSecretPath: String? = null,
)
) {
override fun equals(other: Any?): Boolean {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overriden equals and hashCode shouldn't be necessary without ByteArray amongst the fields. Default implementation should be exactly the same

@@ -73,7 +96,7 @@ data class MaruConfig(
val persistence: Persistence,
val sotNode: ApiEndpointConfig,
val qbftOptions: QbftOptions,
val p2pConfig: P2P?,
val p2pConfig: P2P,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising this again since this seems to be non-nullable still in the latest version of the code


private const val LINEA = "linea"

class MaruRpcMethod : RpcMethod<MaruOutgoingRpcRequestHandler, Bytes, MaruRpcResponseHandler> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class isn't ready to be in main sourceset yet

privateKey: PrivKey,
ipv4Address: Multiaddr,
): P2PNetwork<Peer> {
val rpcMethod = MaruRpcMethod()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to be moved away from here and RpcHandler probably has to be injected here

private const val PEER_ADDRESS_NODE_2: String = "/ip4/$IPV4/tcp/$PORT2/p2p/$PEER_ID_NODE_2"
private const val PEER_ADDRESS_NODE_3: String = "/ip4/$IPV4/tcp/$PORT3/p2p/$PEER_ID_NODE_3"

class P2PTest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As is, this test was useful to learn how to use LibP2PNetwork, but right now the useful part that it's testing is the creation of LibP2PNetwork by P2PNetworkFactory. I think we should change the test to test Maru's wrapper around LibP2PNetwork. The class that implements P2PNetwork

}
assertThat(p2pNetwork1.peerCount).isEqualTo(1)
assertThat(p2pNetwork2.peerCount).isEqualTo(2)
assertThat(p2pNetwork3.peerCount).isEqualTo(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the peer counts, it follows that p2pNetwork1 and 3 are probably not connected directly, but we could use a direct check

val testTopicHandler3 = TestTopicHandler()
p2pNetwork3.subscribe("topic", testTopicHandler3)

Awaitility
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider factoring this assertion out and use it here with a concise and descriptive name

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants