Processing market data - ActiveMQ, ActiveMessaging and Stomp

For EVE Metrics we ran into some scalability and other issues right away with the market. Uploading market history data was causing some major problems, so we had to break out the big guns in terms of distributed workers right away.

Basically, whenever our upload client throws a chunk of data (Say, 100 different orders for a type and the price history for the last month), we’re running at a minimum 128 SQL insert queries- this all adds up, even with bulk inserts. Then you have to factor in delta calculations for histories, which adds another two SQL queries for each history value- again, more queries, more time. As the complexity increased, we ended up with around 3 seconds to run all the queries for larger logfiles, which is no good in reality- the uploader can sometimes be trying to push 3 to 5 files at a time up to the cluster, and this hangs half the app servers immediately for 3 seconds or so.

So, we now avoid MySQL entirely, and use some enterprise-grade messaging to handle our distributed processing of logs, entirely asynchronously from the upload controllers and Mongrel. Details after the break.

System Design

The basics are: We build a hash of logfile, user ID, and developer key ID (For tracking), and turn it into Yaml. We then load it into a broker, in our case ActiveMQ, using ActiveMessaging. ActiveMessaging uses Stomp to communicate with ActiveMQ, and ActiveMQ then gets a few queues of messages (jobs) to which we can push our Yamlified objects. We can then define which processors should listen to which queues, and write handlers to do the jobs asynchronously. Which is awesome.

Some code

So, we’d start by doing the basics:

require 'activemessaging/processor'
include ActiveMessaging::MessageSender

Once this is done we can do this to add a new job:

publish :market_order_uploads, {:body => params[:log], :user_id => @user.id}.to_yaml

In our RAILS_ROOT/config/messaging.rb file we have something like

ActiveMessaging::Gateway.define do |s|
  s.destination :market_order_uploads, '/queue/MarketOrderUploads'
end

Once we’ve done this, we can go ahead and write our processor to do the job that our slow controller used to take ages over.

class MarketOrderUploadsProcessor < ApplicationProcessor
  subscribes_to :market_order_uploads
  def on_message(raw_message)
    message = YAML::load(raw_message)
    logger.debug "MarketOrderUploadsProcessor received: " + message.inspect
  end
end

Then we just add a restart command for script/poller in our Capistrano deployment recipe and we’re all set for distributed processing. The clever bit of this is that we can start up more workers on the fly to handle demand, and can even add additional brokers and workers on other servers (Say, on Amazon EC2) for additional processing power.

Working with ActiveMQ has been challenging- there’s not much in the way of documentation for the configuration files, and there’s an assumption that you know as much as a Java developer about things like beans and whatnot, but guesswork and some googling got me set up and running smoothly. ActiveMessaging is an awesome project and works just fine- some of the docs on the wiki are a little out of date, but doing some quick ‘rake rdoc’ing on the plugin once installed gave me something a little more up-to-date. All in all, it’s not hard at all.

The best bit of this is that ActiveMQ is cross-platform (Unlike systems like memcached or BackgroundRB) so I can run this all on my Windows development box for testing.

Posted by James Harrison on Friday, July 04, 2008