2016-05-09 1 views
0

Я хочу использовать kundera в качестве слоя dao для штурма Apache, чтобы хранить данные в базе данных cassandra. Моя топология создана без каких-либо проблем, и я могу получать сообщения, но когда я попытался сохранить данные в базе данных, я получаю сообщение:java.io.NotSerializableException: com.impetus.kundera.persistence.EntityManagerImpl

26240 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 
26275 [main] INFO o.a.s.d.supervisor - Starting supervisor with id 1db18608-a2df-47e0-8ae6-cc634c90f81d at host 10.0.0.4 
26308 [main] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[main,5,main] died 
java.lang.IllegalStateException: Bolt 'bolt1' contains a non-serializable field of type com.impetus.kundera.persistence.EntityManagerImpl, which was instantiated prior to topology creation. com.impetus.kundera.persistence.EntityManagerImpl should be instantiated within the prepare method of 'bolt1 at the earliest. 
     at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:127) ~[storm-core-1.0.0.jar:1.0.0] 
     at topology.ConnectorTopology.main(ConnectorTopology.java:52) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] 
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: com.impetus.kundera.persistence.EntityManagerImpl 
     at org.apache.storm.utils.Utils.javaSerialize(Utils.java:167) ~[storm-core-1.0.0.jar:1.0.0] 
     at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:122) ~[storm-core-1.0.0.jar:1.0.0] 
     ... 1 more 
Caused by: java.io.NotSerializableException: com.impetus.kundera.persistence.EntityManagerImpl 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_99] 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_99] 
     at org.apache.storm.utils.Utils.javaSerialize(Utils.java:163) ~[storm-core-1.0.0.jar:1.0.0] 
     at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:122) ~[storm-core-1.0.0.jar:1.0.0] 
     ... 1 more 

в topologybuilder классе я:

brokerSpout = new BrokerSpout(rabbitMQAMQP); 
      builder.setSpout("spout1", brokerSpout); 
printerbolt = new PrinterBolt(); 
      builder.setBolt("bolt1", printerbolt).shuffleGrouping("spout1"); 

в мой класс дао (который реализует интерфейс сериализации)

private EntityManager em; 
    @Transient 
    private EntityManagerFactory emf; 
    public SensorDAOImpl() { 
     // TODO Auto-generated constructor stub 

     emf = Persistence.createEntityManagerFactory("cassandra_pu"); 
     em = emf.createEntityManager(); 
    } 




    @Override 
    public void insert(Object entity) 
    { 

     em.persist(entity); 

    } 

мой болт код

private OutputCollector collector; 
    public PrinterBolt() { 
     this.index=0; 
    EntityManagerFactory emf = Persistence.createEntityManagerFactory("cassandra_pu"); 
    EntityManager em= emf.createEntityManager(); 
     database= new DatabaseController(em); 
     // TODO Auto-generated constructor stub 
    } 

    /* (non-Javadoc) 
    * @see org.apache.storm.task.IBolt#execute(org.apache.storm.tuple.Tuple) 
    */ 
    public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, 
      OutputCollector collector) { 
     this.collector = collector; 
    } 
    public void execute(Tuple tuple) { 
     //tuple.getBinaryByField(arg0) 
     String message = tuple.getStringByField("message"); 
     System.out.println("Receive ["+index+"]"+message); 
     database.saveEntitie(tuple.getBinaryByField("message")); 
     index++; 

    } 

Итак, как я могу решить эту проблему, (я не могу прикоснуться к классу com.impetus.kundera.persistence.EntityManagerImpl)

ответ

1

Instantiate поле типа com.impetus.kundera.persistence.EntityManagerImpl в подготовке метод вашего болта.

public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, 
      OutputCollector collector) { 
    emf = Persistence.createEntityManagerFactory("cassandra_pu"); 
    em = emf.createEntityManager(); 
    database = new DatabaseController(em); 
    this.collector = collector; 
    } 
+0

это не решит мою проблему –

+0

Почтовый код вашего болта, пожалуйста. – f1sherox

+1

Не создавайте экземпляр полей без сериализации в конструкторе. Этот код должен быть подготовлен. 'emf = Persistence.createEntityManagerFactory ("cassandra_pu"); em = emf.createEntityManager(); ' Я отредактировал исходный ответ. – f1sherox

 Смежные вопросы

  • Нет связанных вопросов^_^