Adding support for selective subscription with the SubscriptionMessagesHub and related

This commit is contained in:
Nelson R. Perez 2017-05-09 11:21:53 -05:00
parent df5271ef05
commit 5ff6f29ebd
6 changed files with 272 additions and 18 deletions

View file

@ -1,7 +1,7 @@
package de.bitsharesmunich.graphenej;
/**
* Created by nelson on 1/12/17.
* Enum type used to list all possible object types and obtain their space + type id
*/
public enum ObjectType {
@ -33,5 +33,150 @@ public enum ObjectType {
CHAIN_PROPERTY_OBJECT,
WITNESS_SCHEDULE_OBJECT,
BUDGET_RECORD_OBJECT,
SPECIAL_AUTHORITY_OBJECT
SPECIAL_AUTHORITY_OBJECT;
private int getSpace(){
int space = 1;
switch(this){
case BASE_OBJECT:
case ACCOUNT_OBJECT:
case ASSET_OBJECT:
case FORCE_SETTLEMENT_OBJECT:
case COMMITTEE_MEMBER_OBJECT:
case WITNESS_OBJECT:
case LIMIT_ORDER_OBJECT:
case CALL_ORDER_OBJECT:
case CUSTOM_OBJECT:
case PROPOSAL_OBJECT:
case OPERATION_HISTORY_OBJECT:
case WITHDRAW_PERMISSION_OBJECT:
case VESTING_BALANCE_OBJECT:
case WORKER_OBJECT:
case BALANCE_OBJECT:
space = 1;
break;
case GLOBAL_PROPERTY_OBJECT:
case DYNAMIC_GLOBAL_PROPERTY_OBJECT:
case ASSET_DYNAMIC_DATA:
case ASSET_BITASSET_DATA:
case ACCOUNT_BALANCE_OBJECT:
case ACCOUNT_STATISTICS_OBJECT:
case TRANSACTION_OBJECT:
case BLOCK_SUMMARY_OBJECT:
case ACCOUNT_TRANSACTION_HISTORY_OBJECT:
case BLINDED_BALANCE_OBJECT:
case CHAIN_PROPERTY_OBJECT:
case WITNESS_SCHEDULE_OBJECT:
case BUDGET_RECORD_OBJECT:
case SPECIAL_AUTHORITY_OBJECT:
space = 2;
break;
}
return space;
}
private int getType(){
int type = 0;
switch(this){
case BASE_OBJECT:
type = 1;
break;
case ACCOUNT_OBJECT:
type = 2;
break;
case ASSET_OBJECT:
type = 3;
break;
case FORCE_SETTLEMENT_OBJECT:
type = 4;
break;
case COMMITTEE_MEMBER_OBJECT:
type = 5;
break;
case WITNESS_OBJECT:
type = 6;
break;
case LIMIT_ORDER_OBJECT:
type = 7;
break;
case CALL_ORDER_OBJECT:
type = 8;
break;
case CUSTOM_OBJECT:
type = 9;
break;
case PROPOSAL_OBJECT:
type = 10;
break;
case OPERATION_HISTORY_OBJECT:
type = 11;
break;
case WITHDRAW_PERMISSION_OBJECT:
type = 12;
break;
case VESTING_BALANCE_OBJECT:
type = 13;
break;
case WORKER_OBJECT:
type = 14;
break;
case BALANCE_OBJECT:
type = 15;
break;
case GLOBAL_PROPERTY_OBJECT:
type = 0;
break;
case DYNAMIC_GLOBAL_PROPERTY_OBJECT:
type = 1;
break;
case ASSET_DYNAMIC_DATA:
type = 3;
break;
case ASSET_BITASSET_DATA:
type = 4;
break;
case ACCOUNT_BALANCE_OBJECT:
type = 5;
break;
case ACCOUNT_STATISTICS_OBJECT:
type = 6;
break;
case TRANSACTION_OBJECT:
type = 7;
break;
case BLOCK_SUMMARY_OBJECT:
type = 8;
break;
case ACCOUNT_TRANSACTION_HISTORY_OBJECT:
type = 9;
break;
case BLINDED_BALANCE_OBJECT:
type = 10;
break;
case CHAIN_PROPERTY_OBJECT:
type = 11;
break;
case WITNESS_SCHEDULE_OBJECT:
type = 12;
break;
case BUDGET_RECORD_OBJECT:
type = 13;
break;
case SPECIAL_AUTHORITY_OBJECT:
type = 14;
}
return type;
}
/**
* This method is used to return the generic object type in the form space.type.0.
*
* Not to be confused with {@link GrapheneObject#getObjectId()}, which will return
* the full object id in the form space.type.id.
*
* @return: The generic object type
*/
public String getGenericObjectId(){
return String.format("%d.%d.0", getSpace(), getType());
}
}

View file

@ -13,6 +13,7 @@ import java.util.List;
import java.util.Map;
import de.bitsharesmunich.graphenej.AssetAmount;
import de.bitsharesmunich.graphenej.ObjectType;
import de.bitsharesmunich.graphenej.RPC;
import de.bitsharesmunich.graphenej.Transaction;
import de.bitsharesmunich.graphenej.UserAccount;
@ -44,13 +45,35 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
private Gson gson;
private String user;
private String password;
private int currentId = LOGIN_ID;
private boolean clearFilter;
private List<ObjectType> objectTypes;
private int currentId;
private int databaseApiId = -1;
private int subscriptionCounter = 0;
public SubscriptionMessagesHub(String user, String password, WitnessResponseListener errorListener){
/**
* Id used to separate requests regarding the subscriptions
*/
private final int SUBSCRIPTION_ID = 10;
/**
* Constructor used to create a subscription message hub that will call the set_subscribe_callback
* API with the clear_filter parameter set to false, meaning that it will only receive automatic updates
* from objects we register.
*
* A list of ObjectTypes must be provided, otherwise we won't get any update.
*
* @param user: User name, in case the node to which we're going to connect to requires authentication
* @param password: Password, same as above
* @param objectTypes: List of objects of interest
* @param errorListener: Callback that will be fired in case there is an error.
*/
public SubscriptionMessagesHub(String user, String password, List<ObjectType> objectTypes, WitnessResponseListener errorListener){
super(errorListener);
this.objectTypes = objectTypes;
this.user = user;
this.password = password;
this.clearFilter = true;
this.mSubscriptionDeserializer = new SubscriptionResponse.SubscriptionResponseDeserializer();
GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(SubscriptionResponse.class, mSubscriptionDeserializer);
@ -63,6 +86,19 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
this.gson = builder.create();
}
/**
* Constructor used to create a subscription message hub that will call the set_subscribe_callback
* API with the clear_filter parameter set to true, meaning that it will receive automatic updates
* on all network events.
*
* @param user: User name, in case the node to which we're going to connect to requires authentication
* @param password: Password, same as above
* @param errorListener: Callback that will be fired in case there is an error.
*/
public SubscriptionMessagesHub(String user, String password, WitnessResponseListener errorListener){
this(user, password, new ArrayList<ObjectType>(), errorListener);
}
@Override
public void addSubscriptionListener(SubscriptionListener listener){
this.mSubscriptionDeserializer.addSubscriptionListener(listener);
@ -81,6 +117,7 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
@Override
public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
ArrayList<Serializable> loginParams = new ArrayList<>();
currentId = LOGIN_ID;
loginParams.add(user);
loginParams.add(password);
ApiCall loginCall = new ApiCall(1, RPC.CALL_LOGIN, loginParams, RPC.VERSION, currentId);
@ -95,6 +132,7 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
ArrayList<Serializable> emptyParams = new ArrayList<>();
ApiCall getDatabaseId = new ApiCall(1, RPC.CALL_DATABASE, emptyParams, RPC.VERSION, currentId);
websocket.sendText(getDatabaseId.toJsonString());
currentId++;
}else if(currentId == GET_DATABASE_ID){
Type ApiIdResponse = new TypeToken<WitnessResponse<Integer>>() {}.getType();
WitnessResponse<Integer> witnessResponse = gson.fromJson(message, ApiIdResponse);
@ -102,19 +140,33 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
ArrayList<Serializable> subscriptionParams = new ArrayList<>();
subscriptionParams.add(String.format("%d", SUBCRIPTION_NOTIFICATION));
subscriptionParams.add(false);
ApiCall getDatabaseId = new ApiCall(databaseApiId, RPC.CALL_SET_SUBSCRIBE_CALLBACK, subscriptionParams, RPC.VERSION, currentId);
subscriptionParams.add(clearFilter);
ApiCall getDatabaseId = new ApiCall(databaseApiId, RPC.CALL_SET_SUBSCRIBE_CALLBACK, subscriptionParams, RPC.VERSION, SUBCRIPTION_REQUEST);
websocket.sendText(getDatabaseId.toJsonString());
currentId++;
} else if(currentId == SUBCRIPTION_REQUEST){
// There's nothing to handle here.
if(objectTypes != null && objectTypes.size() > 0 && subscriptionCounter < objectTypes.size()){
ArrayList<Serializable> objectOfInterest = new ArrayList<>();
objectOfInterest.add(objectTypes.get(subscriptionCounter).getGenericObjectId());
ArrayList<Serializable> payload = new ArrayList<>();
payload.add(objectOfInterest);
ApiCall subscribe = new ApiCall(databaseApiId, RPC.GET_OBJECTS, payload, RPC.VERSION, SUBSCRIPTION_ID);
websocket.sendText(subscribe.toJsonString());
subscriptionCounter++;
}else{
gson.fromJson(message, SubscriptionResponse.class);
}
currentId++;
}
}
@Override
public void onFrameSent(WebSocket websocket, WebSocketFrame frame) throws Exception {
System.out.println(">> "+frame.getPayloadText());
}
public void reset(){
currentId = 0;
databaseApiId = -1;
subscriptionCounter = 0;
}
}

View file

@ -109,6 +109,9 @@ public class SubscriptionResponse {
public SubscriptionResponse deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
SubscriptionResponse response = new SubscriptionResponse();
JsonObject responseObject = json.getAsJsonObject();
if(!responseObject.has(KEY_METHOD)){
return response;
}
response.method = responseObject.get(KEY_METHOD).getAsString();
JsonArray paramsArray = responseObject.get(KEY_PARAMS).getAsJsonArray();
@ -117,6 +120,10 @@ public class SubscriptionResponse {
ArrayList<Serializable> secondArgument = new ArrayList<>();
response.params.add(secondArgument);
// Hash map used to record the type of objects present in this subscription message
// and only alert listeners that might be interested
HashMap<ObjectType, Boolean> objectMap = new HashMap<>();
JsonArray subArray = paramsArray.get(1).getAsJsonArray().get(0).getAsJsonArray();
for(JsonElement object : subArray){
if(object.isJsonObject()){
@ -138,14 +145,17 @@ public class SubscriptionResponse {
balanceObject.owner = jsonObject.get(AccountBalanceUpdate.KEY_OWNER).getAsString();
balanceObject.asset_type = jsonObject.get(AccountBalanceUpdate.KEY_ASSET_TYPE).getAsString();
balanceObject.balance = jsonObject.get(AccountBalanceUpdate.KEY_BALANCE).getAsLong();
objectMap.put(ObjectType.ACCOUNT_BALANCE_OBJECT, true);
secondArgument.add(balanceObject);
}else if(grapheneObject.getObjectType() == ObjectType.DYNAMIC_GLOBAL_PROPERTY_OBJECT){
DynamicGlobalProperties dynamicGlobalProperties = context.deserialize(object, DynamicGlobalProperties.class);
objectMap.put(ObjectType.DYNAMIC_GLOBAL_PROPERTY_OBJECT, true);
secondArgument.add(dynamicGlobalProperties);
}else if(grapheneObject.getObjectType() == ObjectType.TRANSACTION_OBJECT){
BroadcastedTransaction broadcastedTransaction = new BroadcastedTransaction(grapheneObject.getObjectId());
broadcastedTransaction.setTransaction((Transaction) context.deserialize(jsonObject.get(BroadcastedTransaction.KEY_TRX), Transaction.class));
broadcastedTransaction.setTransactionId(jsonObject.get(BroadcastedTransaction.KEY_TRX_ID).getAsString());
objectMap.put(ObjectType.TRANSACTION_OBJECT, true);
secondArgument.add(broadcastedTransaction);
}else{
//TODO: Add support for other types of objects
@ -156,8 +166,12 @@ public class SubscriptionResponse {
}
}
for(SubscriptionListener listener : mListeners){
// Only notify the listener if there is an object of interest in
// this notification
if(objectMap.containsKey(listener.getInterestObjectType())){
listener.onSubscriptionUpdate(response);
}
}
return response;
}
}

View file

@ -0,0 +1,26 @@
package de.bitsharesmunich.graphenej;
import junit.framework.Assert;
import org.junit.Test;
/**
* Created by nelson on 5/5/17.
*/
public class ObjectTypeTest {
@Test
public void getGenericObjectId() throws Exception {
ObjectType baseObject = ObjectType.BASE_OBJECT;
ObjectType accountObject = ObjectType.ACCOUNT_OBJECT;
ObjectType forceSettlementObject = ObjectType.FORCE_SETTLEMENT_OBJECT;
ObjectType globalPropertiesObject = ObjectType.GLOBAL_PROPERTY_OBJECT;
ObjectType specialAuthorityObject = ObjectType.SPECIAL_AUTHORITY_OBJECT;
Assert.assertEquals("1.1.0", baseObject.getGenericObjectId());
Assert.assertEquals("1.2.0", accountObject.getGenericObjectId());
Assert.assertEquals("1.4.0", forceSettlementObject.getGenericObjectId());
Assert.assertEquals("2.0.0", globalPropertiesObject.getGenericObjectId());
Assert.assertEquals("2.14.0", specialAuthorityObject.getGenericObjectId());
}
}

View file

@ -14,7 +14,7 @@ import de.bitsharesmunich.graphenej.test.NaiveSSLContext;
*/
public class BaseApiTest {
protected String BLOCK_PAY_DE = System.getenv("BLOCKPAY_DE");
protected String BLOCK_PAY_DE = System.getenv("OPENLEDGER_EU");
protected SSLContext context;
protected WebSocket mWebSocket;

View file

@ -5,6 +5,7 @@ import com.neovisionaries.ws.client.WebSocketException;
import org.junit.Test;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import de.bitsharesmunich.graphenej.ObjectType;
@ -37,10 +38,13 @@ public class SubscriptionMessagesHubTest extends BaseApiTest {
@Test
public void testGlobalPropertiesDeserializer(){
ArrayList<ObjectType> interestingObjects = new ArrayList();
interestingObjects.add(ObjectType.TRANSACTION_OBJECT);
interestingObjects.add(ObjectType.DYNAMIC_GLOBAL_PROPERTY_OBJECT);
try{
mMessagesHub = new SubscriptionMessagesHub("", "", mErrorListener);
mMessagesHub = new SubscriptionMessagesHub("", "", interestingObjects, mErrorListener);
mMessagesHub.addSubscriptionListener(new SubscriptionListener() {
private int MAX_MESSAGES = 5;
private int MAX_MESSAGES = 10;
private int messageCounter = 0;
@Override
@ -50,14 +54,15 @@ public class SubscriptionMessagesHubTest extends BaseApiTest {
@Override
public void onSubscriptionUpdate(SubscriptionResponse response) {
System.out.println("On block");
if(response.params.size() == 2){
try{
List<Object> payload = (List) response.params.get(1);
if(payload.size() > 0 && payload.get(0) instanceof DynamicGlobalProperties){
DynamicGlobalProperties globalProperties = (DynamicGlobalProperties) payload.get(0);
System.out.println("time.....................: "+globalProperties.time);
System.out.println("next_maintenance_time....: "+globalProperties.next_maintenance_time);
System.out.println("recent_slots_filled......: "+globalProperties.recent_slots_filled);
// System.out.println("time.....................: "+globalProperties.time);
// System.out.println("next_maintenance_time....: "+globalProperties.next_maintenance_time);
// System.out.println("recent_slots_filled......: "+globalProperties.recent_slots_filled);
}
}catch(Exception e){
System.out.println("Exception");
@ -74,6 +79,18 @@ public class SubscriptionMessagesHubTest extends BaseApiTest {
}
}
});
mMessagesHub.addSubscriptionListener(new SubscriptionListener() {
@Override
public ObjectType getInterestObjectType() {
return ObjectType.TRANSACTION_OBJECT;
}
@Override
public void onSubscriptionUpdate(SubscriptionResponse response) {
System.out.println("onTx");
}
});
mWebSocket.addListener(mMessagesHub);
mWebSocket.connect();