Это похоже на ошибку. От ковыряния в коде, похоже, Kinesis полностью игнорирует конфигурацию spark.streaming.receiver.maxRate
.
Если заглянуть внутрь KinesisReceiver.onStart
, вы видите:
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
Этот конструктор заканчивает вызов другой конструктор, который имеет много значений по умолчанию для конфигурации:
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
Тот, который вы заботитесь о DEFAULT_MAX_RECORDS
, который постоянно установлен в 10 000 записей. Существует метод на KinesisClientLibConfiguration
под названием withMaxRecords
, который вы вызываете для установки фактического количества записей. Это должно быть легко исправить.
Но на данный момент похоже, что приемник Kinesis не соблюдает этот параметр.
определенно, что стало причиной вопросов! благодаря –