2013-04-24 3 views
1

Я решаю две проблемы арбитража для протокола FAST. Пожалуйста, не волнуйтесь, если вы не знакомы с ним, мой вопрос довольно общий. Но я добавляю описание проблемы для тех, кто интересуется (вы можете пропустить его).избегая столкновений при свертывании бесконечного блокировочного буфера в кольцевой буфер


данных во всех UDP-каналы распространяются в двух идентичных каналов (А и В) на двух различных IP-адресов многоадресной рассылки. Настоятельно рекомендуется, чтобы клиент получал и обрабатывал оба канала из-за возможной потери UDP-пакета. Обработка двух идентичных каналов позволяет статистически уменьшить вероятность потери пакетов. Не указывается в каком конкретном канале (A или B) сообщение появляется в первый раз. Для арбитража этих каналов следует использовать порядковый номер сообщения, указанный в преамбуле или в теге 34-MsgSeqNum. Использование преамбулы позволяет определить номер последовательности сообщений без декодирования сообщения FAST. обработку сообщений из-ленты А и В должны быть выполнены с использованием следующего алгоритма:

  1. Послушаю каналы А и В сообщения
  2. способа в соответствии с их порядковыми номерами.
  3. Игнорируйте сообщение, если один из тех же порядковых номеров уже был обработан раньше.
  4. Если появляется пробел в порядковом номере, это указывает на потерю пакетов в обоих каналах (A и B). Клиент должен инициировать один из процессов восстановления. Но прежде всего клиент должен ждать разумного времени, возможно, потерянный пакет придет немного позже из-за переупорядочения пакетов. Протокол UDP не может гарантировать доставку пакетов в последовательности.

    // ТСР восстановить алгоритм далее


Я написал такой очень простой класс. Он предопределяет все необходимые классы, а затем первый поток, который получает конкретный seqNum, может его обработать. Другой поток будет падать позже:

class MsgQueue 
{ 
public: 
    MsgQueue(); 
    ~MsgQueue(void); 
    bool Lock(uint32_t msgSeqNum); 
    Msg& Get(uint32_t msgSeqNum); 
    void Commit(uint32_t msgSeqNum); 
private: 
    void Process(); 
    static const int QUEUE_LENGTH = 1000000; 

    // 0 - available for use; 1 - processing; 2 - ready 
    std::atomic<uint16_t> status[QUEUE_LENGTH]; 
    Msg updates[QUEUE_LENGTH]; 
}; 

Реализация:

MsgQueue::MsgQueue() 
{ 
     memset(status, 0, sizeof(status)); 
} 

MsgQueue::~MsgQueue(void) 
{ 
} 

// For the same msgSeqNum should return true to only one thread 
bool MsgQueue::Lock(uint32_t msgSeqNum) 
{ 
    uint16_t expected = 0; 
    return status[msgSeqNum].compare_exchange_strong(expected, 1); 
} 

void MsgQueue::Commit(uint32_t msgSeqNum) 
{ 
    status[msgSeqNum] = 2; 
      Process(); 
} 

    // this method probably should be combined with "Lock" but please ignore! :) 
Msg& MsgQueue::Get(uint32_t msgSeqNum) 
{ 
    return updates[msgSeqNum]; 
} 

void MsgQueue::Process() 
{ 
     // ready packets must be processed, 
} 

Использование:

if (!msgQueue.Lock(seq)) { 
    return; 
} 
Msg msg = msgQueue.Get(seq); 
msg.Ticker = "HP" 
msg.Bid = 100; 
msg.Offer = 101; 
msgQueue.Commit(seq); 

Это прекрасно работает, если мы предположим, что QUEUE_LENGTH бесконечность. Потому что в этом случае один элемент msgSeqNum = один updates.

Но я должен сделать буфер круглым, потому что невозможно хранить всю историю (много миллионов пакетов), и нет причин для этого. На самом деле мне нужно достаточно буферизировать пакеты для восстановления сеанса, и как только сессия будет восстановлена, я могу их удалить.

Но наличие кругового буфера значительно усложняет алгоритм. Например, предположим, что у нас есть круговой буфер длиной 1000. И в то же время мы пытаемся обработать seqNum = 10 000 и seqNum = 11 000 (это очень маловероятно, но все же возможно). Оба этих пакета будут отображаться в массив updates по индексу 0 и поэтому происходит столкновение. В таком случае буфер должен «отбрасывать» старые пакеты и обрабатывать новые пакеты.

Это тривиально, чтобы реализовать то, что я хочу, используя locks, но написать lock-free код на круглом буфере, который используется из разных потоков, действительно сложный. Поэтому я приветствую любые предложения и советы, как это сделать. Благодаря!

+0

Если длина вашего буфера равна 1000, seqNum 10000 или 11000 должны быть недоступны. –

+0

@MareInfinitus вот почему мне нужно использовать круговой буфер – javapowered

ответ

0

Я не верю, что вы можете использовать кольцевой буфер . A хешированный индекс может использоваться в массиве status[]. Т.е., hash = seq % 1000. Проблема в том, что порядковый номер продиктован сетью, и у вас нет контроля над его заказом. Вы хотите, чтобы заблокировал на основе этого порядкового номера. Ваш массив не должен быть бесконечным, только диапазон порядковый номер; но это, вероятно, больше, чем практично.

Я не уверен, что происходит, когда порядковый номер заблокирован. Означает ли это, что другой поток обрабатывает его? Если это так, вы должны поддерживать подкатегорию для хэш-коллизий для разрешения конкретного порядкового номера.

Вы также можете рассмотреть размер массива как мощность 2. Например, 1024 позволит hash = seq & 1023;, который должен быть достаточно эффективным.

+0

благодарю вас за ответ! в конце концов, я создал TWO круглый буфер. порядковый номер создается фондовой биржей и увеличивается один за другим. однако некоторые из них могут быть пропущены, и у меня есть механизм для обнаружения и устранения таких проблем. поэтому решение было - создать TWO круглые буферы вместо ONE, а затем добавить еще один поток SPINNING, который объединяет эти циклические буферы и процесс. – javapowered

+0

Хм, я был бы обеспокоен тем, что иногда пакеты могут быть повторно заказаны. –

+0

Я делаю восстановление порядка. поэтому у меня есть круговой буфер. (в реальной жизни, кстати, они никогда не меняются в моей конфигурации, я тестировал несколько дней :) – javapowered