2014-10-05 3 views
1

Основываясь на моем предыдущем вопросе Spark and Python use custom file format/generator as input for RDD Я думаю, что я должен разбираться в основном с любым вводом sc.textFile(), а затем использовать мои или из некоторых пользовательских функций библиотеки.Spark и Python пытаются разобрать википедию с помощью gensim

Теперь я особенно стараюсь разбирать википедии с использованием gensim framework. Я уже установил gensim на мой главный узел и все мои рабочие узлы, и теперь я хотел бы использовать функцию gensim build in для анализа страниц wikipedia, вдохновленных этим вопросом List (or iterator) of tuples returned by MAP (PySpark).

Мой код выглядит следующим образом:

import sys 
import gensim 
from pyspark import SparkContext 


if __name__ == "__main__": 
    if len(sys.argv) != 2: 
     print >> sys.stderr, "Usage: wordcount <file>" 
     exit(-1) 

    sc = SparkContext(appName="Process wiki - distributed RDD") 

    distData = sc.textFile(sys.argv[1]) 
    #take 10 only to see how the output would look like 
    processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) 

    print processed_data 
    sc.stop() 

Исходный код extract_pages можно найти на https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py и на основе моей поездке через это, кажется, что он должен работать с искрой.

Но, к сожалению, когда я запускаю код я получаю следующее журнал ошибок:

14/10/05 13:21:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <ip address>.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/root/spark/python/pyspark/worker.py", line 79, in main 
serializer.dump_stream(func(split_index, iterator), outfile) 
File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream 
self.serializer.dump_stream(self._batched(iterator), stream) 
File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream 
for obj in iterator: 
File "/root/spark/python/pyspark/serializers.py", line 185, in _batched 
for item in iterator: 
File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft 
yield next(iterator) 
File "/usr/lib64/python2.6/site-packages/gensim/corpora/wikicorpus.py", line 190, in extract_pages 
elems = (elem for _, elem in iterparse(f, events=("end",))) 
File "<string>", line 52, in __init__ 
IOError: [Errno 2] No such file or directory: u'<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.9/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.9/ http://www.mediawiki.org/xml/export-0.9.xsd" version="0.9" xml:lang="en">' 
    org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) 
    org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) 
    org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) 
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    org.apache.spark.scheduler.Task.run(Task.scala:54) 
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    java.lang.Thread.run(Thread.java:745) 

И тогда некоторые, вероятно, Искробезопасная журнал:

14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296 

и

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Я вы пробовали это без Spark успешно, поэтому проблема должна быть где-то в сочетании Spark и gensim, но я не очень понять ошибку, которую я получаю. В строке 190 gensim wikicorpus.py я не вижу никаких файлов.

EDIT:

Добавлено еще несколько журналов из Спарк:

EDIT2:

gensim использует из xml.etree.cElementTree import iterparse, документации here, что может вызвать проблемы. Он фактически ожидает имя файла или файл, содержащий данные xml. Может ли RDD рассматриваться как файл, содержащий данные xml?

+0

«IOError: [Errno 2] Нет такого файла или каталога» - не вы смогли отследить, где эта ошибка идет от? Можете ли вы получить более простую версию этой работы на обычном Python (т. Е. Нет PySpark)? –

+0

Мне удалось запустить более простую версию на обычном Python без PySpark. И, к сожалению, единственная дальнейшая идея, где возможно, может быть проблемой, - в Edit2. – ziky90

+0

'flatMap' ожидает, что' extact_pages' вернет список. Я не знаю, будет ли это работать с генератором. Вы пытались обернуть вывод 'extract_data' с помощью' list() 'в лямбда-функцию? –

ответ

1

Обычно я работаю с Spark в Scala. Тем не менее, вот мои мысли:

Когда вы загружаете файл через sc.textFile, это своего рода линейный итератор, который распределяется между вашими искрями. Я думаю, что с учетом XML-файла википедии одна строка не обязательно соответствует анализируемому элементу xml, и, таким образом, вы получаете эту проблему.

т.е:

Line 1 : <item> 
Line 2 : <title> blabla </title> <subitem> 
Line 3 : </subItem> 
Line 4 : </item> 

При попытке разобрать каждую строку по себе, он будет выплевывать исключения, как те, которые вы получили.

Мне обычно приходится возиться с дампом википедии, поэтому прежде всего я должен превратить его в «READable version», который легко усваивается Spark. i.e: одна строка для каждой статьи. Как только у вас есть это, вы можете легко накормить его в искру и сделать все виды обработки. Это не займет много ресурсов, чтобы превратить его

Взгляните на ReadableWiki: https://github.com/idio/wiki2vec