2017-01-12 12 views
1

Я просеивал сообщения и форумы, но не смог найти способ добиться этого.Deserialize Двоичные данные массива объектов как элементы доступны

У меня есть массив из 10 000 000 объектов Person. Я отправляю эти объекты по сети с помощью веб-службы Streamed WCF Net.Tcp.

Проблема заключается в том, что я хочу, например, первыми, например, объектами Person 5000 массива, как он поступает и обрабатывает только те. После чего я буду продвигать поток и читать еще 5000 и т. Д.

Мне не удалось найти способ сделать это, поскольку, насколько я могу судить, в C# нет явного размера объектов. Как и в, я не могу просто прочитать первые 312 байта потока и сказать «Да, это первый объект Person. Теперь прочитайте следующие 312 байта, чтобы получить следующего человека».

я в идеале хотел бы использовать Protobuf-Net сериализовать свои объекты, но в .NET BinaryFormatter прекрасно, как хорошо.

Я также открыт для отправки данных в виде кусков, например массивов 5000. Но я хочу сделать это, не открывая новое соединение tcp каждый раз. Если бы был только способ сообщить код, который читает поток: «Хорошо, десериализуйте все, что я только что отправил (массив из 5000), а затем я продолжу писать еще 5000 в поток».

Любые идеи? Спасибо.

+0

С библиотекой Net вы должны прочитать все доступные данные и сохранить их во временном буфере (массив байтов в вашем случае). Если запись составляет 312 байт, вы ждете, пока временный буфер не будет иметь более 312 байт, а затем обработает 312 байта. Затем удалите 312 байта из временного буфера и подождите, пока не получите следующие 312 байта. – jdweng

+0

Что делать, если мой объект имеет свойство string. Разве размер каждого объекта не будет отличаться? – Sal

+0

У вас есть двоичные данные, которые не похожи на классы C# с переменными размерами. Любой объект должен иметь метод определения размера. Ему может предшествовать переменная длины, имеющая фиксированный размер или символ окончания. Если вы отправляете объект класса в C# на TCP, метод serialize создает массив байтов, который вы можете определить размер. Затем добавьте размер к началу данных. На стороне приема прочитайте количество байтов, а затем прочитайте, поместите данные в массив байтов и десериализуйте массив байтов. Это будет работать, если классы сериализации по tx имеют ту же структуру, что и классы на rx. – jdweng

ответ

1

Возможно, для большинства объектов в .NET не существует явного размера, но вы можете найти размер сериализованного объекта. Сначала отправьте размер (в байтах) сериализованного объекта, а затем отправьте сериализованный объект.

// psuedo-code 
byte[] serializedObj = DoSerialization(Person); // we see length on an array 

using (var writer = new StreamWriter(stream)) { 
    writer.Write(serializedObj.Length); 
    stream.Write(serializedObj); 
} 

Вы также можете сделать это оптом, изменив, что и как вы отправляете свои объекты. Вы можете создать List<Person>, добавить N число Person, сериализовать Список и отправить как прежде.

Хотя я не уверен, что если отправка размера перед отправкой данных необходима, это может помочь, когда вы читаете поток, чтобы узнать, сколько байтов вы ожидаете.

+0

Очень простой; Не могу поверить, что я не думал об этом. Я запрограммировал принимающий конец на чтение 4 байтов для длины, затем прочитал длину. Он будет делать это до конца потока. – Sal

1

Вы можете сделать это с помощью protobuf-net, просто используя ObservableCollection<Person> в принимающей системе. Когда коллекция увеличивается на 5000 объектов во время десериализации, удалите и обработайте элементы в обратном вызове ObservableCollection<T>.CollectionChanged. Затем обрабатывайте все оставшиеся элементы в обратном вызове [OnDeserialized].

Например, рассмотрим следующий корневой объект:

[ProtoContract] 
public class RootObject 
{ 
    public RootObject() 
    { 
     this.People = new ObservableCollection<Person>(); 
    } 

    [ProtoMember(1)] 
    public ObservableCollection<Person> People { get; private set; } 

    public event EventHandler<EventArgs<StreamingContext>> OnDeserialized; 

    [OnDeserialized] 
    internal void OnDeserializedMethod(StreamingContext context) 
    { 
     var onDeserialized = OnDeserialized; 
     if (onDeserialized != null) 
      onDeserialized(this, new EventArgs<StreamingContext> { Value = context }); 
    } 
} 

public class EventArgs<T> : EventArgs 
{ 
    public T Value { get; set; } 
} 

у вас есть метод, который вы хотите звонить обрабатывать каждый 5000 Person объектов, поскольку они добавляются в коллекцию, например:

const int ProcessIncrement = 5000; 

void ProcessItems(ICollection<Person> people, bool force) 
{ 
    if (people == null || people.Count == 0) 
     return; 
    if (people.Count >= ProcessIncrement || force) 
    { 
     // Remove and process the items, possibly on a different thread. 
     Console.WriteLine(string.Format("Processing {0} people." people.Count)); 
     people.Clear(); 
    } 
} 

Вы можете предварительно выделить свой RootObject и добавить слушателей с необходимой логикой, и слейте содержимое потока сериализации в корень:

// Allocate a new RootObject 
var newRoot = new RootObject(); 

// Add listeners to process chunks of Person objects as they are added 
newRoot.People.CollectionChanged += (o, e) => 
    { 
     // Process each chunk of 5000. 
     var collection = (ICollection<Person>)o; 
     ProcessItems(collection, false); 
    }; 

newRoot.OnDeserialized += (o, e) => 
    { 
     // Forcibly process any remaining no matter how many. 
     ProcessItems(((RootObject)o).People, true); 
    }; 

// Deserialize from the stream onto the pre-allocated newRoot 
Serializer.Merge(stream, newRoot); 

При необходимости ProcessItems будет вызываться каждый раз, когда объект добавляется в коллекцию, обрабатывая их с шагом 5000, а затем обрабатывая остаток безоговорочно.

Вопрос только в том, действительно ли protobuf-net загружает весь поток в память перед десериализацией коллекции или делает ли она десериализацию потоковой передачи? Как выясняется, последний показывает, как показано в этом sample fiddle, который показывает, что положение потока постепенно увеличивается, поскольку элементы в коллекции People добавляются, обрабатываются и удаляются.

Здесь я добавил слушателей в RootObject перед десериализацией. Если бы вы добавили их в конструктор, вы могли бы использовать ProtoBuf.Serializer.Deserialize<RootObject>(Stream stream) вместо Serializer.Merge на предварительно выделенный объект root, который может быть проще интегрировать в вашу текущую архитектуру.

Кстати, эта техника должна работать с XmlSerializer и Json.NET.

+0

Это отлично подходит для создания структуры данных, которая автоматически записывает мои объекты в поток по мере достижения определенного количества. Но насколько я могу судить, это не решает мою проблему в вопросе: как десериализировать объекты Person на принимающей стороне, 5000 за раз. Решение Шона действительно работало для меня, поэтому я думаю, что я буду использовать это для своей проблемы. Но я определенно использую вашу структуру в качестве базовой линии для создания собственного процесса, который будет поддерживать поток. Большое спасибо! – Sal

 Смежные вопросы

  • Нет связанных вопросов^_^