2015-11-16 10 views
4

Я использую библиотеку python geoip2 и pySpark, чтобы получить географический адрес некоторых IP-адресов. Мой код, как:Библиотека python Geoip2 не работает в функции карты pySpark

geoDBpath = 'somePath/geoDB/GeoLite2-City.mmdb' 
geoPath = os.path.join(geoDBpath) 
sc.addFile(geoPath) 
reader = geoip2.database.Reader(SparkFiles.get(geoPath)) 
def ip2city(ip): 
    try: 
     city = reader.city(ip).city.name 
    except: 
     city = 'not found' 
    return city 

Я попытался

print ip2city("128.101.101.101") 

Он работает. Но когда я попытался сделать это в rdd.map:

rdd = sc.parallelize([ip1, ip2, ip3, ip3, ...]) 
print rdd.map(lambda x: ip2city(x)) 

Он сообщил

Traceback (most recent call last): 
    File "/home/worker/software/spark/python/pyspark/rdd.py", line 1299, in take 
    res = self.context.runJob(self, takeUpToNumLeft, p) 
    File "/home/worker/software/spark/python/pyspark/context.py", line 916, in runJob 
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 
    File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main 
    command = pickleSer._read_with_length(infile) 
    File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
TypeError: Required argument 'fileno' (pos 1) not found 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Может ли один подскажите, как сделать функцию ip2city работу в rdd.map(). Спасибо!

ответ

4

Похоже, что проблема с вашим кодом исходит от объекта reader. Он не может быть правильно сериализован как часть закрытия и отправлен рабочим. Чтобы справиться с этим, вы создаете экземпляр для рабочих. Один из способов справиться с этим - использовать mapPartitions:

from pyspark import SparkFiles 

geoDBpath = 'GeoLite2-City.mmdb' 
sc.addFile(geoDBpath) 

def partitionIp2city(iter): 
    from geoip2 import database 

    def ip2city(ip): 
     try: 
      city = reader.city(ip).city.name 
     except: 
      city = 'not found' 
     return city 

    reader = database.Reader(SparkFiles.get(geoDBpath)) 
    return [ip2city(ip) for ip in iter] 

rdd = sc.parallelize(['128.101.101.101', '85.25.43.84']) 
rdd.mapPartitions(partitionIp2city).collect() 

## ['Minneapolis', None] 
+0

что означает «mapPartitions»? Будет ли он создавать новый 'database.Reader' для каждой партии? – avocado

+0

@loganecolss После раздела. – zero323

 Смежные вопросы

  • Нет связанных вопросов^_^