2017-01-13 5 views
1

Я столкнулся java.io.IOException: s3n://bucket-name : 400 : Bad Request error при загрузке данных через RedShift spark-redshift library:Amazon S3A возвращает 400 Bad Request с искровым красного смещения библиотеки

Красное смещение кластера и s3 ведро и находятся в районе Мумбаи.

Вот полный стек ошибок:

2017-01-13 13:14:22 WARN TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, master): java.io.IOException: s3n://bucket-name : 400 : Bad Request 
      at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:453) 
      at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427) 
      at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411) 
      at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181) 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:498) 
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) 
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) 
      at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source) 
      at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476) 
      at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:115) 
      at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader$1.apply(RedshiftFileFormat.scala:92) 
      at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader$1.apply(RedshiftFileFormat.scala:80) 
      at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:279) 
      at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:263) 
      at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) 
      at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) 
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
      at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
      at org.apache.spark.scheduler.Task.run(Task.scala:86) 
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
      at java.lang.Thread.run(Thread.java:745) 
    Caused by: org.jets3t.service.impl.rest.HttpException: 400 Bad Request 
      at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425) 
      at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279) 
      at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052) 
      at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264) 
      at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193) 
      at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120) 
      at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575) 
      at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174) 
      ... 30 more 

И вот мой Java-код для того же:

SparkContext sparkContext = SparkSession.builder().appName("CreditModeling").getOrCreate().sparkContext(); 
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); 
sparkContext.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", fs_s3a_awsAccessKeyId); 
sparkContext.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", fs_s3a_awsSecretAccessKey); 
sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com"); 

SQLContext sqlContext=new SQLContext(sparkContext); 
Dataset dataset= sqlContext 
     .read() 
     .format("com.databricks.spark.redshift") 
     .option("url", redshiftUrl) 
     .option("query", query) 
     .option("aws_iam_role", aws_iam_role) 
     .option("tempdir", "s3a://bucket-name/temp-dir") 
     .load(); 

Я был в состоянии решить проблему на локальном режиме искрового делая следующие изменения (см. this):

1) Я заменил jets3t jar на 0.9.4

2) Изменение свойств конфигурации Jets3t для поддержки aws4 версии ведра следующим образом:

Jets3tProperties myProperties = Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME); 
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com"); 
myProperties.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256"); 
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646"); 

Но теперь я пытаюсь запустить работу в кластерном режиме автономного режим (искры или с менеджер ресурсов MESOS), и ошибка появляется снова :(

Любая помощь будет оценена!

ответ

1

Актуальные проблемы:

Обновление Jets3tProperties, для поддержки AWS s3 подпись версии 4, во время выполнения работал над локальный режим, но не в режиме кластера, потому что свойства только обновлялись в JVM драйвера, но не на любом из JVM-исполнителей.

Решение:

Я нашел обходной путь для обновления Jets3tProperties на всех исполнителей, ссылаясь на this ссылку.

Обращаясь к приведенной выше ссылке, я добавил дополнительный фрагмент кода для обновления Jets3tProperties внутри функции .foreachPartition(), которая будет запускать его для первого раздела, созданного для любого из исполнителей.

Вот код:

Dataset dataset= sqlContext 
      .read() 
      .format("com.databricks.spark.redshift") 
      .option("url", redshiftUrl) 
      .option("query", query) 
      .option("aws_iam_role", aws_iam_role) 
      .option("tempdir", "s3a://bucket-name/temp-dir") 
      .load(); 

dataset.foreachPartition(rdd -> { 
    boolean first=true; 
    if(first){ 
     Jets3tProperties myProperties = 
       Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME); 
     myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com"); 
     myProperties 
       .setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256"); 
     myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646"); 
     first = false; 
    } 
}); 
+0

Есть ли лучшее решение для этого? Я тоже застрял здесь. –

+0

@SudevAmbadi Ответ на ваш вопрос: нет никакого прямого решения в любом месте, это хак, который я должен был поставить. Его нужно обрабатывать через библиотеку Jets3t. –

0

Этот стек предполагает, что вы используете старый разъем s3n, основанный на jets3t. вы устанавливаете разрешения, которые работают только с S3a, новее. Используйте URL-адрес, например s3a: //, чтобы выбрать новую запись.

Учитывая, что вы пытаетесь использовать API V4, вам также необходимо установить fs.s3a.endpoint. Отклик 400/плохой запрос, который вы хотите увидеть, если вы пытались авторизовать с v4 против центральной endpointd

+0

Спасибо за Ваш ответ @Steve Loughran :) Я заменил имена переменных с действительными значениями в вопросе. Теперь, как вы можете видеть, я установил tempdir с URL-адресом s3a: //, как вы уже упоминали. –

+0

Также я поставил фактическое значение fs.s3a.endpoint для региона Мумбаи. И вы правы в отношении причины 400/плохого запроса, как вы упомянули «Ответ на 400/плохой запрос - это тот, который вы видели бы, если бы вы попытались выполнить авторизацию с v4 против центральной конечной точки». –

+0

Но все изменения, которые я здесь сделал, отлично работают с локальным режимом, но не в режиме кластера. Поэтому я предполагаю, что он может просто обновляться на JVM драйвера, а не на JVM исполнителя. Имеет ли это смысл? –

 Смежные вопросы

  • Нет связанных вопросов^_^