1

Как установить схему для потоковой передачи DataFrame в PySpark.искрообразование с использованием сокетов, установка SCHEMA, отображение DATAFRAME в консоли

from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode 
from pyspark.sql.functions import split 
# Import data types 
from pyspark.sql.types import * 

spark = SparkSession\ 
    .builder\ 
    .appName("StructuredNetworkWordCount")\ 
    .getOrCreate() 

# Create DataFrame representing the stream of input lines from connection to localhost:5560 
lines = spark\ 
    .readStream\ 
    .format('socket')\ 
    .option('host', '192.168.0.113')\ 
    .option('port', 5560)\ 
    .load() 

Например мне нужна таблица, как:

Name, lastName, PhoneNumber  
Bob, Dylan, 123456  
Jack, Ma, 789456 
.... 

Как я могу установить заголовок/схемы для [ 'Имя', 'Фамилия', 'PhoneNumber'] с их типами данных.

Кроме того, можно ли отображать эту таблицу непрерывно или сказать верхние 20 строк DataFrame. Когда я попробовал это я получаю ошибку

«pyspark.sql.utils.AnalysisException:«Режим Полного выхода не поддерживается, когда нет потокового агрегирования на потоковом DataFrames/наборы данных ;; \ nProject»

ответ

4

TextSocketSource не содержит интегрированных параметров синтаксического анализа. Можно использовать только один из двух форматов:

  • метки времени и текст, если includeTimestamp установлен в true со следующей схемой:

    StructType([ 
        StructField("value", StringType()), 
        StructField("timestamp", TimestampType()) 
    ]) 
    
  • текст только если includeTimestamp установлен в false с схема, как показано ниже:

    StructType([StructField("value", StringType())])) 
    

Если вы хотите изменить этот формат, вам нужно будет преобразовать поток для извлечения интересующих полей, например, с помощью регулярных выражений:

from pyspark.sql.functions import regexp_extract 
from functools import partial 

fields = partial(
    regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$" 
) 

lines.select(
    fields(idx=1).alias("name"), 
    fields(idx=2).alias("last_name"), 
    fields(idx=3).alias("phone_number") 
)