ПРИМЕЧАНИЕ. Я предпочитаю использовать потоки для разрешения имен DNS, но такое же поведение может быть воспроизведено с помощью любого типа аналогичной операции.Ruby - Неожиданные результаты с использованием многопоточности (модель производителя/потребителя) при повторении
Я получаю неожиданные результаты, когда я пытаюсь переместить свой код (ранее работающий) из стандартного однопоточных исполнения многопоточности. В частности, мой код выполняет итерацию по массиву хэшей и добавляет пару ключ/значение для каждого хэша в массиве.
Проблема, с которой я сталкиваюсь, исходит из цикла dns_cname.map
, где создается новая пара ключей/значений. Вместо ключа "external_dns_entry"
, имеющего правильное значение (то есть result.name.to_s
, которое содержит имя, разрешенное DNS), я получаю имя для одного из многих других серверов в url_nameserver_mapping
.
У меня такое ощущение, что разрешения DNS происходят по мере того, как потоки становятся доступными, а хеш обновляется не по порядку, но я даже не знаю, как начать отслеживать такую проблему.
Проблемные результаты: Резолюция DNS, выполняемая против сервера1, сопоставляется с сервером 17. Аналогично, сервер 17 отображает сервер 99 и т. Д. Остальная часть Хэша остается в такте.
Любая помощь ВОЗМОЖНО высоко ценится. Большое спасибо заранее!
Вот мой код когда многопоточность НЕ включен (работает нормально):
url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
"server2" => "portlanddns.dns.com",
"server3" => "losangelesdns.dns.com" }
# Parse the JSON string response from the API into a valid Ruby Hash
# The net/http GET request is not shown here for brevity but it was stored in 'response'
unsorted_urls = JSON.parse(response.body)
# Sort (not sure this is relevant)
# I left it since my data is being populated to the Hash incorrectly (w/ threading enabled)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}
url_nameserver_mapping.each do |server,location|
dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
dns_cname = dns.getresources(server, Resolv::DNS::Resource::IN::CNAME)
dns_cname.map do |result|
# Create a new key/value for each Hash in url_properties Array
# Occurs if the server compared matches the value of url['server'] key
url_properties.each do |url|
url["external_dns_entry"] = result.name.to_s if url['server'] == server
end
end
end
Я последовал за руководство по https://blog.engineyard.cm/2013/ruby-concurrency для реализации модели производитель/потребитель многопоточности.
Вот мой адаптированный код когда многопоточность включена (НЕ РАБОТАЕТ):
require 'thread'
require 'monitor'
thread_count = 8
threads = Array.new(thread_count)
producer_queue = SizedQueue.new(thread_count)
threads.extend(MonitorMixin)
threads_available = threads.new_cond
sysexit = false
url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
"server2" => "portlanddns.dns.com",
"server3" => "losangelesdns.dns.com" }
unsorted_urls = JSON.parse(response.body)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}
####################
##### Consumer #####
####################
consumer_thread = Thread.new do
loop do
break if sysexit && producer_queue.length == 0
found_index = nil
threads.synchronize do
threads_available.wait_while do
threads.select { |thread| thread.nil? ||
thread.status == false ||
thread["finished"].nil? == false}.length == 0
end
# Get the index of the available thread
found_index = threads.rindex { |thread| thread.nil? ||
thread.status == false ||
thread["finished"].nil? == false }
end
@domain = producer_queue.pop
threads[found_index] = Thread.new(@domain) do
dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
dns_cname = dns.getresources(@domain, Resolv::DNS::Resource::IN::CNAME)
dns_cname.map do |result|
url_properties.each do |url|
url["external_dns_entry"] = result.name.to_s if url['server'] == @domain
end
end
Thread.current["finished"] = true
# Notify the consumer that another batch of work has been completed
threads.synchronize { threads_available.signal }
end
end
end
####################
##### Producer #####
####################
producer_thread = Thread.new do
url_nameserver_mapping.each do |server,location|
producer_queue << server
threads.synchronize do
threads_available.signal
end
end
sysexit = true
end
# Join on both the producer and consumer threads so the main thread doesn't exit
producer_thread.join
consumer_thread.join
# Join on the child processes to allow them to finish
threads.each do |thread|
thread.join unless thread.nil?
end
Пожалуйста, дайте мне знать, если это поможет, если я разместил структуру данных в массиве 'unsorted_urls' хэшей. Это всего лишь куча хэшей с данными, которые не имеют значения минус ключ 'server', который имеет значение имени сервера, которое используется для сравнения. Я потратил более часа на то, чтобы решить свой вопрос и попытаться сохранить его кратким - я оставил его для краткости. –
Есть ли болванка '}' в 'url [" external_dns_entry "] = result.name.to_s, если url ['server'] == server}'? – Aetherus
Да, спасибо, что поймали это! Я двигал эту линию от одного лайнера до нескольких, чтобы людям не пришлось слишком много прокручивать. Я исправлю это сейчас. –