2016-09-01 4 views
2

У меня есть очередь производителей-потребителей, которая обновляется параллельными программами. Очередь запрашивается для различных статистических данных, таких как среднее или стандартное отклонение или дисперсия, или что-то еще в текущем содержимом очереди. Для среднего значения, это код, я используюЭффективная общая очередь буферов для последовательной обработки

class BufferQueue { 
    const int nMaxQueueSize_; 
    int* values; 
    int head, tail; 
    double sum; 
    ::utils::FastMutex queue_mutex; 

public: 

    BufferQueue(const int nMaxQueueSize) : 
    nMaxQueueSize_(nMaxQueueSize) { 
     head = tail = 0; 
     sum = 0; 
     values = new int[nMaxQueueSize_]; 
    } 

    void enqueue(int val) { 
     values[head] = val; 
     if ((head + 1) % nMaxQueueSize_ == tail) { 
      queue_mutex.lock(); 
      sum = val.value_point - values[tail].value_point; 
      utils::memory_barrier(); 
      head = (1 + head) % nMaxQueueSize_; 
      tail = (1 + tail) % nMaxQueueSize_; 
      queue_mutex.unlock(); 
     } else { 
      queue_mutex.lock(); 
      sum += val.value_point; 
      utils::memory_barrier(); 
      head = (1 + head) % nMaxQueueSize_; 
      queue_mutex.unlock(); 
     } 
    } 

    bool dequeue() { 
     if (head != tail) { 
      queue_mutex.lock(); 
      sum -= values[tail].value_point; 
      utils::memory_barrier(); 
      tail = (1 + tail) % nMaxQueueSize_; 
      queue_mutex.unlock(); 
      return true; 
     } else { 
      sum = 0; 
      return false; 
     } 
    } 

    MarketSpreadPoint& operator[](int i) { 
     return values[ (tail + i) % nMaxQueueSize_ ]; 
    } 

    inline int getSize() { 
     return (head - tail + nMaxQueueSize_) % nMaxQueueSize_; 
    } 

    inline double average() { 
     queue_mutex.lock(); 
     double result = sum/getSize(); 
     queue_mutex.unlock(); 
     return result; 
    } 

    ~BufferQueue() { 
     delete values; 
    } 
}; 

ПРИМЕЧАНИЕ: Одна важная вещь, чтобы помнить о том, что только одна операция выполняется. Я тоже не хочу повторять код, записи отдельных реализаций как BufferQueueAverage, BufferQueueVariance и т.д. Я хочу очень предел кода избыточности (оптимизации компилятора). Даже кондиционирование по типу очереди для каждого обновления кажется субоптимальным.

inline double average() { 
     queue_mutex.lock(); 
     if(type_is_average){ 
      double result = sum/getSize(); 
     }else if(type_is_variance){ 
      /// update accordingly. 
     } 
     double result = sum/getSize(); 
     queue_mutex.unlock(); 
     return result; 
    } 

Что может быть хорошей альтернативой этой идее?

Примечание: В этом варианте осуществления, если очередь полна, голова автоматически сделать хвост, чтобы двигаться вперед. Другими словами, самый старый элемент удаляется автоматически.

Thanks

+0

что такое, что 'ИНТ * значение = новый MarketSpreadPoint [nMaxQueueSize _];' –

+0

@D? ieterLücking, извините, исправлено – v78

+1

Ваша очередь не является потокобезопасной. Я говорю это, потому что ваш код выглядит так, как будто вы пытаетесь сделать это, но он ошибочен. Условия «enqueue» и «dequeue» имеют условия гонки. – Smeeheey

ответ

1

Таким образом, вы хотите отделить очередь от статистики. Я вижу два возможных решения:

  1. использовать шаблон Шаблон Метод или стратегии факторизовать зависимость.
  2. Используйте шаблон, который делает это.

Предполагая, что все статистические данные вы можете собрать gathered incrementally, последний может выглядеть примерно следующим образом (просто имел в виду, как псевдо-код):

class StatisticsMean 
{ 
private: 
    int n = 0; 
    double mean = 0.0; 
public: 
    void addSample(int s) { ++n; mean += (s - mean)/n; } 
    void removeSample(int s) { ... } 
    double getStatistic() const { return mean; } 
} 

template <typename TStatistics> 
class BufferQueue 
{ 
    TStatistics statistics; 
    ... 

    void enqueue(int val) 
    { 
     ... 
     statistics.addSample(val); 
    } 
    ... 
    double getStatistic() const { return statistics.getStatistic(); } 
} 

Шаблон подход дает полную оптимизацию времени компиляции. Вы можете достичь этого с помощью шаблона . Это также позволит вам иметь разные имена для геттеров (getStatistic() в приведенном выше примере).

Это может выглядеть примерно так:

class AbstractBufferQueue 
{ 
    virtual void addSample(int s) = 0; 
    virtual void removeSample(int s) = 0; 

    void enqueue(int val) 
    { 
     ... 
     addSample(val); 
    } 
} 

class BufferQueueAverage : public AbstractBufferQueue 
{ 
    int n; 
    double mean; 

    void addSample(int s) { ++n; mean += (s - mean)/n; } 
    void removeSample(int s) { ... } 
    double getAverage() const { return mean; } 
} 
1

Один из способов сделать то, что вы спрашиваете, с помощью шаблонных классов.

Сначала определите общий интерфейс, который будет иметь аккумулятор .Это может быть что-то вроде:

class accumulator 
{ 
public: 
    typedef double value_type; 

public: 
    void push(int v); // Called when pushing a new value. 
    void pop(int v); // Called when popping a new value; 
    value_type result(size_t n) const; // Returns the current accumulation. 
}; 

Как частный случай, mean_accumulator может быть таким:

class mean_accumulator 
{ 
public: 
    typedef double value_type; 

public: 
    mean_accumulator() : m_sum{0} {} 

    void push(int v) { m_sum += v; } 
    void pop(int v); { m_sum -= v; } 
    double result(size_t n) const { return m_sum/n; }; 

private: 
    int m_sum; 
}; 

Теперь параметризировать вашу очередь на Accumulator, и вызвать его в случае необходимости (в то время как вы на него Обратите внимание, что boost::circular_buffer многое из того, что вам нужно для реализации:

template<class Accumulator> 
class queue 
{ 
private: 
    boost::circular_buffer<int> m_buf; 
    std::mutex m_m; 

public: 
    void push(int v) 
    { 
     // Lock the mutex, push to the circular buffer, and the accumulator 
    } 

    bool pop() 
    { 
     // Lock the mutex; if relevant, update the accumulator and pop the circular buffer 
    } 

    typename Accumulator::value_type result() const 
    { 
     // Lock the mutex and return the accumulator's result. 
    } 
};