2017-01-16 20 views
5

У меня есть заголовок и данные, которые мне нужно представлять в одном байт-массиве. И у меня есть конкретный формат для упаковки заголовка в байт-массиве, а также другой формат для упаковки данных в байт-массив. После того, как у меня есть эти два, мне нужно сделать из него последний байт-массив.Пакет заголовка и макета данных в одном массиве байтов с использованием ByteBuffer эффективным способом?

Ниже приведено расположение, как определено на C++, и, соответственно, я должен делать это на Java.

// below is my header offsets layout 

// addressedCenter must be the first byte 
static constexpr uint32_t addressedCenter  = 0; 
static constexpr uint32_t version    = addressedCenter + 1; 
static constexpr uint32_t numberOfRecords  = version + 1; 
static constexpr uint32_t bufferUsed   = numberOfRecords + sizeof(uint32_t); 
static constexpr uint32_t location    = bufferUsed + sizeof(uint32_t); 
static constexpr uint32_t locationFrom   = location + sizeof(CustomerAddress); 
static constexpr uint32_t locationOrigin  = locationFrom + sizeof(CustomerAddress); 
static constexpr uint32_t partition   = locationOrigin + sizeof(CustomerAddress); 
static constexpr uint32_t copy     = partition + 1; 

// this is the full size of the header 
static constexpr uint32_t headerOffset = copy + 1; 

И CustomerAddress является ЬурейеЕ для uint64_t и состоит, как это -

typedef uint64_t CustomerAddress; 

void client_data(uint8_t datacenter, 
       uint16_t clientId, 
       uint8_t dataId, 
       uint32_t dataCounter, 
       CustomerAddress& customerAddress) 
{ 
    customerAddress = (uint64_t(datacenter) << 56) 
        + (uint64_t(clientId) << 40) 
        + (uint64_t(dataId) << 32) 
        + dataCounter; 
} 

и ниже мой макет данных -

// below is my data layout - 
// 
// key type - 1 byte 
// key len - 1 byte 
// key (variable size = key_len) 
// timestamp (sizeof uint64_t) 
// data size (sizeof uint16_t) 
// data (variable size = data size) 

Постановка задачи: -

Теперь для части p roject, я пытаюсь представить общий материал в одном конкретном классе на Java, чтобы я мог просто передать необходимые поля, и он может сделать из меня окончательный Byte Array, из которого будет первый заголовок, а затем данные:

ниже мой DataFrame класс:

public final class DataFrame { 
    private final byte addressedCenter; 
    private final byte version; 
    private final Map<byte[], byte[]> keyDataHolder; 
    private final long location; 
    private final long locationFrom; 
    private final long locationOrigin; 
    private final byte partition; 
    private final byte copy; 

    public DataFrame(byte addressedCenter, byte version, 
     Map<byte[], byte[]> keyDataHolder, long location, long locationFrom, 
     long locationOrigin, byte partition, byte copy) { 
    this.addressedCenter = addressedCenter; 
    this.version = version; 
    this.keyDataHolder = keyDataHolder; 
    this.location = location; 
    this.locationFrom = locationFrom; 
    this.locationOrigin = locationOrigin; 
    this.partition = partition; 
    this.copy = copy; 
    } 

    public byte[] serialize() { 
    // All of the data is embedded in a binary array with fixed maximum size 70000 
    ByteBuffer byteBuffer = ByteBuffer.allocate(70000); 
    byteBuffer.order(ByteOrder.BIG_ENDIAN); 

    int numOfRecords = keyDataHolder.size(); 
    int bufferUsed = getBufferUsed(keyDataHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2; 

    // header layout 
    byteBuffer.put(addressedCenter); // byte 
    byteBuffer.put(version); // byte 
    byteBuffer.putInt(numOfRecords); // int 
    byteBuffer.putInt(bufferUsed); // int 
    byteBuffer.putLong(location); // long 
    byteBuffer.putLong(locationFrom); // long 
    byteBuffer.putLong(locationOrigin); // long 
    byteBuffer.put(partition); // byte 
    byteBuffer.put(copy); // byte 

    // now the data layout 
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) { 
     byte keyType = 0; 
     byte keyLength = (byte) entry.getKey().length; 
     byte[] key = entry.getKey(); 
     byte[] data = entry.getValue(); 
     short dataSize = (short) data.length; 

     ByteBuffer dataBuffer = ByteBuffer.wrap(data); 
     long timestamp = 0; 

     if (dataSize > 10) { 
     timestamp = dataBuffer.getLong(2);    
     }  

     byteBuffer.put(keyType); 
     byteBuffer.put(keyLength); 
     byteBuffer.put(key); 
     byteBuffer.putLong(timestamp); 
     byteBuffer.putShort(dataSize); 
     byteBuffer.put(data); 
    } 
    return byteBuffer.array(); 
    } 

    private int getBufferUsed(final Map<byte[], byte[]> keyDataHolder) { 
    int size = 36; 
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) { 
     size += 1 + 1 + 8 + 2; 
     size += entry.getKey().length; 
     size += entry.getValue().length; 
    } 
    return size; 
    } 
} 

И ниже, как я использую мой выше DataFrame класс:

public static void main(String[] args) throws IOException { 
    // header layout 
    byte addressedCenter = 0; 
    byte version = 1; 

    long location = packCustomerAddress((byte) 12, (short) 13, (byte) 32, (int) 120); 
    long locationFrom = packCustomerAddress((byte) 21, (short) 23, (byte) 41, (int) 130); 
    long locationOrigin = packCustomerAddress((byte) 21, (short) 24, (byte) 41, (int) 140); 

    byte partition = 3; 
    byte copy = 0; 

    // this map will have key as the actual key and value as the actual data, both in byte array 
    // for now I am storing only two entries in this map 
    Map<byte[], byte[]> keyDataHolder = new HashMap<byte[], byte[]>(); 
    for (int i = 1; i <= 2; i++) { 
     keyDataHolder.put(generateKey(), getMyData()); 
    } 

    DataFrame records = 
     new DataFrame(addressedCenter, version, keyDataHolder, location, locationFrom, 
      locationOrigin, partition, copy); 

    // this will give me final packed byte array 
    // which will have header and data in it. 
    byte[] packedArray = records.serialize(); 
    } 

    private static long packCustomerAddress(byte datacenter, short clientId, byte dataId, 
     int dataCounter) { 
    return ((long) (datacenter) << 56) | ((long) clientId << 40) | ((long) dataId << 32) 
     | ((long) dataCounter); 
    } 

как вы можете увидеть в моем DataFrame классе, я выделяющий ByteBuffer с предопределенным размером 70000. Есть ли лучший способ, с помощью которого я могу выделить размер, который я использую, делая ByteBuffer вместо того, чтобы использовать hardcoded 70000?

Также есть лучший способ по сравнению с тем, что я делаю, который упаковывает мой заголовок и данные в один массив байтов? Я также должен убедиться, что он потокобезопасен, поскольку он может быть вызван несколькими потоками.

+1

ByteBuffer не должен быть статичным в многопоточном контексте. –

ответ

1

Есть ли лучший способ, с помощью которого я могу выделить размер я использую, делая ByteBuffer вместо того, чтобы использовать жёстко прописанные 70000?

Существует не менее двух неперекрывающихся подходов. Вы можете использовать оба варианта.

Один буферный пул. Вы должны узнать, сколько буферов вам нужно в пиковые периоды, и использовать максимум над ним, например. max + max/2, max + средний, max + режим, 2 * макс.

import java.nio.ByteBuffer; 
import java.nio.ByteOrder; 
import java.util.concurrent.CompletionStage; 
import java.util.concurrent.LinkedBlockingDeque; 
import java.util.function.Consumer; 
import java.util.function.Function; 

public class ByteBufferPool { 
    private final int bufferCapacity; 
    private final LinkedBlockingDeque<ByteBuffer> queue; 

    public ByteBufferPool(int limit, int bufferCapacity) { 
     if (limit < 0) throw new IllegalArgumentException("limit must not be negative."); 
     if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative."); 

     this.bufferCapacity = bufferCapacity; 
     this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit); 
    } 

    public ByteBuffer acquire() { 
     ByteBuffer buffer = (queue == null) ? null : queue.pollFirst(); 
     if (buffer == null) { 
      buffer = ByteBuffer.allocate(bufferCapacity); 
     } 
     else { 
      buffer.clear(); 
      buffer.order(ByteOrder.BIG_ENDIAN); 
     } 
     return buffer; 
    } 

    public boolean release(ByteBuffer buffer) { 
     if (buffer == null) throw new IllegalArgumentException("buffer must not be null."); 
     if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity."); 
     if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct."); 
     if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only."); 

     return (queue == null) ? false : queue.offerFirst(buffer); 
    } 

    public void withBuffer(Consumer<ByteBuffer> action) { 
     if (action == null) throw new IllegalArgumentException("action must not be null."); 

     ByteBuffer buffer = acquire(); 
     try { 
      action.accept(buffer); 
     } 
     finally { 
      release(buffer); 
     } 
    } 

    public <T> T withBuffer(Function<ByteBuffer, T> function) { 
     if (function == null) throw new IllegalArgumentException("function must not be null."); 

     ByteBuffer buffer = acquire(); 
     try { 
      return function.apply(buffer); 
     } 
     finally { 
      release(buffer); 
     } 
    } 

    public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) { 
     if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null."); 

     ByteBuffer buffer = acquire(); 
     CompletionStage<T> future = null; 
     try { 
      future = asyncFunction.apply(buffer); 
     } 
     finally { 
      if (future == null) { 
       release(buffer); 
      } 
      else { 
       future = future.whenComplete((result, throwable) -> release(buffer)); 
      } 
     } 
     return future; 
    } 
} 

The withBuffer методы позволяют прямо вперед использование бассейна, в то время как acquire и release позволяют отделить приобретение и освобождение точек.

Еще один разделет сериализационный интерфейс, например. put, putInt и putLong, где вы можете реализовать класс подсчета байтов и фактический класс буферизации байтов. Вы должны добавить метод к такому интерфейсу, чтобы узнать, подсчитывает ли сериализатор байты или буферизацию, чтобы избежать ненужного генерации байтов, а также другой метод для увеличения использования байта напрямую, полезно при вычислении размера строки в некотором кодировании без фактической сериализации ,

public interface ByteSerializer { 
    ByteSerializer put(byte value); 

    ByteSerializer putInt(int value); 

    ByteSerializer putLong(long value); 

    boolean isSerializing(); 

    ByteSerializer add(int bytes); 

    int position(); 
} 

 

public class ByteCountSerializer implements ByteSerializer { 
    private int count = 0; 

    @Override 
    public ByteSerializer put(byte value) { 
     count += 1; 
     return this; 
    } 

    @Override 
    public ByteSerializer putInt(int value) { 
     count += 4; 
     return this; 
    } 

    @Override 
    public ByteSerializer putLong(long value) { 
     count += 8; 
     return this; 
    } 

    @Override 
    public boolean isSerializing() { 
     return false; 
    } 

    @Override 
    public ByteSerializer add(int bytes) { 
     if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative."); 

     count += bytes; 
     return this; 
    } 

    @Override 
    public int position() { 
     return count; 
    } 
} 

 

import java.nio.ByteBuffer; 

public class ByteBufferSerializer implements ByteSerializer { 
    private final ByteBuffer buffer; 

    public ByteBufferSerializer(int bufferCapacity) { 
     if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative."); 

     this.buffer = ByteBuffer.allocate(bufferCapacity); 
    } 

    @Override 
    public ByteSerializer put(byte value) { 
     buffer.put(value); 
     return this; 
    } 

    @Override 
    public ByteSerializer putInt(int value) { 
     buffer.putInt(value); 
     return this; 
    } 

    @Override 
    public ByteSerializer putLong(long value) { 
     buffer.putLong(value); 
     return this; 
    } 

    @Override 
    public boolean isSerializing() { 
     return true; 
    } 

    @Override 
    public ByteSerializer add(int bytes) { 
     if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative."); 

     for (int b = 0; b < bytes; b++) { 
      buffer.put((byte)0); 
     } 
     return this; 
     // or throw new UnsupportedOperationException(); 
    } 

    @Override 
    public int position() { 
     return buffer.position(); 
    } 

    public ByteBuffer buffer() { 
     return buffer; 
    } 
} 

В вашем коде, вы могли бы сделать что-то вдоль этих линий (не проверено):

ByteCountSerializer counter = new ByteCountSerializer(); 
dataFrame.serialize(counter); 
ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position()); 
dataFrame.serialize(serializer); 
ByteBuffer buffer = serializer.buffer(); 
// ... write buffer, ?, profit ... 

Ваш DataFrame.serialize метод должен быть реорганизован для принятия ByteSerializer, и в тех случаях, когда он будет генерировать данные, он должен проверить isSerializing, чтобы узнать, должен ли он вычислять размер или фактически писать байты.

Я оставляю комбинацию обоих подходов в качестве упражнения, главным образом потому, что это зависит от того, как вы решите это сделать.

Например, вы можете сделать ByteBufferSerializer использовать бассейн напрямую и сохранить произвольную емкость (например, ваш 70000), вы можете объединить ByteBuffer по емкости (но вместо необходимой мощности используйте наименьшую мощность 2 больше, чем необходимую емкость, и установите предел буфера перед возвратом с acquire), или вы можете пустить ByteBufferSerializer s до тех пор, пока вы добавляете метод reset().

Также есть лучший способ по сравнению с тем, что я делаю, который упаковывает мой заголовок и данные в один массив байтов?

Да. Передайте экземпляр буферизации байта вместо того, чтобы иметь определенные методы возвращать массивы байтов, которые отбрасываются через момент после проверки их длины или их содержимого.

Мне также нужно убедиться, что он потокобезопасен, так как он может быть вызван несколькими потоками.

Пока каждый буфер используется только одним потоком, с надлежащей синхронизацией, вам не нужно беспокоиться.

Правильная синхронизация означает, что ваш менеджер пула получает и освобождает семантику в своих методах, и что если буфер используется несколькими потоками между извлечением и возвратом в пул, вы добавляете семантику выпуска в поток, который останавливается используя буфер и добавляя семантику получения в поток, который начинает использовать буфер.Например, если вы передаете буфер через CompletableFuture с, вам не стоит беспокоиться об этом или если вы прямо сообщаете между потоками с Exchanger или правильной реализацией BlockingQueue.

Из описания пакета java.util.concurrent «ы:

Методы всех классов в java.util.concurrent и его подпакеты распространить эти гарантии синхронизации более высокого уровня. В частности:

  • Действие в потоке до размещения объекта в любую параллельную коллекцию произойти прежде, чем- действий, следующих за доступ или удаление этого элемента из коллекции в другом потоке.

  • Действие в потоке до начала подачи Runnable к Executorпроизойдет, до начала его исполнения. Аналогично для Callables, представленного на ExecutorService.

  • Меры, принимаемые асинхронного вычисления, представленного в Futureпроизойти прежде, чем- действия после извлечения результата через Future.get() в другом потоке.

  • Действие до «рилизинг» метода синхронизатора, такие как Lock.unlock, Semaphore.release и CountDownLatch.countDownпроизойдет, перед тем действия после успешного «приобретения» способа, таким как Lock.lock, Semaphore.acquire, Condition.await и CountDownLatch.await на тот же объект синхронизатора в другом потоке.

  • Для каждой пары нитей, которые успешно обмениваются объекты через Exchanger, действия до exchange() в каждом потоке произойти прежде, чем- те после соответствующего exchange() в другом потоке.

  • действия до вызова CyclicBarrier.await и Phaser.awaitAdvance (а также его варианты) случиться, перед тем действий, выполняемых барьерного действия, и действия, выполняемые под действием барьерной произойдет, перед тем действия после успешного возвращения из соответствующий await в других потоках.

+0

Спасибо за ваше предложение. Можете ли вы представить мне пример для первых двух предложений в вашем ответе, чтобы я мог лучше понять? Сейчас я смущаюсь, как это будет работать. – john

+0

Хорошо, я добавил примеры. – acelent

+0

Я буду очень честным .. Я смог понять часть ваших примеров. Но я не могу понять, как буду использовать ваше предложение в своем коде.Вы упомянули, что вы оставляете это как упражнение для меня, и я боюсь, что не знаю, как я буду интегрироваться с моим. Я всегда работаю с API низкого уровня байтов и занимаюсь байтами, поэтому прошу прощения за это. Мне нужно узнать, как все это выглядит. Если вы можете помочь мне привести пример, как это будет интегрировано с моим, тогда это будет очень полезно. – john

0

Другой способ сделать это будет через DataOutputStream вокруг ByteArrayOutputStream, но вы должны сконцентрироваться на настройке производительности вокруг мест, где это необходимо, и это не один из них. Эффективность здесь не проблема. Сетевой ввод-вывод будет доминировать на порядки.

Другая причина использования ByteArrayOutputStream заключается в том, что вам не нужно заранее гадать размер буфера: он будет расти по мере необходимости.

Чтобы сохранить его в потоковом режиме, используйте только локальные переменные.

+0

Даже если сетевой ввод-вывод менее эффективен на порядки, что делать, если вы реализуете сервер, который должен обрабатывать миллионы одновременно активных подключений? Распределение буфера (и копирование) является одним из виновников производительности на таких серверах, реализованных в Java и .NET. – acelent

+0

@acelent Затем вам нужно много мощности процессора. Но нет ничего особо неэффективного в отношении кода OP, кроме предварительного выделения 7000 байт. – EJP