3

Каждый раз, когда я пытаюсь создать новую таблицу в cassandra с новым TableDef, я получаю порядок кластеризации по возрастанию, и я пытаюсь получить спуск.Set Cassandra Clustering Order on TableDef с коннектором Spark Cassandra от Datastax

Я использую Cassandra 2.1.10, Spark 1.5.1 и Datastax Spark Cassandra Connector 1.5.0-M2.

Я создаю новый TableDef

val table = TableDef("so", "example", 
    Seq(ColumnDef("parkey", PartitionKeyColumn, TextType)), 
    Seq(ColumnDef("ts", ClusteringColumn(0), TimestampType)), 
    Seq(ColumnDef("name", RegularColumn, TextType))) 

rdd.saveAsCassandraTableEx(table, SomeColumns("key", "time", "name")) 

Что я ожидал увидеть в Кассандре является

CREATE TABLE so.example (
    parkey text, 
    ts timestamp, 
    name text, 
    PRIMARY KEY ((parkey), ts) 
) WITH CLUSTERING ORDER BY (ts DESC); 

То, что я в конечном итоге является

CREATE TABLE so.example (
    parkey text, 
    ts timestamp, 
    name text, 
    PRIMARY KEY ((parkey), ts) 
) WITH CLUSTERING ORDER BY (ts ASC); 

Как может Я заставляю его спускать порядок кластеризации?

ответ

2

Мне не удалось найти прямой способ сделать это. Кроме того, есть много других параметров, которые вы можете указать. Я закончил расширение ColumnDef и TableDef и переопределив метод cql в TableDef. Ниже приведен пример решения, которое я придумал. Если у кого-то есть лучший способ, или это станет естественным, я с удовольствием изменил бы ответ.

// Scala Enum 
object ClusteringOrder { 
    abstract sealed class Order(val ordinal: Int) extends Ordered[Order] 
    with Serializable { 
    def compare(that: Order) = that.ordinal compare this.ordinal 

    def toInt: Int = this.ordinal 
    } 

    case object Ascending extends Order(0) 
    case object Descending extends Order(1) 

    def fromInt(i: Int): Order = values.find(_.ordinal == i).get 

    val values = Set(Ascending, Descending) 
} 

// extend the ColumnDef case class to add enum support 
class ColumnDefEx(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_], 
    indexed: Boolean = false, val clusteringOrder: ClusteringOrder.Order = ClusteringOrder.Ascending) 
    extends ColumnDef(columnName, columnRole, columnType, indexed) 

// Mimic the ColumnDef object 
object ColumnDefEx { 
    def apply(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_], 
    indexed: Boolean, clusteringOrder: ClusteringOrder.Order): ColumnDef = { 
    new ColumnDefEx(columnName, columnRole, columnType, indexed, clusteringOrder) 
    } 

    def apply(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_], 
    clusteringOrder: ClusteringOrder.Order = ClusteringOrder.Ascending): ColumnDef = { 
    new ColumnDefEx(columnName, columnRole, columnType, false, clusteringOrder) 
    } 

    // copied from ColumnDef object 
    def apply(column: ColumnMetadata, columnRole: ColumnRole): ColumnDef = { 
    val columnType = ColumnType.fromDriverType(column.getType) 
    new ColumnDefEx(column.getName, columnRole, columnType, column.getIndex != null) 
    } 
} 

// extend the TableDef case class to override the cql method 
class TableDefEx(keyspaceName: String, tableName: String, partitionKey: Seq[ColumnDef], 
    clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], options: String) 
    extends TableDef(keyspaceName, tableName, partitionKey, clusteringColumns, regularColumns) { 

    override def cql = { 
    val stmt = super.cql 
    val ordered = if (clusteringColumns.size > 0) 
     s"$stmt\r\nWITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns)})" 
    else stmt 
    appendOptions(ordered, options) 
    } 

    private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef]): String = 
    clusteringColumns.map { col => 
     col match { 
     case c: ColumnDefEx => if (c.clusteringOrder == ClusteringOrder.Descending) 
      s"${c.columnName} DESC" else s"${c.columnName} ASC" 
     case c: ColumnDef => s"${c.columnName} ASC" 
     } 
    }.toList.mkString(", ") 

    private[this] def appendOptions(stmt: String, opts: String) = 
    if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt\r\nAND ${opts.substring(4)}" 
    else if (!stmt.contains("WITH") && opts.startsWith("AND")) s"WITH ${opts.substring(3)}" 
    else s"$stmt\r\n$opts" 
} 

// Mimic the TableDef object but return new TableDefEx 
object TableDefEx { 
    def apply(keyspaceName: String, tableName: String, partitionKey: Seq[ColumnDef], 
    clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], options: String = "") = 
    new TableDefEx(keyspaceName, tableName, partitionKey, clusteringColumns, regularColumns, 
     options) 

    def fromType[T: ColumnMapper](keyspaceName: String, tableName: String): TableDef = 
    implicitly[ColumnMapper[T]].newTable(keyspaceName, tableName) 
} 

Это позволило мне создать новые таблицы таким образом:

val table = TableDefEx("so", "example", 
    Seq(ColumnDef("parkey", PartitionKeyColumn, TextType)), 
    Seq(ColumnDefEx("ts", ClusteringColumn(0), TimestampType, ClusteringOrder.Descending)), 
    Seq(ColumnDef("name", RegularColumn, TextType))) 

rdd.saveAsCassandraTableEx(table, SomeColumns("key", "time", "name")) 

 Смежные вопросы

  • Нет связанных вопросов^_^