2017-02-02 7 views
2

Я использую Avro для сериализации объектов, а затем добавляю их в сообщения Kafka, которые будут потребляться и десериализоваться клиентами. Я пробовал несколько разных подходов к сериализации, но ни один из них, похоже, не встраивает схему в поток данных. Вот последняя версия моего кода сериализации. Вы можете увидеть прокомментированные попытки использовать различные доступные авторы.C# Avro Schema, не закодированная потоком данных

public static byte[] Serialize<T>(T recordObj) where T : ISpecificRecord 
    { 
     Log.Info("Serializing {0} object to Avro.", typeof(T)); 
     try 
     { 
      using (var ms = new MemoryStream()) 
      { 
       var encoder = new BinaryEncoder(ms); 
       //var writer = new SpecificDefaultWriter(recordObj.Schema); 
       var writer = new SpecificDatumWriter<T>(recordObj.Schema); 
       //writer.Write(recordObj.Schema, recordObj, encoder); 
       writer.Write(recordObj, encoder); 
       return ms.ToArray(); 
      } 
     } 
     catch (Exception ex) 
     { 
      Log.Error("Failed to Avro serialize object. {0}", ex); 
      return null; 
     } 
    } 

Я не совсем уверен, что еще попробовать.

ответ

1

После того, как вы нашли нужный код в Avro, я обнаружил, что мне нужен FileWriter, но не смог понять, как его создать, поскольку DataFileWriter не имеет открытого конструктора. Оказывается, существует статический метод класса DataFileWriter под названием OpenWriter, который принимает DatumWriter и поток и возвращает DataFileWriter. Теперь приведенный ниже код правильно включает метаданные объекта в поток данных результата.

public static byte[] Serialize<T>(T recordObj) where T : ISpecificRecord 
    { 
     Log.Info("Serializing {0} object to Avro.",typeof(T)); 
     try 
     { 
      using(var ms = new MemoryStream()) 
      { 
       var specDatumWriter = new SpecificDatumWriter<T>(recordObj.Schema); 
       var specDataWriter = Avro.File.DataFileWriter<T>.OpenWriter(specDatumWriter, ms); 
       specDataWriter.Append(recordObj); 
       specDataWriter.Flush(); 
       specDataWriter.Close(); 
       return ms.ToArray(); 
      } 
     } 
     catch(Exception ex) 
     { 
      Log.Error("Failed to Avro serialize object. {0}",ex); 
      return null; 
     } 
    }