2017-01-27 14 views
2

Есть ли способ использовать datastax/spark-cassandra-connector, чтобы выбрать самую последнюю версию каждой клавиши раздела, которая эквивалентна Cassandra 3.6 and later PER PARTITION LIMIT option?spark-cassandra-connector на лимит раздела

В Cassandra 3.6 и более поздних версиях параметр ЛИМИТ в одной группе устанавливает максимальное число строк, что запрос возвращает из каждого раздела. Для примера создайте таблицу, которая будет сортировать данные в более чем одной секции .

Я пробовал методы ниже без успеха:

Кассандры Версия

[cqlsh 5.0.1 | Cassandra 3.9.0 | CQL spec 3.4.2 | Родной v4 протокол]

Главная

import org.apache.commons.lang.StringUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Encoder; 
import org.apache.spark.sql.Encoders; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.SaveMode; 
import org.apache.spark.sql.SparkSession; 

import com.datastax.spark.connector.japi.CassandraRow; 
import com.datastax.spark.connector.japi.rdd.CassandraJavaPairRDD; 
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD; 
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD; 
import com.google.common.collect.ImmutableMap; 

import lombok.extern.slf4j.Slf4j; 
import scala.Tuple2; 
import scala.Tuple3; 

import static java.lang.Double.*; 
import static java.lang.Integer.*; 

@Slf4j 
public class Main extends Configured implements Tool { 

    public static void main(String[] args) throws Exception { 
     System.exit(ToolRunner.run(new Main(), args)); 
    } 

    @Override 
    public int run(String[] args) throws Exception { 
     SPARK_SESSION = SparkSession 
      .builder() 
      .master(SPARK_MASTER) 
      .appName(APP_NAME) 
      .config("spark.cassandra.connection.host", CASSANDRA_HOST_IPS) 
      .config("spark.cassandra.auth.username", CASSANDRA_USER_NAME) 
      .config("spark.cassandra.auth.password", CASSANDRA_PASSWORD) 
      .config("pushdown", "true") 
      .getOrCreate(); 

     try (JavaSparkContext sc = new JavaSparkContext(SPARK_SESSION.sparkContext())) { 
      insertPerPartitionLimitTestList(); 
      getJavaRddPerPartitionLimitTest(sc); 
      getTypedJavaRddPerPartitionLimitTest(sc); 
      getJavaPairRddPerPartitionLimitTest(sc); 
      getCassandraJavaRddPerPartitionLimitTest(sc); 
      getTypedCassandraJavaRddPerPartitionLimitTest(sc); 
      getCassandraTableScanJavaRddPerPartitionLimitTest(sc); 
      getTypedCassandraTableScanJavaRddPerPartitionLimitTest(sc); 
      getCassandraJavaRddToJavaRddPerPartitionLimitTest(sc); 
      getSparkDatasetPerPartitionLimitTest(sc); 
      getSparkSqlDatasetPerPartitionLimitTest(); 
      log.info("Done"); 
      return 0; // success exit code 
     } catch (Throwable t) { 
      log.error("Spark transform failed.", t); 
      return 1; // failure exit code 
     } 
    } 

    public final Map<String, String> cassandraConfig(String keyspace, String table) { 
     return ImmutableMap.<String, String>builder() 
      .put("spark.cassandra.connection.host", CASSANDRA_HOST_IPS) 
      .put("spark.cassandra.auth.username", CASSANDRA_USER_NAME) 
      .put("spark.cassandra.auth.password", CASSANDRA_PASSWORD) 
      .put("pushdown", "true") 
      .put("keyspace", keyspace) 
      .put("table", table) 
      .build(); 
    } 

    /** 
    * Generate test data to INSERT INTO the Cassandra bug.per_partition_limit_test table. 
    * 
    * @param listSize The number of rows of test data to generate. 
    * @return {@link List} of {@link PerPartitionLimitTest} containing test data. 
    */ 
    public List<PerPartitionLimitTest> buildPerPartitionLimitTestList(Integer listSize){ 
     final Timestamp timeSeriesDate = Timestamp.from(LocalDateTime.now().atZone(ZoneId.of("UTC")).toInstant()); 
     final List<PerPartitionLimitTest> perPartitionLimitTests = new ArrayList<>(listSize); 
     // Populate List of objects with test data. 
     for(int i = 0; i < listSize; i++){ 
      final String itemUuid = UUID.randomUUID().toString(); 
      perPartitionLimitTests.add(
       PerPartitionLimitTest.of(
        itemUuid, 
        timeSeriesDate, 
        String.format("/items/%s", itemUuid.toString()) 
       ) 
      ); 
     } 
     return perPartitionLimitTests; 
    } 

    /** 
    * Generate test data and INSERT Dataset data into Cassandra table 
    */ 
    public void insertPerPartitionLimitTestList(){ 
     final Map<String, String> cassandraConfig = cassandraConfig("bug", "per_partition_limit_test"); 
     createDatasetFromList(
      PerPartitionLimitTest.class, 
      buildPerPartitionLimitTestList(20) 
     ) 
      .select("itemUuid", "timeSeriesDate", "itemUri") 
      .toDF("item_uuid", 
       "time_series_date", 
       "item_uri") 
      .write() 
      .format("org.apache.spark.sql.cassandra") 
      .mode(SaveMode.Append) 
      .options(cassandraConfig) 
      .save(); 
    } 

    private PerPartitionLimitTestRowReaderFactory perPartitionLimitTestRowReaderFactory = new PerPartitionLimitTestRowReaderFactory(); 

    public String getPerPartitionLimitTestItemUuidMin(JavaSparkContext sc){ 
     return String.valueOf(
      getPerPartitionLimitTestDataset(
       PerPartitionLimitTest.class, 
       "org.apache.spark.sql.cassandra", 
       cassandraConfig("bug", "per_partition_limit_test") 
      ) 
       .first() 
       .getItemUuid()); 
    } 

    public void getJavaRddPerPartitionLimitTest(JavaSparkContext sc){ 
     final String itemUuidMin = String.valueOf(
      getPerPartitionLimitTestDataset(
       PerPartitionLimitTest.class, 
       "org.apache.spark.sql.cassandra", 
       cassandraConfig("bug", "per_partition_limit_test") 
      ) 
      .first() 
      .getItemUuid()); 

     JavaRDD<CassandraRow> javaRDD = javaFunctions(sc) 
      .cassandraTable("bug", "per_partition_limit_test") 
      .where(String.format("TOKEN(item_uuid) > TOKEN(%s) PER PARTITION LIMIT 1", itemUuidMin)); 
     log.info(String.format("javaRDD.count() = %s", javaRDD.count())); 
    } 

    public void getTypedJavaRddPerPartitionLimitTest(JavaSparkContext sc){ 
     JavaRDD<PerPartitionLimitTest> javaRDD = javaFunctions(sc) 
      .cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory) 
      .where("PER PARTITION LIMIT 1"); 
     log.info(String.format("javaRDD.count() = %s", javaRDD.count())); 
    } 

    public void getJavaPairRddPerPartitionLimitTest(JavaSparkContext sc){ 
     JavaPairRDD<String, PerPartitionLimitTest> javaPairRDD = javaFunctions(sc) 
      .cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory) 
      .where("PER PARTITION LIMIT 1") 
      .keyBy((Function<PerPartitionLimitTest, String>) PerPartitionLimitTest::getItemUuid); 
     log.info(String.format("javaPairRDD.count() = %s", javaPairRDD.count())); 
    } 

    public void getTypedCassandraJavaRddPerPartitionLimitTest(JavaSparkContext sc){ 
     CassandraJavaRDD<PerPartitionLimitTest> cassandraJavaRDD = javaFunctions(sc) 
      .cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory) 
      .where("PER PARTITION LIMIT 1"); 
     log.info(String.format("cassandraJavaRDD.count() = %s", cassandraJavaRDD.count())); 
    } 

    public void getCassandraTableScanJavaRddPerPartitionLimitTest(JavaSparkContext sc){ 
     CassandraTableScanJavaRDD<CassandraRow> cassandraTableScanJavaRDD = javaFunctions(sc) 
      .cassandraTable("bug", "per_partition_limit_test") 
      .where("PER PARTITION LIMIT 1"); 
     log.info(String.format("cassandraTableScanJavaRDD.count() = %s", cassandraTableScanJavaRDD.count())); 
    } 

    public void getTypedCassandraTableScanJavaRddPerPartitionLimitTest(JavaSparkContext sc){ 
     CassandraTableScanJavaRDD<PerPartitionLimitTest> cassandraTableScanJavaRDD = javaFunctions(sc) 
      .cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory) 
      .where("PER PARTITION LIMIT 1"); 
     log.info(String.format("cassandraTableScanJavaRDD.count() = %s", cassandraTableScanJavaRDD.count())); 
    } 

    public void getCassandraJavaRddToJavaRddPerPartitionLimitTest(JavaSparkContext sc){ 
     CassandraJavaRDD<CassandraRow> cassandraJavaRDD = javaFunctions(sc) 
      .cassandraTable("bug", "per_partition_limit_test"); 
     JavaRDD<PerPartitionLimitTest> javaRDD = cassandraJavaRDD 
      .where("PER PARTITION LIMIT 1") 
      .map((Function<CassandraRow, PerPartitionLimitTest>) cassandraRow -> PerPartitionLimitTest.of(
       cassandraRow.getUUID("item_uuid").toString(), 
       new Timestamp(cassandraRow.getDateTime("time_series_date").getMillis()), 
       cassandraRow.getString("item_uri") 
      )); 
     log.info(String.format("javaRDD.count() = %s", javaRDD.count())); 
    } 

    /** 
    * SELECT data from an input data source into a typed {@link Dataset}. 
    * 
    * @param clazz {@link Class} The class of type T that Spark should used to convert the internal Spark SQL representation into. This 
    *         tells Spark the type of object each row in this Dataset should be encoded as. 
    * @param format Specifies the input data source format. 
    * @param config {@link Map} of {@link String} containing options defining the input data source connection. 
    * @param <T> type of class. 
    * @return Typed {@link Dataset} containing table data selected from the input data source. 
    */ 
    public <T> Dataset<T> getPerPartitionLimitTestDataset(Class<T> clazz, String format, Map<String, String> config) { 
     final Encoder<T> encoder = Encoders.bean(clazz); 
     return SPARK_SESSION 
      .read() 
      .format(format) 
      .options(config) 
      .load() 
      .select("item_uuid", "time_series_date", "item_uri") 
      .toDF("itemUuid", "timeSeriesDate", "itemUri") 
      .as(encoder); 
    } 

    public void getSparkDatasetPerPartitionLimitTest(JavaSparkContext sc){ 
     final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset = 
      getPerPartitionLimitTestDataset(
       PerPartitionLimitTest.class, 
       "org.apache.spark.sql.cassandra", 
       cassandraConfig("bug", "per_partition_limit_test") 
      ) 
      .where("PER PARTITION LIMIT 1"); 
     log.info(String.format("perPartitionLimitTestDataset.count() = %s", perPartitionLimitTestDataset.count())); 
    } 

    public void getSparkDatasetPerPartitionLimitTestWithTokenGreaterThan(JavaSparkContext sc){ 
     final String itemUuidMin = getPerPartitionLimitTestItemUuidMin(sc); 
     final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset = 
      getPerPartitionLimitTestDataset(
       PerPartitionLimitTest.class, 
       "org.apache.spark.sql.cassandra", 
       cassandraConfig("bug", "per_partition_limit_test") 
      ) 
      .where(String.format("TOKEN(item_uuid) > TOKEN(%s) PER PARTITION LIMIT 1", itemUuidMin)); 
     log.info(String.format("perPartitionLimitTestDataset.count() = %s", perPartitionLimitTestDataset.count())); 
    } 

    public void getSparkSqlDatasetPerPartitionLimitTest(){ 
     final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset = 
      getPerPartitionLimitTestDataset(PerPartitionLimitTest.class, "org.apache.spark.sql.cassandra", cassandraConfig("bug", "per_partition_limit_test")); 
     // Register the DataFrame as a SQL temporary view 
     perPartitionLimitTestDataset.createOrReplaceTempView("perPartitionLimitTests"); 
     final Encoder<PerPartitionLimitTest> perPartitionLimitTestEncoder = Encoders.bean(PerPartitionLimitTest.class); 
     // Modify data using Spark SQL 
     final Dataset<PerPartitionLimitTest> perPartitionLimitTestSqlDS = SPARK_SESSION.sql(
      "SELECT item_uuid, " 
       + "time_series_date, " 
       + "'item_uri " 
       + "FROM perPartitionLimitTests " 
       + "PER PARTITION LIMIT 1") 
      .as(perPartitionLimitTestEncoder); 
     log.info(String.format("perPartitionLimitTestSqlDS.count() = %s", perPartitionLimitTestSqlDS.count())); 
    } 
} 

PerPartitionLimitTestRowReader

import java.io.Serializable; 
    import java.sql.Timestamp; 

    import com.datastax.driver.core.Row; 
    import com.datastax.spark.connector.CassandraRowMetadata; 
    import com.datastax.spark.connector.ColumnRef; 
    import com.datastax.spark.connector.cql.TableDef; 
    import com.datastax.spark.connector.rdd.reader.RowReader; 
    import com.datastax.spark.connector.rdd.reader.RowReaderFactory; 

    import scala.collection.IndexedSeq; 

    public class PerPartitionLimitTestRowReader extends GenericRowReader<PerPartitionLimitTest> { 
     private static final long serialVersionUID = 1L; 
     private static RowReader<PerPartitionLimitTest> reader = new PerPartitionLimitTestRowReader(); 

     public static class PerPartitionLimitTestRowReaderFactory implements RowReaderFactory<PerPartitionLimitTest>, Serializable{ 
      private static final long serialVersionUID = 1L; 

      @Override 
      public RowReader<PerPartitionLimitTest> rowReader(TableDef arg0, IndexedSeq<ColumnRef> arg1) { 
       return reader; 
      } 

      @Override 
      public Class<PerPartitionLimitTest> targetClass() { 
       return PerPartitionLimitTest.class; 
      } 
     } 

     @Override 
     public PerPartitionLimitTest read(Row row, CassandraRowMetadata rowMetaData) { 
      PerPartitionLimitTest perPartitionLimitTest = new PerPartitionLimitTest(); 
      perPartitionLimitTest.setItemUuid(row.getUUID("item_uuid").toString()); 
      perPartitionLimitTest.setTimeSeriesDate(new Timestamp(row.getTimestamp("time_series_date").getTime())); 
      perPartitionLimitTest.setItemUri(row.getString("item_uri")); 
      return perPartitionLimitTest; 
     } 
    } 
} 

GenericRowReader

import java.io.Serializable; 

import com.datastax.spark.connector.ColumnRef; 
import com.datastax.spark.connector.rdd.reader.RowReader; 

import scala.Option; 
import scala.collection.Seq; 

public abstract class GenericRowReader<T> implements RowReader<T>, Serializable { 
    private static final long serialVersionUID = 1L; 

    @Override 
    public Option<Seq<ColumnRef>> neededColumns() { 
     return Option.empty(); 
    } 

} 

PerPartitionLimitTest домена Entity

import java.io.Serializable; 
import java.sql.Timestamp; 

import javax.validation.Valid; 
import javax.validation.constraints.NotNull; 
import javax.xml.bind.annotation.XmlRootElement; 
import javax.xml.bind.annotation.XmlType; 

import com.datastax.driver.mapping.annotations.Column; 
import com.datastax.driver.mapping.annotations.Table; 

import lombok.Data; 
import lombok.NoArgsConstructor; 
import lombok.NonNull; 
import lombok.RequiredArgsConstructor; 

@Data 
@NoArgsConstructor 
@Table(keyspace = "bug", name = "per_partition_limit_test") 
@RequiredArgsConstructor(staticName = "of") 
@XmlType(name = "PerPartitionLimitTest") 
@XmlRootElement(name = "perPartitionLimitTest") 
public class PerPartitionLimitTest implements Serializable { 

    /** 
    * Type 4 uuid that uniquely identifies the item. 
    */ 
    @Valid 
    @NotNull @NonNull 
    @Column(name = "item_uuid") 
    private String itemUuid; 

    /** 
    * The timestamp when the data was inserted into Cassandra. 
    */ 
    @NotNull @NonNull 
    @Column(name = "time_series_date")//, codec = TimestampTypeCodec.class) 
    private Timestamp timeSeriesDate; 

    /** 
    * URI that points to an itme. 
    */ 
    @Column(name = "item_uri") 
    @NotNull @NonNull 
    private String itemUri; 

} 

Cassandra Таблица:

USE bug; 

DROP TABLE IF EXISTS bug.per_partition_limit_test; 

CREATE TABLE bug.per_partition_limit_test (
    item_uuid uuid, 
    time_series_date timestamp, 
    item_uri text static, 
    PRIMARY KEY ((item_uuid), time_series_date) 
) WITH CLUSTERING ORDER BY (time_series_date DESC) 
AND comment = 'Table Properties: 
default_time_to_live - set to 518400 seconds which is 6 days, data will be automatically dropped after 6 days 
Compaction 
class - set to TimeWindowCompactionStrategy which is used for time series data stored in tables that use the default TTL for all data 
compaction_window_unit - set to DAYS which is time unit used to define the bucket size 
compaction_window_size - set to 6 which is how many units per bucket' 
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '6', 'compaction_window_unit': 'DAYS'} 
AND default_time_to_live = 518400 
AND gc_grace_seconds = 519400; 

Maven Ссылки:

<dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.10</artifactId> 
     <version>2.0.0-M3</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-core</artifactId> 
     <version>3.1.2</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-mapping</artifactId> 
     <version>3.1.2</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-extras</artifactId> 
     <version>3.1.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-catalyst_2.10</artifactId> 
     <version>2.0.2</version> 
     <scope>compile</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>2.0.2</version> 
     <scope>compile</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.10</artifactId> 
     <version>2.0.2</version> 
     <scope>compile</scope> 
    </dependency> 

Ошибки

[Stage 0:>               (0 + 8)/18]ERROR [2017-01-27 04:24:38,061] (Executor task launch worker-1) org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1) 
com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...) 
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:132) 
    at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224) 
    at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200) 
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906) 
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635) 
    ... 3 common frames omitted 
Wrapped by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...) 
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58) 
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24) 
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) 
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:113) 
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45) 
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28) 
    at com.sun.proxy.$Proxy8.prepare(Unknown Source) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:279) 
    ... 16 common frames omitted 
Wrapped by: java.io.IOException: Exception during preparation of SELECT "item_uuid", "time_series_date", "item_uri" FROM "bug"."per_partition_limit_test" WHERE token("item_uuid") > ? AND token("item_uuid") <= ? AND PER PARTITION LIMIT 1 ALLOW FILTERING: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293) 


[Stage 0:>               (0 + 8)/18]ERROR [2017-01-27 04:26:02,044] (Executor task launch worker-3) org.apache.spark.executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3) 
com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...) 
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:132) 
    at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224) 
    at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200) 
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906) 
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635) 
    ... 3 common frames omitted 
Wrapped by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...) 
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58) 
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24) 
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) 
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:113) 
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45) 
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28) 
    at com.sun.proxy.$Proxy8.prepare(Unknown Source) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:279) 
    ... 16 common frames omitted 
Wrapped by: java.io.IOException: Exception during preparation of SELECT "item_uuid", "time_series_date", "item_uri" FROM "bug"."per_partition_limit_test" WHERE token("item_uuid") > ? AND token("item_uuid") <= ? AND PER PARTITION LIMIT 1 ALLOW FILTERING: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293) 

ERROR [2017-01-27 01:41:50,369] (main) Main: Spark transform failed. 
org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'PARTITION' expecting <EOF>(line 1, pos 67) 

== SQL == 
TOKEN(item_uuid) > TOKEN(13432d97-3849-4158-8405-804447d1b0c3) PER PARTITION LIMIT 1 
-------------------------------------------------------------------^^^ 



ERROR [2017-01-27 04:27:31,265] (main) Main: Spark transform failed. 
org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input ''' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IF', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', 'CURRENT_DATE', 'CURRENT_TIMESTAMP', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, SCIENTIFIC_DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 36) 

== SQL == 
SELECT item_uuid, time_series_date, 'item_uri FROM perPartitionLimitTests PER PARTITION LIMIT 1 
------------------------------------^^^ 

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197) 
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99) 
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45) 
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53) 
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) 
    at Main.getSparkSqlDatasetPerPartitionLimitTest(Main.java:397) 
    at Main.run(Main.java:177) 
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) 
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84) 

ответ

 Смежные вопросы

  • Нет связанных вопросов^_^