Я решаю две проблемы арбитража для протокола FAST. Пожалуйста, не волнуйтесь, если вы не знакомы с ним, мой вопрос довольно общий. Но я добавляю описание проблемы для тех, кто интересуется (вы можете пропустить его).избегая столкновений при свертывании бесконечного блокировочного буфера в кольцевой буфер
данных во всех UDP-каналы распространяются в двух идентичных каналов (А и В) на двух различных IP-адресов многоадресной рассылки. Настоятельно рекомендуется, чтобы клиент получал и обрабатывал оба канала из-за возможной потери UDP-пакета. Обработка двух идентичных каналов позволяет статистически уменьшить вероятность потери пакетов. Не указывается в каком конкретном канале (A или B) сообщение появляется в первый раз. Для арбитража этих каналов следует использовать порядковый номер сообщения, указанный в преамбуле или в теге 34-MsgSeqNum. Использование преамбулы позволяет определить номер последовательности сообщений без декодирования сообщения FAST. обработку сообщений из-ленты А и В должны быть выполнены с использованием следующего алгоритма:
- Послушаю каналы А и В сообщения
- способа в соответствии с их порядковыми номерами.
- Игнорируйте сообщение, если один из тех же порядковых номеров уже был обработан раньше.
Если появляется пробел в порядковом номере, это указывает на потерю пакетов в обоих каналах (A и B). Клиент должен инициировать один из процессов восстановления. Но прежде всего клиент должен ждать разумного времени, возможно, потерянный пакет придет немного позже из-за переупорядочения пакетов. Протокол UDP не может гарантировать доставку пакетов в последовательности.
// ТСР восстановить алгоритм далее
Я написал такой очень простой класс. Он предопределяет все необходимые классы, а затем первый поток, который получает конкретный seqNum
, может его обработать. Другой поток будет падать позже:
class MsgQueue
{
public:
MsgQueue();
~MsgQueue(void);
bool Lock(uint32_t msgSeqNum);
Msg& Get(uint32_t msgSeqNum);
void Commit(uint32_t msgSeqNum);
private:
void Process();
static const int QUEUE_LENGTH = 1000000;
// 0 - available for use; 1 - processing; 2 - ready
std::atomic<uint16_t> status[QUEUE_LENGTH];
Msg updates[QUEUE_LENGTH];
};
Реализация:
MsgQueue::MsgQueue()
{
memset(status, 0, sizeof(status));
}
MsgQueue::~MsgQueue(void)
{
}
// For the same msgSeqNum should return true to only one thread
bool MsgQueue::Lock(uint32_t msgSeqNum)
{
uint16_t expected = 0;
return status[msgSeqNum].compare_exchange_strong(expected, 1);
}
void MsgQueue::Commit(uint32_t msgSeqNum)
{
status[msgSeqNum] = 2;
Process();
}
// this method probably should be combined with "Lock" but please ignore! :)
Msg& MsgQueue::Get(uint32_t msgSeqNum)
{
return updates[msgSeqNum];
}
void MsgQueue::Process()
{
// ready packets must be processed,
}
Использование:
if (!msgQueue.Lock(seq)) {
return;
}
Msg msg = msgQueue.Get(seq);
msg.Ticker = "HP"
msg.Bid = 100;
msg.Offer = 101;
msgQueue.Commit(seq);
Это прекрасно работает, если мы предположим, что QUEUE_LENGTH бесконечность. Потому что в этом случае один элемент msgSeqNum = один updates
.
Но я должен сделать буфер круглым, потому что невозможно хранить всю историю (много миллионов пакетов), и нет причин для этого. На самом деле мне нужно достаточно буферизировать пакеты для восстановления сеанса, и как только сессия будет восстановлена, я могу их удалить.
Но наличие кругового буфера значительно усложняет алгоритм. Например, предположим, что у нас есть круговой буфер длиной 1000. И в то же время мы пытаемся обработать seqNum = 10 000 и seqNum = 11 000 (это очень маловероятно, но все же возможно). Оба этих пакета будут отображаться в массив updates
по индексу 0
и поэтому происходит столкновение. В таком случае буфер должен «отбрасывать» старые пакеты и обрабатывать новые пакеты.
Это тривиально, чтобы реализовать то, что я хочу, используя locks
, но написать lock-free
код на круглом буфере, который используется из разных потоков, действительно сложный. Поэтому я приветствую любые предложения и советы, как это сделать. Благодаря!
Если длина вашего буфера равна 1000, seqNum 10000 или 11000 должны быть недоступны. –
@MareInfinitus вот почему мне нужно использовать круговой буфер – javapowered