2016-09-21 3 views
1

Я заметил, что в java sdk есть функция, которая позволяет вам писать заголовки csv-файла. https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/TextIO.Write.html#withHeader-java.lang.String-Как добавить заголовки для вывода csv для потока данных потока Apache?

Являются ли эти функции зеркалированными на skyth python?

ответ

2

Теперь вы можете написать текст и указать заголовок, используя текстовую раковину.

Из документации:

class apache_beam.io.textio.WriteToText(file_path_prefix, file_name_suffix='', append_trailing_newlines=True, num_shards=0, shard_name_template=None, coder=ToStringCoder, compression_type='auto', header=None) 

Таким образом, вы можете использовать следующий фрагмент кода:

beam.io.WriteToText(bucket_name, file_name_suffix='.csv', header='colname1, colname2') 

Полная документация доступна здесь, если вы хотите подробностей или проверить, как она реализуется : https://beam.apache.org/documentation/sdks/pydoc/2.0.0/_modules/apache_beam/io/textio.html#WriteToText

0

Эта функция еще не существует в Python SDK

1

Это не реализуется в данный момент. Однако вы можете реализовать/расширить его самостоятельно (см. attached notebook для примера + теста с моей версией apache_beam).

Это основано на note in the docstring суперкласса FileSink, отметив, что вы должны переписать open функцию:

новый класс, который работает для моей версии apache_beam («0.3.0-incubating.dev»):

import apache_beam as beam 
from apache_beam.io import TextFileSink 
from apache_beam.io.fileio import ChannelFactory,CompressionTypes 
from apache_beam import coders 


class TextFileSinkWithHeader(TextFileSink): 
    def __init__(self, 
       file_path_prefix, 
       file_name_suffix='', 
       append_trailing_newlines=True, 
       num_shards=0, 
       shard_name_template=None, 
       coder=coders.ToStringCoder(), 
       compression_type=CompressionTypes.NO_COMPRESSION, 
       header=None): 
     super(TextFileSinkWithHeader, self).__init__(
      file_path_prefix, 
      file_name_suffix=file_name_suffix, 
      num_shards=num_shards, 
      shard_name_template=shard_name_template, 
      coder=coder, 

      compression_type=compression_type, 
      append_trailing_newlines=append_trailing_newlines) 
     self.header = header 

    def open(self, temp_path): 
     channel_factory = ChannelFactory.open(
      temp_path, 
      'wb', 
      mime_type=self.mime_type) 
     channel_factory.write(self.header+"\n") 
     return channel_factory 

Вы можете впоследствии использовать его следующим образом:

beam.io.Write(TextFileSinkWithHeader('./names_w_headers',header="names")) 

См the notebook для полного обзора.

 Смежные вопросы

  • Нет связанных вопросов^_^