2016-03-01 2 views
1

Когда delayed_job вытаскивает новое задание из очереди, он сначала сортирует очередь по приоритету? Если нет, то я предполагаю, что задания с низким приоритетом могут выполняться до высокого приоритета из-за «read_ahead».Как приоритет взаимодействует с read_ahead в delayed_job?

Из документации delayed_job:

поведения

по умолчанию для чтения 5 заданий из очереди при нахождении доступной работы. Вы можете настроить это, установив Delayed :: Worker.read_ahead.

Пример: Я добавляю 100 заданий с приоритетом 10 (сначала выполняются более низкие приоритеты). Затем я добавляю 1 задание с приоритетом 0. Если я использую значение read_ahead по умолчанию из 5, будет ли delayed_job сначала обработать 96 заданий, прежде чем найти одно из моих высокоприоритетных заданий?

ответ

2

У меня был аналогичный вопрос и выкопаны в исходный код, чтобы найти ответ - предполагается, что вы используете delayed_job_active_record. В администраторской/active_record.rb:

class Job < ::ActiveRecord::Base 
    scope :by_priority, lambda { order("priority ASC, run_at ASC") } 

    def self.reserve(worker, max_run_time = Worker.max_run_time) # rubocop:disable CyclomaticComplexity 
     # scope to filter to records that are "ready to run" 
     ready_scope = ready_to_run(worker.name, max_run_time) 

     # scope to filter to the single next eligible job 
     ready_scope = ready_scope.where("priority >= ?", Worker.min_priority) if Worker.min_priority 
     ready_scope = ready_scope.where("priority <= ?", Worker.max_priority) if Worker.max_priority 
     ready_scope = ready_scope.where(queue: Worker.queues) if Worker.queues.any? 
     ready_scope = ready_scope.by_priority 

     reserve_with_scope(ready_scope, worker, db_time_now) 
    end 

    def self.reserve_with_scope(ready_scope, worker, now) 
     # Optimizations for faster lookups on some common databases 
     case connection.adapter_name 
     when "PostgreSQL" 
     quoted_table_name = connection.quote_table_name(table_name) 
     subquery_sql  = ready_scope.limit(1).lock(true).select("id").to_sql 
     reserved   = find_by_sql(["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql}) RETURNING *", now, worker.name]) 
     reserved[0] 
     when "MySQL", "Mysql2" 
     now = now.change(usec: 0) 
     count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name) 
     return nil if count == 0 
     where(locked_at: now, locked_by: worker.name, failed_at: nil).first 
     when "MSSQL", "Teradata" 
     subsubquery_sql = ready_scope.limit(1).to_sql 
     subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x" 
     quoted_table_name = connection.quote_table_name(table_name) 
     sql = ["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})", now, worker.name] 
     count = connection.execute(sanitize_sql(sql)) 
     return nil if count == 0 
     where(locked_at: now, locked_by: worker.name, failed_at: nil).first 
     else 
     reserve_with_scope_using_default_sql(ready_scope, worker, now) 
     end 
    end 

    def self.reserve_with_scope_using_default_sql(ready_scope, worker, now) 
     # This is our old fashion, tried and true, but slower lookup 
     ready_scope.limit(worker.read_ahead).detect do |job| 
     count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name) 
     count == 1 && job.reload 
     end 
    end 

Так это выглядит, как приоритет имеет преимущество - когда DelayedJob находит следующую доступную работу, чтобы зарезервировать и запустить, она занимает по приоритету первым, прежде чем пытаться ограничить результаты по «read_ahead» ,

Фактически последний метод reserve_with_scope_using_default_sql - это единственное место, где упоминается «read_ahead», поэтому, если вы используете Postgres или MySQL, он автоматически выбирает работу с наивысшим приоритетом (ограничение 1) и игнорирует «read_ahead», ,