0

Я использую платформу Confluent 3.0.1 на окнах. Я выполнил руководство по установке и руководство для разработчиков, чтобы выполнить всю установку и разработать мою топологию.AdminUtils.createTopic API throws kafka.admin.AdminOperationException

Я начал Zookeeper, затем сервер Kafka и попытался запустить мою топологию. Но ошибка ниже на сервере Kafka. Даже если я создаю тему вручную и запускаю топологию, я вижу ту же ошибку.

INFO Topic creation {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$) 
[2016-09-21 17:20:08,807] INFO [KafkaApi-0] Auto creation of topic Text4 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis) 
[2016-09-21 17:20:09,436] ERROR [KafkaApi-0] Error when handling request {group_id=my-first-streams-application1} (kafka.server.KafkaApis) 
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1 
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117) 
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403) 
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629) 
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651) 
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657) 
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657) 
    at scala.Option.getOrElse(Option.scala:121) 
    at kafka.server.KafkaApis.getOrCreateGroupMetadataTopic(KafkaApis.scala:657) 
    at kafka.server.KafkaApis.handleGroupCoordinatorRequest(KafkaApis.scala:818) 
    at kafka.server.KafkaApis.handle(KafkaApis.scala:86) 
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) 
    at java.lang.Thread.run(Thread.java:745) 

И моя топология код ниже:

public class MetricTopology implements InitializingBean { 

    @Autowired 
    @Qualifier("getStreamsConfig") 
    private Properties properties; 

    // Method to build topology. 
    public void buildTopology() { 
     System.out.println("MetricTopology.buildTopology()"); 
     TopologyBuilder builder = new TopologyBuilder(); 
     // add the source processor node that takes Kafka topic "Text4" as input 
     builder.addSource("Source", "Text4") 
      // add the Metricsprocessor node which takes the source processor as its upstream processor 
      .addProcessor("Process",() -> new MetricsProcessor(), "Source"); 
     // Building Stream. 
     KafkaStreams streams = new KafkaStreams(builder, properties); 
     streams.start(); 
    } 

    // Called after all properties are set. 
    public void afterPropertiesSet() throws Exception { 
     buildTopology(); 
    } 

} 

Ниже перечислены свойства, которые я использую, который является частью другого исходного Java-файла.

Properties settings = new Properties(); 
// Set a few key parameters. This properties will be picked from property file. 
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application1"); 
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); 
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "1"); 
+0

И я получаю ниже ошибку при запуске моей топологии на стороне приложения [StreamThread-1] org.apache.kafka.clients.NetworkClient: ошибка при извлечении метаданных с идентификатором корреляции 30: {Text4 = LEADER_NOT_AVAILABLE} – Renukaradhya

ответ

1
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1 

Для того, чтобы создать тему с коэффициентом репликации 3, необходимо, по крайней мере 3 работающих брокеров.