Skip to content
This repository was archived by the owner on Sep 9, 2022. It is now read-only.

Relay Discovery and unspecific address dialing #4

Merged
merged 7 commits into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package relay
import (
"context"
"fmt"
"math/rand"

pb "github.com/libp2p/go-libp2p-circuit/pb"

peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
tpt "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -42,19 +46,57 @@ func (d *RelayDialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn
}
}

rinfo, err := pstore.InfoFromP2pAddr(relayaddr)
dinfo, err := pstore.InfoFromP2pAddr(destaddr)
if err != nil {
return nil, err
}

dinfo, err := pstore.InfoFromP2pAddr(destaddr)
if len(relayaddr.Bytes()) == 0 {
// unspecific relay address, try dialing using known hop relays
return d.tryDialRelays(ctx, *dinfo)
}

rinfo, err := pstore.InfoFromP2pAddr(relayaddr)
if err != nil {
return nil, err
}

return d.Relay().DialPeer(ctx, *rinfo, *dinfo)
}

func (d *RelayDialer) tryDialRelays(ctx context.Context, dinfo pstore.PeerInfo) (tpt.Conn, error) {
var relays []peer.ID
d.mx.Lock()
for p := range d.relays {
relays = append(relays, p)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, the fast and idiomatic way to do this is:

relays := append([]peer.ID{}, d.relays...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, relays isn't a slice.

d.mx.Unlock()

// shuffle list of relays, avoid overloading a specific relay
for i := range relays {
j := rand.Intn(i + 1)
relays[i], relays[j] = relays[j], relays[i]
}

for _, relay := range relays {
if len(d.host.Network().ConnsToPeer(relay)) == 0 {
continue
}

rctx, cancel := context.WithTimeout(ctx, HopConnectTimeout)
c, err := d.Relay().DialPeer(rctx, pstore.PeerInfo{ID: relay}, dinfo)
cancel()

if err == nil {
return c, nil
}

log.Debugf("Error opening relay connection to %s: %s", dinfo.ID, err.Error())
}

return nil, RelayError{pb.CircuitRelay_HOP_NO_CONN_TO_DST}
}

func (d *RelayDialer) Matches(a ma.Multiaddr) bool {
_, err := a.ValueForProtocol(P_CIRCUIT)
return err == nil
Expand Down
54 changes: 54 additions & 0 deletions notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package relay

import (
"context"
"time"

inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)

var _ inet.Notifiee = (*RelayNotifiee)(nil)

type RelayNotifiee Relay

func (r *Relay) Notifiee() inet.Notifiee {
return (*RelayNotifiee)(r)
}

func (n *RelayNotifiee) Relay() *Relay {
return (*Relay)(n)
}

func (n *RelayNotifiee) Listen(net inet.Network, a ma.Multiaddr) {}
func (n *RelayNotifiee) ListenClose(net inet.Network, a ma.Multiaddr) {}
func (n *RelayNotifiee) OpenedStream(net inet.Network, s inet.Stream) {}
func (n *RelayNotifiee) ClosedStream(net inet.Network, s inet.Stream) {}

func (n *RelayNotifiee) Connected(s inet.Network, c inet.Conn) {
if n.Relay().Transport().Matches(c.RemoteMultiaddr()) {
return
}

go func(id peer.ID) {
ctx, cancel := context.WithTimeout(n.ctx, time.Second)
defer cancel()

canhop, err := n.Relay().CanHop(ctx, id)

if err != nil {
log.Debugf("Error testing relay hop: %s", err.Error())
return
}

if canhop {
log.Debugf("Discovered hop relay %s", id.Pretty())
n.mx.Lock()
n.relays[id] = struct{}{}
n.mx.Unlock()
}
}(c.RemotePeer())
}

func (n *RelayNotifiee) Disconnected(s inet.Network, c inet.Conn) {}
40 changes: 40 additions & 0 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

pb "github.com/libp2p/go-libp2p-circuit/pb"
Expand Down Expand Up @@ -33,6 +34,9 @@ type Relay struct {
hop bool

incoming chan *Conn

relays map[peer.ID]struct{}
mx sync.Mutex
}

type RelayOpt int
Expand All @@ -56,6 +60,7 @@ func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error
ctx: ctx,
self: h.ID(),
incoming: make(chan *Conn),
relays: make(map[peer.ID]struct{}),
}

for _, opt := range opts {
Expand All @@ -70,6 +75,7 @@ func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error
}

h.SetStreamHandler(ProtoID, r.handleNewStream)
h.Network().Notify(r.Notifiee())

return r, nil
}
Expand Down Expand Up @@ -121,6 +127,40 @@ func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore
return &Conn{Stream: s, remote: dest, transport: r.Transport()}, nil
}

func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) {
s, err := r.host.NewStream(ctx, id, ProtoID)
if err != nil {
return false, err
}

defer s.Close()

rd := newDelimitedReader(s, maxMessageSize)
wr := newDelimitedWriter(s)

var msg pb.CircuitRelay

msg.Type = pb.CircuitRelay_CAN_HOP.Enum()

err = wr.WriteMsg(&msg)
if err != nil {
return false, err
}

msg.Reset()

err = rd.ReadMsg(&msg)
if err != nil {
return false, err
}

if msg.GetType() != pb.CircuitRelay_STATUS {
return false, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType())
}

return msg.GetCode() == pb.CircuitRelay_SUCCESS, nil
}

func (r *Relay) handleNewStream(s inet.Stream) {
log.Infof("new relay stream from: %s", s.Conn().RemotePeer())

Expand Down
98 changes: 98 additions & 0 deletions relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,74 @@ func TestBasicRelayDial(t *testing.T) {
}
}

func TestUnspecificRelayDial(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 3)

r1, err := NewRelay(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}

_, err = NewRelay(ctx, hosts[1], OptHop)
if err != nil {
t.Fatal(err)
}

r3, err := NewRelay(ctx, hosts[2])
if err != nil {
t.Fatal(err)
}

connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])

time.Sleep(100 * time.Millisecond)

msg := []byte("relay works!")
go func() {
list := r3.Listener()

con, err := list.Accept()
if err != nil {
t.Error(err)
return
}

_, err = con.Write(msg)
if err != nil {
t.Error(err)
return
}
con.Close()
}()

addr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p-circuit/ipfs/%s", hosts[2].ID().Pretty()))
if err != nil {
t.Fatal(err)
}

rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()

d := r1.Dialer()
con, err := d.DialContext(rctx, addr)
if err != nil {
t.Fatal(err)
}

data, err := ioutil.ReadAll(con)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
}

func TestRelayThroughNonHop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -330,3 +398,33 @@ func TestActiveRelay(t *testing.T) {
t.Fatal("message was incorrect:", string(data))
}
}

func TestRelayCanHop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 2)

connect(t, hosts[0], hosts[1])

time.Sleep(10 * time.Millisecond)

r1, err := NewRelay(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}

_, err = NewRelay(ctx, hosts[1], OptHop)
if err != nil {
t.Fatal(err)
}

canhop, err := r1.CanHop(ctx, hosts[1].ID())
if err != nil {
t.Fatal(err)
}

if !canhop {
t.Fatal("Relay can't hop")
}
}
9 changes: 8 additions & 1 deletion transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ func init() {
ma.AddProtocol(Protocol)

// Add dialer transport
const unspecific = "/p2p-circuit/ipfs"
const proto = "/ipfs/p2p-circuit/ipfs"

tps := addrutil.SupportedTransportStrings

err := addrutil.AddTransport(proto)
err := addrutil.AddTransport(unspecific)
if err != nil {
panic(err)
}

err = addrutil.AddTransport(proto)
if err != nil {
panic(err)
}
Expand Down
Loading