Amazon (AWS) SQS with Spring Cloud

Amazon Simple Queue Service (SQS) is a HA cloud messaging service, that can be used to decouple parts of a single system, or integrate multiple disparate systems. A SQS queue acts as buffer between a message producer and a consumer, when former’s message generation rate is more than latter’s processing throughput. It also provides durable point-to-point messaging, such that messages are stored in a fail-safe queue, and can be processed once a consumer application comes up after scheduled maintenance or unexpected downtime. For publish-subscribe design pattern, Amazon (AWS) provides another cloud service called Simple Notification Service (SNS).

Integration_with_Amazon_SQS

Above diagram shows a subset of possible producer-consumer scenarios that can utilize SQS.

For developers who use JMS to connect to messaging providers/brokers like ActiveMQ, Websphere MQ, JBoss Messaging etc., they can use Amazon SQS Java Messaging Library to work with SQS. One can use it along with AWS SDK to write JMS 1.1 compliant applications. But for real-world applications, writing directly with AWS provided library can be cumbersome. That’s where Spring Cloud AWS comes to the rescue. In this post, I’ll show how one can use Spring Cloud with Spring Boot to write a SQS message producer and a consumer. Before that, a few important points to note when working with SQS:

  • Maximum payload of a SQS message can be 256 KB. For applications that have need to send more data in a single packet, could use SQS Extended Client Library which utilizes AWS S3 as the message store (with a reference being sent in actual SQS message). Other option is to compress the message before sending over SQS.
  • Messages can be sent synchronously, but received either synchronously or asynchronously (Using AmazonSQSAsyncClient). If required, one can send messages in a separate child thread so as not to block the main thread for SQS IO operation.
  • Messages can be sent or received in batch to increase throughput (10 messages at max in a batch). Please note that maximum payload size limit for a batch message is also 256 KB, so it can only be used for lighter payloads.
  • There can be multiple producer and consumer threads interacting with the same queue, just like any other messaging provider/broker. Multiple consumer threads provide load balancing for message processing.
  • Consumers can poll for messages using long polling and standard polling. It’s recommended to use long polling to avoid too many empty responses (one can configure timeout).
  • Each message producer or consumer needs AWS credentials to work with SQS. AWS provides different implementations of AWSCredentialsProvider for the same. DefaultAWSCredentialsProviderChain is used by default, if no provider is configured specifically.

Now, how to use Spring Cloud with SQS. First of all, below dependencies should be provided in  Maven project’s pom.xml or Gradle’s build file. AWS SDK and other Spring dependencies would be automatically added to the classpath (unless already specified explicitly).

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-aws-autoconfigure</artifactId>
	<version>latest.version</version>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-aws-messaging</artifactId>
	<version>latest.version</version>
</dependency>

From a configuration perspective, we first need to wire the appropriate AWS SQS client. Assuming  asynchronous interaction with SQS, we will configure a AmazonSQSAsyncClient. There’re a couple of things to note. One, if you’re behind a proxy, you’ll need to specify proxy host and port using ClientConfiguration. Second, DefaultAWSCredentialsProviderChain will look for AWS access key, secret access key and default region in this order – environment variables, system variables, properties file and instance profile (only in EC2). We’ve just overridden the default region below (which is also us-east-1 though).

@Lazy
@Bean(name = "amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQSClient() {
    AmazonSQSAsyncClient awsSQSAsyncClient;
    if (useProxy.equalsIgnoreCase("Y")) {
	ClientConfiguration clientConfig = new ClientConfiguration();
	clientConfig.setProxyHost("proxy.xyz.com");
	clientConfig.setProxyPort(8085);

	awsSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), clientConfig);
    } else {
	awsSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
    }
		
    awsSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName("us-east-1")));
    return awsSQSAsyncClient;
}

Next we look at configuring a QueueMessagingTemplate, which will be used to send messages to a SQS queue.

@Configuration
public class ProducerAWSSQSConfig {
   ...

   @Bean
   public QueueMessagingTemplate queueMessagingTemplate() {
	return new QueueMessagingTemplate(amazonSQSClient());
   }
   
   @Lazy
   @Bean(name = "amazonSQS", destroyMethod = "shutdown")
   public AmazonSQSAsync amazonSQSClient() {
   ...
}

@Component
public class MessageProducer {

   private QueueMessagingTemplate sqsMsgTemplate;
   ...

   public void sendMessage(String payload) {
      ...
      this.sqsMsgTemplate.convertAndSend("randomQName", payload);
      ...
   }
}

We’re sending a String message, which will be converted using a StringMessageConverter internally. One can also send a complete data object as JSON, if Jackson library is available on classpath (using MappingJackson2MessageConverter).

Finally, we look at configuring a SimpleMessageListenerContainer and QueueMessageHandler, which will be used to create a message listener for SQS queue. Please note that internally Spring Cloud creates a SynchonousQueue based thread pool to listen to different queues and to process received messages using corresponding handlers. Number of core threads for thread pool are 2 * number of queues (or configured listeners), and property “maxNumMsgs” determines the maximum number of threads. Property “waitTimeout” determines the timeout period for long polling (default with Spring Cloud).

public class ConsumerAWSSQSConfig {
   ...

   @Bean
   public SimpleMessageListenerContainer simpleMessageListenerContainer() {
	SimpleMessageListenerContainer msgListenerContainer = simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();
	msgListenerContainer.setMessageHandler(queueMessageHandler());
	
        return msgListenerContainer;
   }

   @Bean
   public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
	SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
	msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());
	msgListenerContainerFactory.setDeleteMessageOnException(false);
	msgListenerContainerFactory.setMaxNumberOfMessages(maxNumMsgs);
	msgListenerContainerFactory.setWaitTimeOut(waitTimeout);
	
        return msgListenerContainerFactory;
   }

   @Bean
   public QueueMessageHandler queueMessageHandler() {
	QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory();
	queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());
	
        QueueMessageHandler queueMessageHandler = queueMsgHandlerFactory.createQueueMessageHandler();

	return queueMessageHandler;
   }
   
   @Lazy
   @Bean(name = "amazonSQS", destroyMethod = "shutdown")
   public AmazonSQSAsync amazonSQSClient() {
   ...
}
@Component
public class MessageConsumer {
   ...

   @MessageMapping("randomQName")
   public void onRandomQMessage(String payload) {
	doSomething(payload);
   }
}

One can also send custom message attributes (headers) with a SQS message, so as to provide contextual information to the consumer. In addition, there’re important concepts of visibility timeout and delay queues, which should be explored before getting your hands dirty with SQS.

Hope you have fun with Spring Cloud and AWS SQS.

Data analysis with Spark Streaming

Data analysis with Spark Streaming

Apache Spark has more than taken off by now. It’s being used to solve myriad types of data analysis problems, whether it’s a recommendation system for a modern retail store, or a business intelligence platform for a conservative financial asset manager, and many. Apart from its features and performance to reckon with, a big part of Spark’s success was its support for Python, which led to its widespread adoption. And now they’ve added a R package SparkR, which has been a attention grabber for all those R enamored data scientists out there. No doubt it has taken a fair share of limelight from Hadoop Mapreduce.

Apart from the core Spark engine, there’re four add-on modules that make this framework a powerhouse to reckon with:

  • Spark Streaming – to process real-time data streams
  • Spark SQL and Dataframes – to support relational data analysis, and data structures common to R and Python Pandas.
  • Spark MLib – to create machine learning applications
  • Spark GraphX – to solve graph functional problems, like in hierarchies, networks etc.

In this post, we’ll look at a simple data problem, which needs some computation over a continuous stream of US consumer complaints data (Source: Data.gov). We’ll be using Spark Streaming  (Java API) with Apache Flume (a data collector and aggregator) to exemplify real-time data analysis. Flume will be used as a generator of real-time data events (for sake of simplicity, we’re using a finite dataset). Whereas a standalone Spark Streaming program will consume events as they are generated, maintain a rolling count of consumer complaints by type and US state, and store results to a file at a frequency.

Spark_Streaming_with_FlumeIt’s a hypothetical problem, hence you’ll see events being generated from a finite dataset (CSV file of consumer complaint records – many columns removed to keep it simple). Our sample data looks like this:

"Debt collection","Credit card","MA"
"Bank account or service","Checking account","TN"
"Consumer loan","Vehicle loan","FL"
"Bank account or service","Savings account","PA"
"Debt collection","","GA"
"Debt collection","Credit card","FL"
"Debt collection","Medical","DE"
"Credit reporting","","TX"
"Mortgage","Other mortgage","CA"

First we see a basic Flume event generator (with Avro source). It needs a running Flume agent (directions mentioned below). Full source for event generator is available at Flume Client.

// Initialize connection to Flume agent, and the event source file
public void init(String hostname, int port) {
	// Setup the RPC connection
	this.hostname = hostname;
	this.port = port;
	this.client = RpcClientFactory.getDefaultInstance(hostname, port);

	try {
		InputStream inStream = this.getClass().getClassLoader()
				.getResourceAsStream("cons_comp_data.csv");
		Reader in = new InputStreamReader(inStream);
		csvRecords = CSVFormat.DEFAULT.
                             withSkipHeaderRecord(true).parse(in);
	} 
        .......
        .......
}

// Send records from source file as events to flume agent
public void sendDataToFlume() {
	CSVRecord csvRecord = null;
	Iterator csvRecordItr = csvRecords.iterator();
	while (csvRecordItr.hasNext()) {
		try {
			csvRecord = csvRecordItr.next();
			String eventStr = csvRecord.toString();
			System.out.println(eventStr);
			Event event = EventBuilder.withBody(eventStr,
					Charset.forName("UTF-8"));
			client.append(event);
		} 
                .......
                .......
	}
}

Now we take a look at the Spark Streaming program, which processes each event to maintain a rolling count by complaint type and state. At the start, we create a JavaStreamingContext, which is the entry point to a Spark Streaming program, and create a input stream of events using Spark-Flume integration.

SparkConf sparkConf = new SparkConf().setAppName("ConsCompEventStream");

// Create the context and set the checkpoint directory
JavaStreamingContext context = new JavaStreamingContext(sparkConf,
				new Duration(2000));
context.checkpoint(checkpointDir);

// Create the input stream from flume sink
JavaReceiverInputDStream flumeStream = FlumeUtils.createStream(context, h  ost, port);

We are setting a checkpoint folder in JavaStreamingContext, which is used to create Fault tolerant Spark Streaming applications. It allows the application to recover from failures. Please read more at Spark Checkpointing.

Rest of the program processes each Flume event in logical steps:

  • Use map() function to convert incoming stream of events to a JavaDStream. This allows us to convert each Flume event into a processable format.
  • Use mapToPair() function to convert each processable event to a JavaPairDStream of complaint type and US state.
  • Use updateStateByKey() function to maintain a rolling count by complaint type and US state.
  • Use foreachRDD() function to store the rolling count to a file periodically.
// Transform each flume avro event to a processable format
JavaDStream transformedEvents = flumeStream
		.map(new Function<SparkFlumeEvent, String>() {
                  .........
                };

// Map key-value pairs by product and state to count of 1
JavaPairDStream<Tuple2<String, String>, Integer> countOnes = transformedE  vents
		.mapToPair(new PairFunction<String, Tuple2<String, String>,   Integer>() {
                  .........
                };

// Maintain a updated list of counts per product and state
JavaPairDStream<Tuple2<String, String>, List> updatedCounts = countOnes
		.updateStateByKey(new Function2<List, Optional<List>, Optio  nal<List>>() {
                  .........
                };

// Store the updated list of counts in a text file per product and state
updatedCounts.foreachRDD(new Function<JavaPairRDD<Tuple2<String, String>,   List>, Void>() {
                  .........
                };

Full source is available at Spark Flume Stream. It also includes complete information on how to configure and run the Flume agent, as well as how to run this program using a local Spark cluster. The implementation is in Java 1.7, hence the use of anonymous inner classes. It can look a lot concise and prettier if converted to use lambdas with Java 8. I won’t mind a merge request for the same ;-).