0

Существует приложение-загрузчик, которое выполняет различные виды обработки на элементах загрузки в нескольких потоках. Некоторые потоки анализируют входные данные, некоторые выполняют загрузку, извлечение, сохранение состояния и т. Д. Таким образом, каждый тип потока работает с определенными элементами данных, и некоторые из этих потоков могут выполняться одновременно. Загрузить пункт можно описать следующим образом:Сохранение данных при многопоточной обработке

class File; 

class Download 
{ 
public: 
    enum State 
    { 
     Parsing, Downloading, Extracting, Repairing, Finished 
    }; 

    Download(const std::string &filePath): filePath(filePath) { } 

    void save() 
    { 
     // TODO: save data consistently 

     StateFile f; // state file for this download 

     // save general download parameters 
     f << filePath << state << bytesWritten << totalFiles << processedFiles; 

     // Now we are to save the parameters of the files which belong to this download, 
     // (!) but assume the downloading thread kicks in, downloads some data and 
     // changes the state of a file. That causes "bytesWritten", "processedFiles" 
     // and "state" to be different from what we have just saved. 

     // When we finally save the state of the files their parameters don't match 
     // the parameters of the download (state, bytesWritten, processedFiles). 
     for (File *f : files) 
     { 
      // save the file... 
     } 
    } 

private: 
    std::string filePath; 
    std::atomic<State> state = Parsing; 
    std::atomic<int> bytesWritten = 0; 
    int totalFiles = 0; 
    std::atomic<int> processedFiles = 0; 
    std::mutex fileMutex; 
    std::vector<File*> files; 
}; 

Мне интересно, как сохранить эти данные последовательно. Например, состояние и количество обработанных файлов уже были сохранены, и мы собираемся сохранить список файлов. Между тем некоторые другие потоки могут изменять состояние файла и, следовательно, количество обработанных файлов или состояние загрузки, что делает сохраненные данные непоследовательными.

Первой идеей, которая приходит на ум, является добавление одного мьютекса для всех элементов данных и блокировка его каждый раз, когда имеет доступ к любому из них. Но это было бы, вероятно, неэффективно, так как большинство потоков времени обращаются к различным членам данных, а сохранение происходит только один раз за несколько минут.

Мне кажется, что такая задача довольно распространена в многопоточном программировании, поэтому я надеюсь, что опытные люди могут предложить лучший способ.

+0

* "Первая мысль, которая приходит на ум, чтобы добавить один семафор для всех членов данных и блокировать его каждый раз, когда любой из них будет доступен ». * - Почему вы не можете использовать несколько мьютексов и блокировать доступ к отдельным членам? И почему бы не разделить класс на несколько разных классов, чтобы каждый поток мог спокойно работать над своими собственными фрагментами данных до тех пор, пока он не будет закончен, а частичные результаты будут собраны в конечный результат? –

+0

Ну, как я описал выше, блокировка отдельных элементов не препятствует сохранению целого набора данных. Например. сохраненное состояние загрузки и количество обработанных файлов могут не совпадать с сохраненным списком файлов. Ну, потоки могут использовать одни и те же элементы данных. Я просто хотел, чтобы они использовали не все из них. – mentalmushroom

ответ

0

Я предлагаю вам использовать образец потребителя-производителя.

Загружающий производит парсер и уведомляет его о потреблении, парсер производит экстрактор и уведомляет его об использовании, а экстрактор - для ремонта. После этого у вас будет очередь для каждой задачи. Синхронизация может быть оптимизирована с использованием переменных условия, поэтому потребитель тянет только тогда, когда он уведомляется, как только что-то было создано. В конечном итоге вы будете использовать очень мало мьютексов и гораздо более читаемый и эффективный дизайн.

Вот пример кода для очереди и как делать, если вы должны одновременно загружать, анализировать, извлекать и сохранять:

#include <thread> 
#include <condition_variable> 
#include <mutex> 
#include <queue> 
template<typename T> 
class synchronized_queu 
{ 
public: 
    T consume_one() 
    { 
     std::unique_lock<std::mutex> lock(lock_); 
     while (queue_.size() == 0) 
      condition_.wait(lock); //release and obtain again 
     T available_data = queue_.front(); 
     queue_.pop(); 
     return available_data; 
    } 
    void produce_one(const T& data) 
    { 
     std::unique_lock<std::mutex> lock(lock_); 
     queue_.push(data); 
     condition_.notify_one();//notify only one or all as per your design... 
    } 
private: 
    std::queue<T> queue_; 
    std::mutex lock_; 
    std::condition_variable condition_; 
}; 
struct data 
{ 
    //..... 
}; 

void download(synchronized_queu<data>& q) 
{ 
    //... 
    data data_downloaded = ; //data downloaded; 
    q.produce_one(data_downloaded); 
} 

void parse(synchronized_queu<data>& q1, synchronized_queu<data>& q2) 
{ 
    //... 
    data data_downloaded = q1.consume_one(); 
    //parse 
    data data_parsed = ;//.... 
    q2.produce_one(data_parsed); 
} 

void extract(synchronized_queu<data>& q1, synchronized_queu<data>& q2) 
{ 
    //... 
    data data_parsed = q1.consume_one(); 
    //parse 
    data data_extracted = ;//.... 
    q2.produce_one(data_extracted); 
} 
void save(synchronized_queu<data>& q) 
{ 
    data data_extracted = q.consume_one(); 
    //save.... 
} 

int main() 
{ 
    synchronized_queu<data> dowlowded_queue; 
    synchronized_queu<data> parsed_queue; 
    synchronized_queu<data> extracted_queue; 

    std::thread downloader(download, dowlowded_queue); 
    std::thread parser(parse, dowlowded_queue, parsed_queue); 
    std::thread extractor(extract, parsed_queue, extracted_queue); 
    std::thread saver(save, extracted_queue); 
    while (/*condition to stop program*/) 
    { 

    } 
    downloader.join(); 
    parser.join(); 
    extractor.join(); 
    saver.join(); 
    return 0; 
} 
+0

Дело в том, что эти задачи предназначены для одновременного запуска, поэтому очереди нет. Сохранение требуется периодически, независимо от того, что делают другие потоки. – mentalmushroom

+0

Да, и это то, что я хотел предложить. Для простого объяснения давайте только загрузчик и экстрактор; вы запускаете одновременно два потока загрузчика и экстрактора, которые используют одну и ту же очередь Q1. Загружающий загрузчик после того, как он загружает все, что он подталкивает в очередь, и когда экстрактор вытягивается из очереди, как только что-то там обрабатывает его. –

+0

Вопрос был скорее в сохранении данных, а в том, что они обрабатываются другими потоками. – mentalmushroom