Exposing the 'resubscribe' method, which allows to re-establish a subscription after it was cancelled

master
Nelson R. Perez 2017-07-03 13:23:32 -05:00
parent d08afaa7c0
commit 105d4e6d10
3 changed files with 79 additions and 14 deletions

View File

@ -142,12 +142,7 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
WitnessResponse<Integer> witnessResponse = gson.fromJson(message, ApiIdResponse);
databaseApiId = witnessResponse.result;
ArrayList<Serializable> subscriptionParams = new ArrayList<>();
subscriptionParams.add(String.format("%d", SUBCRIPTION_NOTIFICATION));
subscriptionParams.add(clearFilter);
ApiCall getDatabaseId = new ApiCall(databaseApiId, RPC.CALL_SET_SUBSCRIBE_CALLBACK, subscriptionParams, RPC.VERSION, SUBCRIPTION_REQUEST);
websocket.sendText(getDatabaseId.toJsonString());
currentId++;
subscribe();
} else if(currentId == SUBCRIPTION_REQUEST){
List<SubscriptionListener> subscriptionListeners = mSubscriptionDeserializer.getSubscriptionListeners();
@ -192,26 +187,65 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs
System.out.println(">> "+frame.getPayloadText());
}
/**
* Private method that sends a subscription request to the full node
*/
private void subscribe(){
ArrayList<Serializable> subscriptionParams = new ArrayList<>();
subscriptionParams.add(String.format("%d", SUBCRIPTION_NOTIFICATION));
subscriptionParams.add(clearFilter);
ApiCall getDatabaseId = new ApiCall(databaseApiId, RPC.CALL_SET_SUBSCRIBE_CALLBACK, subscriptionParams, RPC.VERSION, SUBCRIPTION_REQUEST);
mWebsocket.sendText(getDatabaseId.toJsonString());
currentId = SUBCRIPTION_REQUEST;
}
/**
* Public method used to re-establish a subscription after it was cancelled by a previous
* call to the {@see #cancelSubscriptions()} method call.
*/
public void resubscribe(){
if(mWebsocket.isOpen()){
subscribe();
}else{
throw new IllegalStateException("Websocket is not open, can't resubscribe");
}
}
/**
* Method that send a subscription cancellation request to the full node, and also
* deregisters all subscription and request listeners.
*/
public void cancelSubscriptions(){
ApiCall unsubscribe = new ApiCall(databaseApiId, RPC.CALL_CANCEL_ALL_SUBSCRIPTIONS, new ArrayList<Serializable>(), RPC.VERSION, SUBCRIPTION_REQUEST);
mWebsocket.sendText(unsubscribe.toJsonString());
// Clearing all subscription listeners
mSubscriptionDeserializer.clearAllSubscriptionListeners();
// Clearing all request handler listners
mHandlerMap.clear();
}
/**
* Method used to reset all internal variables.
*/
public void reset(){
currentId = 0;
databaseApiId = -1;
subscriptionCounter = 0;
}
public void cancelSubscriptions(){
ApiCall unsubscribe = new ApiCall(databaseApiId, RPC.CALL_CANCEL_ALL_SUBSCRIPTIONS, new ArrayList<Serializable>(), RPC.VERSION, SUBCRIPTION_REQUEST);
mWebsocket.sendText(unsubscribe.toJsonString());
}
public void addRequestHandler(BaseGrapheneHandler handler) throws RepeatedRequestIdException {
if(mHandlerMap.get(handler.getRequestId()) != null){
throw new RepeatedRequestIdException("Already registered handler with id: "+handler.getRequestId());
}
System.out.println("Registering handler with id: "+handler.getRequestId());
mHandlerMap.put(handler.getRequestId(), handler);
try {
// Artificially calling the 'onConnected' method of the handler.
// The underlying websocket was already connected, but from the WebSocketAdapter
// point of view it doesn't make a difference.
handler.onConnected(mWebsocket, null);
} catch (Exception e) {
System.out.println("Exception. Msg: "+e.getMessage());

View File

@ -56,7 +56,7 @@ public class SubscriptionResponse {
public List<Serializable> params;
/**
* Deserializer class that is used to parse and deserialize subscription responses in a partial way,
* Inner static class used to parse and deserialize subscription responses in a partial way,
* depending on the amount of SubscriptionListeners we might have registered.
*
* The rationale behind these architecture is to avoid wasting computational resources parsing unneeded
@ -83,6 +83,11 @@ public class SubscriptionResponse {
listenerTypeCount = new HashMap<>();
}
/**
* Adds a subscription listener to the list.
* @param subscriptionListener: Class implementing the {@see SubscriptionListener} interface
* to be added to the list.
*/
public void addSubscriptionListener(SubscriptionListener subscriptionListener){
int currentCount = 0;
if(listenerTypeCount.containsKey(subscriptionListener.getInterestObjectType())){
@ -92,10 +97,19 @@ public class SubscriptionResponse {
this.mListeners.add(subscriptionListener);
}
/**
* Retrieves the full list of SubscriptionListeners registered.
* @return
*/
public List<SubscriptionListener> getSubscriptionListeners(){
return this.mListeners;
}
/**
* Removes a subscription listener to the list.
* @param subscriptionListener: Class implementing the {@see SubscriptionListener} interface
* to be removed from the list.
*/
public void removeSubscriptionListener(SubscriptionListener subscriptionListener){
int currentCount = listenerTypeCount.get(subscriptionListener.getInterestObjectType());
if(currentCount != 0){
@ -106,6 +120,14 @@ public class SubscriptionResponse {
this.mListeners.remove(subscriptionListener);
}
/**
* Removes all registered subscription listeners
*/
public void clearAllSubscriptionListeners(){
this.mListeners.clear();
this.listenerTypeCount.clear();
}
@Override
public SubscriptionResponse deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
SubscriptionResponse response = new SubscriptionResponse();

View File

@ -56,6 +56,14 @@ public class SubscriptionMessagesHubTest extends BaseApiTest {
}
};
TimerTask resubscribeTask = new TimerTask() {
@Override
public void run() {
System.out.println("Resubscribing..");
mMessagesHub.resubscribe();
}
};
/**
* Task that will just finish the test.
*/
@ -77,7 +85,8 @@ public class SubscriptionMessagesHubTest extends BaseApiTest {
Timer timer = new Timer();
timer.schedule(unsubscribeTask, 5000);
timer.schedule(shutdownTask, 15000);
timer.schedule(resubscribeTask, 10000);
timer.schedule(shutdownTask, 20000);
// Holding this thread while we get update notifications
synchronized (this){