2016-12-10 5 views
2

Я использую DatastoreIO из моего потокового конвейера потоков данных и получаю сообщение об ошибке при записи объекта с тем же ключом.Как использовать транзакцию DatastoreIO

2016-12-10T22:51:04.385Z: Error: (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT 

Если я использую случайное число в ключе то вещи работают, но мне нужно обновить один и тот же ключ, есть транзакционный способ сделать это с помощью DataStoreIO?

static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> { 
    private static final long serialVersionUID = 0; 

    private final String namespace; 
    private final String kind; 

    CreateEntityFn(String namespace, String kind) { 
    this.namespace = namespace; 
    this.kind = kind; 
    } 

    public Entity makeEntity(String key, Tile tile) { 
    Entity.Builder entityBuilder = Entity.newBuilder(); 
    Key.Builder keyBuilder = makeKey(kind, key); 
    if (namespace != null) { 
     keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace); 
    } 
    entityBuilder.setKey(keyBuilder.build()); 
    entityBuilder.getMutableProperties().put("tile", makeValue(tile.toString()).build()); 
    return entityBuilder.build(); 
    } 

    @Override 
    public void processElement(ProcessContext c) { 
    String key = c.element().getKey(); 
    // this works key = key.concat(":" + UUID.randomUUID().toString()); 
    c.output(makeEntity(key, c.element().getValue())); 
    } 
} 

... 

... 
inputData = pipeline 
       .apply(PubsubIO.Read.topic(pubsubTopic)); 
windowedDataStreaming = inputData 
       .apply(Window.<String>into(
         SlidingWindows.of(Duration.standardMinutes(15)) 
            .every(Duration.standardSeconds(31)))); 


          ... 
          ... 
          ... 
//Create a Datastore entity 
PCollection<Entity> siteTileEntities = tileSiteKeyed 
     .apply(ParDo.named("CreateSiteEntities").of(new CreateEntityFn(options.getNamespace(), options.getKind())));  

// write site tiles to datastore 
siteTileEntities 
     .apply(DatastoreIO.v1().write().withProjectId(options.getDataset())); 

// Run the pipeline 
pipeline.run(); 
+0

Спасибо, что поделились своим кодом. Не могли бы вы также поделиться, как вы строите 'inputData'? Меня особенно интересует объект Dustastore 'Query'. –

+0

У меня нет объекта Datastore Query. Я читаю данные из pubsub. –

ответ

4

Ваш фрагмент кода не объясняет, как создается tileSiteKeyed. Предположительно это PCollection<KV<String, Tile>, но если он может иметь дубликаты String ключей, это объясняет проблему.

Обычно PCollection<KV<K, V>> может содержать несколько пар KV с одним и тем же ключом. Если вы хотите обеспечить уникальные ключи для каждого окна, вы можете использовать GroupByKey для этого. Это даст вам PCollection<KV<K, Iterable<V>>> с уникальными ключами на окно. Затем добавьте CreateEntityFn, чтобы принять Iterable<Tile> и создать единую мутацию с изменениями, которые вам нужно внести.

+0

Большое спасибо, я думаю, это проблема. Я попробую ваше предложение. –

2

Эта ошибка указывает на то, что облако Datastore получил Commit запрос с двумя мутациями для того же ключа (т.е. он пытается вставить один и тот же объект дважды или модифицировать тот же объект дважды).

Вы можете избежать ошибки, указав только одну мутацию за ключ за Commit.

+0

Он работает в потоковом потоке потока данных, и я предполагаю, что несколько рабочих одновременно могут писать один и тот же ключ. Мой вопрос просят об этом. –

+0

А это может быть связано с тем, как Dataflow выполняет дозирование под капотом. Вы могли бы поделиться образцом кода: в частности, запрос, который вы используете, и преобразования? –

+0

Спасибо, я обновил свой вопрос. –

 Смежные вопросы

  • Нет связанных вопросов^_^