2016-06-23 4 views
0

Я пытаюсь использовать API Flink Table в scala. Нет ошибок при составлении времени, но qhen i'm выполнения задания в моем FLiNK кластера: flink.api.table.TableException: Type is not supported:<GenericType<java.lang.Object>flink.api.table.TableException: Тип не поддерживается

Мои Maven зависимостей:

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-scala_2.11</artifactId> 
    <version>${flink.version}</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-streaming-scala_2.11</artifactId> 
    <version>${flink.version}</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-table_2.11</artifactId> 
    <version>1.1-SNAPSHOT</version> 
</dependency> 

Мой импорт:

import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.api.scala.table._ 
import org.apache.flink.api.table.{Row, Table, TableEnvironment} 

Мой код:

// odo[(Long,String,Double,Long)] 
val inputTable = odo.toTable(tableEnv,'ts,'ty, 'vl, 'dv) 
val resultStream: Table = inputTable.where('ty === "Odometer").select('dv) 
resultStream.toDataStream[Row].print 

Обновление: Я думаю, это могло быть о версии FLiNK (1.0.3), потому что, когда я делаю что-то вроде этого:

val inputTable = odo.toTable(tableEnv, 'ts, 'ty, 'vl, 'dv) 
val result = inputTable.select('dv,'vl.sum).where('dv == 111) 
result.toDataStream[Row].print() 

У меня есть еще одно исключение: org.apache.flink.api.table.TableException: Aggregate on stream tables is currently not supported.

Любая помощь приветствуется. Спасибо.

ответ

1

API таблицы Flink не поддерживает поля, содержащие GenericType в 1.1-SNAPSHOT. Существует Pull Request, который реализует эту функцию. Весьма вероятно, что он будет содержаться в выпуске Flink 1.1.

Что касается вашего второго исключения: исключение в основном объясняет сам. Пока вы не можете делать скопления на потоках. Однако StreamSQL находится под development.

+0

Я попробовал API таблиц с новой версией Flink (1.1.0), и появилось новое исключение. Я не знаю, какая зависимость теперь мне нужна: java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo' и 'java.lang.ClassNotFoundException: org.apache.flink.api.common. typeinfo.SqlTimeTypeInfo' – jag

+0

«odo» был потоком с водяными знаками, поэтому я сделал «odo» поток без водяных знаков, и это новое исключение: 'java.lang.ClassCastException: org.apache.flink.api.java.typeutils .GenericTypeInfo не может быть передан в org.apache.flink.api.common.typeutils.CompositeType' – jag

+0

Ваша первая проблема звучит как неправильная конфигурация зависимости Maven. Ваша вторая проблема, как ошибка. Не могли бы вы открыть для него проблему. Возможно, с небольшим фрагментом кода, чтобы воспроизвести его. Тогда кто-то может глубже взглянуть на него. – twalthr

 Смежные вопросы

  • Нет связанных вопросов^_^