1

У меня есть метки времени набора данных, который находится в форматеPySpark ошибка: AttributeError: объект «NoneType» не имеет атрибута «_jvm»

И я написал UDF в pyspark для обработки этого набора данных и возвращает в качестве карты ключевых значений , Но я становлюсь ниже сообщения об ошибке.

Dataset: df_ts_list

+--------------------+ 
|    ts_list| 
+--------------------+ 
|[1477411200, 1477...| 
|[1477238400, 1477...| 
|[1477022400, 1477...| 
|[1477224000, 1477...| 
|[1477256400, 1477...| 
|[1477346400, 1476...| 
|[1476986400, 1477...| 
|[1477321200, 1477...| 
|[1477306800, 1477...| 
|[1477062000, 1477...| 
|[1477249200, 1477...| 
|[1477040400, 1477...| 
|[1477090800, 1477...| 
+--------------------+ 

Pyspark UDF:

>>> def on_time(ts_list): 
...  import sys 
...  import os 
...  sys.path.append('/usr/lib/python2.7/dist-packages') 
...  os.system("sudo apt-get install python-numpy -y") 
...  import numpy as np 
...  import datetime 
...  import time 
...  from datetime import timedelta 
...  ts = np.array(ts_list) 
...  if ts.size == 0: 
...    count = 0 
...    duration = 0 
...    st = time.mktime(datetime.now()) 
...    ymd = str(datetime.fromtimestamp(st).date()) 
...  else: 
...    ts.sort() 
...    one_tag = [] 
...    start = float(ts[0]) 
...    for i in range(len(ts)): 
...      if i == (len(ts)) - 1: 
...        end = float(ts[i]) 
...        a_round = [start, end] 
...        one_tag.append(a_round) 
...      else: 
...        diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i]))) 
...        if abs(diff.total_seconds()) > 3600: 
...          end = float(ts[i]) 
...          a_round = [start, end] 
...          one_tag.append(a_round) 
...          start = float(ts[i+1]) 
...    one_tag = [u for u in one_tag if u[1] - u[0] > 300] 
...    count = int(len(one_tag)) 
...    duration = int(np.diff(one_tag).sum()) 
...    ymd = str(datetime.datetime.fromtimestamp(time.time()).date()) 
...  return {'count':count,'duration':duration, 'ymd':ymd} 

Pyspark код:

>>> on_time=udf(on_time, MapType(StringType(),StringType())) 
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show() 

Ошибка:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda> 
    func = lambda _, it: map(mapper, it) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda> 
    mapper = lambda a: udf(*a) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda> 
    return lambda *a: f(*a) 
    File "<stdin>", line 27, in on_time 
    File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _ 
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) 
AttributeError: 'NoneType' object has no attribute '_jvm' 

Любая помощь будет оценена!

ответ

5

В сообщении об ошибке говорится, что в 27-й строке udf вы вызываете некоторые функции pyspark sql. Это строка с abs(), поэтому я полагаю, что где-то выше вы вызываете from pyspark.sql.functions import * и она переопределяет функцию abs() python.

0

Ответ Mariusz на самом деле мне не помог. Поэтому, если вам нравится, что я нашел это, потому что это единственный результат в google, и вы новичок в pyspark (и искры в целом), вот что сработало для меня.

В моем случае я получал эту ошибку, потому что я пытался выполнить код pyspark до того, как была создана среда pyspark.

Убедившись, что pyspark был доступен и настроен перед выполнением вызовов в зависимости от pyspark.sql.functions, исправил проблему для меня.