2016-06-04 2 views
0

ПРИМЕЧАНИЕ. Я предпочитаю использовать потоки для разрешения имен DNS, но такое же поведение может быть воспроизведено с помощью любого типа аналогичной операции.Ruby - Неожиданные результаты с использованием многопоточности (модель производителя/потребителя) при повторении


Я получаю неожиданные результаты, когда я пытаюсь переместить свой код (ранее работающий) из стандартного однопоточных исполнения многопоточности. В частности, мой код выполняет итерацию по массиву хэшей и добавляет пару ключ/значение для каждого хэша в массиве.

Проблема, с которой я сталкиваюсь, исходит из цикла dns_cname.map, где создается новая пара ключей/значений. Вместо ключа "external_dns_entry", имеющего правильное значение (то есть result.name.to_s, которое содержит имя, разрешенное DNS), я получаю имя для одного из многих других серверов в url_nameserver_mapping.

У меня такое ощущение, что разрешения DNS происходят по мере того, как потоки становятся доступными, а хеш обновляется не по порядку, но я даже не знаю, как начать отслеживать такую ​​проблему.

Проблемные результаты: Резолюция DNS, выполняемая против сервера1, сопоставляется с сервером 17. Аналогично, сервер 17 отображает сервер 99 и т. Д. Остальная часть Хэша остается в такте.

Любая помощь ВОЗМОЖНО высоко ценится. Большое спасибо заранее!

Вот мой код когда многопоточность НЕ включен (работает нормально):

url_nameserver_mapping = { "server1" => "dallasdns.dns.com", 
          "server2" => "portlanddns.dns.com", 
          "server3" => "losangelesdns.dns.com" } 


# Parse the JSON string response from the API into a valid Ruby Hash 
# The net/http GET request is not shown here for brevity but it was stored in 'response' 
unsorted_urls = JSON.parse(response.body) 

# Sort (not sure this is relevant) 
# I left it since my data is being populated to the Hash incorrectly (w/ threading enabled) 
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]} 

url_nameserver_mapping.each do |server,location| 

     dns = Resolv::DNS.new(:nameserver => ['8.8.8.8']) 
     dns_cname = dns.getresources(server, Resolv::DNS::Resource::IN::CNAME) 

     dns_cname.map do |result| 
     # Create a new key/value for each Hash in url_properties Array 
     # Occurs if the server compared matches the value of url['server'] key 
     url_properties.each do |url| 
      url["external_dns_entry"] = result.name.to_s if url['server'] == server 
     end 
     end 

end 

Я последовал за руководство по https://blog.engineyard.cm/2013/ruby-concurrency для реализации модели производитель/потребитель многопоточности.

Вот мой адаптированный код когда многопоточность включена (НЕ РАБОТАЕТ):

require 'thread' 
require 'monitor' 

thread_count = 8 
threads = Array.new(thread_count) 
producer_queue = SizedQueue.new(thread_count) 
threads.extend(MonitorMixin) 
threads_available = threads.new_cond 
sysexit = false 

url_nameserver_mapping = { "server1" => "dallasdns.dns.com", 
          "server2" => "portlanddns.dns.com", 
          "server3" => "losangelesdns.dns.com" } 


unsorted_urls = JSON.parse(response.body) 

url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]} 

#################### 
##### Consumer ##### 
#################### 

consumer_thread = Thread.new do 

    loop do 

    break if sysexit && producer_queue.length == 0 
    found_index = nil 

    threads.synchronize do 
     threads_available.wait_while do 
     threads.select { |thread| thread.nil? || 
            thread.status == false || 
            thread["finished"].nil? == false}.length == 0 
     end 
     # Get the index of the available thread 
     found_index = threads.rindex { |thread| thread.nil? || 
               thread.status == false || 
               thread["finished"].nil? == false } 
    end 

    @domain = producer_queue.pop 

     threads[found_index] = Thread.new(@domain) do 

     dns = Resolv::DNS.new(:nameserver => ['8.8.8.8']) 
     dns_cname = dns.getresources(@domain, Resolv::DNS::Resource::IN::CNAME) 

     dns_cname.map do |result| 
      url_properties.each do |url| 
      url["external_dns_entry"] = result.name.to_s if url['server'] == @domain 
      end 
     end 

     Thread.current["finished"] = true 

     # Notify the consumer that another batch of work has been completed 
     threads.synchronize { threads_available.signal } 
     end 
    end 
end 

#################### 
##### Producer ##### 
#################### 

producer_thread = Thread.new do 

    url_nameserver_mapping.each do |server,location| 

    producer_queue << server 

    threads.synchronize do 
     threads_available.signal 
    end 
    end 
    sysexit = true 
end 

# Join on both the producer and consumer threads so the main thread doesn't exit 
producer_thread.join 
consumer_thread.join 

# Join on the child processes to allow them to finish 
threads.each do |thread| 
    thread.join unless thread.nil? 
end 
+0

Пожалуйста, дайте мне знать, если это поможет, если я разместил структуру данных в массиве 'unsorted_urls' хэшей. Это всего лишь куча хэшей с данными, которые не имеют значения минус ключ 'server', который имеет значение имени сервера, которое используется для сравнения. Я потратил более часа на то, чтобы решить свой вопрос и попытаться сохранить его кратким - я оставил его для краткости. –

+0

Есть ли болванка '}' в 'url [" external_dns_entry "] = result.name.to_s, если url ['server'] == server}'? – Aetherus

+0

Да, спасибо, что поймали это! Я двигал эту линию от одного лайнера до нескольких, чтобы людям не пришлось слишком много прокручивать. Я исправлю это сейчас. –

ответ

0

@domain разделяется всеми нитями - это разделение является корнем вашей проблемы: когда он обновляется выбирая следующий блок работы из очереди, все ваши потоки видят это изменение. Вы можете избежать этой проблемы, делая

Thread.new(producer_queue.pop) do |domain| 
    #domain isn't shared with anyone (as long as there 
    #is no local variable called domain in the enclosing scope 
end 

Тангенциальных на ваш вопрос, но это, кажется, понравилось очень overengineered подход. Гораздо проще развернуть кучу потребительских потоков заблаговременно и прочитать их непосредственно из очереди работы.

+0

Спасибо за это! Если пример, который я вытащил из этого сайта, слишком надуман, у вас есть предложение о том, как я могу сделать то, что вы сказали (закрутить потоки потребителей заранее)? Спасибо! –

+0

Вы хотите сказать: «threads [found_index] = Thread.new (производитель_queue.pop) do' вместо:' threads [found_index] = Thread.new (@domain) do' - - - как эти разные ? –

+0

Главное отличие заключается в том, что вместо ваших потоков, которыми пользуются @domain (каждый раз, когда начинается новый поток, вы меняете это значение для всех потоков), мой пример создает не общую локальную переменную. Полностью переписать ваш код потока немного из объема здесь и не поможет прояснить ваше недоразумение. –