Maglevity

MagLev, Ruby, …

Simple Worker Queue with Procs in MagLev

with 4 comments

At the last PDX.rb meeting, Jesse Cooke presented “A very simple implementation of a background queue/worker” in MagLev. His program uses ruby Proc objects as the unit of work. Since procs are just objects, MagLev can save them to disk and run them in any MagLev VM connected to the repository. Instant, native distributed worker queue!

Jesse’s Worker Queue Example

Jesse’s code implements a single producer, multiple consumer queue using MagLev persistence and plain old Ruby objects. Here is the producer:

Maglev.abort_transaction

Maglev::PERSISTENT_ROOT[:q] ||= []

20.times do |t|
  Maglev::PERSISTENT_ROOT[:q] << Proc.new { sleep 1; puts "job ##{t}" }
end

Maglev.commit_transaction

Maglev::PERSISTENT_ROOT is a Hash that is persistent (i.e., it and its contents are saved in the MagLev repository, and are visible to all MagLev VMs that connect to the repository.

Because MagLev implements Persistence by Reachability, the array that is assigned to PERSISTENT_ROOT[:q], and the contents of that array will also be persisted to the repository. So, when Jesse puts twenty Proc objects into the queue, and commits them, they will be available to any worker in any MagLev VM connected to the same repository. Here’s the worker (consumer) code:

work = "let's do something"
while work do
  begin
    Maglev.abort_transaction
    work = Maglev::PERSISTENT_ROOT[:q].shift
    Maglev.commit_transaction
    work.call if work
  rescue Maglev::CommitFailedException
    puts "Dang, someone hijacked my job... gonna grab another."
    redo
  end
end

Workers spend their dreary lives in the confines of a loop, endlessly pulling jobs off of the queue and executing them. The unique aspect MagLev brings is that these workers can be in a different VM and on a different node from the VM that created the proc. The system is coordinated using MagLev transactions.

The worker begins his day with Maglev.abort_transaction. This creates a new transactional context, and gives it the latest version of the job queue. The worker then pulls the next job off of the queue:

work = Maglev::PERSISTENT_ROOT[:q].shift

commits the new state of the queue:

Maglev.commit_transaction

and gets down to work:

work.call if work

Running the Example

First run queue.rb to set up the queue:

$ maglev-ruby queue.rb

Next, inspect the queue to see that MagLev has indeed saved a bunch of Proc objects, and that they are available in a different VM:

$ maglev-ruby -e 'p Maglev::PERSISTENT_ROOT[:q]'
[#<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>]

Finally, simultaneously run several worker VMs. I ran three, and this was the output:

$ maglev-ruby worker.rb
job #1
job #3
job #5
job #8
job #11
job #14
job #17
$ maglev-ruby worker.rb
job #0
job #2
job #4
job #6
job #9
job #12
job #15
job #18
$ maglev-ruby worker.rb
job #7
job #10
job #13
job #16
job #19

Each job is run once, and only once. Pretty sweet for a few lines of code.

Potential Trouble

We saw the happy path through the worker code above, but what if two workers (in different MagLev VMs), try to get a job at about the same time? Here is a scenario that might get you into trouble. Suppose the queue is [job1, job2, job3], and we have Fred and Barney as workers:

Fred     Maglev.abort_transaction       # queue is [job1, job2, job3]
Barney   Maglev.abort_transaction       # queue is [job1, job2, job3]

Fred     work = Maglev::PERSISTENT_ROOT[:q].shift  # gets job1
Barney   work = Maglev::PERSISTENT_ROOT[:q].shift  # Also gets job1 !

We don’t want both workers working on the same job. But Jesse, clever chap that he is, makes the workers commit their updates to the queue before they start working. The first worker to do so, will have no problem:

Fred     Maglev.commit_transaction   # Success!
Fred:    work.call if work           # grind_stone.apply(nose)

But since Fred committed before Barney, and Fred updated an object (pulled job1 off the queue) that Barney also changed, Barney’s VM will raise an exception:

Barney   Maglev.commit_transaction   # raises Maglev::CommitFailedException

So now Barney knows something is wrong, and executes the rescue clause:

Barney   rescue Maglev::CommitFailedException
Barney     puts "Dang, someone hijacked my job... gonna grab another."
Barney     redo

which gets Barney back to the top of the loop where he tries grabbing another job. When Barney freshens his transactional view, he will see the updated queue, and everything works out well:

Barney   Maglev.abort_transaction                  # queue is [job2, job3]
Barney   work = Maglev::PERSISTENT_ROOT[:q].shift  # gets job2
Barney   Maglev.commit_transaction   # Success!
Barney:  work.call if work

Inducing the Race Condition

In the previous run of workers, we didn’t see any complaints about hijacked jobs. This is because the window for the race condition is so narrow, that it is very unlikely to occur. But, we can modify worker.rb to make the race condition more likely:

work = "let's do something"
while work do
  begin
    Maglev.abort_transaction
    work = Maglev::PERSISTENT_ROOT[:q].shift

    # Sleep a bit *before* we commit, to increase the probability someone
    # else will get the same version of the queue we did.
    sleep rand(3)

    Maglev.commit_transaction
    work.call if work
  rescue Maglev::CommitFailedException
    puts "Dang, someone hijacked my job... gonna grab another."
    redo
  end
end

To see the race, first reload the queue:

$ maglev-ruby queue.rb

then simultaneously fire off several workers to consume the jobs:

$ maglev-ruby worker.rb
job #0
job #1
Dang, someone hijacked my job... gonna grab another.
job #4
job #5
job #7
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
job #11
job #12
Dang, someone hijacked my job... gonna grab another.
job #14
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
$ maglev-ruby worker.rb
Dang, someone hijacked my job... gonna grab another.
job #2
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
job #6
job #8
Dang, someone hijacked my job... gonna grab another.
job #10
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
job #18
job #19
$ maglev-ruby worker.rb
Dang, someone hijacked my job... gonna grab another.
job #3
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
job #9
Dang, someone hijacked my job... gonna grab another.
Dang, someone hijacked my job... gonna grab another.
job #13
job #15
job #16
job #17
Dang, someone hijacked my job... gonna grab another.

All the jobs get executed once, and only once, but we see that there was indeed contention for the queue, resulting in a lot of swearing in the workplace!

Summary

While the example is not perfect (e.g., it does not protect against lost jobs if a worker dies after the commit, but before the job is finished, nor does it support multiple producers), but for a a few lines of code, Jesse’s queue is a nice example that shows the power MagLev’s persistence and optimistic concurrency control bring to Ruby.

Written by maglevdevelopment

July 8, 2011 at 8:51 am

Posted in MagLev, Ruby

4 Responses

Subscribe to comments with RSS.

  1. Can you tell more about how to handle the cases which are in the summary?

    s

    November 7, 2011 at 12:28 am

  2. Enjoyed your RubyConf talk! I modified your example to run jobs asynchronously and poll for new jobs when the queue is empty:

    Store = Maglev::PERSISTENT_ROOT

    Maglev.abort_transaction

    Store[:queue] ||= []

    20.times do |t|
    Store[:queue] << Proc.new do
    sleep 1
    puts "job ##{t}"
    end
    end

    Maglev.commit_transaction

    loop do
    Maglev.abort_transaction

    until Store[:queue].empty?
    Thread.new do
    Store[:queue].shift.call
    end

    Maglev.commit_transaction
    end

    sleep 0.5
    end

    Then from another Maglev irb you can add a job to the queue:

    Store = Maglev::PERSISTENT_ROOT

    Maglev.abort_transaction
    Store[:queue] << Proc.new { puts 'another task!'}
    Maglev.commit_transaction

    havenn

    November 1, 2012 at 11:46 am


Leave a comment