2015-10-27 3 views
1

Я пытаюсь использовать пользователя Rust для чтения из нескольких тем. Это код, у меня сейчас:Потребление нескольких тем Кафки у потребителя ржавчины

extern crate kafka; 
use kafka::client::KafkaClient; 
use kafka::consumer::Consumer; 
use kafka::utils; 
fn main(){ 
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned())); 
    let res = client.load_metadata_all(); 
    let topics = client.topic_partitions.keys().cloned().collect(); 
    let offsets = client.fetch_offsets(topics, -1); 
    for topic in &topics { 
    let mut con = Consumer::new(client, "test-consumer-group".to_owned(), "topic".to_owned()).partition(0); 
    let mut messagedata = 0; 
    for msg in con { 
     println!("{}", str::from_utf8(&msg.message).unwrap().to_string()); 
    } 
    } 
} 

ниже ошибка:

src/main.rs:201:19: 201:25 error: use of moved value: `topics` [E0382] 
src/main.rs:201  for topic in &topics { 
            ^~~~~~ 
    note: in expansion of for loop expansion 
    src/main.rs:201:5: 210:6 note: expansion site 
    src/main.rs:167:40: 167:46 note: `topics` moved here because it has type `collections::vec::Vec<collections::string::String>`, which is non-copyable 
    src/main.rs:167  let offsets = client.fetch_offsets(topics, -1); 
                 ^~~~~~ 
    src/main.rs:203:37: 203:43 error: use of moved value: `client` [E0382] 
    src/main.rs:203  let mut con = Consumer::new(client, "test-consumer-group".to_owned(), "topicname".to_owned()).partition(0); 
                ^~~~~~ 
    note: in expansion of for loop expansion 
    src/main.rs:201:5: 210:6 note: expansion site 
    note: `client` was previously moved here because it has type  `kafka::client::KafkaClient`, which is non-copyable 
    error: aborting due to 2 previous errors 

Чтобы лучше объяснить мой вопрос, вот мой частичный выполнимый код только одной теме:

let mut con = Consumer::new(client, "test-consumer-group".to_owned(), "testtopic".to_owned()).partition(0); 

for msg in con { 
    println!("{}", str::from_utf8(&msg.message).unwrap().to_string()); 
} 

И я протестировал функцию fetch_message, она работает для нескольких тем, но в результате у меня есть (msgs) тема Topicmessage, я не знаю, как получить сообщение от Topicmessage.

let msgs = client.fetch_messages_multi(vec!(utils::TopicPartitionOffset{ 
              topic: "topic1".to_string(), 
              partition: 0, 
              offset: 0 //from the begining 
              }, 
             utils::TopicPartitionOffset{ 
              topic: "topic2".to_string(), 
              partition: 0, 
              offset: 0 
             }, 
             utils::TopicPartitionOffset{ 
              topic: "topic3".to_string(), 
              partition: 0, 
              offset: 0 
             })); 
for msg in msgs{ 
    println!("{}", msg); 
} 
+0

@Shepmaster Спасибо за головы, я пересмотрел свой вопрос. – coco

+2

Эти вопросы на самом деле являются основой Rust: права собственности. Пожалуйста, ознакомьтесь с другими проблемами stackoverflow, которые имеют ошибку «использование перемещенного значения» и прочитайте главу книги о праве собственности: https://doc.rust-lang.org/stable/book/ownership.html –

+0

@ker Спасибо за ваш Помогите! – coco

ответ

1

В конце я изменил код, как этот, и он работает.

let msgs = client.fetch_messages_multi(...).unwrap(); 
for msg in msgs{ 
    println!("{}", msg.message); 
} 
+1

, пожалуйста, напишите объяснение о том, что не так в вопросе и почему ваше решение верно –