At Citrine, we have a long data pipeline that processes data uploaded by our users. This data can be as simple as a basic PDF document, or as complex as a rigidly structured set of materials properties in a CSV or JSON file.To make it easy for clients in multiple languages to join the fun, we chose RabbitMQ for messaging. This post has a few tips & tricks you might find handy when using RabbitMQ with Ruby and Rails.
Subscribe with Sneakers
Sneakers is a fantastic library for letting your Rails application subscribe to RabbitMQ message queues. It’s very easy to configure with your Rails environment, so you have access to ActiveRecord or anything else you need. It’s also easy to get going on Heroku with a simple Procfile configuration:
-- Procfile web: bundle exec puma -p $PORT -c config/puma.rb worker: WORKERS=’EmailWorker’ bundle exec rake sneakers:run bigworker: WORKERS=’BigWorker’ bundle exec rake sneakers:run
We spread our workers out across different dynos to maximize resource utilization and to easily scale our bigger, slower jobs. For example, a particular job might need a lot of memory or CPU but the rest of your workers are lightweight.
You can also override the global sneakers configuration (set in config/sneakers.rb) per-worker, which is handy for particular jobs that might need to override a timeout or prefetch setting. Sneakers makes this really easy:
-- app/workers/one_worker.rb class EmailWorker include Sneakers::Worker from_queue ‘queue.email_queue’, :timeout_job_after => 180 #override config/sneakers.rb
Test your Workflow
Testing your Sneakers workers (or your entire RabbitMQ workflow) can be cumbersome. We found a library, Moqueue, that eases a lot of this pain. It’s quite old and documentation is sparse (or non-existent, but you can view a cached version of the introductory blog post here ), but thankfully it’s not rocket science. The gist is that it manages your message queues in memory and doesn’t require a running AMQP broker. To get started, you’ll need a bit of boilerplate. First, you need a few methods in your test helper:
-- test/test_helper.rb module MoqueueRunner def run logger.info ‘Worker subscribed.’ @queue.subscribe {|header, msg| self.work msg } logger.info ‘Worker alive.’ end end
def make_saferun(clazz) clazz.include(MoqueueRunner) # custom Moqueue runner for sneakers workers end
This is code just makes our Sneakers workers play nicely with Moqueue’s AMQP situation. Each test also needs to set up the queues it uses, set the worker to use the MoqueueRunner, and finally make our internal AMQP publishers use Moqueue. Note that our publishing class is custom, but allows for the injection of a custom AMQP channel for testing & flexibility.
-- test/models/one_worker_test.rb def setup overload_amqp # start moqueue make_saferun(EmailWorker) # set the runner # setup the queue for this test @mq = MQ.new @qname = ‘queue.email_queue’ @topic = @mq.topic(‘queue’) @queue = @mq.queue(@qname) @queue.bind(@topic, :key => @qname) # make your publisher use Moqueue by adding AMQP::ConnectionManager.stub :get_publisher, { pub: @topic, keyname: key } do @publisher = Publisher::MyPublisher.new(“queue.another_queue”, @mq) end end
def teardown reset_broker # reset the queues for each test end
OK! You’re almost ready to test. Here’s a simple test that makes sure a queue contains a message we think it does. This particular test will make sure that upon delivery of a certain message, an e-mail will get sent.
-- test/models/one_worker_test.rb test ‘email sent on message receive’ worker = EmailWorker.new(@queue) worker.run @topic.publish( { ‘message’ => “send email”, email: ‘president@whitehouse.gov’ }, key: “queue.email_queue”) assert_not ActionMailer::Base.deliveries.empty? end
You can also test some information about the queues themselves.
-- test/models/one_worker_test.rb test ‘message contains email’ d = EmailObject.create(email:’president@whitehouse.gov’) d.publish_self assert @queue.received_messages.size >= 1 assert @queue.received_messages[0][‘email’] == ‘president@whitehouse.gov’ end
There you have it! It’s not the easiest thing in the world, but you can at least test some behavior with your asynchronous workers.
Comments (0)
Sign in to post comments.