Мой DASK dataframe около 120 мм строк и 4 столбца:Ошибка при экспорте DASK dataframe в CSV
df_final.dtypes
cust_id int64
score float64
total_qty float64
update_score float64
dtype: object
и я делаю эту операцию на jupyter ноутбуков, подключенных к Linux машине:
%time df_final.to_csv('/path/claritin-files-*.csv')
и он бросает эту ошибку:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-24-46468ae45023> in <module>()
----> 1 get_ipython().magic(u"time df_final.to_csv('path/claritin-files-*.csv')")
/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
2334 magic_name, _, magic_arg_s = arg_s.partition(' ')
2335 magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2336 return self.run_line_magic(magic_name, magic_arg_s)
2337
2338 #-------------------------------------------------------------------------
/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
2255 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
2256 with self.builtin_trap:
-> 2257 result = fn(*args,**kwargs)
2258 return result
2259
/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)
/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
191 **# but it's overkill for just that one bit of state.**
192 def magic_deco(arg):
--> 193 call = lambda f, *a, **k: f(*a, **k)
194
195 if callable(arg):
/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)
1161 if mode=='eval':
1162 st = clock2()
-> 1163 out = eval(code, glob, local_ns)
1164 end = clock2()
1165 else:
<timed eval> in <module>()
/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_csv(self, filename, **kwargs)
936 """ See dd.to_csv docstring for more information """
937 from .io import to_csv
--> 938 return to_csv(self, filename, **kwargs)
939
940 def to_delayed(self):
/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.pyc in to_csv(df, filename, name_function, compression, compute, get, **kwargs)
411 if compute:
412 from dask import compute
--> 413 compute(*values, get=get)
414 else:
415 return values
/home/mspra/anaconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
177 dsk = merge(var.dask for var in variables)
178 keys = [var._keys() for var in variables]
--> 179 results = get(dsk, keys, **kwargs)
180
181 results_iter = iter(results)
/home/mspra/anaconda2/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 **kwargs)
77
78 # Cleanup pools associated to dead threads
/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs)
491 _execute_task(task, data) # Re-execute locally
492 else:
--> 493 raise(remote_exception(res, tb))
494 state['cache'][key] = res
495 finish_task(dsk, key, state, results, keyorder.get)
**ValueError: invalid literal for long() with base 10: 'total_qty'**
Traceback
---------
File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 268, in execute_task
result = _execute_task(task, data)
File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 55, in pandas_read_text
coerce_dtypes(df, dtypes)
File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 83, in coerce_dtypes
df[c] = df[c].astype(dtypes[c])
File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3054, in astype
raise_on_error=raise_on_error, **kwargs)
File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3189, in astype
return self.apply('astype', dtype=dtype, **kwargs)
File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3056, in apply
applied = getattr(b, f)(**kwargs)
File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 461, in astype
values=values, **kwargs)
File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 504, in _astype
values = _astype_nansafe(values.ravel(), dtype, copy=True)
File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/types/cast.py", line 534, in _astype_nansafe
return lib.astype_intsafe(arr.ravel(), dtype).reshape(arr.shape)
File "pandas/lib.pyx", line 980, in pandas.lib.astype_intsafe (pandas/lib.c:17409)
File "pandas/src/util.pxd", line 93, in util.set_value_at_unsafe (pandas/lib.c:72777)
У меня есть несколько вопросов:
1) В первую очередь этот экспорт работал нормально в пятницу, он выплескивает 100 CSV-файлов (так как он имеет 100 разделов), которые я позже агрегировал. Итак, что сегодня не так - что-нибудь из журнала ошибок?
2) Может быть, этот вопрос предназначен для создателей этого пакета, что является наиболее эффективным с точки зрения времени способом получения csv-экстракта из dask dataframe такого размера, поскольку он занимал от 1,5 до 2 часов, в последний раз он работал.
Я не использую распределенный пакет, и это находится на одном ядре кластера Linux.
Спасибо !!, да, предположил, что раньше, так как я читаю dataframe из формата csv. Не уверен, почему он не читает его правильно. относительно вашего предложения по второму вопросу, это для чтения и письма в паркетном формате (я знаком с паркетным домом). –
Общей причиной является то, что целочисленный столбец имеет некоторые отсутствующие значения, поэтому панда решает, что ему нужно использовать float partway through. Я не понимаю вашего комментария о паркете. – MRocklin
Я имел в виду, когда вы сказали использовать паркет или HDF5, вы имели в виду чтение паркетных файлов для преобразования в dask dataframes, а затем запись в формат паркета вместо формата csv? могут ли файлы csv экспортироваться быстрее (мой фреймворк составляет 130 мм x 4 столбца), если я использую dask, распределенный по кластеру машин? –