Search Platform using Elasticsearch (on AWS)

Apache Lucene is a high performance, cross-platform search engine library, which has gained immense support in enterprises looking to build firm-wide, customer-facing search platforms. And it does owe good share of its popularity to Apache Solr and Elasticsearch, that are matured search products built on top of Lucene. We chose Elasticsearch 2.3.3 (5.0 is the current version at time of writing), to build our multi-entity search platform for a financial services firm.

All our applications and data platforms are housed in AWS VPCs. We decided to implement our own cluster on EC2 rather than using AWS Elasticsearch, with main reasons being: AWS Elasticsearch doesn’t provide data encryption using AWS KMS yet (at least at time of writing this);  AWS has been generally behind on adding latest Elasticsearch versions; and we wanted full freedom in maintaining our own cluster with choice of security groups, plugins, monitoring etc. Without going into details of infrastructure, I’ll just mention the major plugins we are using:

  • EC2-Discovery for unicast discovery (AWS doesn’t support multicast out of box)
  • Kopf for web-based administration
  • Head for web-based administration
  • SQL for data analysis
  • Read-only to control maintenance actions (alternative to using Nginx as proxy)

elasticsearch-cluster

Our indexing and query services have been implemented in Java (using Spring Boot), and we use Elasticsearch’s Java API (Transport Client) to connect to the cluster. We don’t use Spring Data Elasticsearch, as it also lags behind in version upgrades mostly. Now, moving to what we want to focus on in this blog: how to implement a search service for data stored in Elasticsearch.

First and most important component that impacts data indexing and search in Elasticsearch is an Analyzer. Analysis is the process of converting text, like a product description, or a person name into tokens or terms which are added to the inverted index for searching. Analysis is performed by the analyzer which can be either a built-in or custom. There’re three sub-parts to an analyzer, and they process all incoming text in this order:

  • Character Filter – Used to preprocess text before indexing/searching, like stripping HTML markup
  • Tokenizer – Main component which produces tokens from incoming text
  • Token Filter – Can modify, delete or add tokens to stream of tokens from tokenizer

For our product and person search services (90% of our search functionality), we created custom edgeNGram based token filters, and analyzers using those token filters. A edgeNGram token filter is similar to a nGram token filter, except that it only keeps nGrams which start at the beginning of tokens produced by a tokenizer. The analysis settings for such an index look like:

{
  "settings": {
    "index": {
      ...
    },
    "analysis": {
      "filter": {
        "autocomplete_filter": {
	  "type": "edgeNGram",
	  "min_gram": "1",
	  "max_gram": "10"
	},
	"autocomplete_phrase_filter": {
	  "type": "edgeNGram",
	  "min_gram": "1",
	  "max_gram": "20"
	}
      },
      "analyzer": {
	"keyword_analyzer": {
	  "type": "custom",
	  "filter": [
	    "asciifolding","lowercase"
	  ],
	  "tokenizer": "keyword"
	},
	"autocomplete_analyzer": {
	  "type": "custom",
	  "filter": [
	    "standard","lowercase","stop","autocomplete_filter"
	  ],
	  "tokenizer": "standard"
	},
	"autocomplete_phrase_analyzer": {
	  "filter": [
	    "standard","lowercase","autocomplete_phrase_filter"
	  ],
	  "type": "custom",
	  "tokenizer": "keyword"
        }
      }
    }
  }
}

Now we apply these analyzers to field mappings in the index. In a elaborate search platform, where one needs ability to search a field in different ways, it’s fruitful to store multiple versions of such a field, often using different analyzers during indexing and searching. That way, one could use more relevant version for a given scenario. For e.g.:

{
  "settings": {
    "index": {
      ...
    },
    "analysis": {
     ...
    }
  },
  "mappings": {
    "product": {
      "dynamic": "strict",
      "_all": {
	"enabled": false
      },
      "properties": {
        ...
        ...
	"productTypeCode": {
	  "type": "string",
	  "norms": {
	    "enabled": false
	  },
	  "analyzer": "keyword_analyzer"
	},
        "productName": {
	  "type": "string",
	  "term_vector": "with_positions_offsets",
	  "analyzer": "english",
	  "fields": {
	    "autocomplete_exact": {
	      "type": "string",
	      "norms": {
		"enabled": false
	      },
	      "analyzer": "keyword_analyzer"
	    },
	    "autocomplete_phrase": {
	      "type": "string",
	      "norms": {
		"enabled": false
	      },
	      "analyzer": "autocomplete_phrase_analyzer",
	      "search_analyzer": "keyword_analyzer"
	    },
	    "autocomplete_startswith": {
	      "type": "string",
	      "norms": {
	        "enabled": false
	      },
	      "analyzer": "autocomplete_analyzer",
	      "search_analyzer": "standard"
	    },
	    "autocomplete_token": {
	      "type": "string",
	      "norms": {
		"enabled": false
	      },
	      "index_options": "docs",
	      "analyzer": "standard"
	    },
	    "raw": {
	      "type": "string",
	      "index": "not_analyzed"
	    }
	  }
	}
        ...
        ...
      }
    }
  }
}

What’s worth noting above is how custom analyzers have been used along with built-in analyzers to map different versions of productName (and other fields like that). This is how different versions could be used during searching:

  • productName.autocomplete_exact – to match stored documents that are exactly equal to full queried text (case-insensitive) – Query against “intel corp” will match “Intel Corp“, and not “Intel Corporation“.
  • productName.autocomplete_phrase – to match stored documents that start with full queried text (case-insensitive) – Query against “intel corp” will match both “Intel Corp” and “Intel Corporation“.
  • productName.autocomplete_token – to match stored documents that have exact tokens as full exact tokens in queried text (case insensitive) – Query against “intel corp” will match not just “intel” products, but also “Microsoft Corp” or any other product with “corp“, but won’t match “Microsoft Corporation” or like.
  • productName.autocomplete_startswith – to match stored documents that have tokens that start with tokens in queried text  (case insensitive) – Query against “intel corp” will match not just “intel” products, but also “Microsoft Corp“, “Microsoft Corporation“, “Intellisat” etc.

We use combinations of multiple fields in Elasticsearch queries to order search results (most relevant to least relevant). Elasticsearch Bool Query can be used to construct such compound/combinational queries, with different boost values set for sub-queries to get desired result order (default ordering by descending order of _score).

GET product/_search?search_type=dfs_query_then_fetch
{
  "query": {
    "bool": {
      "should": [
        {
          "constant_score": {
            "query": {
              "match": {
                "productName.autocomplete_exact": "intel corp"
              }
            },
            "boost": 300
          }
        },
        {
          "constant_score": {
            "query": {
              "match": {
                "productName.autocomplete_phrase": "intel corp"
              }
            },
            "boost": 200
          }
        },
        {
          "more_like_this": {
            "fields": ["productName.autocomplete_token"],
            "like": "intel corp",
            "min_term_freq": 1,
            "min_doc_freq": 1,
            "max_query_terms": 10,
            "minimum_should_match": 1,
            "boost": 100
          }
        },
        {
          "constant_score": {
            "query": {
              "match": {
                "productName.autocomplete_startswith": "intel corp"
              }
            },
            "boost": 10
          }
        }
      ],
      "minimum_should_match": 1
    }
  }
}

The above Bool query is composed of Constant Score and More Like This type of queries, which work for us. One should look to replace these sub-queries with appropriate query types, depending on desired search results.

Sometimes, the requirement is to search text in more than one field, with each field stored as multi-fields. For e.g. in our case, user wanted to search in both productName and productDesc fields. In such cases, one could use Dis Max compound query to produce a union of Bool sub-queries (one Bool sub-query per multi-fields field).

GET product/_search?search_type=dfs_query_then_fetch
{
  "query": {
    "dis_max": {
      "queries": [
        {
          "bool": {
            "should": [
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productName.autocomplete_exact": "intel corp"
                    }
                  },
                  "boost": 300
                }
              },
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productName.autocomplete_phrase": "intel corp"
                    }
                  },
                  "boost": 200
                }
              },
              {
                "more_like_this": {
                  "fields": ["productName.autocomplete_token"],
                  "like": "intel corp",
                  "min_term_freq": 1,
                  "min_doc_freq": 1,
                  "max_query_terms": 10,
                  "minimum_should_match": 1,
                  "boost": 100
                }
              },
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productName.autocomplete_startswith": "intel corp"
                    }
                  },
                  "boost": 10
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "bool": {
            "should": [
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productDesc.autocomplete_exact": "intel corp"
                    }
                  },
                  "boost": 300
                }
              },
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productDesc.autocomplete_phrase": "intel corp"
                    }
                  },
                  "boost": 200
                }
              },
              {
                "more_like_this": {
                  "fields": ["productDesc.autocomplete_token"],
                  "like": "intel corp",
                  "min_term_freq": 1,
                  "min_doc_freq": 1,
                  "max_query_terms": 10,
                  "minimum_should_match": 1,
                  "boost": 100
                }
              },
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productDesc.autocomplete_startswith": "intel corp"
                    }
                  },
                  "boost": 10
                }
              }
            ],
            "minimum_should_match": 1
          }
        }      
      ]
    }
  }
}

Dis max query generates the union of documents produced by its subqueries, and that scores each document with the maximum score for that document as produced by any subquery, plus a tie breaking increment for any additional matching subqueries.

Here we have discussed just the basic search elements. Elasticsearch provides many other constructs, using which one could filter, rescore (second level of scoring after first search), sort (sort on fields other than on _score) documents, or even use scripting to customize most of these operations. All of these features are quite intuitive to use with Elasticsearch, as we’ve experienced building out our search platform.

Hope you found this article a bit useful. All feedback is welcome.

Automated MongoDB Cluster Backup on AWS S3

If you want a true copy of your MongoDB cluster data in case of data corruption, accidental deletion or disaster recovery, you’ll want to back it up reliably. Automated backup of a MongoDB cluster (with multiple shards) gets a bit complex, because any replica-set stores only one shard at a time, and relevant product utilities dump/export data only from one node at a time. Thus we need to build a custom mechanism to backup all shards simultaneously.

If your MongoDB cluster is setup on AWS, best possible place to store regular backups is S3. And even if the cluster is on-premises, S3 is still a wonderful option (Similar options exist for Azure, Rackspace etc.). Few things to note:

  • You’ll need appropriate permissions to relevant S3 bucket, to store your regular backups.
  • If your MongoDB cluster is on AWS EC2 nodes, it’ll most probably assume a IAM role to interact with other AWS services. In that case, S3 bucket permissions should be granted to the role.
  • If your MongoDB cluster is not on AWS, S3 bucket permissions should be granted to a specific IAM user (better to create a specific backup user). You should have the access credentials for that user (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY).
  • Backup of each MongoDB shard should be taken from one of the secondary nodes of related replica-set. This is to avoid impacting the primary node during backup.
  • To avoid taking duplicate backups on multiple secondary nodes of a replica-set, backup script should run on primary node of that replica-set and connect to one of the secondaries.
  • Each replica-set’s backup node (one of the secondaries) should be locked against any writes during the operation, to avoid reading in-flight writes. It should be unlocked after the backup is complete.
  • You could create a cron job to automate periodic execution of the backup script. If you’re using DevOps to provision your cluster, cron job setup can be done there. We’re using this approach (AWS Cloudformation), to avoid any manual setup.
  • If you’ve a self-healing cluster (like AWS Autoscaling in our case), backup script should be configured to run on all nodes in the cluster. In such a case, script should be intelligent enough to identify the shard and type of node (primary/secondary).

Now the intelligent script:

#!/usr/bin/env bash

export AWS_DEFAULT_REGION=us-east-1

## Provide AWS access credentials if not running on EC2, 
## and relevant IAM role is not assumed
# export AWS_ACCESS_KEY_ID=xxx
# export AWS_SECRET_ACCESS_KEY=yyy

## AWS S3 bucket name for backups
bucket=mybucket

## Create this folder beforehand on all cluster nodes
cd /backup

## Check if the cluster node is a replica-set primary
mongo --eval "printjson(db.isMaster())" > ismaster.txt
ismaster=`cat ismaster.txt | grep ismaster | awk -F'[:]' '{print $2}' | 
          cut -d "," -f-1 | sed 's/ //g'`
echo "master = $ismaster"

if [ "$ismaster" == "true" ]; then
   ## It's a replica-set primary, get the stored shard's name
   shardname=`cat ismaster.txt | grep "setName" | awk -F'[:]' 
             '{print $2}' | grep shard | cut -d"-" -f-1 
             | sed 's/\"//g' | sed 's/ //g'`
   echo "Shard is $shardname"
 
   ## Create a time-stamped backup directory on current primary node
   NOW=$(TZ=":US/Eastern" date +"%m%d%Y_%H%M")
   snapshotdir=$shardname-$NOW
   echo "Creating folder $snapshotdir"
   mkdir $snapshotdir
 
   ## Get the IP address of this primary node
   primary=`cat ismaster.txt | grep primary | awk -F'[:]' '{print $2}' 
           | sed 's/\"//g' | sed 's/ //g'`
   echo "Primary node is $primary"
 
   ## Connect to one of the secondaries to take the backup
   cat ismaster.txt 
       | sed -n '/hosts/{:start /]/!{N;b start};/:27017/p}' 
       | grep :27017 | awk '{print $1}' | cut -d "," -f-1 
       | sed 's/\"//g' 
       | (while read hostipport; do
            hostip=`echo $hostipport | cut -d":" -f-1`
            echo "Current node is $hostip"
 
            ## Check if IP address belongs to a secondary node
            if [ "$hostip" != "$primary" ]; then
              ## Lock the secondary node against any writes
              echo "Locking the secondary $hostip"
              mongo --host $hostip --port 27017 --eval 
                    "printjson(db.fsyncLock())"
 
              ## Take the backup from secondary node, 
              ## into the above created directory
              echo "Taking backup using mongodump connecting 
                    to $hostip"
              mongodump --host $hostip --port 27017 --out 
                        $snapshotdir
 
              ## Unlock the secondary node, so that it could 
              ## resume replicating data from primary
              echo "Unlocking the secondary $hostip"
              mongo --host $hostip --port 27017 --eval 
                    "printjson(db.fsyncUnlock())"
 
              ## Sync/copy the backup to S3 bucket, 
              ## in shard specific folder
              echo "Syncing snapshot data to S3"
              aws s3 sync $snapshotdir 
                  s3://$bucket/mongo-backup/$shardname/$NOW/
 
              ## Remove the backup from current node, 
              ## as it's not required now
              echo "Removing snapshot dir and temp files"
              rm -rf $snapshotdir
              rm ismaster.txt
 
              ## Break from here, so that backup is taken only 
              ## from one secondary at a time
              break
            fi
        done)
else
 ## It's not a primary node, exit
 echo "This node is not a primary, exiting the backup process"
 rm ismaster.txt
fi

echo "Backup script execution is complete"

It’s not perfect, but it works for our multi-shard/replica-set MongoDB cluster. We found this less complex than taking AWS EBS snapshots and re-attaching relevant snapshot volumes in a self-healing cluster. We’d explored few other options, so just listing those here:

  1. An oplog based backup – https://github.com/journeyapps/mongo-oplog-backup
  2. https://github.com/micahwedemeyer/automongobackup

Hope you found it a good read.

Overcome cloud challenges – AWS as Example

Technologies of future (Big Data, IoT, Micro-Services etc.) are optimized to run on commodity hardware and tons of computing, storage and memory resources, which don’t come cheap or fast (“provisioning”). That’s where cloud IaaS and PaaS services come to the rescue. These services enable faster time to market (prototyping anyone?), easy scalability, innovation (AWS Lambda, Azure Cognitive Services etc.) and yes, there’s that cost saving too (if architected and used right though).

But with their advantages, the public cloud technologies have got their share of challenges, due to which their adoption has been slow (particularly in sectors like finance, insurance, auto etc.). In this post, I’ll touch on key general challenges with public cloud technology, and some ways to deal with those if you’re planning to or are already working with Amazon Web Services (AWS) cloud.

Solutions to AWS Challenges
Sample Architecture on AWS, indicating useful services to deal with typical challenges
  • Security – Yep, that’s the first thing you’ll hear from a skeptical executive about public cloud adoption. And we can’t fault them for that, given the nature of some of their processes and operations. I’ll list a few mechanisms using which an organization can keep their applications secure in AWS cloud:
    • Security Groups – In my opinion, this is one of the most basic and powerful features of AWS. It’s just like a private firewall around your instance, which you can configure to control what type of data goes in and out, and to/from specific ports/interfaces. And these are stateful, such that responses to inbound traffic can go out no matter what your outbound traffic restrictions are.
    • Access Control Lists (ACLs) – There are network ACLs, which work like Security Groups at the subnet layer. These are also a set of rules, and you can (read should) configure them to allow/deny specific traffic to/from your subnets. These are stateless though, i.e. response traffic must be explicitly configured in allow rules. Then there are S3 ACLs, which you can configure to control access to your S3 buckets and objects. Right combination of Security Groups and ACLs should be utilized when deploying your application(s) to AWS.
    • Identity and Access Management (IAM) – IAM is about authentication and authorization for your AWS resources. Whether it’s a developer accessing AWS console from within your firm, or a EC2 instance accessing DynamoDB etc., IAM manages all of it. Think of it as a Directory Service composed of users & groups, and roles assigned to them. You can configure macro to micro level permissions and policies, allowing or denying particular action(s) on particular resource(s). Right policy configurations can protect your applications/resources from unauthorized access. If required, one can also add Multi-factor authentication for account access, enabled via virtual (your phone) or hardware devices (key fob/display card).
    • Secure Token Service (STS) – STS provides temporary, limited-privilege credentials for your IAM or federated users (like Active Directory users federated using ADFS). It basically generates a short-lived token (configurable time), using which a user could access required AWS resources during that time. The access is terminated once the token expires. STS is a powerful service which should be used to control access to AWS console or resources.
  • Data Privacy – In sensitive environs, like Financial Services, Healthcare, Insurance etc., keeping data secure is a huge deal. And as we’ve seen in recent past, even government services and large internet companies have been victims of data breaches, which leads to loss of trust and clamor for more regulations/laws to protect consumer data. But there’re ways to keep data secure in a public cloud like AWS:
    • SSL/TLS for in-transit data – This is basic stuff, but worth reminding. If you have two applications talking to each other, and both are deployed in a private subnet in a VPC, maybe SSL/TLS can be skipped. But in enterprise world, since cloud infrastructure is mostly realized in a hybrid model, you’ll have on-premises applications talking to cloud-deployed applications, and also business users accessing those applications. In such cases, SSL/TLS becomes mandatory.
    • Key Management Service (KMS) – KMS is a service to generate and store encryption keys, which can be used to encrypt your data at rest, whether it’s stored in S3, RDS or EBS volumes. The keys can be rotated to ensure that your encryption/decryption process is safe in longer run (just like resetting passwords periodically). KMS also provides an audit trail of key usage.
    • Encrypted EBS Volumes – If your data is stored in EBS volumes (self-managed relational database/NoSQL cluster, or a file store etc.), the volumes should be encrypted to keep data secure. The data is encrypted at rest and in transit to/from EC2 instance. A snapshot created from an encrypted volume is also encrypted. Internally, EBS encryption uses KMS managed keys. Encryption is supported for all EBS types, but not all EC2 instance types support encrypted volumes.
    • Hardware Security Module (HSM) – This is a tamper-resistant hardware appliance, that can be used to securely store cryptographic key material. Some organizations have to meet strict regulatory and contractual requirements with respect to data security, and HSM devices can aid in such situations. You’ll need a private subnet in your VPC to house the HSM appliance.
    • Data Governance – This is a general concept, which should be adopted by organizations looking to move to cloud (if not done yet). Data Governance is not just about creating a fat committee, but it’s identifying different tiers of data, which have different degrees of sensitivity and privacy, and to do that continuously. The exercise helps define guidelines around which data can reside in cloud, and which not. If it’s decided to keep sensitive and confidential data on-premises, a hybrid cloud by way of a VPC (VPN-over-IPSec or Direct Connect) can still be realized to keep your application in cloud, that can access on-premises stored data securely (though the in-transit data should be encrypted).
  • Availability – Another area of concern with public clouds has been availability of overall infrastructure and services. Cloud outages were a regular feature in news in the past, leading to downtime for customer applications. Over last couple of years though, major public cloud providers have made considerable improvements to their infrastructure to be able to flaunt availability figures of 99.95% and so. But if you do a rough calculation, even that figure can cause headache for executives. Since cloud service outages are generally location specific, impact to your applications can be avoided through sound architecture decisions:
    • Multi-Region Deployment – Regions in AWS are completely isolated geographic areas, providing high level of fault tolerance. For HA (hot-hot or hot-warm) multi-region deployment, one has to consider replication of different resources across regions. EC2 computing instances can be maintained in a hot/warm state, whereas out-of-box cross-region asynchronous replication can be done for data in S3 or RDS. If you manage your own data layer on EC2, you’ll have to implement your own replication mechanism. Network failover can be achieved using Route 53 routing policies (latency based or failover).
    • Multi-AZ Deployment – Availability Zones (AZ) in AWS are like isolated data centers within a region. And Amazon has been working to continuously increase number of AZs in major regions. Application and Data clusters should be deployed across different AZs in a region to provide redundancy and load balancing. When employing Multi-Region or Multi-AZ architecture, you mitigate against risk of localized regional failures.
    • Utilize DevOps – Automated deployments using configuration management tools/services (AWS OpsWorks, Puppet, Saltstack etc.), along with the power of monitoring triggered actions can help reduce the downtime to a minimum. In addition, DevOps thinking can also help standardize development processes across teams. Thus, it’s imperative to consider DevOps while designing your deployment architecture.
  • Vendor lock-in – One other bottleneck regularly mentioned during cloud strategy discussions is locking-in to features and capabilities of a particular cloud provider. People fear that once they are deep into using AWS or Azure or some other public cloud, it’s hard to change down the line. The hesitation is not without reason, because major cloud providers have been so far poor at interoperability. But there’s light at the end of the tunnel. Some organizations are working to create such standards for future, like DMTF Open Virtualization Format (OVF), OASIS TOSCA etc. But until their adoption, there are other options to avoid lock-in. There’re Openstack based cloud providers to choose from, most popular being Rackspace. Or if you have got your eyes set on one of the big guns, you can look at Pivotal Cloud Foundary as a service management platform to abstract a cloud provider. Same can be achieved to a good extent with any configuration management tool as well (listed above in DevOps).

There are other challenges like Compliance with different industry regulations, skill upgrade of existing teams etc. To address the first, public cloud providers like AWS have made long strides in conforming to various standards and regulations to increase wide adoption. For second, problem of stale skills can be managed by investing in brief-to-thorough training programs. And I’m not talking about break-the-bank programs out there, there are much cheaper online options available with good enough content – like cloudacademy.com, linuxacademy.com etc.

Through this post, I just wanted to highlight a few ways on how to tackle key bottlenecks while moving to AWS. These are definitely not end in themselves, nor are these absolutely full-proof solutions. But these can act like a basic checklist, and one should dig deeper to understand nuances and grey areas before adoption.

Amazon (AWS) SQS with Spring Cloud

Amazon Simple Queue Service (SQS) is a HA cloud messaging service, that can be used to decouple parts of a single system, or integrate multiple disparate systems. A SQS queue acts as buffer between a message producer and a consumer, when former’s message generation rate is more than latter’s processing throughput. It also provides durable point-to-point messaging, such that messages are stored in a fail-safe queue, and can be processed once a consumer application comes up after scheduled maintenance or unexpected downtime. For publish-subscribe design pattern, Amazon (AWS) provides another cloud service called Simple Notification Service (SNS).

Integration_with_Amazon_SQS

Above diagram shows a subset of possible producer-consumer scenarios that can utilize SQS.

For developers who use JMS to connect to messaging providers/brokers like ActiveMQ, Websphere MQ, JBoss Messaging etc., they can use Amazon SQS Java Messaging Library to work with SQS. One can use it along with AWS SDK to write JMS 1.1 compliant applications. But for real-world applications, writing directly with AWS provided library can be cumbersome. That’s where Spring Cloud AWS comes to the rescue. In this post, I’ll show how one can use Spring Cloud with Spring Boot to write a SQS message producer and a consumer. Before that, a few important points to note when working with SQS:

  • Maximum payload of a SQS message can be 256 KB. For applications that have need to send more data in a single packet, could use SQS Extended Client Library which utilizes AWS S3 as the message store (with a reference being sent in actual SQS message). Other option is to compress the message before sending over SQS.
  • Messages can be sent synchronously, but received either synchronously or asynchronously (Using AmazonSQSAsyncClient). If required, one can send messages in a separate child thread so as not to block the main thread for SQS IO operation.
  • Messages can be sent or received in batch to increase throughput (10 messages at max in a batch). Please note that maximum payload size limit for a batch message is also 256 KB, so it can only be used for lighter payloads.
  • There can be multiple producer and consumer threads interacting with the same queue, just like any other messaging provider/broker. Multiple consumer threads provide load balancing for message processing.
  • Consumers can poll for messages using long polling and standard polling. It’s recommended to use long polling to avoid too many empty responses (one can configure timeout).
  • Each message producer or consumer needs AWS credentials to work with SQS. AWS provides different implementations of AWSCredentialsProvider for the same. DefaultAWSCredentialsProviderChain is used by default, if no provider is configured specifically.

Now, how to use Spring Cloud with SQS. First of all, below dependencies should be provided in  Maven project’s pom.xml or Gradle’s build file. AWS SDK and other Spring dependencies would be automatically added to the classpath (unless already specified explicitly).

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-aws-autoconfigure</artifactId>
	<version>latest.version</version>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-aws-messaging</artifactId>
	<version>latest.version</version>
</dependency>

From a configuration perspective, we first need to wire the appropriate AWS SQS client. Assuming  asynchronous interaction with SQS, we will configure a AmazonSQSAsyncClient. There’re a couple of things to note. One, if you’re behind a proxy, you’ll need to specify proxy host and port using ClientConfiguration. Second, DefaultAWSCredentialsProviderChain will look for AWS access key, secret access key and default region in this order – environment variables, system variables, properties file and instance profile (only in EC2). We’ve just overridden the default region below (which is also us-east-1 though).

@Lazy
@Bean(name = "amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQSClient() {
    AmazonSQSAsyncClient awsSQSAsyncClient;
    if (useProxy.equalsIgnoreCase("Y")) {
	ClientConfiguration clientConfig = new ClientConfiguration();
	clientConfig.setProxyHost("proxy.xyz.com");
	clientConfig.setProxyPort(8085);

	awsSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), clientConfig);
    } else {
	awsSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
    }
		
    awsSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName("us-east-1")));
    return awsSQSAsyncClient;
}

Next we look at configuring a QueueMessagingTemplate, which will be used to send messages to a SQS queue.

@Configuration
public class ProducerAWSSQSConfig {
   ...

   @Bean
   public QueueMessagingTemplate queueMessagingTemplate() {
	return new QueueMessagingTemplate(amazonSQSClient());
   }
   
   @Lazy
   @Bean(name = "amazonSQS", destroyMethod = "shutdown")
   public AmazonSQSAsync amazonSQSClient() {
   ...
}

@Component
public class MessageProducer {

   private QueueMessagingTemplate sqsMsgTemplate;
   ...

   public void sendMessage(String payload) {
      ...
      this.sqsMsgTemplate.convertAndSend("randomQName", payload);
      ...
   }
}

We’re sending a String message, which will be converted using a StringMessageConverter internally. One can also send a complete data object as JSON, if Jackson library is available on classpath (using MappingJackson2MessageConverter).

Finally, we look at configuring a SimpleMessageListenerContainer and QueueMessageHandler, which will be used to create a message listener for SQS queue. Please note that internally Spring Cloud creates a SynchonousQueue based thread pool to listen to different queues and to process received messages using corresponding handlers. Number of core threads for thread pool are 2 * number of queues (or configured listeners), and property “maxNumMsgs” determines the maximum number of threads. Property “waitTimeout” determines the timeout period for long polling (default with Spring Cloud).

public class ConsumerAWSSQSConfig {
   ...

   @Bean
   public SimpleMessageListenerContainer simpleMessageListenerContainer() {
	SimpleMessageListenerContainer msgListenerContainer = simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();
	msgListenerContainer.setMessageHandler(queueMessageHandler());
	
        return msgListenerContainer;
   }

   @Bean
   public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
	SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
	msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());
	msgListenerContainerFactory.setDeleteMessageOnException(false);
	msgListenerContainerFactory.setMaxNumberOfMessages(maxNumMsgs);
	msgListenerContainerFactory.setWaitTimeOut(waitTimeout);
	
        return msgListenerContainerFactory;
   }

   @Bean
   public QueueMessageHandler queueMessageHandler() {
	QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory();
	queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());
	
        QueueMessageHandler queueMessageHandler = queueMsgHandlerFactory.createQueueMessageHandler();

	return queueMessageHandler;
   }
   
   @Lazy
   @Bean(name = "amazonSQS", destroyMethod = "shutdown")
   public AmazonSQSAsync amazonSQSClient() {
   ...
}
@Component
public class MessageConsumer {
   ...

   @MessageMapping("randomQName")
   public void onRandomQMessage(String payload) {
	doSomething(payload);
   }
}

One can also send custom message attributes (headers) with a SQS message, so as to provide contextual information to the consumer. In addition, there’re important concepts of visibility timeout and delay queues, which should be explored before getting your hands dirty with SQS.

Hope you have fun with Spring Cloud and AWS SQS.