2016-05-27 7 views
1
from __future__ import print_function # Python 2/3 compatibility 
import boto3 
import json 
import decimal 

#kinesis = boto3.resource('kinesis', region_name='eu-west-1') 
client = boto3.client('kinesis') 
with open("questions.json") as json_file: 
    questions = json.load(json_file) 
    Records = [] 
    count = 0 
    for question in questions: 
     value1 = question['value'] 
     if value1 is None: 
      value1 = '0' 
     record = { 'StreamName':'LoadtestKinesis', 'Data':b'question','PartitionKey':'value1' } 
     Records.append(record) 
     count +=1 
     if count == 500: 
      response = client.put_records(Records) 
      Records = [] 

Это мой питон скрипт для загрузки массива JSON файлов кинезиса потока где я сочетающий 500 записей использовать put_records функции. Но я получаю сообщение об ошибке: put_records() only accepts keyword arguments. Как передать список записей на этот метод? Каждая запись представляет собой json с ключом раздела.put_records() принимает только ключевые аргументы в Kinesis boto3 Python API

Образец Json:

[{ 
     "air_date": "2004-12-31", 
     "answer": "FDDDe", 
     "category": "AACC", 
     "question": "'No. 2: 1912 Olympian; football star at Carlisle Indian School; 6 MLB seasons with the Reds, Giants & Braves'", 
     "round": "DDSSS!", 
     "show_number": "233", 
     "value": "$200" 
    }] 

ответ

1

При прохождении нескольких записей, необходимо инкапсулировать записи в списке записей, а затем добавить идентификатор потока.

Формат примерно так:

{ 
    "Records": [ 
     { 
     "Data": blob, 
     "ExplicitHashKey": "string", 
     "PartitionKey": "string" 
     }, 
     { 
     "Data": "another record", 
     "ExplicitHashKey": "string", 
     "PartitionKey": "string" 
     } 
    ], 
    "StreamName": "string" 
} 

Смотрите Kinesis docs для получения дополнительной информации.

+0

Что делать, если у меня есть миллионы записей, я не могу записывать каждую информацию вручную в Records? Я искал петлю и добавлял каждую запись в список. Kinesis имеет лучшую производительность по 500 записей за партию, поэтому мне нужен способ добавить 500 записей сразу –

+0

@AnshumanRanjanyou все еще может обрабатывать пакетные записи. Вам просто нужно немного изменить свой код. – vageli

2
from __future__ import print_function # Python 2/3 compatibility 
    import boto3 
    import json 
    import decimal 
    import time 


    def putdatatokinesis(RecordKinesis): 
     start = time.clock() 
     response = client.put_records(Records=RecordKinesis, StreamName='LoadtestKinesis') 
     print ("Time taken to process" + len(Records) + " is " +time.clock() - start) 
     return response 
client = boto3.client('kinesis') 
firehoseclient = boto3.client('firehose') 
with open("questions.json") as json_file: 
    questions = json.load(json_file) 
    Records = [] 
    RecordKinesis = [] 
    count = 0 
    for question in questions: 
     value1 = question['value'] 
     if value1 is None: 
      value1 = '0' 
     recordkinesis = { 'Data':b'question','PartitionKey':value1 } 
     RecordKinesis.append(recordkinesis) 
     Records.append(record) 
     count +=1 
     if count == 500: 
      putdatatokinesis(RecordKinesis) 
      Records = [] 
      RecordKinesis = [] 

Это сработало. Идея состоит в том, чтобы передать аргумент Records в качестве аргумента ключа.