2015-09-30 5 views
0

Я использую Spark для загрузки некоторых данных в BigQuery. Идея состоит в том, чтобы читать данные с S3 и использовать API клиента Spark и BigQuery для загрузки данных. Ниже приведен код, который вставляет в BigQuery.BigQuery - Как установить время ожидания чтения в клиентской библиотеке Java

val bq = createAuthorizedClientWithDefaultCredentialsFromStream(appName, credentialStream) 
val bqjob = bq.jobs().insert(pid, job, data).execute() // data is a InputStream content 

При таком подходе я вижу много исключений SocketTimeoutException.

Caused by: java.net.SocketTimeoutException: Read timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954) 
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:911) 
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) 
at java.io.BufferedInputStream.read(BufferedInputStream.java:345) 
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703) 
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) 
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534) 
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439) 
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) 
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) 
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37) 
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94) 
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972) 
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545) 
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562) 
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419) 
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) 
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427) 
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) 
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) 

Похоже, что задержка при чтении с S3 приводит к тому, что HTTP-клиент Google отключается. Я хотел увеличить тайм-аут и попробовал следующие варианты.

val req = bq.jobs().insert(pid, job, data).buildHttpRequest() 
req.setReadTimeout(3 * 60 * 1000) 
val res = req.execute() 

Но это вызывает отказ в предварительном условии в BigQuery. Он ожидает, что mediaUploader будет нулевым, но не уверен, почему.

Exception in thread "main" java.lang.IllegalArgumentException 
    at com.google.api.client.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:76) 
    at com.google.api.client.util.Preconditions.checkArgument(Preconditions.java:37) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.buildHttpRequest(AbstractGoogleClientRequest.java:297) 

Это заставило меня попробовать второй вставки API на BigQuery

val req = bq.jobs().insert(pid, job).buildHttpRequest().setReadTimeout(3 * 60 * 1000).setContent(data) 
val res = req.execute() 

и на этот раз не удалось с другой ошибкой.

Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request 
{ 
    "code" : 400, 
    "errors" : [ { 
    "domain" : "global", 
    "message" : "Job configuration must contain exactly one job-specific configuration object (e.g., query, load, extract, spreadsheetExtract), but there were 0: ", 
    "reason" : "invalid" 
    } ], 
    "message" : "Job configuration must contain exactly one job-specific configuration object (e.g., query, load, extract, spreadsheetExtract), but there were 0: " 
} 

Просьба предложить мне, как установить таймаут. Также укажите, если я делаю что-то неправильно.

+0

Я не уверен, что полностью соблюдаю это. Пара вопросов: (1) Если проблема в том, что чтение S3 происходит медленно, можете ли вы сначала прочитать содержимое S3, а затем запустить операцию BigQuery после локальных данных? BigQuery не поддерживает чтение непосредственно с S3, поэтому вы должны переносить контент локально. (2) Можете ли вы привести пример запроса на работу json, который отключен? –

+0

Что касается последней проблемы, которую вы упомянули, «Конфигурация задания должна содержать ровно один заданный для конкретной задачи объект конфигурации, но было 0», вы выбрасываете исходную конфигурацию и затем отправляете пустой запрос json в файл bigquery.jobs.insert. Я думаю, что это будет исправлено установкой таймаута правильно - см. Ниже ответ, как это сделать. –

+0

вы должны попробовать API переноса данных Google - https://cloud.google.com/storage/transfer/index.Предполагается, что S3 поддерживается как источник, а gcs - в качестве адресата. –

ответ

1

Ответ на главный вопрос из заголовка: как установить таймауты с использованием клиентской библиотеки Java.

Чтобы установить тайм-ауты, вам нужен пользовательский HttpRequestInitializer, настроенный на вашем клиенте. Например:

Bigquery.Builder builder = 
    new Bigquery.Builder(new UrlFetchTransport(), new JacksonFactory(), credential); 
final HttpRequestInitializer existing = builder.getHttpRequestInitializer(); 
builder.setHttpRequestInitializer(new HttpRequestInitializer() { 
    @Override 
    public void initialize(HttpRequest request) throws IOException { 
     existing.initialize(request); 
     request 
      .setReadTimeout(READ_TIMEOUT) 
      .setConnectTimeout(CONNECTION_TIMEOUT); 
     } 
    }); 
Bigquery client = builder.build(); 

Я не думаю, что это решит все проблемы, с которыми вы сталкиваетесь. Несколько идей, которые могут быть полезны, но я не совсем понимаю, сценарий, поэтому они могут быть выключены дорожки:

  • При перемещении больших файлов: рассмотрим постановку их на ГКС перед их загрузкой в ​​BigQuery.
  • Если вы используете загрузку мультимедиа для отправки данных по вашему запросу: они не могут быть слишком большими или вы рискуете таймаутами или сбоями сетевого подключения.
  • Если вы выполняете смущающую параллельную миграцию данных, а фрагменты данных относительно невелики, bigquery.tabledata.insertAll может быть более подходящим для сценариев с большими вентиляторами. См. https://cloud.google.com/bigquery/streaming-data-into-bigquery для получения более подробной информации.

Благодарим за вопрос!

+0

Спасибо, Майкл за ваши ценные материалы. Я попробую это и обновить. И использование Streaming API для пакетной операции выглядело не так. Особенно с большим количеством данных, я думаю, имеет смысл использовать пакетную нагрузку, такую ​​как insert API. – Jegan

+0

К сожалению, это не работает для меня. Он все еще разгоняется с 20 секундами, хотя я установил более высокий тайм-аут. Теперь мне интересно, пойду ли я в правильном направлении. Существуют ли какие-либо другие варианты для использования в случае загрузки записей из партии файлов в хранилище, отличном от gcs? – Jegan