NodeLatencyVerifier

- Using an HttpUrl instance as a key to the map of FullNode instances, we were using simple String here before, but turned out not to be too reliable

NetworkService
- Introducing the NodeLatencyVerifier instance into the NetworkService, and making its use optional
- In case the user opts to use the node-latency verification, the initial connection is delayed until we obtain a measurement of all nodes
- Exposing a PublishSubject, this allows users of the library to get notified of the node latency measurements

NetworkServiceManager
- A boolean flag is now used to decide whether or not to start a NetworkService instance with the node-latency verification feature ON
develop
Nelson R. Perez 2018-09-20 21:51:14 -05:00
parent 776630dd57
commit ede7265989
5 changed files with 179 additions and 24 deletions

View File

@ -50,11 +50,16 @@ import cy.agorise.graphenej.models.JsonRpcResponse;
import cy.agorise.graphenej.models.OperationHistory;
import cy.agorise.graphenej.network.FullNode;
import cy.agorise.graphenej.network.LatencyNodeProvider;
import cy.agorise.graphenej.network.NodeLatencyVerifier;
import cy.agorise.graphenej.network.NodeProvider;
import cy.agorise.graphenej.operations.CustomOperation;
import cy.agorise.graphenej.operations.LimitOrderCreateOperation;
import cy.agorise.graphenej.operations.TransferOperation;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.PublishSubject;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@ -76,6 +81,8 @@ public class NetworkService extends Service {
public static final String KEY_REQUESTED_APIS = "key_requested_apis";
public static final String KEY_ENABLE_LATENCY_VERIFIER = "key_enable_latency_verifier";
/**
* Shared preference
*/
@ -112,7 +119,17 @@ public class NetworkService extends Service {
// Variable used to keep track of the currently obtained API accesses
private HashMap<Integer, Integer> mApiIds = new HashMap<Integer, Integer>();
NodeProvider nodeProvider = new LatencyNodeProvider();
// Variable used as a source of node information
private NodeProvider nodeProvider = new LatencyNodeProvider();
// Class used to obtain frequent node latency updates
private NodeLatencyVerifier nodeLatencyVerifier;
// PublishSubject used to announce full node latencies updates
private PublishSubject<FullNode> fullNodePublishSubject;
// Counter used to trigger the connection only after we've received enough node latency updates
private long latencyUpdateCounter;
private Gson gson = new GsonBuilder()
.registerTypeAdapter(Transaction.class, new Transaction.TransactionDeserializer())
@ -144,7 +161,7 @@ public class NetworkService extends Service {
public void connect(){
OkHttpClient client = new OkHttpClient();
FullNode fullNode = nodeProvider.getBestNode();
Log.d(TAG,"connect.url: "+fullNode.getUrl());
Log.v(TAG,"connect.url: "+fullNode.getUrl());
Request request = new Request.Builder().url(fullNode.getUrl()).build();
mWebSocket = client.newWebSocket(request, mWebSocketListener);
}
@ -197,18 +214,19 @@ public class NetworkService extends Service {
public void onDestroy() {
if(mWebSocket != null)
mWebSocket.close(NORMAL_CLOSURE_STATUS, null);
nodeLatencyVerifier.stop();
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
Log.d(TAG,"onBind.intent: "+intent);
// Retrieving credentials and requested API data from the shared preferences
mUsername = intent.getStringExtra(NetworkService.KEY_USERNAME);
mPassword = intent.getStringExtra(NetworkService.KEY_PASSWORD);
mRequestedApis = intent.getIntExtra(NetworkService.KEY_REQUESTED_APIS, 0);
mAutoConnect = intent.getBooleanExtra(NetworkService.KEY_AUTO_CONNECT, true);
boolean verifyNodeLatency = intent.getBooleanExtra(NetworkService.KEY_ENABLE_LATENCY_VERIFIER, false);
ArrayList<String> nodeUrls = new ArrayList<>();
// If the user of the library desires, a custom list of node URLs can
@ -225,15 +243,61 @@ public class NetworkService extends Service {
// Adding the library-provided list of nodes second
nodeUrls.addAll(Arrays.asList(Nodes.NODE_URLS));
// Feeding all node information to the NodeProvider instance
for(String nodeUrl : nodeUrls){
nodeProvider.addNode(new FullNode(nodeUrl));
}
if(mAutoConnect) connect();
// We only connect automatically if the auto-connect flag is true AND
// we are not going to care about node latency ordering.
if(mAutoConnect && !verifyNodeLatency) {
connect();
}else{
// In case we care about node latency ordering, we must first obtain
// a first round of measurements in order to be sure to select the
// best node.
if(verifyNodeLatency){
ArrayList<FullNode> fullNodes = new ArrayList<>();
for(String url : nodeUrls){
fullNodes.add(new FullNode(url));
}
nodeLatencyVerifier = new NodeLatencyVerifier(fullNodes);
fullNodePublishSubject = nodeLatencyVerifier.start();
fullNodePublishSubject.observeOn(AndroidSchedulers.mainThread()).subscribe(nodeLatencyObserver);
}
}
return mBinder;
}
/**
* Observer used to be notified about node latency measurement updates.
*/
private Observer<FullNode> nodeLatencyObserver = new Observer<FullNode>() {
@Override
public void onSubscribe(Disposable d) { }
@Override
public void onNext(FullNode fullNode) {
latencyUpdateCounter++;
// Updating the node with the new latency measurement
nodeProvider.updateNode(fullNode);
// Once we have the latency value of all available nodes,
// we can safely proceed to start the connection
if(latencyUpdateCounter == nodeProvider.getSortedNodes().size()){
connect();
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"nodeLatencyObserver.onError.Msg: "+e.getMessage());
}
@Override
public void onComplete() { }
};
/**
* Class used for the client Binder. Because we know this service always
* runs in the same process as its clients, we don't need to deal with IPC.
@ -489,7 +553,7 @@ public class NetworkService extends Service {
Log.e(TAG,"onFailure. Exception: "+t.getClass().getName()+", Msg: "+t.getMessage());
// Logging error stack trace
for(StackTraceElement element : t.getStackTrace()){
Log.e(TAG,String.format("%s#%s:%s", element.getClassName(), element.getMethodName(), element.getLineNumber()));
Log.v(TAG,String.format("%s#%s:%s", element.getClassName(), element.getMethodName(), element.getLineNumber()));
}
// Registering current status
isLoggedIn = false;
@ -538,4 +602,12 @@ public class NetworkService extends Service {
public List<FullNode> getNodes(){
return nodeProvider.getSortedNodes();
}
/**
* Returns an observable that will notify its observers about node latency updates.
* @return Observer of {@link FullNode} instances.
*/
public PublishSubject<FullNode> getNodeLatencyObservable(){
return fullNodePublishSubject;
}
}

View File

@ -51,6 +51,7 @@ public class NetworkServiceManager implements Application.ActivityLifecycleCallb
private int mRequestedApis;
private List<String> mCustomNodeUrls = new ArrayList<>();
private boolean mAutoConnect;
private boolean mVerifyLatency;
/**
* Runnable used to schedule a service disconnection once the app is not visible to the user for
@ -104,7 +105,8 @@ public class NetworkServiceManager implements Application.ActivityLifecycleCallb
.putExtra(NetworkService.KEY_PASSWORD, mPassword)
.putExtra(NetworkService.KEY_REQUESTED_APIS, mRequestedApis)
.putExtra(NetworkService.KEY_CUSTOM_NODE_URLS, customNodes)
.putExtra(NetworkService.KEY_AUTO_CONNECT, mAutoConnect);
.putExtra(NetworkService.KEY_AUTO_CONNECT, mAutoConnect)
.putExtra(NetworkService.KEY_ENABLE_LATENCY_VERIFIER, mVerifyLatency);
context.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
@ -182,6 +184,14 @@ public class NetworkServiceManager implements Application.ActivityLifecycleCallb
this.mAutoConnect = mAutoConnect;
}
public boolean isVerifyLatency() {
return mVerifyLatency;
}
public void setVerifyLatency(boolean mVerifyLatency) {
this.mVerifyLatency = mVerifyLatency;
}
/**
* Class used to create a {@link NetworkServiceManager} with specific attributes.
*/
@ -191,27 +201,53 @@ public class NetworkServiceManager implements Application.ActivityLifecycleCallb
private int requestedApis;
private List<String> customNodeUrls;
private boolean autoconnect = true;
private boolean verifyNodeLatency;
/**
* Sets the user name, if required to connect to a node.
* @param name User name
* @return The Builder instance
*/
public Builder setUserName(String name){
this.username = name;
return this;
}
/**
* Sets the password, if required to connect to a node.
* @param password Password
* @return The Builder instance
*/
public Builder setPassword(String password){
this.password = password;
return this;
}
/**
* Sets an integer with the requested APIs encoded as binary flags.
* @param apis Integer representing the different APIs we require from the node.
* @return The Builder instance
*/
public Builder setRequestedApis(int apis){
this.requestedApis = apis;
return this;
}
/**
* Adds a list of custom node URLs.
* @param nodeUrls List of custom full node URLs.
* @return The Builder instance
*/
public Builder setCustomNodeUrls(List<String> nodeUrls){
this.customNodeUrls = nodeUrls;
return this;
}
/**
* Adds a list of custom node URLs.
* @param nodeUrls List of custom full node URLs.
* @return The Builder instance
*/
public Builder setCustomNodeUrls(String nodeUrls){
String[] urls = nodeUrls.split(",");
for(String url : urls){
@ -221,11 +257,33 @@ public class NetworkServiceManager implements Application.ActivityLifecycleCallb
return this;
}
/**
* Sets the autoconnect flag. This is true by default.
* @param autoConnect True if we want the service to connect automatically, false otherwise.
* @return The Builder instance
*/
public Builder setAutoConnect(boolean autoConnect){
this.autoconnect = autoConnect;
return this;
}
/**
* Sets the node-verification flag. This is false by default.
* @param verifyLatency True if we want the service to perform a latency analysis before connecting.
* @return The Builder instance.
*/
public Builder setNodeLatencyVerification(boolean verifyLatency){
this.verifyNodeLatency = verifyLatency;
return this;
}
/**
* Method used to build a {@link NetworkServiceManager} instance with all of the characteristics
* passed as parameters.
* @param context A Context of the application package implementing
* this class.
* @return Instance of the NetworkServiceManager class.
*/
public NetworkServiceManager build(Context context){
NetworkServiceManager manager = new NetworkServiceManager(context);
if(username != null) manager.setUserName(username);
@ -233,6 +291,7 @@ public class NetworkServiceManager implements Application.ActivityLifecycleCallb
if(customNodeUrls != null) manager.setCustomNodeUrls(customNodeUrls);
manager.setRequestedApis(requestedApis);
manager.setAutoConnect(autoconnect);
manager.setVerifyLatency(verifyNodeLatency);
return manager;
}
}

View File

@ -1,10 +1,12 @@
package cy.agorise.graphenej.network;
import cy.agorise.graphenej.stats.ExponentialMovingAverage;
/**
* Class that represents a full node and is used to keep track of its round-trip time measured in milliseconds.
*/
public class FullNode implements Comparable {
private String mUrl;
private ExponentialMovingAverage latency;

View File

@ -1,7 +1,5 @@
package cy.agorise.graphenej.network;
import android.util.Log;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -9,7 +7,7 @@ import java.util.List;
import java.util.PriorityQueue;
public class LatencyNodeProvider implements NodeProvider {
private final String TAG = this.getClass().getName();
private final String TAG = "LatencyNodeProvider";
private PriorityQueue<FullNode> mFullNodeHeap;
public LatencyNodeProvider(){
@ -55,8 +53,6 @@ public class LatencyNodeProvider implements NodeProvider {
for(FullNode fullNode : nodeList){
if(fullNode != null){
nodeList.add(fullNode);
}else{
Log.d(TAG,"Found a null node in getSortedNodes");
}
}
Collections.sort(nodeList);

View File

@ -2,12 +2,14 @@ package cy.agorise.graphenej.network;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import java.util.HashMap;
import java.util.List;
import cy.agorise.graphenej.api.android.NetworkService;
import io.reactivex.subjects.PublishSubject;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@ -18,6 +20,7 @@ import okhttp3.WebSocketListener;
* Class that encapsulates the node latency verification task
*/
public class NodeLatencyVerifier {
private final String TAG = this.getClass().getName();
public static final int DEFAULT_LATENCY_VERIFICATION_PERIOD = 5 * 1000;
@ -30,7 +33,7 @@ public class NodeLatencyVerifier {
// Subject used to publish the result to interested parties
private PublishSubject<FullNode> subject = PublishSubject.create();
private HashMap<String, FullNode> nodeURLMap = new HashMap<>();
private HashMap<HttpUrl, FullNode> nodeURLMap = new HashMap<>();
// private WebSocket webSocket;
@ -100,10 +103,15 @@ public class NodeLatencyVerifier {
requestMap.put(fullNode.getUrl(), request);
}
client.newWebSocket(request, mWebSocketListener);
if(!nodeURLMap.containsKey(fullNode.getUrl())){
nodeURLMap.put(fullNode.getUrl(), fullNode);
String normalURL = fullNode.getUrl().replace("wss://", "https://");
Log.d(TAG,"normal URL : "+normalURL);
if(!nodeURLMap.containsKey(fullNode.getUrl().replace("wss://", "https://"))){
HttpUrl key = HttpUrl.parse(normalURL);
Log.i(TAG, "Inserting key: "+key.toString());
nodeURLMap.put(key, fullNode);
}
client.newWebSocket(request, mWebSocketListener);
}
mHandler.postDelayed(this, verificationPeriod);
}
@ -125,14 +133,32 @@ public class NodeLatencyVerifier {
handleResponse(webSocket, response);
}
/**
* Method used to handle the node's first response. The idea here is to obtain
* the RTT (Round Trip Time) measurement and publish it using the PublishSubject.
*
* @param webSocket Websocket instance
* @param response Response instance
*/
private void handleResponse(WebSocket webSocket, Response response){
String url = "wss://" + webSocket.request().url().host() + webSocket.request().url().encodedPath();
FullNode fullNode = nodeURLMap.get(url);
long after = System.currentTimeMillis();
long before = timestamps.get(fullNode);
long delay = after - before;
fullNode.addLatencyValue(delay);
subject.onNext(fullNode);
// Obtaining the HttpUrl instance that was previously used as a key
HttpUrl url = webSocket.request().url();
if(nodeURLMap.containsKey(url)){
FullNode fullNode = nodeURLMap.get(url);
long after = System.currentTimeMillis();
long before = timestamps.get(fullNode);
long delay = after - before;
fullNode.addLatencyValue(delay);
subject.onNext(fullNode);
}else{
// We cannot properly handle a response to a request whose
// URL was not registered at the nodeURLMap. This is because without this,
// we cannot know to which node this response corresponds. This should not happen.
Log.e(TAG,"nodeURLMap does not contain url: "+url);
for(HttpUrl key : nodeURLMap.keySet()){
Log.e(TAG,"> "+key);
}
}
webSocket.close(NetworkService.NORMAL_CLOSURE_STATUS, null);
}
};