Работа с Kafka (v2.11-0.10.1.0) -spark-streaming (v-2.0.1-bin-hadoop2.7).Как получить смещение потребителя Кафки?
У меня есть Kafka Producer и Spark-streaming потребитель для производства и потребления. Все работает нормально, пока я не остановлю потребителя (около 2 минут) и начните снова. Потребитель начинает и читает данные, абсолютно совершенные. Но я потерялся с данными за 2 минуты, когда потребитель отключился.
Kafka consumer/server.properties не изменился.
производитель Кафка со свойствами:
Properties properties = new Properties();
properties.put("bootstrap.servers", AppCoding.KAFKA_HOST);
properties.put("auto.create.topics.enable", true);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("retries", 1);
logger.info("Initializing Kafka Producer.");
Producer<String, String> producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<String, String>(AppCoding.KAFKA_TOPIC, "", documentAsString));
Потребляя с помощью искровой потокового API, как:
SparkConf sparkConf = new SparkConf().setMaster(args[4]).setAppName("Streaming");
// Create the context with 60 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(60000 * 5));
//input arguments:localhost:2181 sparkS incoming 10 local[*]
Set<String> topicsSet = new HashSet<>(Arrays.asList(args[2].split(";")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", args[0]);
//input arguments: localhost:9092 "" incoming 10 local[*]
JavaPairInputDStream<String, String> kafkaStream =
KafkaUtils.createDirectStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);
На другом конце я использую ActiveMQ. В то время как ActiveMQ Consumer мог получать данные во время их отключения. Помогите мне, если есть проблема с запутанностью.
Спасибо. Нет такой величины смещения, как раньше. Я должен использовать только малый/самый большой. Самый маленький: - дает все данные каждый раз, когда я запускаю потребитель. В результате программа снова и снова будет выполнять одни и те же данные. Самый большой: - в любом случае дает самое последнее значение по умолчанию. Я ищу подход, который потребитель мог бы прочитать, откуда он ушел. – srikanth
Потребитель прочитает, откуда он ушел автоматически. Config 'auto.offset.reset' предназначен для случаев отсутствия смещения инициализации или смещения вне диапазона. – amethystic