2012-09-13 2 views
12

Что я хотел бы сделать, это создать своего рода «трубу» (например, канал между процессами), но между C++ iostreams в рамках одной и той же программы. У меня есть функция, которая требует входного потока в качестве аргумента, но мои данные поступают из потока вывода. Так есть стандартный способ для вывода вывода std::ostream на вход std::istream?C++ подключить выходной поток к потоку ввода

+2

Соответствует ли std :: stringstream вашей потребности? Если нет, объясните, почему. – AProgrammer

+1

Существует iostream (обратите внимание, что в начале он имеет 'i' и' o'). Вы накачиваете данные на одном конце и заканчиваете его. Это то, что вы хотите. –

+0

-1 вопрос не указан. –

ответ

13

Вы можете создать std::streambuf, где выходные данные поступают в один буфер и std::overflow() блокируются, когда буфер заполняется. На другом конце у вас будет входной буфер, который блокируется на underflow(), когда буфер становится пустым. Очевидно, что чтение и письмо были бы в двух разных потоках.

Сложное дело - синхронизировать два буфера: потоки не используют никаких операций синхронизации при доступе к буферам. Только когда вызывается любая из виртуальных функций, вы можете перехватить операцию и выполнить синхронизацию. С другой стороны, не использовать буфер довольно неэффективно. Способ решения этой проблемы заключается в использовании относительно небольшого выходного буфера (например, 256 char с), а также переопределить sync(), чтобы использовать эту функцию для передачи символов во входной буфер. streambuf будет использовать мьютекс для синхронизации и переменную условия для блокировки на полном входном буфере на выходе и пустой входной буфер на входе. Чтобы поддерживать чистое выключение, также должна быть функция, устанавливающая флаг, что больше не поступает вход, и все дальнейшие операции вывода должны завершиться неудачно.

Создание фактической реализации показывает, что двух буферов недостаточно: потоки, обращающиеся к входному и выходному буферам, могут быть активны, когда соответствующие другие блоки блокируют. Таким образом, необходим третий промежуточный буфер. С этим небольшим изменением к вышеуказанному плану ниже приведен некоторый код (он использует крошечные буферы, чтобы убедиться, что есть фактические переполнения и потоки, для реального использования, по крайней мере, входной буфер должен быть, вероятно, больше).

// threadbuf.cpp              -*-C++-*- 
// ---------------------------------------------------------------------------- 
// Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de   
//                  
// Permission is hereby granted, free of charge, to any person   
// obtaining a copy of this software and associated documentation  
// files (the "Software"), to deal in the Software without restriction, 
// including without limitation the rights to use, copy, modify,   
// merge, publish, distribute, sublicense, and/or sell copies of   
// the Software, and to permit persons to whom the Software is   
// furnished to do so, subject to the following conditions:    
//                  
// The above copyright notice and this permission notice shall be  
// included in all copies or substantial portions of the Software.  
//                  
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,  
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES  
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND    
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT   
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,   
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR   
// OTHER DEALINGS IN THE SOFTWARE. 
// ---------------------------------------------------------------------------- 


#include <algorithm> 
#include <condition_variable> 
#include <iostream> 
#include <mutex> 
#include <stdexcept> 
#include <streambuf> 
#include <string> 
#include <thread> 

// ---------------------------------------------------------------------------- 

class threadbuf 
    : public std::streambuf 
{ 
private: 
    typedef std::streambuf::traits_type traits_type; 
    typedef std::string::size_type  string_size_t; 

    std::mutex    d_mutex; 
    std::condition_variable d_condition; 
    std::string    d_out; 
    std::string    d_in; 
    std::string    d_tmp; 
    char*     d_current; 
    bool     d_closed; 

public: 
    threadbuf(string_size_t out_size = 16, string_size_t in_size = 64) 
     : d_out(std::max(string_size_t(1), out_size), ' ') 
     , d_in(std::max(string_size_t(1), in_size), ' ') 
     , d_tmp(std::max(string_size_t(1), in_size), ' ') 
     , d_current(&this->d_tmp[0]) 
     , d_closed(false) 
    { 
     this->setp(&this->d_out[0], &this->d_out[0] + this->d_out.size() - 1); 
     this->setg(&this->d_in[0], &this->d_in[0], &this->d_in[0]); 
    } 
    void close() 
    { 
     { 
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      this->d_closed = true; 
      while (this->pbase() != this->pptr()) { 
       this->internal_sync(lock); 
      } 
     } 
     this->d_condition.notify_all(); 
    } 

private: 
    int_type underflow() 
    { 
     if (this->gptr() == this->egptr()) 
     { 
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      while (&this->d_tmp[0] == this->d_current && !this->d_closed) { 
       this->d_condition.wait(lock); 
      } 
      if (&this->d_tmp[0] != this->d_current) { 
       std::streamsize size(this->d_current - &this->d_tmp[0]); 
       traits_type::copy(this->eback(), &this->d_tmp[0], 
            this->d_current - &this->d_tmp[0]); 
       this->setg(this->eback(), this->eback(), this->eback() + size); 
       this->d_current = &this->d_tmp[0]; 
       this->d_condition.notify_one(); 
      } 
     } 
     return this->gptr() == this->egptr() 
      ? traits_type::eof() 
      : traits_type::to_int_type(*this->gptr()); 
    } 
    int_type overflow(int_type c) 
    { 
     std::unique_lock<std::mutex> lock(this->d_mutex); 
     if (!traits_type::eq_int_type(c, traits_type::eof())) { 
      *this->pptr() = traits_type::to_char_type(c); 
      this->pbump(1); 
     } 
     return this->internal_sync(lock) 
      ? traits_type::eof() 
      : traits_type::not_eof(c); 
    } 
    int sync() 
    { 
     std::unique_lock<std::mutex> lock(this->d_mutex); 
     return this->internal_sync(lock); 
    } 
    int internal_sync(std::unique_lock<std::mutex>& lock) 
    { 
     char* end(&this->d_tmp[0] + this->d_tmp.size()); 
     while (this->d_current == end && !this->d_closed) { 
      this->d_condition.wait(lock); 
     } 
     if (this->d_current != end) 
     { 
      std::streamsize size(std::min(end - d_current, 
              this->pptr() - this->pbase())); 
      traits_type::copy(d_current, this->pbase(), size); 
      this->d_current += size; 
      std::streamsize remain((this->pptr() - this->pbase()) - size); 
      traits_type::move(this->pbase(), this->pptr(), remain); 
      this->setp(this->pbase(), this->epptr()); 
      this->pbump(remain); 
      this->d_condition.notify_one(); 
      return 0; 
     } 
     return traits_type::eof(); 
    } 
}; 

// ---------------------------------------------------------------------------- 

static void writer(std::ostream& out) 
{ 
    for (std::string line; std::getline(std::cin, line);) 
    { 
     out << "writer: '" << line << "'\n"; 
    } 
} 

// ---------------------------------------------------------------------------- 

static void reader(std::istream& in) 
{ 
    for (std::string line; std::getline(in, line);) 
    { 
     std::cout << "reader: '" << line << "'\n"; 
    } 
} 

// ---------------------------------------------------------------------------- 

int main() 
{ 
    try 
    { 
     threadbuf sbuf; 
     std::ostream out(&sbuf); 
     std::istream in(&sbuf); 

     std::thread write(&::writer, std::ref(out)); 
     std::thread read(&::reader, std::ref(in)); 

     write.join(); 
     sbuf.close(); 
     read.join(); 
    } 
    catch (std::exception const& ex) 
    { 
     std::cerr << "ERROR: " << ex.what() << "\n"; 
    } 
} 
+1

+1 для усилий; ОП, безусловно, искал гораздо более быстрое и легкое решение. – Walter

+0

Ну, используя код выше ** ** быстро для snybody, но я :) Я видел подобный запрос раньше, т. Е. Он может быть полезен и для других. ... и это было интересное упражнение, чтобы на самом деле реализовать то, о чем я только что говорил. Наконец: я не знаю о более легком решении! –

+0

Вы перевернутая легенда Дитмар. Я использую это в модульном тесте, и это работает. Благодарю. – MattSmith

 Смежные вопросы

  • Нет связанных вопросов^_^