Adding multiplexing capabilities to the SubscriptionMessagesHub class and adapted the GetAccounts to work with it

This commit is contained in:
Nelson R. Perez 2017-06-27 16:52:02 -05:00
parent 980d072c2b
commit 52f46da573
11 changed files with 327 additions and 29 deletions

View file

@ -17,8 +17,8 @@
# http://www.gradle.org/docs/current/userguide/multi_project_builds.html#sec:decoupled_projects
# org.gradle.parallel=true
VERSION_NAME=0.4.2-SNAPSHOT
VERSION_CODE=4
VERSION_NAME=0.4.3-SNAPSHOT
VERSION_CODE=5
GROUP=com.github.bilthon
POM_DESCRIPTION=A Java library for mobile app Developers; Graphene/Bitshares blockchain.

View file

@ -7,12 +7,26 @@ import de.bitsharesmunich.graphenej.interfaces.WitnessResponseListener;
import de.bitsharesmunich.graphenej.models.BaseResponse;
/**
* Base class that should be extended by any implementation of a specific request to the full node.
*
* Created by nelson on 1/5/17.
*/
public abstract class BaseGrapheneHandler extends WebSocketAdapter {
protected WitnessResponseListener mListener;
/**
* The 'id' field of a message to the node. This is used in order to multiplex different messages
* using the same websocket connection.
*
* For example:
*
* {"id":5,"method":"call","params":[0,"get_accounts",[["1.2.100"]]],"jsonrpc":"2.0"}
*
* The 'requestId' here is 5.
*/
protected long requestId;
public BaseGrapheneHandler(WitnessResponseListener listener){
this.mListener = listener;
}
@ -33,4 +47,12 @@ public abstract class BaseGrapheneHandler extends WebSocketAdapter {
mListener.onError(new BaseResponse.Error(cause.getMessage()));
websocket.disconnect();
}
public void setRequestId(long id){
this.requestId = id;
}
public long getRequestId(){
return this.requestId;
}
}

View file

@ -25,20 +25,22 @@ import de.bitsharesmunich.graphenej.models.WitnessResponse;
* @author henry
*/
public class GetAccounts extends BaseGrapheneHandler {
private String accountId;
private List<UserAccount> userAccounts;
private WitnessResponseListener mListener;
private boolean oneTime;
public GetAccounts(String accountId, WitnessResponseListener listener){
public GetAccounts(String accountId, boolean oneTime, WitnessResponseListener listener){
super(listener);
this.accountId = accountId;
this.oneTime = oneTime;
this.mListener = listener;
}
public GetAccounts(List<UserAccount> accounts, WitnessResponseListener listener){
public GetAccounts(List<UserAccount> accounts, boolean oneTime, WitnessResponseListener listener){
super(listener);
this.userAccounts = accounts;
this.oneTime = oneTime;
this.mListener = listener;
}
@ -54,7 +56,7 @@ public class GetAccounts extends BaseGrapheneHandler {
accountIds.add(accountId);
}
params.add(accountIds);
ApiCall getAccountByAddress = new ApiCall(0, RPC.CALL_GET_ACCOUNTS, params, RPC.VERSION, 1);
ApiCall getAccountByAddress = new ApiCall(0, RPC.CALL_GET_ACCOUNTS, params, RPC.VERSION, (int) requestId);
websocket.sendText(getAccountByAddress.toJsonString());
}
@ -74,8 +76,10 @@ public class GetAccounts extends BaseGrapheneHandler {
} else {
this.mListener.onSuccess(witnessResponse);
}
if(oneTime){
websocket.disconnect();
}
}
@Override
public void onFrameSent(WebSocket websocket, WebSocketFrame frame) throws Exception {

View file

@ -9,14 +9,15 @@ import com.neovisionaries.ws.client.WebSocketFrame;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import de.bitsharesmunich.graphenej.AssetAmount;
import de.bitsharesmunich.graphenej.ObjectType;
import de.bitsharesmunich.graphenej.RPC;
import de.bitsharesmunich.graphenej.Transaction;
import de.bitsharesmunich.graphenej.UserAccount;
import de.bitsharesmunich.graphenej.errors.RepeatedRequestIdException;
import de.bitsharesmunich.graphenej.interfaces.SubscriptionHub;
import de.bitsharesmunich.graphenej.interfaces.SubscriptionListener;
import de.bitsharesmunich.graphenej.interfaces.WitnessResponseListener;
@ -33,23 +34,26 @@ import de.bitsharesmunich.graphenej.operations.TransferOperation;
* Created by nelson on 1/26/17.
*/
public class SubscriptionMessagesHub extends BaseGrapheneHandler implements SubscriptionHub {
private WebSocket mWebsocket;
// Sequence of message ids
private final static int LOGIN_ID = 1;
private final static int GET_DATABASE_ID = 2;
private final static int SUBCRIPTION_REQUEST = 3;
public final static int LOGIN_ID = 1;
public final static int GET_DATABASE_ID = 2;
public final static int SUBCRIPTION_REQUEST = 3;
// ID of subscription notifications
private final static int SUBCRIPTION_NOTIFICATION = 4;
public final static int SUBCRIPTION_NOTIFICATION = 4;
private SubscriptionResponse.SubscriptionResponseDeserializer mSubscriptionDeserializer;
private Gson gson;
private String user;
private String password;
private boolean clearFilter;
private List<ObjectType> objectTypes;
private int currentId;
private int databaseApiId = -1;
private int subscriptionCounter = 0;
private HashMap<Long, BaseGrapheneHandler> mHandlerMap = new HashMap<>();
/**
* Id used to separate requests regarding the subscriptions
@ -65,15 +69,14 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
*
* @param user: User name, in case the node to which we're going to connect to requires authentication
* @param password: Password, same as above
* @param objectTypes: List of objects of interest
* @param clearFilter: Whether to automatically subscribe of not to the notification feed.
* @param errorListener: Callback that will be fired in case there is an error.
*/
public SubscriptionMessagesHub(String user, String password, List<ObjectType> objectTypes, WitnessResponseListener errorListener){
public SubscriptionMessagesHub(String user, String password, boolean clearFilter, WitnessResponseListener errorListener){
super(errorListener);
this.objectTypes = objectTypes;
this.user = user;
this.password = password;
this.clearFilter = true;
this.clearFilter = clearFilter;
this.mSubscriptionDeserializer = new SubscriptionResponse.SubscriptionResponseDeserializer();
GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(SubscriptionResponse.class, mSubscriptionDeserializer);
@ -88,15 +91,20 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
/**
* Constructor used to create a subscription message hub that will call the set_subscribe_callback
<<<<<<< Updated upstream
* API with the clear_filter parameter set to true, meaning that it will receive automatic updates
* on all network events.
=======
* API with the clear_filter parameter set to false, meaning that it will only receive automatic updates
* from objects we register.
>>>>>>> Stashed changes
*
* @param user: User name, in case the node to which we're going to connect to requires authentication
* @param password: Password, same as above
* @param errorListener: Callback that will be fired in case there is an error.
*/
public SubscriptionMessagesHub(String user, String password, WitnessResponseListener errorListener){
this(user, password, new ArrayList<ObjectType>(), errorListener);
this(user, password, false, errorListener);
}
@Override
@ -116,6 +124,7 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
@Override
public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
this.mWebsocket = websocket;
ArrayList<Serializable> loginParams = new ArrayList<>();
currentId = LOGIN_ID;
loginParams.add(user);
@ -145,16 +154,39 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
websocket.sendText(getDatabaseId.toJsonString());
currentId++;
} else if(currentId == SUBCRIPTION_REQUEST){
if(objectTypes != null && objectTypes.size() > 0 && subscriptionCounter < objectTypes.size()){
ArrayList<Serializable> objectOfInterest = new ArrayList<>();
objectOfInterest.add(objectTypes.get(subscriptionCounter).getGenericObjectId());
List<SubscriptionListener> subscriptionListeners = mSubscriptionDeserializer.getSubscriptionListeners();
// If we haven't subscribed to all requested subscription channels yet,
// just send one more subscription
if(subscriptionListeners != null &&
subscriptionListeners.size() > 0 &&
subscriptionCounter < subscriptionListeners.size()){
ArrayList<Serializable> objects = new ArrayList<>();
ArrayList<Serializable> payload = new ArrayList<>();
payload.add(objectOfInterest);
for(SubscriptionListener listener : subscriptionListeners){
objects.add(listener.getInterestObjectType().getGenericObjectId());
}
payload.add(objects);
ApiCall subscribe = new ApiCall(databaseApiId, RPC.GET_OBJECTS, payload, RPC.VERSION, SUBSCRIPTION_ID);
websocket.sendText(subscribe.toJsonString());
subscriptionCounter++;
}else{
gson.fromJson(message, SubscriptionResponse.class);
WitnessResponse witnessResponse = gson.fromJson(message, WitnessResponse.class);
if(witnessResponse.result != null){
// This is the response to a request that was submitted to the message hub
// and whose handler was stored in the "request id" -> "handler" map
BaseGrapheneHandler handler = mHandlerMap.get(witnessResponse.id);
handler.onTextFrame(websocket, frame);
mHandlerMap.remove(witnessResponse.id);
}else{
// If we've already subscribed to all requested subscription channels, we
// just proceed to deserialize content.
// The deserialization is handled by all those TypeAdapters registered in the class
// constructor while building the gson instance.
SubscriptionResponse response = gson.fromJson(message, SubscriptionResponse.class);
}
}
}
}
@ -169,4 +201,20 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
databaseApiId = -1;
subscriptionCounter = 0;
}
public void addRequestHandler(BaseGrapheneHandler handler) throws RepeatedRequestIdException {
if(mHandlerMap.get(handler.getRequestId()) != null){
throw new RepeatedRequestIdException("Already registered handler with id: "+handler.getRequestId());
}
System.out.println("Registering handler with id: "+handler.getRequestId());
mHandlerMap.put(handler.getRequestId(), handler);
try {
handler.onConnected(mWebsocket, null);
} catch (Exception e) {
System.out.println("Exception. Msg: "+e.getMessage());
System.out.println("Exception type: "+e);
}
}
}

View file

@ -0,0 +1,68 @@
package de.bitsharesmunich.graphenej.api.android;
import java.util.ArrayList;
import java.util.List;
import de.bitsharesmunich.graphenej.api.BaseGrapheneHandler;
import de.bitsharesmunich.graphenej.api.SubscriptionMessagesHub;
import de.bitsharesmunich.graphenej.errors.RepeatedRequestIdException;
import de.bitsharesmunich.graphenej.interfaces.WitnessResponseListener;
/**
* Created by nelson on 6/26/17.
*/
public class NodeConnection {
private List<String> mUrlList;
private int mUrlIndex;
private WebsocketWorkerThread mThread;
private SubscriptionMessagesHub mMessagesHub;
private long requestCounter = SubscriptionMessagesHub.SUBCRIPTION_NOTIFICATION + 1;
private static NodeConnection instance;
public static NodeConnection getInstance(){
if(instance == null){
instance = new NodeConnection();
}
return instance;
}
public NodeConnection(){
this.mUrlList = new ArrayList<>();
}
public void addNodeUrl(String url){
this.mUrlList.add(url);
}
public List<String> getNodeUrls(){
return this.mUrlList;
}
public void clearNodeList(){
this.mUrlList.clear();
}
/**
* Method that will try to connect to one of the nodes. If the connection fails
* a subsequent call to this method will try to connect with the next node in the
* list if there is one.
*/
public void connect(String user, String password, boolean subscribe, WitnessResponseListener errorListener) {
if(this.mUrlList.size() > 0){
mThread = new WebsocketWorkerThread(this.mUrlList.get(mUrlIndex));
mUrlIndex = mUrlIndex + 1 % this.mUrlList.size();
mMessagesHub = new SubscriptionMessagesHub(user, password, subscribe, errorListener);
mThread.addListener(mMessagesHub);
mThread.start();
}
}
public void addRequestHandler(BaseGrapheneHandler handler) throws RepeatedRequestIdException {
handler.setRequestId(requestCounter);
requestCounter++;
mMessagesHub.addRequestHandler(handler);
}
}

View file

@ -0,0 +1,60 @@
package de.bitsharesmunich.graphenej.api.android;
import com.neovisionaries.ws.client.WebSocket;
import com.neovisionaries.ws.client.WebSocketException;
import com.neovisionaries.ws.client.WebSocketFactory;
import com.neovisionaries.ws.client.WebSocketListener;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.SSLContext;
import de.bitsharesmunich.graphenej.test.NaiveSSLContext;
/**
* Created by nelson on 11/17/16.
*/
public class WebsocketWorkerThread extends Thread {
private final String TAG = this.getClass().getName();
// When debugging we'll use a NaiveSSLContext
public static final boolean DEBUG = true;
private final int TIMEOUT = 5000;
private WebSocket mWebSocket;
public WebsocketWorkerThread(String url){
try {
WebSocketFactory factory = new WebSocketFactory().setConnectionTimeout(TIMEOUT);
if(DEBUG){
SSLContext context = NaiveSSLContext.getInstance("TLS");
// Set the custom SSL context.
factory.setSSLContext(context);
}
mWebSocket = factory.createSocket(url);
} catch (IOException e) {
System.out.println("IOException. Msg: "+e.getMessage());
} catch(NullPointerException e){
System.out.println("NullPointerException at WebsocketWorkerThreas. Msg: "+e.getMessage());
} catch (NoSuchAlgorithmException e) {
System.out.println("NoSuchAlgorithmException. Msg: "+e.getMessage());
}
}
@Override
public void run() {
try {
mWebSocket.connect();
} catch (WebSocketException e) {
System.out.println("WebSocketException. Msg: "+e.getMessage());
}
}
public void addListener(WebSocketListener listener){
mWebSocket.addListener(listener);
}
}

View file

@ -0,0 +1,12 @@
package de.bitsharesmunich.graphenej.errors;
/**
* Created by nelson on 6/27/17.
*/
public class RepeatedRequestIdException extends Exception {
public RepeatedRequestIdException(String message){
super(message);
}
}

View file

@ -4,7 +4,7 @@ package de.bitsharesmunich.graphenej.models;
* Created by nelson on 11/12/16.
*/
public class BaseResponse {
public int id;
public long id;
public Error error;
public static class Error {

View file

@ -51,6 +51,7 @@ public class SubscriptionResponse {
public static final String KEY_METHOD = "method";
public static final String KEY_PARAMS = "params";
public int id;
public String method;
public List<Serializable> params;
@ -110,6 +111,7 @@ public class SubscriptionResponse {
SubscriptionResponse response = new SubscriptionResponse();
JsonObject responseObject = json.getAsJsonObject();
if(!responseObject.has(KEY_METHOD)){
System.out.println("Missing method field");
return response;
}
response.method = responseObject.get(KEY_METHOD).getAsString();

View file

@ -5,7 +5,6 @@ import com.neovisionaries.ws.client.WebSocketException;
import org.junit.Test;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import de.bitsharesmunich.graphenej.ObjectType;
@ -38,11 +37,8 @@ public class SubscriptionMessagesHubTest extends BaseApiTest {
@Test
public void testGlobalPropertiesDeserializer(){
ArrayList<ObjectType> interestingObjects = new ArrayList();
interestingObjects.add(ObjectType.TRANSACTION_OBJECT);
interestingObjects.add(ObjectType.DYNAMIC_GLOBAL_PROPERTY_OBJECT);
try{
mMessagesHub = new SubscriptionMessagesHub("", "", interestingObjects, mErrorListener);
mMessagesHub = new SubscriptionMessagesHub("", "", true, mErrorListener);
mMessagesHub.addSubscriptionListener(new SubscriptionListener() {
private int MAX_MESSAGES = 10;
private int messageCounter = 0;

View file

@ -0,0 +1,86 @@
package de.bitsharesmunich.graphenej.api.android;
import org.junit.Test;
import java.util.Timer;
import java.util.TimerTask;
import de.bitsharesmunich.graphenej.api.GetAccounts;
import de.bitsharesmunich.graphenej.errors.RepeatedRequestIdException;
import de.bitsharesmunich.graphenej.interfaces.WitnessResponseListener;
import de.bitsharesmunich.graphenej.models.BaseResponse;
import de.bitsharesmunich.graphenej.models.WitnessResponse;
/**
* Created by nelson on 6/26/17.
*/
public class NodeConnectionTest {
private String BLOCK_PAY_DE = System.getenv("OPENLEDGER_EU");
private NodeConnection nodeConnection;
private TimerTask subscribeTask = new TimerTask() {
@Override
public void run() {
System.out.println("Adding request here");
try{
nodeConnection.addRequestHandler(new GetAccounts("1.2.100", false, new WitnessResponseListener(){
@Override
public void onSuccess(WitnessResponse response) {
System.out.println("getAccounts.onSuccess");
}
@Override
public void onError(BaseResponse.Error error) {
System.out.println("getAccounts.onError. Msg: "+ error.message);
}
}));
}catch(RepeatedRequestIdException e){
System.out.println("RepeatedRequestIdException. Msg: "+e.getMessage());
}
}
};
private TimerTask releaseTask = new TimerTask() {
@Override
public void run() {
System.out.println("Releasing lock!");
synchronized (NodeConnectionTest.this){
NodeConnectionTest.this.notifyAll();
}
}
};
@Test
public void testNodeConnection(){
nodeConnection = NodeConnection.getInstance();
nodeConnection.addNodeUrl(BLOCK_PAY_DE);
nodeConnection.connect("", "", true, mErrorListener);
Timer timer = new Timer();
timer.schedule(subscribeTask, 5000);
timer.schedule(releaseTask, 30000);
try{
// Holding this thread while we get update notifications
synchronized (this){
wait();
}
}catch(InterruptedException e){
System.out.println("InterruptedException. Msg: "+e.getMessage());
}
}
private WitnessResponseListener mErrorListener = new WitnessResponseListener() {
@Override
public void onSuccess(WitnessResponse response) {
System.out.println("onSuccess");
}
@Override
public void onError(BaseResponse.Error error) {
System.out.println("onError");
}
};
}