Search Platform using Elasticsearch (on AWS)

Apache Lucene is a high performance, cross-platform search engine library, which has gained immense support in enterprises looking to build firm-wide, customer-facing search platforms. And it does owe good share of its popularity to Apache Solr and Elasticsearch, that are matured search products built on top of Lucene. We chose Elasticsearch 2.3.3 (5.0 is the current version at time of writing), to build our multi-entity search platform for a financial services firm.

All our applications and data platforms are housed in AWS VPCs. We decided to implement our own cluster on EC2 rather than using AWS Elasticsearch, with main reasons being: AWS Elasticsearch doesn’t provide data encryption using AWS KMS yet (at least at time of writing this);  AWS has been generally behind on adding latest Elasticsearch versions; and we wanted full freedom in maintaining our own cluster with choice of security groups, plugins, monitoring etc. Without going into details of infrastructure, I’ll just mention the major plugins we are using:

  • EC2-Discovery for unicast discovery (AWS doesn’t support multicast out of box)
  • Kopf for web-based administration
  • Head for web-based administration
  • SQL for data analysis
  • Read-only to control maintenance actions (alternative to using Nginx as proxy)

elasticsearch-cluster

Our indexing and query services have been implemented in Java (using Spring Boot), and we use Elasticsearch’s Java API (Transport Client) to connect to the cluster. We don’t use Spring Data Elasticsearch, as it also lags behind in version upgrades mostly. Now, moving to what we want to focus on in this blog: how to implement a search service for data stored in Elasticsearch.

First and most important component that impacts data indexing and search in Elasticsearch is an Analyzer. Analysis is the process of converting text, like a product description, or a person name into tokens or terms which are added to the inverted index for searching. Analysis is performed by the analyzer which can be either a built-in or custom. There’re three sub-parts to an analyzer, and they process all incoming text in this order:

  • Character Filter – Used to preprocess text before indexing/searching, like stripping HTML markup
  • Tokenizer – Main component which produces tokens from incoming text
  • Token Filter – Can modify, delete or add tokens to stream of tokens from tokenizer

For our product and person search services (90% of our search functionality), we created custom edgeNGram based token filters, and analyzers using those token filters. A edgeNGram token filter is similar to a nGram token filter, except that it only keeps nGrams which start at the beginning of tokens produced by a tokenizer. The analysis settings for such an index look like:

{
  "settings": {
    "index": {
      ...
    },
    "analysis": {
      "filter": {
        "autocomplete_filter": {
	  "type": "edgeNGram",
	  "min_gram": "1",
	  "max_gram": "10"
	},
	"autocomplete_phrase_filter": {
	  "type": "edgeNGram",
	  "min_gram": "1",
	  "max_gram": "20"
	}
      },
      "analyzer": {
	"keyword_analyzer": {
	  "type": "custom",
	  "filter": [
	    "asciifolding","lowercase"
	  ],
	  "tokenizer": "keyword"
	},
	"autocomplete_analyzer": {
	  "type": "custom",
	  "filter": [
	    "standard","lowercase","stop","autocomplete_filter"
	  ],
	  "tokenizer": "standard"
	},
	"autocomplete_phrase_analyzer": {
	  "filter": [
	    "standard","lowercase","autocomplete_phrase_filter"
	  ],
	  "type": "custom",
	  "tokenizer": "keyword"
        }
      }
    }
  }
}

Now we apply these analyzers to field mappings in the index. In a elaborate search platform, where one needs ability to search a field in different ways, it’s fruitful to store multiple versions of such a field, often using different analyzers during indexing and searching. That way, one could use more relevant version for a given scenario. For e.g.:

{
  "settings": {
    "index": {
      ...
    },
    "analysis": {
     ...
    }
  },
  "mappings": {
    "product": {
      "dynamic": "strict",
      "_all": {
	"enabled": false
      },
      "properties": {
        ...
        ...
	"productTypeCode": {
	  "type": "string",
	  "norms": {
	    "enabled": false
	  },
	  "analyzer": "keyword_analyzer"
	},
        "productName": {
	  "type": "string",
	  "term_vector": "with_positions_offsets",
	  "analyzer": "english",
	  "fields": {
	    "autocomplete_exact": {
	      "type": "string",
	      "norms": {
		"enabled": false
	      },
	      "analyzer": "keyword_analyzer"
	    },
	    "autocomplete_phrase": {
	      "type": "string",
	      "norms": {
		"enabled": false
	      },
	      "analyzer": "autocomplete_phrase_analyzer",
	      "search_analyzer": "keyword_analyzer"
	    },
	    "autocomplete_startswith": {
	      "type": "string",
	      "norms": {
	        "enabled": false
	      },
	      "analyzer": "autocomplete_analyzer",
	      "search_analyzer": "standard"
	    },
	    "autocomplete_token": {
	      "type": "string",
	      "norms": {
		"enabled": false
	      },
	      "index_options": "docs",
	      "analyzer": "standard"
	    },
	    "raw": {
	      "type": "string",
	      "index": "not_analyzed"
	    }
	  }
	}
        ...
        ...
      }
    }
  }
}

What’s worth noting above is how custom analyzers have been used along with built-in analyzers to map different versions of productName (and other fields like that). This is how different versions could be used during searching:

  • productName.autocomplete_exact – to match stored documents that are exactly equal to full queried text (case-insensitive) – Query against “intel corp” will match “Intel Corp“, and not “Intel Corporation“.
  • productName.autocomplete_phrase – to match stored documents that start with full queried text (case-insensitive) – Query against “intel corp” will match both “Intel Corp” and “Intel Corporation“.
  • productName.autocomplete_token – to match stored documents that have exact tokens as full exact tokens in queried text (case insensitive) – Query against “intel corp” will match not just “intel” products, but also “Microsoft Corp” or any other product with “corp“, but won’t match “Microsoft Corporation” or like.
  • productName.autocomplete_startswith – to match stored documents that have tokens that start with tokens in queried text  (case insensitive) – Query against “intel corp” will match not just “intel” products, but also “Microsoft Corp“, “Microsoft Corporation“, “Intellisat” etc.

We use combinations of multiple fields in Elasticsearch queries to order search results (most relevant to least relevant). Elasticsearch Bool Query can be used to construct such compound/combinational queries, with different boost values set for sub-queries to get desired result order (default ordering by descending order of _score).

GET product/_search?search_type=dfs_query_then_fetch
{
  "query": {
    "bool": {
      "should": [
        {
          "constant_score": {
            "query": {
              "match": {
                "productName.autocomplete_exact": "intel corp"
              }
            },
            "boost": 300
          }
        },
        {
          "constant_score": {
            "query": {
              "match": {
                "productName.autocomplete_phrase": "intel corp"
              }
            },
            "boost": 200
          }
        },
        {
          "more_like_this": {
            "fields": ["productName.autocomplete_token"],
            "like": "intel corp",
            "min_term_freq": 1,
            "min_doc_freq": 1,
            "max_query_terms": 10,
            "minimum_should_match": 1,
            "boost": 100
          }
        },
        {
          "constant_score": {
            "query": {
              "match": {
                "productName.autocomplete_startswith": "intel corp"
              }
            },
            "boost": 10
          }
        }
      ],
      "minimum_should_match": 1
    }
  }
}

The above Bool query is composed of Constant Score and More Like This type of queries, which work for us. One should look to replace these sub-queries with appropriate query types, depending on desired search results.

Sometimes, the requirement is to search text in more than one field, with each field stored as multi-fields. For e.g. in our case, user wanted to search in both productName and productDesc fields. In such cases, one could use Dis Max compound query to produce a union of Bool sub-queries (one Bool sub-query per multi-fields field).

GET product/_search?search_type=dfs_query_then_fetch
{
  "query": {
    "dis_max": {
      "queries": [
        {
          "bool": {
            "should": [
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productName.autocomplete_exact": "intel corp"
                    }
                  },
                  "boost": 300
                }
              },
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productName.autocomplete_phrase": "intel corp"
                    }
                  },
                  "boost": 200
                }
              },
              {
                "more_like_this": {
                  "fields": ["productName.autocomplete_token"],
                  "like": "intel corp",
                  "min_term_freq": 1,
                  "min_doc_freq": 1,
                  "max_query_terms": 10,
                  "minimum_should_match": 1,
                  "boost": 100
                }
              },
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productName.autocomplete_startswith": "intel corp"
                    }
                  },
                  "boost": 10
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "bool": {
            "should": [
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productDesc.autocomplete_exact": "intel corp"
                    }
                  },
                  "boost": 300
                }
              },
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productDesc.autocomplete_phrase": "intel corp"
                    }
                  },
                  "boost": 200
                }
              },
              {
                "more_like_this": {
                  "fields": ["productDesc.autocomplete_token"],
                  "like": "intel corp",
                  "min_term_freq": 1,
                  "min_doc_freq": 1,
                  "max_query_terms": 10,
                  "minimum_should_match": 1,
                  "boost": 100
                }
              },
              {
                "constant_score": {
                  "query": {
                    "match": {
                      "productDesc.autocomplete_startswith": "intel corp"
                    }
                  },
                  "boost": 10
                }
              }
            ],
            "minimum_should_match": 1
          }
        }      
      ]
    }
  }
}

Dis max query generates the union of documents produced by its subqueries, and that scores each document with the maximum score for that document as produced by any subquery, plus a tie breaking increment for any additional matching subqueries.

Here we have discussed just the basic search elements. Elasticsearch provides many other constructs, using which one could filter, rescore (second level of scoring after first search), sort (sort on fields other than on _score) documents, or even use scripting to customize most of these operations. All of these features are quite intuitive to use with Elasticsearch, as we’ve experienced building out our search platform.

Hope you found this article a bit useful. All feedback is welcome.

Automated MongoDB Cluster Backup on AWS S3

If you want a true copy of your MongoDB cluster data in case of data corruption, accidental deletion or disaster recovery, you’ll want to back it up reliably. Automated backup of a MongoDB cluster (with multiple shards) gets a bit complex, because any replica-set stores only one shard at a time, and relevant product utilities dump/export data only from one node at a time. Thus we need to build a custom mechanism to backup all shards simultaneously.

If your MongoDB cluster is setup on AWS, best possible place to store regular backups is S3. And even if the cluster is on-premises, S3 is still a wonderful option (Similar options exist for Azure, Rackspace etc.). Few things to note:

  • You’ll need appropriate permissions to relevant S3 bucket, to store your regular backups.
  • If your MongoDB cluster is on AWS EC2 nodes, it’ll most probably assume a IAM role to interact with other AWS services. In that case, S3 bucket permissions should be granted to the role.
  • If your MongoDB cluster is not on AWS, S3 bucket permissions should be granted to a specific IAM user (better to create a specific backup user). You should have the access credentials for that user (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY).
  • Backup of each MongoDB shard should be taken from one of the secondary nodes of related replica-set. This is to avoid impacting the primary node during backup.
  • To avoid taking duplicate backups on multiple secondary nodes of a replica-set, backup script should run on primary node of that replica-set and connect to one of the secondaries.
  • Each replica-set’s backup node (one of the secondaries) should be locked against any writes during the operation, to avoid reading in-flight writes. It should be unlocked after the backup is complete.
  • You could create a cron job to automate periodic execution of the backup script. If you’re using DevOps to provision your cluster, cron job setup can be done there. We’re using this approach (AWS Cloudformation), to avoid any manual setup.
  • If you’ve a self-healing cluster (like AWS Autoscaling in our case), backup script should be configured to run on all nodes in the cluster. In such a case, script should be intelligent enough to identify the shard and type of node (primary/secondary).

Now the intelligent script:

#!/usr/bin/env bash

export AWS_DEFAULT_REGION=us-east-1

## Provide AWS access credentials if not running on EC2, 
## and relevant IAM role is not assumed
# export AWS_ACCESS_KEY_ID=xxx
# export AWS_SECRET_ACCESS_KEY=yyy

## AWS S3 bucket name for backups
bucket=mybucket

## Create this folder beforehand on all cluster nodes
cd /backup

## Check if the cluster node is a replica-set primary
mongo --eval "printjson(db.isMaster())" > ismaster.txt
ismaster=`cat ismaster.txt | grep ismaster | awk -F'[:]' '{print $2}' | 
          cut -d "," -f-1 | sed 's/ //g'`
echo "master = $ismaster"

if [ "$ismaster" == "true" ]; then
   ## It's a replica-set primary, get the stored shard's name
   shardname=`cat ismaster.txt | grep "setName" | awk -F'[:]' 
             '{print $2}' | grep shard | cut -d"-" -f-1 
             | sed 's/\"//g' | sed 's/ //g'`
   echo "Shard is $shardname"
 
   ## Create a time-stamped backup directory on current primary node
   NOW=$(TZ=":US/Eastern" date +"%m%d%Y_%H%M")
   snapshotdir=$shardname-$NOW
   echo "Creating folder $snapshotdir"
   mkdir $snapshotdir
 
   ## Get the IP address of this primary node
   primary=`cat ismaster.txt | grep primary | awk -F'[:]' '{print $2}' 
           | sed 's/\"//g' | sed 's/ //g'`
   echo "Primary node is $primary"
 
   ## Connect to one of the secondaries to take the backup
   cat ismaster.txt 
       | sed -n '/hosts/{:start /]/!{N;b start};/:27017/p}' 
       | grep :27017 | awk '{print $1}' | cut -d "," -f-1 
       | sed 's/\"//g' 
       | (while read hostipport; do
            hostip=`echo $hostipport | cut -d":" -f-1`
            echo "Current node is $hostip"
 
            ## Check if IP address belongs to a secondary node
            if [ "$hostip" != "$primary" ]; then
              ## Lock the secondary node against any writes
              echo "Locking the secondary $hostip"
              mongo --host $hostip --port 27017 --eval 
                    "printjson(db.fsyncLock())"
 
              ## Take the backup from secondary node, 
              ## into the above created directory
              echo "Taking backup using mongodump connecting 
                    to $hostip"
              mongodump --host $hostip --port 27017 --out 
                        $snapshotdir
 
              ## Unlock the secondary node, so that it could 
              ## resume replicating data from primary
              echo "Unlocking the secondary $hostip"
              mongo --host $hostip --port 27017 --eval 
                    "printjson(db.fsyncUnlock())"
 
              ## Sync/copy the backup to S3 bucket, 
              ## in shard specific folder
              echo "Syncing snapshot data to S3"
              aws s3 sync $snapshotdir 
                  s3://$bucket/mongo-backup/$shardname/$NOW/
 
              ## Remove the backup from current node, 
              ## as it's not required now
              echo "Removing snapshot dir and temp files"
              rm -rf $snapshotdir
              rm ismaster.txt
 
              ## Break from here, so that backup is taken only 
              ## from one secondary at a time
              break
            fi
        done)
else
 ## It's not a primary node, exit
 echo "This node is not a primary, exiting the backup process"
 rm ismaster.txt
fi

echo "Backup script execution is complete"

It’s not perfect, but it works for our multi-shard/replica-set MongoDB cluster. We found this less complex than taking AWS EBS snapshots and re-attaching relevant snapshot volumes in a self-healing cluster. We’d explored few other options, so just listing those here:

  1. An oplog based backup – https://github.com/journeyapps/mongo-oplog-backup
  2. https://github.com/micahwedemeyer/automongobackup

Hope you found it a good read.

Performance Best Practices for MongoDB

MongoDB is a document-oriented NoSQL database, used as data backbone or a polyglot member for many enterprise and internet-targeted systems. It has an extensive querying capability (one of the most thorough in NoSQL realm), and integration is provided by most of popular application development frameworks. There are challenges in terms of flexible scaling, but it’s a great product to work with once you come to terms with some of the nuances (you need patience though).

We’re using MongoDB as an event store in a AWS deployed financial data management application. The application has been optimized for write operations, as we have to maintain structures from data feeds almost throughout the day. There are downstream data consumers, but given defined query patterns, read performance is well within the defined SLA. In this blog, I’ll share some of the key aspects that could help improve write performance of a MongoDB cluster. All of these might not make sense for everyone, but a combination of a few could definitely help. MongoDB version we’re working with is 3.0.

  • Sharding – Sharding is a way to scale-out your data layer. Many in-memory and NoSQL data products support sharded clusters on commodity hardware, and MongoDB is one of those. A shard is a single MongoDB instance or a replica-set, that holds a subset of total data in the cluster. Data is divided on basis of a defined shard key, which is used for routing requests to different shards (by mongos router). A production-grade sharded cluster takes some time to setup and tune to what’s desired, but yields good performance benefits once done right. Since data is stored in parts (shards), multiple parts can be simultaneously written to without much contention. Read operations also get a kick if queries contain the shard key (otherwise those can be slower than non-sharded clusters). Thus, choosing the right shard key becomes the most important decision.

MongoDB App Cluster

  • Storage Engine – Storage engine determines how data will be stored in memory and on disk. There are two main storage engines in MongoDB – WiredTiger and MMAPv1. MMAPv1 was default until version 3.0, and WiredTiger is default in 3.2. Biggest difference between the two is that MMAPv1 has at best collection-level locking for writes, whereas WiredTiger has document level locking (mostly optimistic). Thus, WiredTiger can allow for a more fine-grained concurrency control, and more parallel writes possibly. WiredTiger also comes with data compression on disk, plus has a checkpointing feature to maintain consistency (in addition to default Journaling). If you’re using version 3.0, MMAPv1 could still be good enough given that it’s a tried and tested component and WiredTiger was still fairly new at that point.
  • Write Concern – Write concern allows one to define the degree of acknowledgement indicating success of a write operation. If a system can easily recover write losses without observable impact to users, then it’s possible to do away with acknowledgements altogether, so as to make the write round-trip as fast as possible (Not allowed if Journaling is ON). By default, acknowledgement is required from a standalone instance or primary of a replica set. If one wants strong consistency from a replica set, write concern can be configured to be more than 1 or “majority” to make sure writes have propagated to some replica nodes as well (though it’s done at cost of degraded write performance).
  • Journaling – Journaling is basically write-ahead logging on disk to provide consistency and durability. Any chosen storage engine by default writes more often to the journal than to the data disk.  One can also configure Write Concern with “j” option, which indicates that operation will be acknowledged after it’s written to the journal. That allows for more consistency, but overall performance can take a hit (duration between Journal commits is decreased automatically to compensate though). From a performance perspective, there’re a couple of things one can do with respect to Journaling – 1. Keep journal on a different volume than the data volume to reduce I/O contention, and/or 2. Reduce interval between journal commits (please test this thoroughly, as too less a value can eventually decrease overall write capacity).
  • Bulk Writes – One can write multiple documents in a collection through a single bulk-write operation. Combined with WiredTiger storage engine, this can be a great way to make overall writes faster if it’s possible to chunk multiple writes together. Though if you’ve a e-commerce system, and you need to handle write operations per user transaction, it might not be a great option. Bulk writes can be ordered (serial and fail-fast) or unordered (parallel isolated writes), and the choice depends on specific requirements of a system.

There’re other general and non-intrusive ways to improve overall performance of a MongoDB cluster, like increase memory (to reduce read page faults), use SSD for data volume, or increase disk IOPS (should be employed if cost and infra limits permit). What we’ve covered above are specific MongoDB provided features, which can be played around with, provided there’s appetite.

Hope this was a wee bit helpful.