Я использую Spark для загрузки некоторых данных в BigQuery. Идея состоит в том, чтобы читать данные с S3 и использовать API клиента Spark и BigQuery для загрузки данных. Ниже приведен код, который вставляет в BigQuery.BigQuery - Как установить время ожидания чтения в клиентской библиотеке Java
val bq = createAuthorizedClientWithDefaultCredentialsFromStream(appName, credentialStream)
val bqjob = bq.jobs().insert(pid, job, data).execute() // data is a InputStream content
При таком подходе я вижу много исключений SocketTimeoutException.
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:911)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
Похоже, что задержка при чтении с S3 приводит к тому, что HTTP-клиент Google отключается. Я хотел увеличить тайм-аут и попробовал следующие варианты.
val req = bq.jobs().insert(pid, job, data).buildHttpRequest()
req.setReadTimeout(3 * 60 * 1000)
val res = req.execute()
Но это вызывает отказ в предварительном условии в BigQuery. Он ожидает, что mediaUploader будет нулевым, но не уверен, почему.
Exception in thread "main" java.lang.IllegalArgumentException
at com.google.api.client.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:76)
at com.google.api.client.util.Preconditions.checkArgument(Preconditions.java:37)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.buildHttpRequest(AbstractGoogleClientRequest.java:297)
Это заставило меня попробовать второй вставки API на BigQuery
val req = bq.jobs().insert(pid, job).buildHttpRequest().setReadTimeout(3 * 60 * 1000).setContent(data)
val res = req.execute()
и на этот раз не удалось с другой ошибкой.
Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "Job configuration must contain exactly one job-specific configuration object (e.g., query, load, extract, spreadsheetExtract), but there were 0: ",
"reason" : "invalid"
} ],
"message" : "Job configuration must contain exactly one job-specific configuration object (e.g., query, load, extract, spreadsheetExtract), but there were 0: "
}
Просьба предложить мне, как установить таймаут. Также укажите, если я делаю что-то неправильно.
Я не уверен, что полностью соблюдаю это. Пара вопросов: (1) Если проблема в том, что чтение S3 происходит медленно, можете ли вы сначала прочитать содержимое S3, а затем запустить операцию BigQuery после локальных данных? BigQuery не поддерживает чтение непосредственно с S3, поэтому вы должны переносить контент локально. (2) Можете ли вы привести пример запроса на работу json, который отключен? –
Что касается последней проблемы, которую вы упомянули, «Конфигурация задания должна содержать ровно один заданный для конкретной задачи объект конфигурации, но было 0», вы выбрасываете исходную конфигурацию и затем отправляете пустой запрос json в файл bigquery.jobs.insert. Я думаю, что это будет исправлено установкой таймаута правильно - см. Ниже ответ, как это сделать. –
вы должны попробовать API переноса данных Google - https://cloud.google.com/storage/transfer/index.Предполагается, что S3 поддерживается как источник, а gcs - в качестве адресата. –