Posted on 31 May 2016
For a project at work we needed to reindex a large Elasticsearch index and couldn’t do it via the _reindex
API. The source needed to be processed slightly in the new index. We were reindexing to gain more shards.
This is the rake task that helped reindex. It is based on this Elasticsearch Guide.
Usage
First create the new index via Postman.
POST /new_index
{
'settings': {
'index': {
'number_of_shards': 3
}
}
}
Next run the rake task. This will handle the reindex from one to the other via scan/scroll and the bulk API. I ran this on my local machine to not deal with Heroku timeouts. I simply set the correct environment variables to have Chewy pick up the production Elasticsearch. This is risky but I was not worried because we were dealing with only a new index.
bundle exec rake elasticsearch:reindex[old_index,new_index]
This rake task comes with a nice progress bar to track how far along the reindex is.
Once the reindex is complete you can update the alias you use to have production start using the new index.
POST /_aliases
{
'actions': [
{ 'remove': { 'index': 'old_index', 'alias': 'alias' } },
{ 'add': { 'index': 'new_index', 'alias': 'alias' } }
]
}
Rake Task
namespace :elasticsearch do
desc "Reindex a index"
task :reindex, [:old_index_name, :new_index_name] => [:environment] do |t, args|
client = Chewy.client
results = client.search({
index: args[:old_index_name],
scroll: '10m',
body: {
"query" => { "match_all" => {} },
"sort" => ["_doc"],
"size" => 1000,
},
})
progressbar = ProgressBar.create({
:title => "Documents (thousands)",
:total => results["hits"]["total"] / 1000 + 1,
:format => '%a |%B| %p%% %t %c of %C',
})
loop do
break if results["hits"]["hits"].empty?
bulk_body = results["hits"]["hits"].map do |result|
source = result["_source"]
# process the source
{
index: {
_index: args[:new_index_name],
_type: result["_type"],
_id: result["_id"],
data: source,
},
}
end
response = client.bulk(body: bulk_body)
if response["errors"]
raise "Problem reindexing - #{response.inspect}"
end
progressbar.increment
results = client.scroll({
scroll: '10m',
scroll_id: results["_scroll_id"],
})
end
progressbar.finish
end
end
Improvements
This rake task isn’t perfect, but it gets most of the way there. Some future improvements will be better handling around timeouts when talking to Elasticsearch. It would also be good to deal with indexing errors when bulk importing into the new index.
Posted on 18 May 2016
Several of my projects involve integrating with APIs. In order to verify that all is well with our integrations we perform a nightly check up. In this check up we perform token validation and other sanity checks such as billing is valid. We check for anything that might change that would end up breaking our integration.
Base Worker
class NightlyCheckWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
recurrence { daily }
def perform
User.active.each do |user|
CheckWorker.perform_async(user.id)
end
end
end
This worker is here to fan out to separate jobs for each active user. The NightlyCheckWorker::CheckWorker
is listed below.
Validation Class
class NightlyCheckWorker
class Validation
include ActiveModel::Validations
attr_reader :user
def initialize(user)
@user = user
end
def disable?
false
end
# Hook to send mail on validation fail
def deliver_notice
end
end
end
This class acts as an interface for the rest of validations. It has a basic constructor and a few methods with defaults that can be overridden.
It includes ActiveModel::Validations
to allow each validation class to perform validations similar to normal active record objects.
A Validator
class NightlyCheckWorker
class GoogleTokenValidator < Validation
def disable?
true
end
def message
"Google token is invalid"
end
validate do
response = user.google_client.get("/ping")
case response.status
when 401
errors.add(:google, "token has been rejected"
when 403
errors.add(:google, "is invalid")
end
end
end
end
This is a sample validator class. We can see that this should be disabled and gives a message we can use in when sending an email. Note that the validation code is just a sample and is almost certainly wrong. It is provided to show how we check a response and give different errors depending on the status or message contained within.
NightlyCheckWorker::CheckWorker
class NightlyCheckWorker
class CheckWorker
include Sidekiq::Worker
def perform(user_id)
validators.each do |validator_class|
validator = validator_class.new(user)
unless validator.valid?
user.disable! if validator.disable?
validator.deliver_notice
end
end
update_user_info
end
private
def validators
[
GoogleTokenValidator,
# ...
]
end
end
end
This is the class that handles all of the work. The #validators
method contains all of the validator classes and will loop over them performing checks. The only thing worth pointing out in this code is the last method call of #perform
. I find it a good spot to include general caching in these workers. Every night we’ll cache any counts or update information that might have gotten out of date.
Improvements
One of the things that we changed after using this for a long time is saving the validation errors in the database. Previously they were always emailed to us and that was it. Now we’re saving them in a JSON column attached to the user. I will leave this as an excercise for the reader.
Posted on 07 May 2016
For one of my projects at SmartLogic, we process events in the background from webhooks. These talk to an API that need to be rate limited. Because of this we made a Sidekiq job that locks itself so only one job will run per ID.
For a better understanding of what this will accomplish, here is a brief example:
MyWorker.perform_async(1) # will process
MyWorker.perform_async(2) # will process
MyWorker.perform_async(1) # will immediately exit, 1 still processing
sleep 10 # all jobs have finished
MyWorker.perform_async(1) # will process
Recurrence
It should be pointed out that the worker example will tick with Sidetiq. This ensures we have a stready stream of jobs into the queue. The worker example will also reschedule itself to beat the tick if there are events to process.
class MySchedulerWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
recurrence { hourly.minute_of_hour(0, 15, 30, 45) }
def perform
User.all.each do |user|
MyWorker.perform_async(user.id)
end
end
end
Worker
Let’s start with what it looks like to use the locked job. This job will process an event stream of a user and pull off a certain amount of events each job run. The job will lock around the user, not the class itself.
class MyWorker
include Sidekiq::Worker
include LockedJob
def perform(user_id)
return if already_processing?(shop_id)
lock_processing!(user_id)
# ... process events
if events_left?
@schedule_next_run = true
end
ensure
clear_processing_lock(user_id)
if @schedule_next_run
self.class.perform_async(user_id)
end
end
private
# ...
def processing_key(user_id)
"users:#{user_id}:processing:#{jid}"
end
def processing_key_star(user_id)
"users:#{user_id}:processing:*"
end
end
To start out we check for #already_processing?
. If something is already processing, we just want this job to stop now. Once we know it’s safe to start working, #lock_processing!
.
This job will reschedule itself if there are still items to process, so we set an instance variable to trigger a new run once the lock has been cleared.
The last two private methods are very important for this. They have a unique key for redis that uses the job id from sidekiq and then a star version that we’ll use to determine how many of this job is currently running.
LockedJob
Note: AppContainer is a container for configuration in my projects. #redis_pool
is a connection_pool for redis.
module LockedJob
SIXTY_MINUTES = 60 * 60
private
def already_processing?(model_id)
AppContainer.redis_pool.with do |redis|
redis.keys(processing_key_star(model_id)).count >= 1
end
end
def lock_processing!(model_id)
AppContainer.redis_pool.with do |redis|
redis.setex(processing_key(model_id), SIXTY_MINUTES, "true")
end
end
def clear_processing_lock!(model_id)
# We are done processing, clean out the key
AppContainer.redis_pool.with do |redis|
redis.del(processing_key(model_id))
end
end
end
This module has the special methods that the worker used. #already_processing?
checks with the star key to see how many other jobs are running. This can easily be set to larger numbers if you want to lock at different rates.
#lock_processing!
sets a key that will expire in 60 minutes. We set an expiration to make sure that other jobs can still start if this one hangs. We don’t want to lock forever.
#clear_processing_lock!
deletes the model specific key to allow other jobs to start.
Benefits of Locking
This set up has worked very successfully for the last few months. We went from a separate job per event and trying to rate limit around that. It was very complicated to limit when fanned out like that. The worst version of that was very similar to this with keys for locking, but cycled hundreds of millions of jobs. A very large strain on the system for doing virtually nothing.
It helps out a lot with handling rate limits for talking to external APIs.
Posted on 23 Mar 2016
Part 3 will wrap up this series and show off a full example of how Nagare can be used.
Name fun fact: Nagare (流れ) is Japanese for a stream or flow.
Railtie
The railtie included with Nagare changes the default _render_with_renderer_json
method on controllers, similar to ActiveModel::Serializers. You can see a full version of it on github.
A Full Example
Controllers
We start by defining a context that lets the context have a current user. Anything else a serializer might want can be added here.
class ApplicationController < ActionController::Base
private
def nagare_context
@nagare_context ||= Nagare::Context.new({
current_user: current_user,
})
end
end
A regular controller looks like this:
class AdminOrdersController < ApplicationController
def index
render({
json: orders,
serializers: { collection: AdminOrdersSerializer, item: AdminOrderSerializer },
context: {
href: orders_url,
},
})
end
def show
render({
json: order,
serializers: { item: AdminOrderSerializer },
context: {
href: order_url(order),
},
})
end
end
The collection key is required if you have a collection. I found this to be a good pattern when working on Artifact.
Serializers
The serializers are pretty simple. The serializers can customize which attributes will be output. In Artifact I extend these with a custom DSL for [Collection+JSON][cjson] to include links, queries, and templates.
class AdminOrdersSerializer < Nagare::Collection
# This is the key that will contain all of the serialized items.
key "orders"
# You can also have extra attributes on the collection
attributes :count
def count
collection.count
end
end
Serializers have several ways of obtaining attributes. It can be a method on the serializer itself, the object, or from the context.
class AdminOrderSerializer < Nagare::Item
# email is a method off of the order
# item_count is a method we define
# href comes from the passed in context
attributes :email, :item_count, :href
def item_count
object.items.count
end
end
attributes
defines a method per attribute that will try the object or context for the attribute value. Otherwise you can simply define your own method to use.
Wrapping Up
This brings the Rethinking Rails API Serialization series to a close. I hope you found something useful. Please read over the source for Nagare. I welcome issues or pull requests.
Rethinking Rails API Serializations Series
Posted on 22 Mar 2016
Part 2 of this series will show off snippets of a new gem I created, Nagare. The gem is pretty small so give it a read through. Part 3 will show off more of the gem, this post will concentrate on the specific issues I had in the first post.
General Context
The first thing I wanted to tackle is injecting context to the serializer. I do this by making you specify everything you want the serializer to know about. The default context has nothing.
class ApplicationController < ActionController::Base
def nagare_context
@nagare_context ||= Nagare::Context.new({
current_user: current_user,
})
end
end
It’s simple to create a new context and you can even override it in specific controllers because it’s just a method. I tend to use my letter gem here to handle memoization for me.
Resource Context
Next up is context on the resource level. This is similar to how I injected context by ActiveModel::Serializers, the biggest difference is the key change.
def index
render({
context: {
href: books_url,
},
})
end
This hash extends the general context so you can easily override specific keys if necessary.
Collection Serializers
Collection serializers are a subclass of an item serializer in Nagare. This gives them the full capability of attribute
and the context.
class BooksSerializer < Nagare::Collection
attribute :count, :href
def count
collection.count
end
end
href
is coming from the context, if we think of this rendering in the index
method shown above.
Explicit Resources
Nagare requires you to explicitly set the serializer for it to do anything. This is how I prefer it because I almost never tended to use the automatic serializer choice.
def index
render({
serializers: { collection: BooksSerializer, item: BookSerializer },
})
end
Adapters
This was one thing I really liked from ActiveModel::Serializers and copied for Nagare. ActiveModel::Serializers lets you define an adapter that can completely reshape the JSON before it leaves your app. I used this for Collection+JSON with ActiveModel::Serializers.
Nagare’s version of adapters is pretty simple to start. The only interface is #as_json
. Here is the full default adapter:
class Adapter
def initialize(serializer, collection: false)
@serializer = serializer
@collection = collection
end
def as_json(options = nil)
serializer.as_json(options)
end
private
attr_reader :serializer, :collection
end
This is pretty simple, but you have a hook into doing more complicated things like Collection+JSON or JSON API.
Part 3
Next post I’ll show off how to use Nagare further.
Rethinking Rails API Serializations Series