diff --git a/graphenej/src/main/java/de/bitsharesmunich/graphenej/api/SubscriptionMessagesHub.java b/graphenej/src/main/java/de/bitsharesmunich/graphenej/api/SubscriptionMessagesHub.java index 1250002..7ee88e1 100644 --- a/graphenej/src/main/java/de/bitsharesmunich/graphenej/api/SubscriptionMessagesHub.java +++ b/graphenej/src/main/java/de/bitsharesmunich/graphenej/api/SubscriptionMessagesHub.java @@ -142,12 +142,7 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs WitnessResponse witnessResponse = gson.fromJson(message, ApiIdResponse); databaseApiId = witnessResponse.result; - ArrayList subscriptionParams = new ArrayList<>(); - subscriptionParams.add(String.format("%d", SUBCRIPTION_NOTIFICATION)); - subscriptionParams.add(clearFilter); - ApiCall getDatabaseId = new ApiCall(databaseApiId, RPC.CALL_SET_SUBSCRIBE_CALLBACK, subscriptionParams, RPC.VERSION, SUBCRIPTION_REQUEST); - websocket.sendText(getDatabaseId.toJsonString()); - currentId++; + subscribe(); } else if(currentId == SUBCRIPTION_REQUEST){ List subscriptionListeners = mSubscriptionDeserializer.getSubscriptionListeners(); @@ -192,26 +187,65 @@ public class SubscriptionMessagesHub extends BaseGrapheneHandler implements Subs System.out.println(">> "+frame.getPayloadText()); } + /** + * Private method that sends a subscription request to the full node + */ + private void subscribe(){ + ArrayList subscriptionParams = new ArrayList<>(); + subscriptionParams.add(String.format("%d", SUBCRIPTION_NOTIFICATION)); + subscriptionParams.add(clearFilter); + ApiCall getDatabaseId = new ApiCall(databaseApiId, RPC.CALL_SET_SUBSCRIBE_CALLBACK, subscriptionParams, RPC.VERSION, SUBCRIPTION_REQUEST); + mWebsocket.sendText(getDatabaseId.toJsonString()); + currentId = SUBCRIPTION_REQUEST; + } + + /** + * Public method used to re-establish a subscription after it was cancelled by a previous + * call to the {@see #cancelSubscriptions()} method call. + */ + public void resubscribe(){ + if(mWebsocket.isOpen()){ + subscribe(); + }else{ + throw new IllegalStateException("Websocket is not open, can't resubscribe"); + } + } + + /** + * Method that send a subscription cancellation request to the full node, and also + * deregisters all subscription and request listeners. + */ + public void cancelSubscriptions(){ + ApiCall unsubscribe = new ApiCall(databaseApiId, RPC.CALL_CANCEL_ALL_SUBSCRIPTIONS, new ArrayList(), RPC.VERSION, SUBCRIPTION_REQUEST); + mWebsocket.sendText(unsubscribe.toJsonString()); + + // Clearing all subscription listeners + mSubscriptionDeserializer.clearAllSubscriptionListeners(); + + // Clearing all request handler listners + mHandlerMap.clear(); + } + + /** + * Method used to reset all internal variables. + */ public void reset(){ currentId = 0; databaseApiId = -1; subscriptionCounter = 0; } - public void cancelSubscriptions(){ - ApiCall unsubscribe = new ApiCall(databaseApiId, RPC.CALL_CANCEL_ALL_SUBSCRIPTIONS, new ArrayList(), RPC.VERSION, SUBCRIPTION_REQUEST); - mWebsocket.sendText(unsubscribe.toJsonString()); - } - public void addRequestHandler(BaseGrapheneHandler handler) throws RepeatedRequestIdException { if(mHandlerMap.get(handler.getRequestId()) != null){ throw new RepeatedRequestIdException("Already registered handler with id: "+handler.getRequestId()); } - System.out.println("Registering handler with id: "+handler.getRequestId()); mHandlerMap.put(handler.getRequestId(), handler); try { + // Artificially calling the 'onConnected' method of the handler. + // The underlying websocket was already connected, but from the WebSocketAdapter + // point of view it doesn't make a difference. handler.onConnected(mWebsocket, null); } catch (Exception e) { System.out.println("Exception. Msg: "+e.getMessage()); diff --git a/graphenej/src/main/java/de/bitsharesmunich/graphenej/models/SubscriptionResponse.java b/graphenej/src/main/java/de/bitsharesmunich/graphenej/models/SubscriptionResponse.java index 70032f9..3cb5f78 100644 --- a/graphenej/src/main/java/de/bitsharesmunich/graphenej/models/SubscriptionResponse.java +++ b/graphenej/src/main/java/de/bitsharesmunich/graphenej/models/SubscriptionResponse.java @@ -56,7 +56,7 @@ public class SubscriptionResponse { public List params; /** - * Deserializer class that is used to parse and deserialize subscription responses in a partial way, + * Inner static class used to parse and deserialize subscription responses in a partial way, * depending on the amount of SubscriptionListeners we might have registered. * * The rationale behind these architecture is to avoid wasting computational resources parsing unneeded @@ -83,6 +83,11 @@ public class SubscriptionResponse { listenerTypeCount = new HashMap<>(); } + /** + * Adds a subscription listener to the list. + * @param subscriptionListener: Class implementing the {@see SubscriptionListener} interface + * to be added to the list. + */ public void addSubscriptionListener(SubscriptionListener subscriptionListener){ int currentCount = 0; if(listenerTypeCount.containsKey(subscriptionListener.getInterestObjectType())){ @@ -92,10 +97,19 @@ public class SubscriptionResponse { this.mListeners.add(subscriptionListener); } + /** + * Retrieves the full list of SubscriptionListeners registered. + * @return + */ public List getSubscriptionListeners(){ return this.mListeners; } + /** + * Removes a subscription listener to the list. + * @param subscriptionListener: Class implementing the {@see SubscriptionListener} interface + * to be removed from the list. + */ public void removeSubscriptionListener(SubscriptionListener subscriptionListener){ int currentCount = listenerTypeCount.get(subscriptionListener.getInterestObjectType()); if(currentCount != 0){ @@ -106,6 +120,14 @@ public class SubscriptionResponse { this.mListeners.remove(subscriptionListener); } + /** + * Removes all registered subscription listeners + */ + public void clearAllSubscriptionListeners(){ + this.mListeners.clear(); + this.listenerTypeCount.clear(); + } + @Override public SubscriptionResponse deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { SubscriptionResponse response = new SubscriptionResponse(); diff --git a/graphenej/src/test/java/de/bitsharesmunich/graphenej/api/SubscriptionMessagesHubTest.java b/graphenej/src/test/java/de/bitsharesmunich/graphenej/api/SubscriptionMessagesHubTest.java index ff543c3..2bbcd4b 100644 --- a/graphenej/src/test/java/de/bitsharesmunich/graphenej/api/SubscriptionMessagesHubTest.java +++ b/graphenej/src/test/java/de/bitsharesmunich/graphenej/api/SubscriptionMessagesHubTest.java @@ -56,6 +56,14 @@ public class SubscriptionMessagesHubTest extends BaseApiTest { } }; + TimerTask resubscribeTask = new TimerTask() { + @Override + public void run() { + System.out.println("Resubscribing.."); + mMessagesHub.resubscribe(); + } + }; + /** * Task that will just finish the test. */ @@ -77,7 +85,8 @@ public class SubscriptionMessagesHubTest extends BaseApiTest { Timer timer = new Timer(); timer.schedule(unsubscribeTask, 5000); - timer.schedule(shutdownTask, 15000); + timer.schedule(resubscribeTask, 10000); + timer.schedule(shutdownTask, 20000); // Holding this thread while we get update notifications synchronized (this){