0

У меня есть соединитель Kafka со следующим кодом для метода poll() в реализации SourceTask.Почему метаданные добавляются на выход этого разъема Kafka?

@Override 
public List<SourceRecord> poll() throws InterruptedException 
{ 
    SomeType item = mQueue.take(); 
    List<SourceRecord> records = new ArrayList<>(); 
    SourceRecord[] sourceRecords = new SourceRecord[]{ 
     new SourceRecord(null, null, "data", null, 
         Schema.STRING_SCHEMA, "foo", 
         Schema.STRING_SCHEMA, "bar") 
    }; 
    Collections.addAll(records, sourceRecords); 

    return records; 
} 

Если я придаю потребитель к теме данных, я получаю следующее сообщение, посланное через от разъема:

{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} 

Если я публикую сообщение прямо к теме, используя следующие команды:

echo -e 'foo,bar' > /tmp/test_kafka.txt 
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=, 

Затем присоедините же потребитель, я получаю сообщение:

foo bar 

Это то, что я ожидал увидеть как результат реализации коннектора, а не сообщение {"schema":..., которое я получил.

Как изменить реализацию poll() так, чтобы сообщение было отправлено без метаданных схемы, отображаемых в фактическом ключе и значении сообщения?

ответ

1

Хорошо, оказывается, что это было только потому, что я имел следующие строки в connect-standalone.properties

key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 

я должен был

key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter 

В качестве альтернативного решения, я также был в состоянии изменить следующие от истинного до ложного

value.converter.schemas.enable=false 

Тогда в моем классе процессора I cha nged код на:

SourceRecord[] sourceRecords = new SourceRecord[]{ 
    new SourceRecord(null, null, "data", null, 
        Schema.STRING_SCHEMA, "foo", 
        null, "bar") 
}; 

Это отличается, потому что я больше не определяя схему для значения.