0

У меня есть пара функций, и каждая функция создает журналы, относящиеся к одной транзакции; Это многопоточное приложение, поэтому запись функции func1 может быть случайной для транзакции, но для одной транзакции она будет выполняться только с помощью func1, func2 и func3.Способ отправки половинных журналов в RabbitMQ перед отправкой полного журнала на logstash/elasticsearch

func1(transactionId) { 
    log("%d Now in func1", transactionId); 
} 

func2(transactionId) { 
    log("%d Now in func2", transactionId); 
} 

func3(transactionId) { 
    log("%d Now in func3", transactionId); 
} 

Теперь я хочу написать в logstash сразу для каждой транзакции ТОЛЬКО за раз; то есть

1 Now in func1 Now in func2 Now in fun3 

, а затем это необходимо, наконец, перейти на поиск elasticsearch;

Я подумывал написать половину транзакционного журнала в временную очередь RabbitMQ, а затем по завершении полной транзакции я передам его в очередь производителей RabbitMQ для отправки сообщения в logstash;

Как

func1(transactionId) { 
    add2RMQ(transactionId, "Now in func1"); 
} 

func2(transactionId) { 
    add2RMQ("transactionId, "Now in func2"); 
} 

func3(transactionId) { 
     add2RMQ("transactionId, "Now in func3"); 
     /* Last point of transaction */ 
     commit2RMQ(transactionId); 
} 

Время commit2RMQ выполнить logstash должен получить полное сообщение, специфичную для операции записи в elasticsearch.

Вопрос:

  1. Что является правильным решением для решения этой проблемы, чтобы отправить данные, относящиеся к сделке сразу elasticsearch?
  2. Можем ли мы решить эту проблему с помощью RabbitMQ? Если да, то какой правильный API я должен использовать для этого?
  3. Есть ли какой-либо способ, которым я могу достичь этого без RabbitMQ, но только с помощью logstash и elasticsearch?
  4. Я не хочу использовать API обновления elasticsearch, так как он может потреблять много операций поиска для каждого сообщения журнала, специфичного для трансакции.

ответ

3

Попытка объединить различные строки журнала, относящиеся к одной транзакции, не является простой проблемой для решения, особенно если вы добавляете систему очередности сообщений в микс в качестве промежуточного хранилища журналов для агрегирования. Я бы пошел другим путем, который не связан с другой подсистемой, такой как RabbitMQ.

Кроме того, если вы попытаетесь объединить несколько строк журнала в один, вы потеряете подробную информацию, которую может предоставить каждая строка журнала, например, количество времени, которое каждая функция выполняла для выполнения, например. Также, что произойдет, если func2, соответственно func3, выдает исключение? Если вы храните частичный журнал, состоящий только из func1, то только func1 и func2?

То, что я собираюсь писать, возможно, может быть перенесено на любой язык и любое решение для регистрации, но с целью иллюстрации я предполагаю, что ваша программа написана на Java, и вы используете Log4J.

Таким образом, я бы использовал Log4J's Mapped Diagnostic Context (MDC) для хранения идентификатора транзакции (и, возможно, других данных, таких как имя пользователя и т. Д.) В каждой строке вашего журнала. Таким образом, вы можете легко получить все строки журнала, относящиеся к одной транзакции. Преимущество этого заключается в том, что вам не нужно ничего собирать, вы просто предоставляете достаточную информацию о контенте, чтобы Kibana могла это сделать для вас позже.

В вашем псевдокоде вы добавляете идентификатор транзакции непосредственно к своему сообщению.Преимущество использования MDC для этого, а не для регистрации идентификатора в вашем сообщении, заключается в том, что он избавляет вас от разбора всех ваших сообщений в Logstash для повторного обнаружения идентификатора транзакции, который вы уже знали при создании строки журнала.

Так идея заключается в том, что в вашем коде, как только у вас есть идентификатор транзакции, добавить его к текущему контексту каждого потока, как это:

import org.apache.log4j.MDC; 

... 
func1(transactionId) { 
    // add the transaction ID to the logging context 
    MDC.put("transactionID", transactionId); 
    log("Now in func1"); 
} 

func2(transactionId) { 
    log("Now in func2"); 
} 

func3(transactionId) { 
    log("Now in func3"); 
} 

Затем в файле конфигурации Log4J, вы можете указать Appender используя %X{transactionID} шаблон для того, чтобы сохранить его, в данном случае я добавляю его только после того, как имя потока, но вы можете поместить его где угодно:

log4j.appender.consoleAppender.layout.ConversionPattern = %d [%t] [%X{transactionID}] %5p %c - %m%n 

Ваши журналы будут выглядеть к этому:

2015-09-28T05:07:28.425Z [http-8084-2] [625562271762] INFO YourClass - Now in func1 
2015-09-28T05:07:29.776Z [http-8084-2] [625562271762] INFO YourClass - Now in func2 
2015-09-28T05:07:30.652Z [http-8084-2] [625562271762] INFO YourClass - Now in func3 
              ^
               | 
            the transaction ID is here 

Если у вас есть лог такие строки, это то кусок пирога, чтобы получить идентификатор транзакции через фильтр Logstash grok и хранить его в своем собственном transactionID поле в индексе logstash. В Kibana вы можете искать идентификатор транзакции и сортировать по timestamp desc, и у вас будет весь контекст этой транзакции.

Дайте ему шанс!

+0

Большое спасибо; Я очень ценю то, как вы решили проблему с точки зрения предоставления мне решения, характерного для случаев сбоев. Я сделаю этот эксперимент. – Viswesn