2016-05-11 2 views
2

Если необходимо прочитать данные от SqlServer в потоковом режиме, для этого есть некоторые возможности. Такие, как с помощью SqlDataReader с CommandBehavior.SequentialAccess, и, в частности, когда необходимо получить доступ двоичные данные столбца существует GetStream(int) метод, что:Поставка потока в качестве источника данных для двоичного столбца при использовании SqlBulkCopy

var cmd = new SqlCommand(); 
cmd.Connection = connection; 
cmd.CommandText = @"select 0xas Data"; 

using (var dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) 
{ 
    dr.Read(); 

    var stream = dr.GetStream(0); 
    // access stream 
} 

Но что потоковой передачи данных в противоположном направлении, когда нужно датафид к SqlServer с использованием SqlBulkCopy, и особенно если поток должен быть предоставлен в качестве источника данных для двоичного столбца?

Я попытался следующие

var cmd2 = new SqlCommand(); 
cmd2.Connection = connection; 
cmd2.CommandText = @"create table #Test (ID int, Data varbinary(max))"; 
cmd2.ExecuteNonQuery(); 

using (SqlBulkCopy sbc = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null)) 
{ 
    sbc.DestinationTableName = "#Test"; 
    sbc.EnableStreaming = true; 

    sbc.ColumnMappings.Add(0, "ID"); 
    sbc.ColumnMappings.Add(1, "Data"); 

    sbc.WriteToServer(new TestDataReader()); 
} 

Где TestDataReader реализует IDataReader следующим образом:

class TestDataReader : IDataReader 
{ 
    public int FieldCount { get { return 2; } } 
    int rowCount = 1; 
    public bool Read() { return (rowCount++) < 3; } 
    public bool IsDBNull(int i) { return false; } 

    public object GetValue(int i) 
    { 
     switch (i) 
     { 
      case 0: return rowCount; 
      case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 }; 
      default: throw new Exception(); 
     } 
    } 

    //the rest members of IDataReader 
} 

, и она работала, как ожидалось.

Однако изменение

case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 }; 

к

case 1: return new MemoryStream(new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 }); 

вызвало исключение System.InvalidOperationException с сообщением

Данное значение типа MemoryStream из источника данных, не может быть преобразован в тип VARBINARY указанного целевого столбца.

Есть ли способ, чтобы поставить Stream от IDataReader (или, возможно, DbDataReader), чтобы SqlBulkCopy в качестве источника данных для двоичного столбца, без копирования всех данных в памяти (массив байт) в первую очередь?

+0

См сайт: https://msdn.microsoft.com/en-us/library/system.data.sqlclient.sqlbulkcopy (v = vs.110) .aspx – jdweng

+0

Вам интересно, как это сделать с помощью пользовательского IDataReader, или на самом деле вы будете использовать какой-то существующий считыватель данных (например, SqlDataReader)? – Evk

+0

@Evk с пользовательским 'IDataReader' или' DbDataReader' (я использую собственную реализацию, которая передает данные из двоичных/xml-файлов, однако достаточно фиктивной реализации, как в моем вопросе). –

ответ

3

Не уверен, что это документально в любом месте, но если сделать короткий осмотр SqlBulkCopy исходного кода вы можете узнать, что он рассматривает разные читатель данных по-разному. Во-первых, SqlBulkCopy поддерживает потоковое вещание и GetStream, но вы можете заметить, что интерфейс IDataReader не содержит GetStream метода. Поэтому, когда вы загружаете пользовательскую реализацию IDataReader в SqlBulkCopy - она ​​не будет обрабатывать двоичные столбцы как потоковые и не будет принимать значения Stream.

С другой стороны - DbDataReader. есть этот способ. Если вы подаете SqlBulkCopy экземпляр класса DbDataReader - унаследованный класс - он будет обрабатывать все двоичные столбцы потоковым способом, и он назовет DbDataReader.GetStream.

Так, чтобы исправить вашу проблему - наследоваться от DbDataReader как это:

class TestDataReader : DbDataReader 
{ 
    public override bool IsDBNull(int ordinal) { 
     return false; 
    } 

    public override int FieldCount { get; } = 2; 
    int rowCount = 1; 

    public override bool HasRows { get; } = true; 
    public override bool IsClosed { get; } = false; 

    public override bool Read() 
    { 
     return (rowCount++) < 3; 
    } 

    public override object GetValue(int ordinal) { 
     switch (ordinal) { 
      // do not return anything for binary column here - it will not be called 
      case 0: 
       return rowCount; 
      default: 
       throw new Exception(); 
     } 
    } 

    public override Stream GetStream(int ordinal) { 
     // instead - return your stream here 
     if (ordinal == 1) 
      return new MemoryStream(new byte[] {0x01, 0x23, 0x45, 0x67, 0x89}); 
     throw new Exception(); 
    } 
    // bunch of irrelevant stuff 

} 
+0

Хорошо, я нашел, почему «GetStream» не получил вызов в моей второй попытке реализации ридера (тот, который унаследован от «DbDataReader», а не от «IDataReader»). Вероятно, играя с кодом в разных направлениях, я ввел ошибку (я изменил целевой столбец на 'varbinary (100)' вместо 'varbinary (max)' в какой-то момент). Ваш код в сочетании с кодом из вопроса работал нормально. Отлично, что это возможно! Спасибо. –

0

Смотреть следующий код

static int SendOrders(int totalToSend) 
    { 
     using (SqlConnection con = new SqlConnection(connectionString)) 
     { 
     con.Open(); 
     using (SqlTransaction tran = con.BeginTransaction()) 
     { 
      var newOrders = 
        from i in Enumerable.Range(0, totalToSend) 
        select new Order 
        { 
        customer_name = "Customer " + i % 100, 
        quantity = i % 9, 
        order_id = i, 
        order_entry_date = DateTime.Now 
        }; 

      SqlBulkCopy bc = new SqlBulkCopy(con, 
      SqlBulkCopyOptions.CheckConstraints | 
      SqlBulkCopyOptions.FireTriggers | 
      SqlBulkCopyOptions.KeepNulls, tran); 

      bc.BatchSize = 1000; 
      bc.DestinationTableName = "order_queue"; 
      bc.WriteToServer(newOrders.AsDataReader()); 

      tran.Commit(); 
     } 
     con.Close(); 

     } 

     return totalToSend; 

    } 
+1

Извините, но это не представляется нам полезным. У меня уже есть образец использования 'SqlBulkCopy' в моем вопросе. Речь идет о деталях этого использования. В частности, возможно ли передавать данные «Stream» в двоичный столбец через «IDataReader» + 'SqlBulkCopy' без копирования всех данных потока в память в первую очередь. –