User Tools

Site Tools


sdk_subscriber_example

Subscriber Code Example

The following is a simple example of a Subscriber Handler. We'll use this example as a means to outline how the framework is used to create a subscribing handler.

import com.perspectium.api.Message;
import com.perspectium.logging.PerspectiumLogger;
import com.perspectium.replicator.ASubscriber;
 
/**
 * @author davidloo
 * 
 * The EchoSubscriber demonstrates how to use the Subscriber interface to access
 * Perspectium's Cloud ESB for messages targeted for this agent.
 *  
 * This subscriber is loaded into a Replicator Agent runtime by configuring it
 * in the agents config.xml
 * 
 * @see http://wiki.perspectium.com/doku.php?id=replicator_config
 *
 */
public class EchoSubscriber extends ASubscriber {
    final static PerspectiumLogger Log = PerspectiumLogger.create(EchoSubscriber.class);
 
    private String fMessageText;
 
    /* 
     * This is the main call back API you will need to override in the subscriber
     * side of the agent.
     * 
     * When the agent retrieves a message, it will decode, decrypt and marshall it 
     * into a Message object before calling processMessage and passing it. 
     * 
     */
    @Override
    public boolean processMessage(Message message) {
        fMessageText = message.getValue();
 
        if (fMessageText != null)
            Log.info(String.format("%s - Received echoed message: %s", fTaskName, fMessageText));
        else
            Log.info("Message body was empty");
 
        return true;
    }
 
    /* (non-Javadoc)
     * @see com.perspectium.replicator.AgentTask#initialize(java.lang.String)
     */
    @Override
    public void initialize(String taskName) {
        super.initialize(taskName);
    }
 
    /* (non-Javadoc)
     * @see com.perspectium.replicator.ISubscribe#postProcessMessage()
     */
    public void postProcessMessage() {
        Log.info("in postProcessMessage of EchoSubscriber");
    }
}

intialize

First let's turn our attention to the initialize(String taskName) method. Your initialize method will be called with the taskName string set to the value of your <task_name> directive you defined within the conf/agent.xml configuration file. This is where you'll want to perform any configuration validation to ensure you have everything you need setup prior to your processMessage() method being called.

As you can see in the example, all we care about is making sure that the parent method gets called.

processMessage

The framework takes care of consuming the message from the message bus, performing the decryption and then calling your processMessage method with the Messsage. As you can see, our processMessage method makes sure the message isn't empty and then logs the message text to the console. The processMessage method returns true in both cases implying that the message was processed without an error. When the method returns true the message is further processed for reporting. If the method returns false then this step is skipped.

postProcessMessage

This method is called by the framework in order for your handler to perform any final processing. For example, if you had been buffering messages in order to perform an optimized write to the local file system then that worked would be finished within this method.

That's it! You've successfully created a Subscriber handler which when executed by the Replicator Agent will pull messages from your queue and provide them to your handler via the processMessage method.

sdk_subscriber_example.txt · Last modified: 2016/02/11 11:00 by mark.english