0

Я новичок в многопроцессорности в python. Я извлекаю некоторые функции из списка из 70 000 URL-адресов. Я имею их из 2 разных файлов. После процесса извлечения функции я передаю результат в список, а затем в файл CSV.BrokenPipeError: [WinError 109] Труба завершена во время извлечения данных

Код запускается, но затем останавливается с ошибкой. Я попытался поймать ошибку, но она произвела другую.

версия

Python = 3,5

from feature_extractor import Feature_extraction 

import pandas as pd 

from pandas.core.frame import DataFrame 

import sys 

from multiprocessing.dummy import Pool as ThreadPool 

import threading as thread 

from multiprocessing import Process,Manager,Array 

import time 

class main(): 

lst = None 

def __init__(self): 
    manager = Manager() 
    self.lst = manager.list() 
    self.dostuff() 
    self.read_lst() 

def feature_extraction(self,url): 
     if self.lst is None: 
      self.lst = [] 

     features = Feature_extraction(url) 
     self.lst.append(features.get_features()) 
     print(len(self.lst)) 



def Pool(self,url): 
     pool = ThreadPool(8) 
     results = pool.map(self.feature_extraction, url) 

def dostuff(self): 
    df = pd.read_csv('verified_online.csv',encoding='latin-1') 
    df['label'] = df['phish_id'] * 0 
    mal_urls = df['url'] 

    df2 = pd.read_csv('new.csv') 
    df2['label'] = df['phish_id']/df['phish_id'] 
    ben_urls = df2['urls'] 
    t = Process(target=self.Pool,args=(mal_urls,)) 
    t2 = Process(target=self.Pool,args=(ben_urls,)) 
    t.start() 
    t2.start() 
    t.join() 
    t2.join 

def read_lst(self): 
    nw_df = DataFrame(list(self.lst)) 

    nw_df.columns = ['Redirect count','ssl_classification','url_length','hostname_length','subdomain_count','at_sign_in_url','exe_extension_in_request_url','exe_extension_in_landing_url', 
         'ip_as_domain_name','no_of_slashes_in requst_url','no_of_slashes_in_landing_url','no_of_dots_in_request_url','no_of_dots_in_landing_url','tld_value','age_of_domain', 
         'age_of_last_modified','content_length','same_landing_and_request_ip','same_landing_and_request_url'] 
    frames = [df['label'],df2['label']] 
    new_df = pd.concat(frames) 
    new_df = new_df.reset_index() 
    nw_df['label'] = new_df['label'] 
    nw_df.to_csv('dataset.csv', sep=',', encoding='latin-1') 

if __name__ == '__main__': 



start_time = time.clock() 
try: 
    main() 

except BrokenPipeError: 
    print("broken pipe....") 
    pass 

print (time.clock() - start_time, "seconds") 

Traceback Ошибка

Process Process-3: 
Traceback (most recent call last): 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\connection.py", line 312, in _recv_bytes 
    nread, err = ov.GetOverlappedResult(True) 
BrokenPipeError: [WinError 109] The pipe has been ended 

During handling of the above exception, another exception occurred: 

Traceback (most recent call last): 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap 
    self.run() 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "H:\Projects\newoproject\src\main.py", line 33, in Pool 
    results = pool.map(self.feature_extraction, url) 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\pool.py", line 260, in map 
    return self._map_async(func, iterable, mapstar, chunksize).get() 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\pool.py", line 608, in get 
    raise self._value 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\pool.py", line 119, in worker 
    result = (True, func(*args, **kwds)) 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\pool.py", line 44, in mapstar 
    return list(map(*args)) 
    File "H:\Projects\newoproject\src\main.py", line 26, in feature_extraction 
    self.lst.append(features.get_features()) 
    File "<string>", line 2, in append 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\managers.py", line 717, in _callmethod 
    kind, result = conn.recv() 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\connection.py", line 250, in recv 
    buf = self._recv_bytes() 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\connection.py", line 321, in _recv_bytes 
    raise EOFError 
EOFError 
+0

Я обрезал неприемлемый материал для попрошайничества с вашего поста; [прочитайте это] (http://meta.stackoverflow.com/q/326569/472495) - все это, спасибо! – halfer

+1

Большое спасибо @halfer –

ответ

0

Мой ответ запаздывает и не решает проблему непосредственно отвечал; но, надеюсь, даст ключ к другим, кто сталкивается с подобными ошибками.

Ошибки, с которыми я столкнулся: BrokenPipeError WinError 109 Труба была окончена & WinError 232 Труба закрывается

Наблюдаемые с Python 36 на Windows 7, если: (1) той же функции асинхронной был отправлен несколько раз, каждый раз с разными экземплярами хранилища многопроцессорных данных, в моем случае Queue() (0) AND (2) ссылки на очереди были сохранены в короткий срок локальные переменные в огибающей функции.

Ошибки произошли, несмотря на то, что очереди, совместно используемые с успешно созданными и исполняемыми асинхронными функциями, имели элементы и все равно будут активно использоваться (put() & get()) в момент исключения.

Ошибка была последовательной, когда тот же async_func был вызван во второй раз со вторым экземпляром очереди. Сразу после функции apply_async() этой функции соединение с 1-й очередью, переданное в async_func в первый раз, будет нарушено.

Проблема решена, когда ссылки на очереди были сохранены в неперекрывающихся (например, Queue-list) & переменных продолжительности жизни (таких как переменные, возвращаемые функциям выше в стеке вызовов) в огибающей функции.

 Смежные вопросы

  • Нет связанных вопросов^_^