A Framework for Constructing Peer-to-Peer Overlay Networks in Java

Introduction

Peer-to-peer emerges as a better way for building applications on the Internet that require high scalability and availability. Peer-to-peer systems are usually organized into structured overlay networks, which provide key-based routing capabilities to eliminate flooding in unstructured ones. Many overlay network protocols have been proposed to organize peers into various topologies with emphasis on different networking properties. However, applications are often stuck to a specific peer-to-peer overlay network implementation, because different overlay implementations usually provide very different interfaces and messaging mechanisms.

We present a framework for constructing peer-to-peer overlay networks in Java. First, networking is abstracted by the interfaces that use URIs to uniformly address peers on different underlying or overlay networks. Then, asynchronous and synchronous messaging support is built upon these interfaces. Finally, overlay networking interfaces are sketched to handle specific issues in overlay networks. We have constructed several overlay networks in this framework, and built peer-to-peer applications which are independent of overlay implementations.

Examples

There is simple implementation of a DHT application in this framework. To run the example, the classpath should be configured to include overlay_1.1.0.jar and the following packages in the lib directory: To launch the first node by specifying the IP address of the machine and TCP port to bind, e.g. tcp://10.0.0.9:3000
$ java pdl.transport.overlay.util.DHT tcp://10.0.0.9:3000


Note that a groovy shell will be open for issuing put/get command on the dht object.

INFO  [main] TCPTransport#<init>:114> address = tcp://10.0.0.9:3000, bind = tcp://10.0.0.9:3000, maxThread = 8, serializer = pdl.transport.util.JavaSerializer
INFO  [main] TCPTransport#open:171> opening server socket /10.0.0.9:3000
INFO  [main] FissioneTransport#<init>:96> may seed on []
INFO  [main] FissioneTransport#<init>:101> link address = tcp://10.0.0.9:3000/fissione
INFO  [main] FissioneTransport#<init>:104> generated id = fissione://nucleus
INFO  [main] FissioneTransport#open:215> []->()->[]
tcp://10.0.0.9:3000/fissione
Let\'s get Groovy!
================
Version: 1.0 JVM: 14.0-b16
Type 'exit' to terminate the shell
Type 'help' for command help
Type 'go' to execute the statements

groovy>
groovy> dht.put 1, "hello, world"
groovy> go

===> null

groovy>


Then, to launch more nodes (possibly on other machines) into the overlay network, specify an extra parameter pointing to the address of a seed node (e.g. the first node, as printed above, tcp://10.0.0.9:3000/fissione)
$ java pdl.transport.overlay.util.DHT tcp://10.0.0.9:3001 tcp://10.0.0.9:3000/fissione


Of course, a groovy shell will also be opend for issuing put/get command.

INFO  [main] TCPTransport#<init>:114> address = tcp://10.0.0.9:3001, bind = tcp://10.0.0.9:3001, maxThread = 8, serializer = pdl.transport.util.JavaSerializer
INFO  [main] TCPTransport#open:171> opening server socket /10.0.0.9:3001
INFO  [main] FissioneTransport#<init>:96> may seed on [tcp://10.0.0.9:3000/fissione]
INFO  [main] FissioneTransport#<init>:101> link address = tcp://10.0.0.9:3001/fissione
INFO  [main] FissioneTransport#<init>:104> generated id = fissione://nucleus
INFO  [main] FissioneTransport#join:224> seed on tcp://10.0.0.9:3000/fissione timeout = 20000
INFO  [main] FissioneTransport#join:230> get ack JoinAck [0, 2]->1->[0, 2] with[]
INFO  [main] FissioneTransport#open:215> [0, 2]->(1)->[0, 2]
tcp://10.0.0.9:3001/fissione
Let\'s get Groovy!
================
Version: 1.0 JVM: 14.0-b16
Type 'exit' to terminate the shell
Type 'help' for command help
Type 'go' to execute the statements

groovy> dht.get 1
groovy> go

===> hello, world

groovy> 


Features

This framework facilitates the constructing of peer-to-peer overlay networks in Java. First, Uniform Resource Identifier (URI) is used to uniformly address peers and resources on various underlying networks and overlay networks, as shown in the figure below. This framework then consists of a transport layer, a messaging module, and an overlay transport layer.

Transport Layer

The transport layer abstracts end-to-end communications in a network by Transport and TransportListener interfaces, which can be used to send and receive data in the network. The Transport interface defines common operations for various network protocols: opening or closing the network, querying the address of a peer, sending data to a target address, managing call-back listeners, etc.

The TransportFactory utility class is used to create concrete transport instances specified by the given URI. For example, in the following code snippet, TCPTransport and MapTransport are created by binding to the specified addresses.


Transport transport;

transport = TransportFactory.createTransport(URI.create("tcp://10.0.0.9:2000"));
// or
transport = TransportFactory.createTransport(URI.create("map://node1"));


Then, the open() method must be called to allocate JVM re-sources (e.g. buffers and sockets), while the close() method will release the allocated resources. The getAddress() method re-turns the address of the instance in the network.

The send() method is used to asynchronously send data to target addresses. The data can be any object that is serializable . For example, the following code snippet sends “Hello�to a node in a TCP network.

transport.send(URI.create("tcp://10.0.0.9:2000"), "Hello");


To receive data, implementations of the TransportListener interface should be registered to the Transport instance. For example, the following code snippet registers a TransportLis-tener implementation (as an anonymous class).

transport.addTransportListener(new TransportListener() {

    
public void receive(URI source, Object payload) {
        
// receive a object from the source node
    }

    
public void reject(URI target, Object payload) {
        
// a send to the target node is failed
    }
});


To activate edge computers behind NAT/firewalls, a RelayTransport is provided by the framework to relay tcp protocols for them. First a Relayer must be started outside of the NAT/firewalls, e.g.

java pdl.transport.tcp.Relayer 7000 tcp://1.2.3.4:3000

Where 7000 is a TCP port used for waiting relay clients, and the second parameter is used as the initial allocated addresses for each relay clients.

Then, a relay client can create a RelayTransport using the TransportFactory, and other nodes can communicate with this node regardless its NAT/firewalls situation.

Transport transport = TransportFactory.createTransport(URI.create("relay://1.2.3.4:7000"));



Messenger Module

The Messenger class provides methods for conducting asynchronous and synchronous remote calls based on the transport layer. Objects with annotated methods are registered to the Messenger class and become CallHandlers. Remote calls are extracted, scheduled, and will be concurrently processed by a pool of threads, and they immediately return Future objects that serve as the placeholder of the potential return values.

A Messenger object is associated with each Transport instance. The following code snippet acquires the associated object for later use.

Messenger messenger = transport.getMessenger();


The addHandler() method is used to register objects with paths. A path uniquely identifies an object registered in a messenger. For instance, two new objects are registered to �test�and �calc� respectively, in the following code snippet.

messenger.addHandler("/test", new TestHandler());

public class TestHandler {

    
@MessengerCall
    
public void sayHello(String name) {
        
System.out.println("Hello, " + name);
    
}
}


Asynchronous remote calls can be invoked by the call() methods with the address of the remote object, together with the name of the method and required parameters. The following code snippet calls the above sayHello() method with a parameter “World!�asynchronously,

messenger.call(URI.create("tcp://10.0.0.9:2000/test"), "sayHello", "World!");


The callFuture() methods are similar to the call() methods, except that they immediately return Future objects as the placeholders of any potential return values. Then the Future objects�get() method can be invoked to wait for the remote call to return, i.e. making the call synchronous. For example,

Future<Void> future = messenger.callFuture(URI.create("tcp://10.0.0.9:2000/test"), "sayHello", "World!");
// ...
future.get();


Note that the above remote calls are typeless, i.e. the Java compiler will not check the syntax of the remote method. To ensure the type safety of remote calls, synchronous and asynchronous proxies are provided by the Messenger module as well, e.g.

messenger.sync(TestHandler.class, URI.create("tcp://10.0.0.9:2000/test")).sayHello("World!");
// or 
messenger.async(TestHandler.class, URI.create("tcp://10.0.0.9:2000/test")).sayHello("World!");


Overlay Transport Layer

The overlay transport layer abstracts overlay networks by OverlayTransport and OverlayListener interfaces, it extends the transport layer with additional support for mapping object into the ID space, key-based routing process, application data replication and migration. Figure 4 depicts main classes of the overlay trans-port layer.

The OverlayTransport interface extends the Transport interface, so it can be used in the same way. Moreover, the OverlayTransport interface defines additional overlay specific operations, e.g. getting the address of a key object and managing overlay related callback listeners.

To support the replication and migration of application data, OverlayListener and OverlayFilter interface is designed to handle overlay specific callback notifications. The following code snippet registers an OverlayListener implementation (as an anonymous class)

OverlayTransport overlay = ...;
overlay.addOverlayListener(new OverlayListener() {
    
public Object forward(URI target, Object payload, URI next){
        
// process the payload on the routing nodes
        return payload;
    
}
    
    
public List<?> populate(OverlayFilter filter, boolean remove) {
        
// use the filter to get a list of object to be 
        // replicated (remove == false) or
        // migrated (remove == true)
        return ...;
    
}
});


The forward() method is used to notify nodes along the path of a key-based routing process, right before sending data to the next node. It gives applications the opportunity to participate in the routing process to accomplish some application specific tasks, e.g. adjusting the content of the data.
The populate() method is used to collect application data that need to be replicated or migrated when nodes arrive or depart. Since the data are in the application and only the overlay network knows what kind of data should be replicated or migrated, they must work together to accomplish the task. So when invoking the populate() method, the overlay network will provide an Over-layFilter instance that has the knowledge of what kind of data should be collected. It also tells the application whether the collected data need to be removed, i.e. either replication or migration.

An OverlayFilter interface defines accept() methods to tell whether a key is in some desired range in the ID space. For in-stance, the following code is the Chord implementation of the OverlayFilter that tells whether a key is in a certain interval (start, end] on the ring.


public class ChordFilter implements OverlayFilter {
    
private final ChordID start, end;

    
public ChordFilter(ChordID start, ChordID end) {
        
this.start = start;
        
this.end = end;
    
}

    
public boolean accept(Object key) {
        
ChordID id = start.hash(key);
        
return id.between(start, end) || id.equals(end);
    
}

    
public boolean accept(URI address) {
        
ChordID id = ChordID.parse(address);
        
return id.between(start, end) || id.equals(end);
    
}
}


The framework provides an AbstractOverlayTrans-port class that implements some common tasks, such as listener management and callback notification. Several overlay networking protocols are implemented, e.g. ChordTransport and FissioneTransport, which can run on various underlying net-works provided by the framework.

Download

http://sourceforge.net/projects/overlay/files/

Papers

[1] Rui Shen, Ji Wang, Shengdong Zhang, Siqi Shen, Pei Fan: A Framework for Constructing Peer-to-Peer Overlay Networks in Java. In Proceedings of the 7th International Conference on the Principles and Practice of Programming in Java (PPPJ'09), ACM Press, pp. 40-48, Calgary, Canada, Aug 27-28, 2009.
[2] Yiming Zhang, Ling Liu, Dongsheng Li, Xicheng Lu. Distributed Line Graphs: A Universal Framework for Building DHTs Based on Arbitary Constant-Degree Graphs. In Proceedings of the 28th IEEE International Conference on Distributed Computing Systems(ICDCS'08), IEEE CS Press, pp. 152-159, Beijing, China, Jun 17-20, 2008.
[3] Dongsheng Li, Xicheng Lu, Jie Wu. FISSIONE: A Scalable Constant Degree and Low Congestion DHT Scheme Based on Kautz Graphs. In Proceedings of the 24th Annual Joint Conference of the IEEE Computer and Communications Societies (INFOCOM'05), pp. 1677-1688, Miami, Florida, Mar 13-17, 2005.

Visitors