Posted on 20 Oct 2014
At Ruby DCamp I spent some time writing a faraday middleware that automatically dereferences JSON pointers. It was a fun exercise in reading an RFC spec and writing something that implements it. I wanted to do this after some cool demonstrations I saw at RESTFest this year. One was hyper+json which uses JSON Pointers.
Below is what I came up with. It is also hosted on github. The github project also contains specs which better highlight what is happening.
This code will take the following JSON:
{
"name": "/first_name",
"first_name": "Eric",
}
And convert it into:
{
"name": "Eric",
"first_name": "Eric",
}
It even allows deep linking of attributes.
json_faraday_middleware.rb
require 'faraday'
require 'json'
class JsonPointerMiddleware < Faraday::Middleware
JSON_MIME_TYPE = "application/json"
def call(env)
@app.call(env).on_complete do |env|
unless env.response_headers["Content-Type"] == JSON_MIME_TYPE
return
end
@body = JSON.parse(env.body)
pointerize(@body)
begin
env.body = @body.to_json
rescue JSON::NestingError
# there is a circular nest, skip dereferencing
end
end
end
def pointerize(body)
body.each do |key, value|
if value.is_a?(Hash)
pointerize(value)
end
next unless value =~ /\//
pointer_keys = value.split("/")[1..-1]
pointer_keys = escape_slash(pointer_keys)
pointer_keys = escape_tilde(pointer_keys)
pointer_keys = convert_indices(pointer_keys)
new_value = pointer_keys.inject(@body) do |body, pointer_key|
next if body.nil?
body[pointer_key]
end
body[key] = new_value if new_value
end
end
def escape_slash(keys)
keys.map do |key|
key.gsub("~1", "/")
end
end
def escape_tilde(keys)
keys.map do |key|
key.gsub("~0", "~")
end
end
# Convert array indices to Integers
def convert_indices(keys)
keys.map do |key|
Integer(key) rescue key
end
end
end
Posted on 15 Oct 2014
For one of our clients at SmartLogic, we had to find the primary color of a photo to use as a background while the photo was loading. This was pretty easy given ImageMagick and lot of other tutorials online, but the primary color was always some extremely dark color, or gray/black.
This sidekiq worker detects the “best” primary color of the photo, skipping really dark colors and any version of gray.
This uses the convert
imagemagick command to find the color histogram and sorts it by most used color. It then strips out “bad” colors to find the most common color we can use.
class ProcessPhotoWorker
include Sidekiq::Worker
# Regex for the output of image magic
COLOR_REGEX =
/\d+: \(\s{0,2}\d{1,3},\s{0,2}\d{1,3},\s{0,2}\d{1,3}\) (?<color>#.{6}).*/
# These are all very dark colors
COLORS_TO_IGNORE = [
"#000000", "#000033", "#000066",
"#003300", "#003333", "#003366",
"#330000", "#330033", "#330066",
"#333300", "#333333", "#333366",
"#660000", "#660033", "#660066",
"#663300", "#663333", "#663366",
]
def perform(photo_id)
@photo = Photo.find(photo_id)
# Save a local version of the S3 photo
tmpfile = Tempfile.new("photo")
begin
tmpfile.binmode
tmpfile.write(@photo.image.read)
tmpfile.rewind
image = ::MiniMagick::Image.read(tmpfile.read)
capture_geometry(image)
capture_main_color(image, tmpfile)
ensure
tmpfile.close
tmpfile.unlink
end
@photo.save
end
private
def capture_geometry(image)
@photo.image_height = image[:height]
@photo.image_width = image[:width]
end
# Uses the convert tool to find the most common color
# Output is similar to:
# 67655: ( 50, 18, 18) #321212 srgb(50,18,18)
# 240295: ( 15, 3, 6) #0F0306 srgb(15,3,6)
def capture_main_color(image, tmpfile)
output = image.run(command_to_run(tmpfile))
sorted_colors = output_to_colors(output)
sorted_colors = sorted_colors - COLORS_TO_IGNORE
sorted_colors = skip_gray(sorted_colors)
@photo.image_main_color = sorted_colors.last
end
def command_to_run(tmpfile)
command = ::MiniMagick::CommandBuilder.new("convert")
command << tmpfile.path << "-resize" << "100x100" << "+dither"
command << "-remap" << "netscape:" << "-format" << "'%c'" << "histogram:info:"
command
end
def output_to_colors(command_output)
lines = command_output.split("\n")
sorted_lines = lines.map do |line|
# convert white space to single space inbetween for the regex
# first line contains a "'"
line.gsub(/\s+/, " ").gsub("'", "").strip
end.sort_by(&:to_i)
sorted_colors = sorted_lines.map do |color|
match = COLOR_REGEX.match(color)
match[:color] if match
end
sorted_colors.reject(&:blank?)
end
def skip_gray(sorted_colors)
sorted_colors.reject do |color|
color = Color::RGB.from_html(color)
color.red == color.green &&
color.red == color.blue &&
color.green == color.blue
end
end
end
Posted on 06 Oct 2014
This post will continue with the last post about Running an extra process inside of sidekiq.
My side project connects to a lot of APIs (themoviedb.org, thetvdb.com, GoodReads, and BoardGameGeek). Each of one of these has their own rate limit. Previously I was scheduling sidekiq jobs 10 seconds apart to make sure I didn’t hit any of their limits. This was really lame and I wanted a nicer way of handling the rate limit.
What I came up with was what I referred to as “remote” faraday. The faraday connection is in a single spot inside of the extra process I stuck inside of sidekiq.
I have classes that handle each API for two specific needs, searching for items and viewing a single item. A sample for BoardGameGeek is below.
class BGG
def search(board_game_name)
response = remote_faraday.
get(:bgg, "/xmlapi/search?search=#{CGI.escape(board_game_name)}")
xml = Nokogiri::XML(response)
# process the xml
end
def board_game(board_game_id)
request = remote_faraday.
get(:bgg, "/xmlapi/boardgame/#{board_game_id}")
xml = Nokogiri::XML(request)
# process the xml
end
private
def remote_faraday
@remote_faraday ||= RemoteFaraday.new
end
end
There’s not much difference here from a regular faraday connection and my “remote” connection. This was nice because this class shouldn’t care about the change.
Here is what the RemoteFaraday class looks like:
class RemoteFaraday
MAX_LOOPS = 200 # a minute or so of waiting
class TimeoutError < StandardError
end
def initialize(container = AppContainer)
@redis_pool = container.redis_connection_pool
@uuid_generator = container.uuid_generator
end
def get(client, path)
uuid = @uuid_generator.uuid
@redis_pool.with do |redis|
redis.lpush("remote-faraday:requests", {
:method => :get,
:client => client,
:path => path,
:uuid => uuid,
}.to_json)
end
count = 0
begin
response = @redis_pool.with do |redis|
redis.get(redis_key(client, :get, path, uuid))
end
sleep 0.3
count += 1
end while response.nil? && count < MAX_LOOPS
if response.nil?
raise TimeoutError
end
@redis_pool.with do |redis|
redis.del(redis_key(client, :get, path, uuid))
end
response
end
private
def redis_key(client, method, path, uuid)
"#{client}:response:#{method}:#{path}:#{uuid}"
end
end
The class takes a redis pool and UUID generator from an application container. The UUID is used to make sure each request is saved uniquely inside of redis. We don’t want to overwrite a different request/response.
The RemoteFaraday class pushes a request into the redis queue and waits about a minute for a response. In order to not fill up redis we clear out the key that the response was saved into.
This class only handles GET
s at the moment because I haven’t needed to use any other method yet.
The other end of the queue is the RemoteOrchestrator
that was seen in the previous post. Here is the full one.
class RemoteOrchestrator
include Celluloid
include Celluloid::Logger
def initialize(redis_pool = AppContainer.redis_connection_pool)
@redis_pool = redis_pool
info "Starting orchestrator for remote connections"
after(1) do
loop!
end
end
def loop!
message = @redis_pool.with do |redis|
redis.brpop("remote-faraday:requests", :timeout => 5)
end
unless message
async.loop!
return
end
message = JSON.parse(message.last)
info "Received: #{message.inspect}"
worker = workers[message["client"]]
worker.async.send(message["method"], message["path"], message["uuid"])
async.loop!
end
private
def workers
@workers ||= {
"bgg" => Celluloid::Actor[:bgg],
}
end
def connection_pool
@redis_pool
end
end
This is virtually the same as what I put in the last post. The #loop!
method calls itself after each message or breaking from a timeouted brpop
.
Below is the RemoteConnection
class that uses faraday to connect to an API.
class RemoteConnection
include Celluloid
include Celluloid::Logger
def initialize(connection_pool = AppContainer.redis_connection_pool)
@connection_pool = connection_pool
end
def get(path, uuid)
exclusive do
info "Fetching #{path} with uuid #{uuid}"
response = connection.get(path)
@connection_pool.with do |redis|
redis.set(redis_key(:get, path, uuid), response.body)
info "Set response to #{redis_key(:get, path, uuid)}"
end
sleep timeout
end
end
def timeout
1
end
private
def redis_key(method, path, uuid)
"#{client}:response:#{method}:#{path}:#{uuid}"
end
def connection
@connection ||= Faraday.new(host)
end
end
The important bit in the previous class is exclusive
. This is a celluloid directive that makes sure only 1 worker ever runs what is inside of the block. Since the entire method is inside the block only 1 connection will happen.
Below is a specific subclass for BoardGameGeek.
class RemoteBGG < RemoteConnection
def host
"http://boardgamegeek.com"
end
def client
:bgg
end
end
This was a fun little project that has let me speed up my sidekiq jobs since I no longer needed to arbitrarily wait 10 seconds in between jobs. I’ve thought about making this into a gem, but I haven’t had a reason to yet. I’m also not sure how useful it would be as a gem.
Posted on 25 Sep 2014
For one of my side projects I wanted to have an always on process. I didn’t really want to have to set up something like daemon-kit for it as that felt like too much for what this was going to do. I remembered that sidetiq was able to accomplish this and started looking at the source code.
Sidetiq very simply starts running inside the sidekiq process by using celluloid. This was the first time I played with celluloid so it was a good time to start learning about it.
Launching your process inside of sidekiq is easy.
if Sidekiq.server?
MySupervisor.run!
end
A supervisor simply keeps celluloid actors going if they die. See supervisors for more information. My supervisor looked like:
class MySupervisor < Celluloid::SupervisionGroup
supervise CelluloidActor, :as => :actor
supervise OtherCelluloidActor, :as => :other_actor
supervise Orchestrator, :as => :orchestrator
end
I created an orchestrator actor that does all of the main work for me. It looks at redis and pops messages off a queue as they are entered.
class RemoteOrchestrator
include Celluloid
include Celluloid::Logger
def initialize(redis_pool = AppContainer.redis_connection_pool)
@redis_pool = redis_pool
info "Starting orchestrator for remote connections"
after(1) do
loop!
end
end
def loop!
message = connection_pool.with do |redis|
redis.brpop("requests", :timeout => 5)
end
unless message
async.loop!
return
end
message = JSON.parse(message.last)
info "Received: #{message.inspect}"
worker = workers[message["client"]]
worker.async.process(message["body"])
async.loop!
end
private
def workers
@workers ||= {
"actor" => Celluloid::Actor[:actor],
"other_actor" => Celluloid::Actor[:other_actor],
}
end
def connection_pool
@redis_pool
end
end
The important part about this class is the after(1)
call. It queues the block to run in 1 second, which puts it in the background. The actor also loops itself by sending another loop!
message async at the end of loop!
. This makes sure it will continuing running.
I’ve run this in my side project for a few months now and I haven’t noticed any ill effects.
Posted on 18 Sep 2014
At work I needed to make a configuration class that worked well with ActiveAdmin, which meant implementing a lot of the ActiveModel modules. Below is what I came up with. It was pretty overkill for this project, but I like what came out nonetheless.
One nice thing I want to point out is that I’m passing in a redis instance, in this case the default is the AppContainer.redis. This lets you easily set up a Redis::Namespace
for the configuration class, and the class itself doesn’t have to know how to do that.
To define new attributes you use the attribute
class method. It lets you set a default adn a type. The type is only special for :integer
at the moment, but it’s not hard to add more.
class Configuration
extend ActiveModel::Naming
include ActiveModel::Conversion
include ActiveModel::Dirty
include ActiveModel::Serialization
include ActiveModel::Serializers::JSON
def self.reset!(redis = AppContainer.redis)
attributes.each do |attribute|
redis.del("config_#{attribute}")
end
end
def self.attributes
@attributes ||= []
end
def self.attribute(name, default: default, type: type)
attributes << name
define_attribute_methods name
define_method(name) do
attr = @attributes[name] || default
case type
when :integer
attr.to_i
else
attr
end
end
define_method("#{name}=") do |value|
send("#{name}_will_change!") unless value == @attributes[name]
@attributes[name] = value
end
end
attribute :timeout, default: 1500, type: :integer
def initialize(redis = AppContainer.redis)
@redis = redis
@attributes = {}
load_attributes
end
def attributes
attrs = self.class.attributes.map do |attribute|
[attribute, send(attribute)]
end
Hash[attrs]
end
def update(attributes)
attributes.each do |attribute, value|
send("#{attribute}=", value)
end
save
end
def save
changes.each do |attribute, (old, new)|
@redis.set("config_#{attribute}", new)
end
changes_applied
true
end
private
def load_attributes
self.class.attributes.each do |attribute|
@attributes[attribute] = @redis.get("config_#{attribute}")
end
end
end
To use it:
config = Configuration.new
config.timeout
# => 1500
config.timeout = 1600
config.changed?
# => true
config.changes
# => { "timeout" => [1500, 1600] }
config = Configuration.new
config.timeout
# => 1600
config.update("timeout" => 1000)
config.timeout
# => 1000