2016-11-29 7 views
2

Я зову искровым представить прохождение Maxrate, у меня есть один KINESIS приемник и партии 1sСпарк Streaming + Kinesis: Приемник Maxrate нарушается

spark-submit --conf spark.streaming.receiver.maxRate=10 ....

однако одна партия может значительно превышать тверда Maxrate. i.e: Im получает 300 записей.

Не хватает ли каких-либо настроек?

ответ

2

Это похоже на ошибку. От ковыряния в коде, похоже, Kinesis полностью игнорирует конфигурацию spark.streaming.receiver.maxRate.

Если заглянуть внутрь KinesisReceiver.onStart, вы видите:

val kinesisClientLibConfiguration = 
    new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId) 
    .withKinesisEndpoint(endpointUrl) 
    .withInitialPositionInStream(initialPositionInStream) 
    .withTaskBackoffTimeMillis(500) 
    .withRegionName(regionName) 

Этот конструктор заканчивает вызов другой конструктор, который имеет много значений по умолчанию для конфигурации:

public KinesisClientLibConfiguration(String applicationName, 
     String streamName, 
     AWSCredentialsProvider kinesisCredentialsProvider, 
     AWSCredentialsProvider dynamoDBCredentialsProvider, 
     AWSCredentialsProvider cloudWatchCredentialsProvider, 
     String workerId) { 
    this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, 
      dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, 
      DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, 
      DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, 
      DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, 
      new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), 
      DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, 
      DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null); 
} 

Тот, который вы заботитесь о DEFAULT_MAX_RECORDS, который постоянно установлен в 10 000 записей. Существует метод на KinesisClientLibConfiguration под названием withMaxRecords, который вы вызываете для установки фактического количества записей. Это должно быть легко исправить.

Но на данный момент похоже, что приемник Kinesis не соблюдает этот параметр.

+0

определенно, что стало причиной вопросов! благодаря –

2

Для справок в будущем.

Это известная bug фиксируется в Spark 2.2.0 выпуске

 Смежные вопросы

  • Нет связанных вопросов^_^