2017-02-21 19 views
0

У меня есть задача Apache Beam, которая читает из источника MySQL с использованием JDBC, и она должна записывать данные так же, как и в таблицу BigQuery. На данный момент никаких преобразований не произойдет, что произойдет позже, на данный момент я просто хочу, чтобы вывод базы данных был непосредственно записан в BigQuery.Использование MySQL в качестве источника ввода и записи в Google BigQuery

Это основной метод пытается выполнить эту операцию:

public static void main(String[] args) { 
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 

     Pipeline p = Pipeline.create(options); 

     // Build the table schema for the output table. 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("phone").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("url").setType("STRING")); 
     TableSchema schema = new TableSchema().setFields(fields); 

     p.apply(JdbcIO.<KV<String, String>>read() 
     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
      "com.mysql.jdbc.Driver", "jdbc:mysql://host:3306/db_name") 
      .withUsername("user") 
      .withPassword("pass")) 
      .withQuery("SELECT phone_number, identity_profile_image FROM scraper_caller_identities LIMIT 100") 
      .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() { 
       public KV<String, String> mapRow(ResultSet resultSet) throws Exception { 
       return KV.of(resultSet.getString(1), resultSet.getString(2)); 
      } 
      }) 
     .apply(BigQueryIO.Write 
      .to(options.getOutput()) 
      .withSchema(schema) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))); 

     p.run(); 
    } 

Но когда я выполнить шаблон используя Maven, я получаю следующее сообщение об ошибке:

Test.java:[184,6] cannot find symbol symbol: method apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
location: class org.apache.beam.sdk.io.jdbc.JdbcIO.Read<com.google.cloud.dataflow.sdk.values.KV<java.lang.String,java.lang.String>>

Кажется, что я не передавая BigQueryIO. Запишите ожидаемый сбор данных, и с этим я сейчас борюсь.

Как я могу заставить данные, поступающие из MySQL, удовлетворять ожиданиям BigQuery в этом случае?

ответ

1

Я думаю, что вам нужно, чтобы обеспечить PCollection <TableRow> в BigQueryIO.Write вместо PCollection < < KV String, String > > типа, что RowMapper выводит.

Также, при настройке TableRow, используйте правильные названия столбцов и значений. Примечание: Я думаю, что ваши KV - это значения телефона и url (например, {"555-555-1234": "http://www.url.com"}), а не имена и пары имен столбцов (например, {"phone": "555-555- 1234" , „URL“: «http://www.url.com»})

Смотрите пример здесь: https://beam.apache.org/documentation/sdks/javadoc/0.5.0/

могли бы вы дать это попробовать и дайте мне знать, если это работает для вас? Надеюсь это поможет.