2012-07-02 4 views
7

У меня есть несколько потоков, которым необходимо потреблять данные из потока TCP. Я хочу использовать круговой буфер/очередь в общей памяти для чтения из сокета TCP. Прием TCP будет записываться непосредственно в круговую очередь. Потребители будут читать из очереди.Как реализовать tcp с нулевым копированием, используя замкнутый кольцевой буфер в C++

Эта конструкция должна включать нулевую копию и блокировку нуля. Однако здесь есть два разных вопроса.

  1. Возможно ли/полезно читать только 1 логическое сообщение из сокета TCP? Если нет, и я прочитал более 1 сообщения, мне придется скопировать остатки с этого на этот-> следующий.

  2. Возможно ли реализовать блокирующую очередь? Я знаю, что есть атомные операции, но они также могут быть дорогостоящими. потому что весь кэш ЦП должен быть недействительным. Это повлияет на все операции на всех моих 24 ядрах.

Я немного ржавый в низкоуровневом TCP, и не совсем ясно, как сказать, когда сообщение будет завершено. Я ищу \ 0 или это конкретная реализация?

ти

+4

Что такое сообщение в вашем пространстве? У вас высокие цели, но недостаточно ли простого простого прямого решения? – Nim

+0

Lock-free не означает «не дорого», хотя на практике блокировка на практике обычно на порядок быстрее, чем переключатели контекста, связанные с блокировкой. То, что вы хотите, может быть достигнуто исключительно с помощью атомных приращений, что очень быстро, при условии, что вы сделаете свой буфер подходящим размером, например. 65536. В общем, я согласен с Нимом. – Damon

+0

@Damon, интересно, я не вижу реализации кольца, которая полагается на soley на атомных приращениях, может быть, у вас есть бумага (ссылка) что-то для этого? Единственные реализации, которые я видел (и сделали), полагаются на операции сравнения/обмена. – Nim

ответ

8

К сожалению, TCP не может передавать сообщения, только потоки байтов. Если вы хотите передавать сообщения, вам нужно будет применить протокол сверху. Лучшие протоколы для высокой производительности - это те, которые используют заголовок, проверяющий здравомыслие, с указанием длины сообщения - это позволяет вам считывать правильную сумму данных непосредственно в подходящий объект буфера без повторения байтов байтов по байтам, символа сообщения. Затем буфер POINTER можно поставить в очередь на другой поток и создать новый буферный объект для следующего сообщения. Это позволяет избежать любого копирования массовых данных и для больших сообщений достаточно эффективно, что использование неблокирующей очереди для указателей объектов сообщений несколько бессмысленно.

Следующая оптимизация avaialble заключается в объединении буферов объекта *, чтобы избежать непрерывного ввода/удаления, утилизации буферов * в потребительском потоке для повторного использования в потоке приема сети. Это довольно легко сделать с ConcurrentQueue, предпочтительно блокировать, чтобы позволить управление потоком вместо искажения данных или segfaults/AV, если пул временно опустеет.

Затем добавьте [мертвую зону] размер [кеш-пойнт] в начале каждого элемента данных буфера *, поэтому предотвратите любой поток из данных с ложным обменом данными с любым другим.

Результатом должен быть поток сообщений с высокой полосой пропускания в потребительский поток с очень малой задержкой, отходы ЦП или переполнение кэша. Все ваши 24 ядра могут работать на разных данных.

Копирование объемных данных в многопоточных приложениях - это допущение плохого дизайна и поражения.

Развейте ..

Похоже, вы застряли перебор данных из-за различные протоколы :(

Ложных разделениями свободного PDU объекта буфера, например:

typedef struct{ 
    char deadZone[256]; // anti-false-sharing 
    int dataLen; 
    char data[8388608]; // 8 meg of data 
} SbufferData; 

class TdataBuffer: public{ 
private: 
    TbufferPool *myPool; // reference to pool used, in case more than one 
    EpduState PDUstate; // enum state variable used to decode protocol 
protected: 
    SbufferData netData; 
public: 
    virtual reInit(); // zeros dataLen, resets PDUstate etc. - call when depooling a buffer 
    virtual int loadPDU(char *fromHere,int len); // loads protocol unit 
    release(); // pushes 'this' back onto 'myPool' 
}; 

loadPDU получает указатель на длину, необработанные сетевые данные.Он возвращает либо 0, либо означает, что он еще не полностью собрал PDU, или количество байтов, которое он потреблял из необработанных сетевых данных, чтобы полностью собрать PDU, и в этом случае очередь выключить, depool другой и вызвать loadPDU() с неиспользованным остатком необработанных данных, затем продолжите следующие исходные данные.

Вы можете использовать различные пулы разных классов буферов-буферов, чтобы при необходимости обслуживать различные протоколы - массив TbufferPool [Eprotocols]. TbufferPool может быть просто очередью BlockingCollection. Управление становится почти тривиальным: буферы могут быть отправлены в очереди по всей вашей системе, в графический интерфейс для отображения статистики, а затем, возможно, для журнала, если в конце цепочки очередей происходит вызов release().

Очевидно, что в «реальном» объекте PDU было бы больше загружать методы, объединения данных/структуры, итераторы и механизм состояния для работы протокола, но это основная идея. Главное - простое управление, инкапсуляция и, поскольку ни один из двух потоков не может работать в одном экземпляре буфера, никакой блокировки/синхронизации не требуется для синтаксического анализа/доступа к данным.

О, да, и поскольку никакая очередь не должна оставаться заблокированной дольше, чем требуется, чтобы нажать/поместить один указатель, шансы на фактическое соперничество очень низки - даже обычные блокирующие очереди вряд ли когда-либо понадобится использовать блокировку ядра.

+0

спасибо! к сожалению, я не контролирую протокол сообщений. Мой код будет подключаться к различным общественным биржам и частным предприятиям-продавцам. мы надеемся, что они используют заголовки/длины. Если это не стандарт, мне, вероятно, придется разработать код приема TCP, не зависящий от протокола сообщений. Я уверен, что могу найти шаблон дизайна для этого или даже посмотреть на источник ACE? спасибо @Martin James – jaybny

+0

«Затем добавьте [мертвую зону] размер [кешлайн]] в начале каждого элемента данных буфера * Я не совсем уверен, что понимаю проблему, которую это решает. Кроме того, что такое «размер cachline»? Специфический процессор? 32/64 бит? ty – jaybny

+0

@jaybny - Google 'false sharing' –

0

Если вы используете Windows 8 или Windows Server 2012, можно использовать зарегистрированный ввод-вывод, который предлагает более высокую пропускную способность для более низкого процессора, чем обычный IOCP; он делает это путем вырезания переходов ядра, нулевой копии, среди прочего

API: http://msdn.microsoft.com/en-us/library/windows/desktop/ms740642%28v=vs.85%29.aspx

Справочная информация: http://www.serverframework.com/asynchronousevents/rio/