I came across a problem where a slow processing job needed to run. The results of the job did not need to be available in real time so we could wait 5-10 minutes for the result, no problem.

The trouble began when the results had changed and the results would no longer be valid. A new job would be enqueued to Sidekiq again to handle the re-processing. This is where things got interesting because the original job had already begun work on those rows, they were already locked.

MyAwesomeModel.transaction do
  query = MyAwesomeModel.where(awesome_id: 123).where(day: '1999-12-31')
  # lock the rows
  query.lock(true)
  # Delete to start fresh
  MyAwesomeModel.where(awesome_id: 123).where(day: '1999-12-31').delete_all
  # Import
  MyAwesomeModel.connection.query populate_awesome_model(awesome_id: 123, day: '1999-12-31')
end

At first, letting MySQL handle the deadlocks seemed to work. Just handle the exception and requeue the worker, letting Sidekiq Unique jobs filter out any duplicate processing jobs. However, it would be much easier to handle this in the worker and not add any extra requests to MySQL.

redis = Redis.new
redis.lock("my-awesome-lock-name-#{123}-#{'1999-12-31'}") do |lock|
  # ... do the work
end

This approach using the redis-lock gem was great. Just handle the Redis::Lock::LockNotAcquired exception and requeue. Retrying in this fashion until the new job can start. This worked until I realized that the locks have a timeout to ensure that any lock holder has a finite time with the lock unless the ask for more time.

Enter Threads

Before this, I typically avoid writing my own multi-threaded code. It’s not worth the hassle for thinking in concurrent terms. My brain was just not meant for it. For a simple task like this I am fine with it. I needed two threads to handle this. One for keeping the lock alive, and a second for doing the work.

After writing a class to handle creating the two threads, the API looked something like this.

redis = Redis.new
redis.lock("my-awesome-lock-name-#{123}-#{'1999-12-31'}") do |lock|
  RedisLockUtils.execute_with_lock(lock: lock, logger: Rails.logger) do
    # do the work
  emd
end

For those with a hungrier appetite, this is the full source for the RedisLockUtils class.

class RedisLockUtils
  WARNING_MSG = "Execution with lock is still running!"

  # Uses a two threads, one to keep the lock alive, one to execute the passed in block
  # @param [Redis::Lock] lock
  def self.execute_with_lock(lock:, logger: Rails.logger)
    helper_progname      = "#{self}-Helper L-#{lock.key}"
    worker_progname      = "#{self}-Worker L-#{lock.key}"
    finished             = false
    execution_started_at = nil
    already_warned       = false

    helper = RedisLockThread.new do
      until finished

        unless execution_started_at
          sleep(0.1)
        end

        lock_expires_in = lock.xval.to_i - Time.now.to_i

        if lock_expires_in < 30.seconds
          logger.info(helper_progname) { "Waiting since #{execution_started_at}. Extending. lock=#{lock.key} lock_expires_in=#{lock_expires_in}" }
          lock.extend_life(1.minutes)
        end

        unless already_warned or Time.now.to_i - execution_started_at.to_i < 10.minutes
          logger.warn(helper_progname) { "#{WARNING_MSG} lock=#{lock.key}, execution_started_at=#{execution_started_at}" }
          already_warned = true
        end

        sleep(rand(1..5))
      end
    end

    worker = RedisLockThread.new do
      execution_started_at = Time.now
      logger.info(worker_progname) { "Beginning execution. Yielding to block." }

      begin
        yield
      ensure
        logger.debug(worker_progname) { "Marking as finished." }
        finished = true
      end

      logger.info(worker_progname) { "Finished execution." }
    end

    helper.join
    worker.join
    nil
  rescue => e
    logger.error(self) { "Encountered exception while executing with lock. #{e.class} #{e} lock=#{lock.key}" }
    raise e
  ensure
    helper.kill rescue nil
    worker.kill rescue nil
  end

  class RedisLockThread < Thread
    def initialize
      @abort_on_exception = true
      super
    end
  end
end