Recent Posts

JSON Pointer Faraday Middleware

Posted on 20 Oct 2014

At Ruby DCamp I spent some time writing a faraday middleware that automatically dereferences JSON pointers. It was a fun exercise in reading an RFC spec and writing something that implements it. I wanted to do this after some cool demonstrations I saw at RESTFest this year. One was hyper+json which uses JSON Pointers.

Below is what I came up with. It is also hosted on github. The github project also contains specs which better highlight what is happening.

This code will take the following JSON:

{
  "name": "/first_name",
  "first_name": "Eric",
}

And convert it into:

{
  "name": "Eric",
  "first_name": "Eric",
}

It even allows deep linking of attributes.

json_faraday_middleware.rb
require 'faraday'
require 'json'

class JsonPointerMiddleware < Faraday::Middleware
  JSON_MIME_TYPE = "application/json"

  def call(env)
    @app.call(env).on_complete do |env|
      unless env.response_headers["Content-Type"] == JSON_MIME_TYPE
        return
      end

      @body = JSON.parse(env.body)
      pointerize(@body)
      begin
        env.body = @body.to_json
      rescue JSON::NestingError
        # there is a circular nest, skip dereferencing
      end
    end
  end

  def pointerize(body)
    body.each do |key, value|
      if value.is_a?(Hash)
        pointerize(value)
      end
      next unless value =~ /\//

      pointer_keys = value.split("/")[1..-1]
      pointer_keys = escape_slash(pointer_keys)
      pointer_keys = escape_tilde(pointer_keys)
      pointer_keys = convert_indices(pointer_keys)
      new_value = pointer_keys.inject(@body) do |body, pointer_key|
        next if body.nil?
        body[pointer_key]
      end
      body[key] = new_value if new_value
    end
  end

  def escape_slash(keys)
    keys.map do |key|
      key.gsub("~1", "/")
    end
  end

  def escape_tilde(keys)
    keys.map do |key|
      key.gsub("~0", "~")
    end
  end

  # Convert array indices to Integers
  def convert_indices(keys)
    keys.map do |key|
      Integer(key) rescue key
    end
  end
end

Detecting Primary Color in Photo

Posted on 15 Oct 2014

For one of our clients at SmartLogic, we had to find the primary color of a photo to use as a background while the photo was loading. This was pretty easy given ImageMagick and lot of other tutorials online, but the primary color was always some extremely dark color, or gray/black.

This sidekiq worker detects the “best” primary color of the photo, skipping really dark colors and any version of gray.

This uses the convert imagemagick command to find the color histogram and sorts it by most used color. It then strips out “bad” colors to find the most common color we can use.

class ProcessPhotoWorker
  include Sidekiq::Worker

  # Regex for the output of image magic
  COLOR_REGEX =
     /\d+: \(\s{0,2}\d{1,3},\s{0,2}\d{1,3},\s{0,2}\d{1,3}\) (?<color>#.{6}).*/
  # These are all very dark colors
  COLORS_TO_IGNORE = [
    "#000000", "#000033", "#000066",
    "#003300", "#003333", "#003366",
    "#330000", "#330033", "#330066",
    "#333300", "#333333", "#333366",
    "#660000", "#660033", "#660066",
    "#663300", "#663333", "#663366",
  ]

  def perform(photo_id)
    @photo = Photo.find(photo_id)

    # Save a local version of the S3 photo
    tmpfile = Tempfile.new("photo")
    begin
      tmpfile.binmode
      tmpfile.write(@photo.image.read)
      tmpfile.rewind

      image = ::MiniMagick::Image.read(tmpfile.read)

      capture_geometry(image)
      capture_main_color(image, tmpfile)
    ensure
      tmpfile.close
      tmpfile.unlink
    end

    @photo.save
  end

  private

  def capture_geometry(image)
    @photo.image_height = image[:height]
    @photo.image_width = image[:width]
  end

  # Uses the convert tool to find the most common color
  # Output is similar to:
  #   67655: ( 50, 18, 18) #321212 srgb(50,18,18)
  #  240295: ( 15,  3,  6) #0F0306 srgb(15,3,6)
  def capture_main_color(image, tmpfile)
    output = image.run(command_to_run(tmpfile))

    sorted_colors = output_to_colors(output)
    sorted_colors = sorted_colors - COLORS_TO_IGNORE
    sorted_colors = skip_gray(sorted_colors)

    @photo.image_main_color = sorted_colors.last
  end

  def command_to_run(tmpfile)
    command = ::MiniMagick::CommandBuilder.new("convert")
    command << tmpfile.path << "-resize" << "100x100" << "+dither"
    command << "-remap" << "netscape:" << "-format" << "'%c'" << "histogram:info:"
    command
  end

  def output_to_colors(command_output)
    lines = command_output.split("\n")

    sorted_lines = lines.map do |line|
      # convert white space to single space inbetween for the regex
      # first line contains a "'"
      line.gsub(/\s+/, " ").gsub("'", "").strip
    end.sort_by(&:to_i)

    sorted_colors = sorted_lines.map do |color|
      match = COLOR_REGEX.match(color)
      match[:color] if match
    end

    sorted_colors.reject(&:blank?)
  end

  def skip_gray(sorted_colors)
    sorted_colors.reject do |color|
      color = Color::RGB.from_html(color)
      color.red == color.green &&
        color.red == color.blue &&
        color.green == color.blue
    end
  end
end

"Remote" Faraday: Respecting API Rate Limits

Posted on 06 Oct 2014

This post will continue with the last post about Running an extra process inside of sidekiq.

My side project connects to a lot of APIs (themoviedb.org, thetvdb.com, GoodReads, and BoardGameGeek). Each of one of these has their own rate limit. Previously I was scheduling sidekiq jobs 10 seconds apart to make sure I didn’t hit any of their limits. This was really lame and I wanted a nicer way of handling the rate limit.

What I came up with was what I referred to as “remote” faraday. The faraday connection is in a single spot inside of the extra process I stuck inside of sidekiq.

I have classes that handle each API for two specific needs, searching for items and viewing a single item. A sample for BoardGameGeek is below.

class BGG
  def search(board_game_name)
    response = remote_faraday.
      get(:bgg, "/xmlapi/search?search=#{CGI.escape(board_game_name)}")
    xml = Nokogiri::XML(response)
    # process the xml
  end

  def board_game(board_game_id)
    request = remote_faraday.
      get(:bgg, "/xmlapi/boardgame/#{board_game_id}")
    xml = Nokogiri::XML(request)
    # process the xml
  end

  private

  def remote_faraday
    @remote_faraday ||= RemoteFaraday.new
  end
end

There’s not much difference here from a regular faraday connection and my “remote” connection. This was nice because this class shouldn’t care about the change.

Here is what the RemoteFaraday class looks like:

class RemoteFaraday
  MAX_LOOPS = 200 # a minute or so of waiting

  class TimeoutError < StandardError
  end

  def initialize(container = AppContainer)
    @redis_pool = container.redis_connection_pool
    @uuid_generator = container.uuid_generator
  end

  def get(client, path)
    uuid = @uuid_generator.uuid

    @redis_pool.with do |redis|
      redis.lpush("remote-faraday:requests", {
        :method => :get,
        :client => client,
        :path => path,
        :uuid => uuid,
      }.to_json)
    end

    count = 0
    begin
      response = @redis_pool.with do |redis|
        redis.get(redis_key(client, :get, path, uuid))
      end
      sleep 0.3
      count += 1
    end while response.nil? && count < MAX_LOOPS

    if response.nil?
      raise TimeoutError
    end

    @redis_pool.with do |redis|
      redis.del(redis_key(client, :get, path, uuid))
    end

    response
  end

  private

  def redis_key(client, method, path, uuid)
    "#{client}:response:#{method}:#{path}:#{uuid}"
  end
end

The class takes a redis pool and UUID generator from an application container. The UUID is used to make sure each request is saved uniquely inside of redis. We don’t want to overwrite a different request/response.

The RemoteFaraday class pushes a request into the redis queue and waits about a minute for a response. In order to not fill up redis we clear out the key that the response was saved into.

This class only handles GETs at the moment because I haven’t needed to use any other method yet.

The other end of the queue is the RemoteOrchestrator that was seen in the previous post. Here is the full one.

class RemoteOrchestrator
  include Celluloid
  include Celluloid::Logger

  def initialize(redis_pool = AppContainer.redis_connection_pool)
    @redis_pool = redis_pool

    info "Starting orchestrator for remote connections"

    after(1) do
      loop!
    end
  end

  def loop!
    message = @redis_pool.with do |redis|
      redis.brpop("remote-faraday:requests", :timeout => 5)
    end

    unless message
      async.loop!
      return
    end

    message = JSON.parse(message.last)

    info "Received: #{message.inspect}"

    worker = workers[message["client"]]
    worker.async.send(message["method"], message["path"], message["uuid"])

    async.loop!
  end

  private

  def workers
    @workers ||= {
      "bgg" => Celluloid::Actor[:bgg],
    }
  end

  def connection_pool
    @redis_pool
  end
end

This is virtually the same as what I put in the last post. The #loop! method calls itself after each message or breaking from a timeouted brpop.

Below is the RemoteConnection class that uses faraday to connect to an API.

class RemoteConnection
  include Celluloid
  include Celluloid::Logger

  def initialize(connection_pool = AppContainer.redis_connection_pool)
    @connection_pool = connection_pool
  end

  def get(path, uuid)
    exclusive do
      info "Fetching #{path} with uuid #{uuid}"
      response = connection.get(path)

      @connection_pool.with do |redis|
        redis.set(redis_key(:get, path, uuid), response.body)
        info "Set response to #{redis_key(:get, path, uuid)}"
      end

      sleep timeout
    end
  end

  def timeout
    1
  end

  private

  def redis_key(method, path, uuid)
    "#{client}:response:#{method}:#{path}:#{uuid}"
  end

  def connection
    @connection ||= Faraday.new(host)
  end
end

The important bit in the previous class is exclusive. This is a celluloid directive that makes sure only 1 worker ever runs what is inside of the block. Since the entire method is inside the block only 1 connection will happen.

Below is a specific subclass for BoardGameGeek.

class RemoteBGG < RemoteConnection
  def host
    "http://boardgamegeek.com"
  end

  def client
    :bgg
  end
end

This was a fun little project that has let me speed up my sidekiq jobs since I no longer needed to arbitrarily wait 10 seconds in between jobs. I’ve thought about making this into a gem, but I haven’t had a reason to yet. I’m also not sure how useful it would be as a gem.

Running an extra process inside of sidekiq

Posted on 25 Sep 2014

For one of my side projects I wanted to have an always on process. I didn’t really want to have to set up something like daemon-kit for it as that felt like too much for what this was going to do. I remembered that sidetiq was able to accomplish this and started looking at the source code.

Sidetiq very simply starts running inside the sidekiq process by using celluloid. This was the first time I played with celluloid so it was a good time to start learning about it.

Launching your process inside of sidekiq is easy.

if Sidekiq.server?
  MySupervisor.run!
end

A supervisor simply keeps celluloid actors going if they die. See supervisors for more information. My supervisor looked like:

class MySupervisor < Celluloid::SupervisionGroup
  supervise CelluloidActor, :as => :actor
  supervise OtherCelluloidActor, :as => :other_actor
  supervise Orchestrator, :as => :orchestrator
end

I created an orchestrator actor that does all of the main work for me. It looks at redis and pops messages off a queue as they are entered.

class RemoteOrchestrator
  include Celluloid
  include Celluloid::Logger

  def initialize(redis_pool = AppContainer.redis_connection_pool)
    @redis_pool = redis_pool

    info "Starting orchestrator for remote connections"

    after(1) do
      loop!
    end
  end

  def loop!
    message = connection_pool.with do |redis|
      redis.brpop("requests", :timeout => 5)
    end

    unless message
      async.loop!
      return
    end

    message = JSON.parse(message.last)

    info "Received: #{message.inspect}"

    worker = workers[message["client"]]
    worker.async.process(message["body"])

    async.loop!
  end

  private

  def workers
    @workers ||= {
      "actor" => Celluloid::Actor[:actor],
      "other_actor" => Celluloid::Actor[:other_actor],
    }
  end

  def connection_pool
    @redis_pool
  end
end

The important part about this class is the after(1) call. It queues the block to run in 1 second, which puts it in the background. The actor also loops itself by sending another loop! message async at the end of loop!. This makes sure it will continuing running.

I’ve run this in my side project for a few months now and I haven’t noticed any ill effects.

Redis Application Configuration Class

Posted on 18 Sep 2014

At work I needed to make a configuration class that worked well with ActiveAdmin, which meant implementing a lot of the ActiveModel modules. Below is what I came up with. It was pretty overkill for this project, but I like what came out nonetheless.

One nice thing I want to point out is that I’m passing in a redis instance, in this case the default is the AppContainer.redis. This lets you easily set up a Redis::Namespace for the configuration class, and the class itself doesn’t have to know how to do that.

To define new attributes you use the attribute class method. It lets you set a default adn a type. The type is only special for :integer at the moment, but it’s not hard to add more.

class Configuration
  extend ActiveModel::Naming
  include ActiveModel::Conversion
  include ActiveModel::Dirty
  include ActiveModel::Serialization
  include ActiveModel::Serializers::JSON

  def self.reset!(redis = AppContainer.redis)
    attributes.each do |attribute|
      redis.del("config_#{attribute}")
    end
  end

  def self.attributes
    @attributes ||= []
  end

  def self.attribute(name, default: default, type: type)
    attributes << name

    define_attribute_methods name

    define_method(name) do
      attr = @attributes[name] || default

      case type
      when :integer
        attr.to_i
      else
        attr
      end
    end
    define_method("#{name}=") do |value|
      send("#{name}_will_change!") unless value == @attributes[name]
      @attributes[name] = value
    end
  end

  attribute :timeout, default: 1500, type: :integer

  def initialize(redis = AppContainer.redis)
    @redis = redis
    @attributes = {}
    load_attributes
  end

  def attributes
    attrs = self.class.attributes.map do |attribute|
      [attribute, send(attribute)]
    end
    Hash[attrs]
  end

  def update(attributes)
    attributes.each do |attribute, value|
      send("#{attribute}=", value)
    end
    save
  end

  def save
    changes.each do |attribute, (old, new)|
      @redis.set("config_#{attribute}", new)
    end
    changes_applied
    true
  end

  private

  def load_attributes
    self.class.attributes.each do |attribute|
      @attributes[attribute] = @redis.get("config_#{attribute}")
    end
  end
end

To use it:

config = Configuration.new
config.timeout
# => 1500
config.timeout = 1600
config.changed?
# => true
config.changes
# => { "timeout" => [1500, 1600] }
config = Configuration.new
config.timeout
# => 1600
config.update("timeout" => 1000)
config.timeout
# => 1000
Creative Commons License
This site's content is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License unless otherwise specified. Code on this site is licensed under the MIT License unless otherwise specified.