Recent Posts

Elasticsearch Cluster Snapshot & Restore

Posted on 01 Nov 2016

We recently needed to do a cross cluster snapshot and restore for elasticsearch. We were hosted on Elastic Cloud and found out the hard way that the tooling in place there does not work. We did find out that Elasticsearch has a backup repository system in place that works very well. We would have saved ourselves some time if we had just started with this.

Create a user in AWS IAM

First off create a user in AWS IAM that has access to S3. You can attach the full access or limit down to a single bucket as shown in the Elastic Cloud documentation.

  "Statement": [
      "Action": [
      "Effect": "Allow",
      "Resource": [

Create the Repository

Create the repository in each cluster. This is also taken from that guide. You also need the repository-s3 plugin installed in each cluster for this to work.

sudo bin/elasticsearch-plugin install repository-s3

curl -X PUT localhost:9200/_snapshot/bucket-name -d '{
  "type": "s3",
  "settings": {
    "bucket": "bucket-name",
    "region": "us-east1",
    "access_key": "AKIAYOURKEYHERE",
    "secret_key": "secret-key",
    "compress": true


On the old cluster, create a snapshot. You can check on the status as it processes.

curl -X PUT localhost:9200/_snapshot/bucket-name/snapshot-backup-name
curl -X GET localhost:9200/_snapshot/bucket-name/snapshot-backup-name/_status

There are a lot of options you can provide to the snapshot, including limiting to certain indices.


On the new cluster, restore the snapshot. You can easily view the status of the restore with regular elasticsearch monitoring tools. The index health with be shown as shards come online.

curl -X POST localhost:9200/_snapshot/bucket-name/snapshot-backup-name/_restore -d '{
  "indices": "one-index"


Hopefully this is of use to others. Snapshotting and restoring manually is a very simple process and was much easier than trying to figure out a custom solution from your elasticsearch host.

Backup with Duply and Duplicity

Posted on 17 Jun 2016

I recently stopped using Crashplan as my backup service of choice. I still wanted backups so I started looking for alternatives. Duply and duplicity came up as a nice choice. This is how I set it up.

This assumes you already have duplicity and duply installed. My linux of choice is Arch and duplicity was in the repo. duply is an AUR that is easily installed.


We need to start by generating a GPG key. This will be used to encrypt backups.

gpg --gen-key

After generating the key, make sure to save the key ID.


I had troubles with pinentry on a GUI-less system. You may need to symlink the pinentry-curses version of pinentry. Found from this forum post.

sudo ln -s /usr/bin/pinentry-curses /usr/bin/pinentry

I also needed to set up GPG to allow gpg password from an ENV variable. This is required with GPG 2.1.


pinentry-mode loopback
# ...


# ...


Start by creating a profile, then edit the configuration.

$ duply eric create


This file sets up basic configuration for duply. The file contains a lot of commented out options. These are the only options I have set right now. I commented what they do inline.

GPG_KEY='...' # your gpg key id, get from `gpg --list-keys`
GPG_PW='...' # your gpg password
GPG_OPTS='--pinentry-mode loopback' # required to use GPG_PW

TARGET='sftp://eric@hostname/backups/hostname' # backing up to another linux machine via SSH
SOURCE='/home/eric/' # grab my home folder

PYTHON="python2" # arch has python 3 default, set for python 2 instead

# run a full backup once a week


I wanted to only include certain folders in my home directory. Duplicity has a nice exclude file format you can use to deal with this. Duply has it baked in. The exclude file is created when creating your profile.

# keep out junk folders
- /home/eric/prog/*/*/log/
- /home/eric/prog/*/*/tmp/

# Include Folders
+ /home/eric/bin
+ /home/eric/Desktop
+ /home/eric/Documents
+ /home/eric/dotfiles
+ /home/eric/Music
+ /home/eric/ownCloud
+ /home/eric/Pictures
+ /home/eric/prog
+ /home/eric/.ssh

# Exclude everything else
- /home/eric/

The important thing to remember is duplicity goes down the list when determining if something is excluded or included. That's why the log and tmp folders are excluded first. Otherwise they would be included by the /home/eric/prog line if it was first.

Automatic backup

Set cron to run the backup script nightly. I use keychain and needed to source the correct file to get SSH working.

crontab -e

@daily . ~/.keychain/$(hostname)-sh && /usr/bin/duply eric backup


Sometime in the future I want to switch to Google Cloud Storage instead of SSH. I'm currently only backing up between local machines while I'm testing this out. I would like to have an onsite and a cloud backup.

Reindexing Elasticsearch with Ruby

Posted on 31 May 2016

For a project at work we needed to reindex a large Elasticsearch index and couldn't do it via the _reindex API. The source needed to be processed slightly in the new index. We were reindexing to gain more shards.

This is the rake task that helped reindex. It is based on this Elasticsearch Guide.


First create the new index via Postman.

POST /new_index
  'settings': {
    'index': {
      'number_of_shards': 3

Next run the rake task. This will handle the reindex from one to the other via scan/scroll and the bulk API. I ran this on my local machine to not deal with Heroku timeouts. I simply set the correct environment variables to have Chewy pick up the production Elasticsearch. This is risky but I was not worried because we were dealing with only a new index.

bundle exec rake elasticsearch:reindex[old_index,new_index]

This rake task comes with a nice progress bar to track how far along the reindex is.

Once the reindex is complete you can update the alias you use to have production start using the new index.

POST /_aliases
  'actions': [
    { 'remove': { 'index': 'old_index', 'alias': 'alias' } },
    { 'add': { 'index': 'new_index', 'alias': 'alias' } }

Rake Task

namespace :elasticsearch do
  desc "Reindex a index"
  task :reindex, [:old_index_name, :new_index_name] => [:environment] do |t, args|
    client = Chewy.client
    results ={
      index: args[:old_index_name],
      scroll: '10m',
      body: {
        "query" => { "match_all" => {} },
        "sort" => ["_doc"],
        "size" => 1000,

    progressbar = ProgressBar.create({
      :title => "Documents (thousands)",
      :total => results["hits"]["total"] / 1000 + 1,
      :format => '%a |%B| %p%% %t %c of %C',

    loop do
      break if results["hits"]["hits"].empty?

      bulk_body = results["hits"]["hits"].map do |result|
      source = result["_source"]

      # process the source

        index: {
          _index: args[:new_index_name],
          _type: result["_type"],
          _id: result["_id"],
          data: source,

      response = client.bulk(body: bulk_body)

      if response["errors"]
        raise "Problem reindexing - #{response.inspect}"


      results = client.scroll({
        scroll: '10m',
        scroll_id: results["_scroll_id"],



This rake task isn't perfect, but it gets most of the way there. Some future improvements will be better handling around timeouts when talking to Elasticsearch. It would also be good to deal with indexing errors when bulk importing into the new index.

Nightly Status Check Worker

Posted on 18 May 2016

Several of my projects involve integrating with APIs. In order to verify that all is well with our integrations we perform a nightly check up. In this check up we perform token validation and other sanity checks such as billing is valid. We check for anything that might change that would end up breaking our integration.

Base Worker

class NightlyCheckWorker
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  recurrence { daily }

  def perform do |user|

This worker is here to fan out to separate jobs for each active user. The NightlyCheckWorker::CheckWorker is listed below.

Validation Class

class NightlyCheckWorker
  class Validation
    include ActiveModel::Validations

    attr_reader :user

    def initialize(user)
      @user = user

    def disable?

    # Hook to send mail on validation fail
    def deliver_notice

This class acts as an interface for the rest of validations. It has a basic constructor and a few methods with defaults that can be overridden.

It includes ActiveModel::Validations to allow each validation class to perform validations similar to normal active record objects.

A Validator

class NightlyCheckWorker
  class GoogleTokenValidator < Validation
    def disable?

    def message
      "Google token is invalid"

    validate do
      response = user.google_client.get("/ping")
      case response.status
      when 401
        errors.add(:google, "token has been rejected"
      when 403
        errors.add(:google, "is invalid")

This is a sample validator class. We can see that this should be disabled and gives a message we can use in when sending an email. Note that the validation code is just a sample and is almost certainly wrong. It is provided to show how we check a response and give different errors depending on the status or message contained within.


class NightlyCheckWorker
  class CheckWorker
    include Sidekiq::Worker

    def perform(user_id)
      validators.each do |validator_class|
        validator =
        unless validator.valid?
          user.disable! if validator.disable?




    def validators
        # ...

This is the class that handles all of the work. The #validators method contains all of the validator classes and will loop over them performing checks. The only thing worth pointing out in this code is the last method call of #perform. I find it a good spot to include general caching in these workers. Every night we'll cache any counts or update information that might have gotten out of date.


One of the things that we changed after using this for a long time is saving the validation errors in the database. Previously they were always emailed to us and that was it. Now we're saving them in a JSON column attached to the user. I will leave this as an excercise for the reader.

Sidekiq Job Locking - Lock Around an ID

Posted on 07 May 2016

For one of my projects at SmartLogic, we process events in the background from webhooks. These talk to an API that need to be rate limited. Because of this we made a Sidekiq job that locks itself so only one job will run per ID.

For a better understanding of what this will accomplish, here is a brief example:

MyWorker.perform_async(1) # will process
MyWorker.perform_async(2) # will process
MyWorker.perform_async(1) # will immediately exit, 1 still processing

sleep 10 # all jobs have finished

MyWorker.perform_async(1) # will process


It should be pointed out that the worker example will tick with Sidetiq. This ensures we have a stready stream of jobs into the queue. The worker example will also reschedule itself to beat the tick if there are events to process.

class MySchedulerWorker
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  recurrence { hourly.minute_of_hour(0, 15, 30, 45) }

  def perform
    User.all.each do |user|


Let's start with what it looks like to use the locked job. This job will process an event stream of a user and pull off a certain amount of events each job run. The job will lock around the user, not the class itself.

class MyWorker
  include Sidekiq::Worker
  include LockedJob

  def perform(user_id)
    return if already_processing?(shop_id)


    # ... process events

    if events_left?
      @schedule_next_run = true

    if @schedule_next_run


  # ...

  def processing_key(user_id)

  def processing_key_star(user_id)

To start out we check for #already_processing?. If something is already processing, we just want this job to stop now. Once we know it's safe to start working, #lock_processing!.

This job will reschedule itself if there are still items to process, so we set an instance variable to trigger a new run once the lock has been cleared.

The last two private methods are very important for this. They have a unique key for redis that uses the job id from sidekiq and then a star version that we'll use to determine how many of this job is currently running.


Note: AppContainer is a container for configuration in my projects. #redis_pool is a connection_pool for redis.

module LockedJob
  SIXTY_MINUTES = 60 * 60


  def already_processing?(model_id)
    AppContainer.redis_pool.with do |redis|
      redis.keys(processing_key_star(model_id)).count >= 1

  def lock_processing!(model_id)
    AppContainer.redis_pool.with do |redis|
      redis.setex(processing_key(model_id), SIXTY_MINUTES, "true")

  def clear_processing_lock!(model_id)
    # We are done processing, clean out the key
    AppContainer.redis_pool.with do |redis|

This module has the special methods that the worker used. #already_processing? checks with the star key to see how many other jobs are running. This can easily be set to larger numbers if you want to lock at different rates.

#lock_processing! sets a key that will expire in 60 minutes. We set an expiration to make sure that other jobs can still start if this one hangs. We don't want to lock forever.

#clear_processing_lock! deletes the model specific key to allow other jobs to start.

Benefits of Locking

This set up has worked very successfully for the last few months. We went from a separate job per event and trying to rate limit around that. It was very complicated to limit when fanned out like that. The worst version of that was very similar to this with keys for locking, but cycled hundreds of millions of jobs. A very large strain on the system for doing virtually nothing.

It helps out a lot with handling rate limits for talking to external APIs.

Eric Oestrich
I am:
All posts
Creative Commons License
This site's content is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License unless otherwise specified. Code on this site is licensed under the MIT License unless otherwise specified.