Simple Worker Queue with Procs in MagLev
At the last PDX.rb meeting, Jesse Cooke presented “A very simple implementation of a background queue/worker” in MagLev. His program uses ruby Proc
objects as the unit of work. Since procs are just objects, MagLev can save them to disk and run them in any MagLev VM connected to the repository. Instant, native distributed worker queue!
Jesse’s Worker Queue Example
Jesse’s code implements a single producer, multiple consumer queue using MagLev persistence and plain old Ruby objects. Here is the producer:
Maglev.abort_transaction Maglev::PERSISTENT_ROOT[:q] ||= [] 20.times do |t| Maglev::PERSISTENT_ROOT[:q] << Proc.new { sleep 1; puts "job ##{t}" } end Maglev.commit_transaction
Maglev::PERSISTENT_ROOT
is a Hash
that is persistent (i.e., it and its contents are saved in the MagLev repository, and are visible to all MagLev VMs that connect to the repository.
Because MagLev implements Persistence by Reachability, the array that is assigned to PERSISTENT_ROOT[:q]
, and the contents of that array will also be persisted to the repository. So, when Jesse puts twenty Proc
objects into the queue, and commits them, they will be available to any worker in any MagLev VM connected to the same repository. Here’s the worker (consumer) code:
work = "let's do something" while work do begin Maglev.abort_transaction work = Maglev::PERSISTENT_ROOT[:q].shift Maglev.commit_transaction work.call if work rescue Maglev::CommitFailedException puts "Dang, someone hijacked my job... gonna grab another." redo end end
Workers spend their dreary lives in the confines of a loop, endlessly pulling jobs off of the queue and executing them. The unique aspect MagLev brings is that these workers can be in a different VM and on a different node from the VM that created the proc. The system is coordinated using MagLev transactions.
The worker begins his day with Maglev.abort_transaction
. This creates a new transactional context, and gives it the latest version of the job queue. The worker then pulls the next job off of the queue:
work = Maglev::PERSISTENT_ROOT[:q].shift
commits the new state of the queue:
Maglev.commit_transaction
and gets down to work:
work.call if work
Running the Example
First run queue.rb
to set up the queue:
$ maglev-ruby queue.rb
Next, inspect the queue to see that MagLev has indeed saved a bunch of Proc
objects, and that they are available in a different VM:
$ maglev-ruby -e 'p Maglev::PERSISTENT_ROOT[:q]' [#<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>, #<Proc>]
Finally, simultaneously run several worker VMs. I ran three, and this was the output:
$ maglev-ruby worker.rb job #1 job #3 job #5 job #8 job #11 job #14 job #17
$ maglev-ruby worker.rb job #0 job #2 job #4 job #6 job #9 job #12 job #15 job #18
$ maglev-ruby worker.rb job #7 job #10 job #13 job #16 job #19
Each job is run once, and only once. Pretty sweet for a few lines of code.
Potential Trouble
We saw the happy path through the worker code above, but what if two workers (in different MagLev VMs), try to get a job at about the same time? Here is a scenario that might get you into trouble. Suppose the queue is [job1, job2, job3], and we have Fred and Barney as workers:
Fred Maglev.abort_transaction # queue is [job1, job2, job3] Barney Maglev.abort_transaction # queue is [job1, job2, job3] Fred work = Maglev::PERSISTENT_ROOT[:q].shift # gets job1 Barney work = Maglev::PERSISTENT_ROOT[:q].shift # Also gets job1 !
We don’t want both workers working on the same job. But Jesse, clever chap that he is, makes the workers commit their updates to the queue before they start working. The first worker to do so, will have no problem:
Fred Maglev.commit_transaction # Success! Fred: work.call if work # grind_stone.apply(nose)
But since Fred committed before Barney, and Fred updated an object (pulled job1 off the queue) that Barney also changed, Barney’s VM will raise an exception:
Barney Maglev.commit_transaction # raises Maglev::CommitFailedException
So now Barney knows something is wrong, and executes the rescue clause:
Barney rescue Maglev::CommitFailedException Barney puts "Dang, someone hijacked my job... gonna grab another." Barney redo
which gets Barney back to the top of the loop where he tries grabbing another job. When Barney freshens his transactional view, he will see the updated queue, and everything works out well:
Barney Maglev.abort_transaction # queue is [job2, job3] Barney work = Maglev::PERSISTENT_ROOT[:q].shift # gets job2 Barney Maglev.commit_transaction # Success! Barney: work.call if work
Inducing the Race Condition
In the previous run of workers, we didn’t see any complaints about hijacked jobs. This is because the window for the race condition is so narrow, that it is very unlikely to occur. But, we can modify worker.rb
to make the race condition more likely:
work = "let's do something" while work do begin Maglev.abort_transaction work = Maglev::PERSISTENT_ROOT[:q].shift # Sleep a bit *before* we commit, to increase the probability someone # else will get the same version of the queue we did. sleep rand(3) Maglev.commit_transaction work.call if work rescue Maglev::CommitFailedException puts "Dang, someone hijacked my job... gonna grab another." redo end end
To see the race, first reload the queue:
$ maglev-ruby queue.rb
then simultaneously fire off several workers to consume the jobs:
$ maglev-ruby worker.rb job #0 job #1 Dang, someone hijacked my job... gonna grab another. job #4 job #5 job #7 Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another. job #11 job #12 Dang, someone hijacked my job... gonna grab another. job #14 Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another.
$ maglev-ruby worker.rb Dang, someone hijacked my job... gonna grab another. job #2 Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another. job #6 job #8 Dang, someone hijacked my job... gonna grab another. job #10 Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another. job #18 job #19
$ maglev-ruby worker.rb Dang, someone hijacked my job... gonna grab another. job #3 Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another. job #9 Dang, someone hijacked my job... gonna grab another. Dang, someone hijacked my job... gonna grab another. job #13 job #15 job #16 job #17 Dang, someone hijacked my job... gonna grab another.
All the jobs get executed once, and only once, but we see that there was indeed contention for the queue, resulting in a lot of swearing in the workplace!
Summary
While the example is not perfect (e.g., it does not protect against lost jobs if a worker dies after the commit, but before the job is finished, nor does it support multiple producers), but for a a few lines of code, Jesse’s queue is a nice example that shows the power MagLev’s persistence and optimistic concurrency control bring to Ruby.
Can you tell more about how to handle the cases which are in the summary?
s
November 7, 2011 at 12:28 am
Enjoyed your RubyConf talk! I modified your example to run jobs asynchronously and poll for new jobs when the queue is empty:
Store = Maglev::PERSISTENT_ROOT
Maglev.abort_transaction
Store[:queue] ||= []
20.times do |t|
Store[:queue] << Proc.new do
sleep 1
puts "job ##{t}"
end
end
Maglev.commit_transaction
loop do
Maglev.abort_transaction
until Store[:queue].empty?
Thread.new do
Store[:queue].shift.call
end
Maglev.commit_transaction
end
sleep 0.5
end
Then from another Maglev irb you can add a job to the queue:
Store = Maglev::PERSISTENT_ROOT
Maglev.abort_transaction
Store[:queue] << Proc.new { puts 'another task!'}
Maglev.commit_transaction
havenn
November 1, 2012 at 11:46 am
Oops, the code I posted lost its indentation. Sorry about that! Here is a gist: https://gist.github.com/3996000
havenn
November 1, 2012 at 11:50 am
*I* (Peter) did not give the talk at RubyConf 2012, that was Jesse. But I’ll forward on the compliment.
pbmclain
November 1, 2012 at 12:21 pm