2017-01-22 10 views
0

Я работаю над заданием PySpark с большими данными в формате ниже.Count Duplicates Значения в течение интервала времени в PySpark

ID-1234567 iplong agent partner client country timestamp category reference 

Мне нужно найти среднее количество повторяющихся записей, основываясь на колонках 2(iplong), 3(agent), 5(client), 6(country), 9(reference) в течение одной минуты с интервалом времени для каждого партнера.

Я понимаю, что мне нужно

  1. Разделить записей в один-минутными интервалами.
  2. Карта все по partner
  3. Group все по partner
  4. Сокращение каждого интервала по количеству от общего числа записей и подсчета различных записей и принять разницу, чтобы получить количество повторяющихся записей (Также нужно определить функцию сравнения двух записывается только со значениями 2(iplong), 3(agent), 5(client), 6(country), 9(reference) columns.)
  5. Добавить all the partner со всех интервалов и совпадений в двух экземплярах. И разделите на счет их появления.

Я понимаю этот процесс, но не точную реализацию в pyspark.

Может кто-нибудь, пожалуйста, помогите мне с выполнением любого из вышеуказанных шагов в pyspark.

Образец данных:

9794474 1000460030 Samsung_S5233 dv4gs dswae in 2012-03-08 00:00:00 mg riflql2a0yv8xoa9sq0recx4x 
9794471 3386480130 Nokia_C3-00 duq7h dr75h py 2012-03-08 00:00:00 co 
9794468 1907980030 Nokia_5233 dv6i3 ds3xq vn 2012-03-08 00:00:00 es gp53lqr9njqd6z2ap5d364sip 
9794467 1791990020 MAUI duxto dvb8g in 2012-03-08 00:00:00 ad 
9794466 1791000060 Nokia_3110c dusg4 dvb8g in 2012-03-08 00:00:00 ad 
9794477 1353590020 Blackberry_9300 du6dt dtr0u es 2012-03-08 00:00:00 es h5njsswvxorsau9u8fxh0e9se 
9794478 1402290050 NokiaC6-01.3 dusnc dsgcn ru 2012-03-08 00:00:00 mc 
9794481 1848749950 Nokia_C3-00 dvry3 dr6sg th 2012-03-08 00:00:01 mc oj0rekb51pvirnjuqjt10zn4b 

Update:

До сих пор я пытался положить все данные в MySQL и чтение из него. Но в операциях чтения требуется слишком много времени.

Для подхода mapreduce я пробовал разные вещи. Но не понимаю, как я буду подходить к этому дальше в коде. Следовательно, не в состоянии двигаться вперед с одним подходом.

clicks_rdd = sc.parallelize(list(clicks_reader)[1:]) 
minwise_clicks = clicks_rdd.groupby(clicks_rdd.index.map(lambda t: t.minute)) # Didn't work 
clicks_mapped_publishers = clicks_rdd.map(lambda x : (x.pop(3), x)) # Works fine but need the records divided into minute intervals first. 

Пробовал и другие вещи здесь и там. Но ничего сплошного.

Ниже перечислены первые 25 записей моего первоначального файла набора данных.

id,iplong,agent,partnerid,cid,cntr,timeat,category,referer 
9794476,1071324855,SonyEricsson_K70,dv3va,dsfag,us,2012-03-08 00:00:00.0,ad, 
9794474,1000461055,Samsung_S5233,dv4gs,dswae,in,2012-03-08 00:00:00.0,mg,riflql2a0yv8xoa9sq0recx4x 
9794471,3386484265,Nokia_C3-00,duq7h,dr75h,py,2012-03-08 00:00:00.0,co, 
9794468,1907981997,Nokia_5233,dv6i3,ds3xq,vn,2012-03-08 00:00:00.0,es,gp53lqr9njqd6z2ap5d364sip 
9794467,1791989091,MAUI,duxto,dvb8g,in,2012-03-08 00:00:00.0,ad, 
9794466,1791002478,Nokia_3110c,dusg4,dvb8g,in,2012-03-08 00:00:00.0,ad, 
9794477,1353590316,Blackberry_9300,du6dt,dtr0u,es,2012-03-08 00:00:00.0,es,h5njsswvxorsau9u8fxh0e9se 
9794478,1402285217,NokiaC6-01.3,dusnc,dsgcn,ru,2012-03-08 00:00:00.0,mc, 
9794481,1848747204,Nokia_C3-00,dvry3,dr6sg,th,2012-03-08 00:00:01.0,mc,oj0rekb51pvirnjuqjt10zn4b 
9794482,1893182670,NokiaC2-03,du77a,dr6x2,id,2012-03-08 00:00:01.0,co,r63f8uhijvr2irvka3glwyb38 
9794483,1912930086,MAUI,dvwdj,dvb8g,id,2012-03-08 00:00:01.0,ad, 
9794485,2098816838,GT-S5360B,dvjtq,dr72e,th,2012-03-08 00:00:01.0,co, 
9794486,3309473440,MAUI,dv6i3,ds3k0,za,2012-03-08 00:00:01.0,es, 
9794492,702295934,Nokia_9300,dv6i3,dtqrw,ng,2012-03-08 00:00:01.0,es,onbw7na2mi8a62g4p6y3av2qt 
9794493,694135362,Nokia_N95,dupgf,dvb8g,sd,2012-03-08 00:00:01.0,ad,hoq05psulkszxm4izlql4g962 
9794495,1791428359,Samsung_S8300,dvpo7,dvb8g,in,2012-03-08 00:00:02.0,co,im387req0zp1ucygamhgadgtm 
9794496,1783607271,GT-S5570,du56s,dsgq2,in,2012-03-08 00:00:02.0,mc,immfap8948rebeym8ri0vf5cr 
9794498,1860189232,Samsung_GT-B3313,du56s,ds22r,in,2012-03-08 00:00:02.0,mc,r81nrzjemr5jrfvjjeoxmdm4y 
9794499,1868310973,Nokia_2730c,dv3va,drvnr,au,2012-03-08 00:00:02.0,ad, 
9794500,1893182511,Nokia_5233,dv6i7,dr6tn,id,2012-03-08 00:00:02.0,co,tq09jycwii12iul7hzalucue3 
9794501,1884230403,Samsung_GT-S3653,dvjil,ds92x,in,2012-03-08 00:00:02.0,mc,h0z1j3bwiverubvwg851e9eon 
9794503,1945382244,GT-S5360,dvijt,dsgq2,in,2012-03-08 00:00:02.0,mc,fbbenjzmoe0oc7x4e2080nj8x 
9794508,2928534854,Samsung_R310,dunsq,dsg3q,us,2012-03-08 00:00:02.0,ad,kl9j183hop90uwq2p82iidjsb 
9794510,3063717709,Samsung_GT-S3653,dvjjf,dr751,in,2012-03-08 00:00:02.0,ad,rpdt9h4kpooxiedeuuxvk6gi5 
9794511,3557769762,Samsung_C3050,du53k,dr71b,hr,2012-03-08 00:00:02.0,se, 

Update 2

Пример вывода. Это формат разделенных вкладками. Вы можете скопировать и вставить его в Excel для правильного просмотра. Здесь avg_spiky_ReAgCnIpCi - средний подсчет голосов reference,, Country, IP, Client комбинация повторяющихся каждую секунду. Который меня интересует. И затем я могу внести изменения, чтобы получить другие функции.

partnerid status avg_spiky_ReAgCnIpCi std_spiky_ReAgCnIpCi night_avg_spiky_ReAgCnIpCi night_std_spiky_ReAgCnIpCi morning_avg_spiky_ReAgCnIpCi morning_std_spiky_ReAgCnIpCi afternoon_avg_spiky_ReAgCnIpCi afternoon_std_spiky_ReAgCnIpCi evening_avg_spiky_ReAgCnIpCi evening_std_spiky_ReAgCnIpCi avg_spiky_ReAgCnIp std_spiky_ReAgCnIp avg_spiky_ReAgCn std_spiky_ReAgCn avg_spiky_iplong std_spiky_iplong avg_spiky_agent std_spiky_agent night_avg_spiky_agent night_std_spiky_agent morning_avg_spiky_agent morning_std_spiky_agent afternoon_avg_spiky_agent afternoon_std_spiky_agent evening_avg_spiky_agent evening_std_spiky_agent avg_spiky_cid std_spiky_cid avg_spiky_cntr std_spiky_cntr avg_spiky_referer std_spiky_referer night_avg_spiky_referer night_std_spiky_referer morning_avg_spiky_referer morning_std_spiky_referer afternoon_avg_spiky_referer afternoon_std_spiky_referer evening_avg_spiky_referer evening_std_spiky_referer category_es category_mc category_ad category_co category_se category_mg category_pp category_in category_gd category_ow total_clicks distinct_iplong distinct_agent distinct_cid distinct_cntr distinct_referer night_click_percent morning_click_percent afternoon_click_percent evening_click_percent night_referer_percent morning_referer_percent afternoon_referer_percent evening_referer_percent night_agent_percent morning_agent_percent afternoon_agent_percent evening_agent_percent avg_total_clicks std_total_clicks avg_distinct_iplong std_distinct_iplong avg_distinct_agent std_distinct_agent avg_distinct_cid std_distinct_cid avg_distinct_cntr std_distinct_cntr avg_distinct_referer std_distinct_referer avg_null_agent std_null_agent avg_null_referer std_null_referer night_avg_null_referer night_std_null_referer morning_avg_null_referer morning_std_null_referer afternoon_avg_null_referer afternoon_std_null_referer evening_avg_null_referer evening_std_null_referer first_15_minute_percent second_15_minute_percent third_15_minute_percent last_15_minute_percent brand_MAUI_percent brand_Nokia_percent brand_Generic_percent brand_Apple_percent brand_Blackberry_percent brand_Samsung_percent brand_SonyEricsson_percent brand_LG_percent brand_other_percent avg_per_hour_density std_per_hour_density cntr_az_percent cntr_id_percent cntr_in_percent cntr_us_percent cntr_ng_percent cntr_tr_percent cntr_ru_percent cntr_th_percent cntr_sg_percent cntr_uk_percent cntr_other_percent 
du3nk 0 1.23 8.47 0 0 0 0 0 0 1.23 8.47 1.24 8.48 1.27 8.61 4.14 11.73 8.73 16.06 0 0 0 0 0 0 8.73 16.06 38.18 240.99 60 248 1.8 10.35 0 0 0 0 0 0 1.8 10.35 0 1 0 0 0 0 0 0 0 0 3360 644 250 61 31 1696 0 0 0 1 0 0 0 1 0 0 0 1 3360 0 644 0 250 0 61 0 31 0 1696 0 0 0 598 0 0 0 0 0 0 0 598 0 0.16 0.17 0.33 0.35 0.01 0 0.05 0 0 0 0 0 0 2 0 0 0 0.13 0 0 0 0 0 0.01 0 0.04 
du3nq 1 8.38 5.83 0 0 0 0 0 0 8.38 5.83 25.13 9.27 25.13 9.27 188.5 49.5 188.5 49.5 0 0 0 0 0 0 188.5 49.5 53.86 39.03 188.5 49.5 25.13 9.27 0 0 0 0 0 0 25.13 9.27 1 0 0 0 0 0 0 0 0 0 377 1 1 5 1 8 0 0 0 1 0 0 0 1 0 0 0 1 377 0 1 0 1 0 5 0 1 0 8 0 0 0 0 0 0 0 0 0 0 0 0 0 0.09 0.14 0.33 0.44 0 0 0 1 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 1 
du3op 0 30.43 46.87 0 0 0 0 44.67 59.63 19.75 30.19 35.5 48.84 35.5 48.84 71 52.27 71 52.27 0 0 0 0 134 0 39.5 33.5 13.31 8.24 71 52.27 35.5 48.84 0 0 0 0 67 62 19.75 30.19 0 0 1 0 0 0 0 0 0 0 213 1 1 6 1 1 0 0 0.63 0.37 0 0 1 1 0 0 1 1 213 0 1 0 1 0 6 0 1 0 1 0 0 0 205 0 0 0 0 0 129 0 76 0 0 0.09 0.25 0.66 0 1 0 0 0 0 0 0 0 3 0 0 0 0 0 0 0 0 0 0 0 1 
du3or 0 1 0 0 0 0 0 1 0 1 0 1 0 1 0 1 0 1 0 0 0 0 0 1 0 1 0 1 0 1 0 1 0 0 0 0 0 1 0 1 0 0 1 0 0 0 0 0 0 0 0 2 2 1 1 1 1 0 0 0.5 0.5 0 0 1 1 0 0 1 1 2 0 2 0 1 0 1 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0.5 0 0 0.5 0 0 0 0 0 0 1 0 0 2 0 0 1 0 0 0 0 0 0 0 0 0 
du3ov 0 1.01 0.11 0 0 0 0 0 0 1.01 0.11 1.01 0.11 1.01 0.11 44 30 29.33 31.63 0 0 0 0 0 0 29.33 31.63 6.29 5.59 44 30 1.02 0.21 0 0 0 0 0 0 1.02 0.21 0 0 0 0 1 0 0 0 0 0 88 1 2 10 1 86 0 0 0 1 0 0 0 1 0 0 0 1 88 0 1 0 2 0 10 0 1 0 86 0 0 0 0 0 0 0 0 0 0 0 0 0 0.84 0 0 0.16 0 0.94 0 0.06 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 1 
du3ox 0 1 0 0 0 0 0 0 0 1 0 1 0 1 0 1 0 1 0 0 0 0 0 0 0 1 0 1 0 1 0 1 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 1 1 1 1 1 1 0 0 0 1 0 0 0 1 0 0 0 1 1 0 1 0 1 0 1 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 
du3oy 0 1.02 0.12 0 0 0 0 0 0 1.02 0.12 1.02 0.15 1.02 0.15 64.5 31.5 32.25 35.55 0 0 0 0 0 0 32.25 35.55 7.59 6.03 64.5 31.5 1.03 0.28 0 0 0 0 0 0 1.03 0.28 0 0 0 0 1 0 0 0 0 0 129 1 3 12 1 124 0 0 0 1 0 0 0 1 0 0 0 1 129 0 1 0 3 0 12 0 1 0 124 0 0 0 0 0 0 0 0 0 0 0 0 0 0.26 0.58 0.16 0 0 0.95 0 0.04 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 1 
du3oz 1 1 0 0 0 0 0 1 0 0 0 1 0 33 3.35 1.01 0.08 165 0 0 0 0 0 165 0 0 0 27.5 8.18 165 0 33 3.35 0 0 0 0 33 3.35 0 0 1 0 0 0 0 0 0 0 0 0 165 164 1 6 1 5 0 0 1 0 0 0 1 0 0 0 1 0 165 0 164 0 1 0 6 0 1 0 5 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 1 0 0 
du3p1 0 1 0 0 0 0 0 1 0 0 0 1 0 18.2 16.11 1.01 0.07 91 80 0 0 0 0 91 80 0 0 15.17 14.82 91 80 18.2 16.11 0 0 0 0 18.2 16.11 0 0 1 0 0 0 0 0 0 0 0 0 182 181 1 6 1 5 0 0 1 0 0 0 1 0 0 0 1 0 182 0 181 0 1 0 6 0 1 0 5 0 0 0 0 0 0 0 0 0 0 0 0 0 0.06 0 0 0.94 0 1 0 0 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 1 0 0 
du3r7 0 3.63 1.32 0 0 0 0 0 0 3.63 1.32 29 0 29 0 29 0 29 0 0 0 0 0 0 0 29 0 3.63 1.32 29 0 29 0 0 0 0 0 0 0 29 0 0 0 0 0 1 0 0 0 0 0 29 1 1 8 1 1 0 0 0 1 0 0 0 1 0 0 0 1 29 0 1 0 1 0 8 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 
+0

Вы можете поделиться своими данными в воспроизводимом формате? что вы пробовали? – mtoto

+0

Обновление вопроса с помощью того, что я пробовал, и первых 25 записей исходного набора данных. –

+0

Нам нужна дополнительная информация и более подробная информация. Когда вы говорите «в течение одной минуты временных интервалов», вы имеете в виду на каждый минутный интервал времени или вы имеете в виду в течение одной минуты каждой записи. (т. е. все дубликаты в пределах 0min-1min, 1min-2min и т. д. .. запись записи 0 на 59 секунд и запись 1 в 1 минуту1 находятся в пределах одной минуты друг от друга). Могу ли я предложить простой пример ввода и вывода, который вы ожидаете. – gnicholas

ответ

1

Init:

from pyspark import * 
from pyspark.sql import * 
from pyspark.sql.types import * 
from pyspark.sql import functions as f 

Это равно ваши 'первые 25 записей моего первоначального набора данных файла'.

df = spark.read.load(path="file:///home/zht/PycharmProjects/test/disk_file", format='csv', sep=',', header=True) 

Просто, чтобы получить замечательный результат, этот шаг может быть игнорировать

df = df.withColumn('iplong', f.substring('iplong', pos=0, len=1)) \ 
    .withColumn('agent', f.substring('agent', pos=0, len=1)) \ 
    .withColumn('client', f.substring('client', pos=0, len=2)) \ 
    .withColumn('partner', f.substring('partner', pos=0, len=2)) \ 
    .withColumn('timestamp',f.when(f.substring('id', pos=6, len=1) % 2 == 1, '2012-03-08 00:01:00.0').otherwise(df['timestamp'])) 
df.show() 

+-------+------+-----+-------+------+-------+--------------------+--------+--------------------+ 
|  id|iplong|agent|partner|client|country|   timestamp|category|   reference| 
+-------+------+-----+-------+------+-------+--------------------+--------+--------------------+ 
|9794476|  1| S|  dv| ds|  us|2012-03-08 00:01:...|  ad|    null| 
|9794474|  1| S|  dv| ds|  in|2012-03-08 00:01:...|  mg|riflql2a0yv8xoa9s...| 
|9794471|  3| N|  du| dr|  py|2012-03-08 00:01:...|  co|    null| 
|9794468|  1| N|  dv| ds|  vn|2012-03-08 00:00:...|  es|gp53lqr9njqd6z2ap...| 
|9794467|  1| M|  du| dv|  in|2012-03-08 00:00:...|  ad|    null| 
|9794466|  1| N|  du| dv|  in|2012-03-08 00:00:...|  ad|    null| 
|9794477|  1| B|  du| dt|  es|2012-03-08 00:01:...|  es|h5njsswvxorsau9u8...| 
|9794478|  1| N|  du| ds|  ru|2012-03-08 00:01:...|  mc|    null| 
|9794481|  1| N|  dv| dr|  th|2012-03-08 00:00:...|  mc|oj0rekb51pvirnjuq...| 
|9794482|  1| N|  du| dr|  id|2012-03-08 00:00:...|  co|r63f8uhijvr2irvka...| 
|9794483|  1| M|  dv| dv|  id|2012-03-08 00:00:...|  ad|    null| 
|9794485|  2| G|  dv| dr|  th|2012-03-08 00:00:...|  co|    null| 
|9794486|  3| M|  dv| ds|  za|2012-03-08 00:00:...|  es|    null| 
|9794492|  7| N|  dv| dt|  ng|2012-03-08 00:01:...|  es|onbw7na2mi8a62g4p...| 
|9794493|  6| N|  du| dv|  sd|2012-03-08 00:01:...|  ad|hoq05psulkszxm4iz...| 
|9794495|  1| S|  dv| dv|  in|2012-03-08 00:01:...|  co|im387req0zp1ucyga...| 
|9794496|  1| G|  du| ds|  in|2012-03-08 00:01:...|  mc|immfap8948rebeym8...| 
|9794498|  1| S|  du| ds|  in|2012-03-08 00:01:...|  mc|r81nrzjemr5jrfvjj...| 
|9794499|  1| N|  dv| dr|  au|2012-03-08 00:01:...|  ad|    null| 
|9794500|  1| N|  dv| dr|  id|2012-03-08 00:00:...|  co|tq09jycwii12iul7h...| 
+-------+------+-----+-------+------+-------+--------------------+--------+--------------------+ 

И ключ операции:

res = df.groupBy([f.window('timestamp', windowDuration='1 minutes'),'partner', 'iplong', 'agent']).count() 
res = res.withColumn('total',f.sum('count').over(Window.partitionBy(["window", "partner"]))) 
res.show(n=30, truncate=False) 

+---------------------------------------------+-------+------+-----+-----+-----+ 
|window          |partner|iplong|agent|count|total| 
+---------------------------------------------+-------+------+-----+-----+-----+ 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |1  |N |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |3  |N |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |3  |S |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |6  |N |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |1  |B |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |1  |G |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |1  |S |1 |7 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |3  |M |1 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |1  |N |3 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |2  |G |1 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |1  |G |1 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |1  |M |1 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |1  |S |1 |8 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|dv  |3  |S |1 |6 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|dv  |7  |N |1 |6 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|dv  |1  |S |3 |6 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|dv  |1  |N |1 |6 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|du  |2  |S |1 |4 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|du  |1  |M |1 |4 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|du  |1  |N |2 |4 | 
+---------------------------------------------+-------+------+-----+-----+-----+ 

Колонка Количество означает количество записей в каждые 1 мин & партнер & iplong & агент

Колонка общая означает количество записей за каждый 1 мин & партнера

ли вы имеете в виду это?

+0

Вывод основной операции (последняя напечатанная таблица) - это каждая запись в ней, соответствующая партнеру? Если это так, то это действительно большая помощь, и это то, что мне нужно, чтобы идти в правильном направлении. Это не точный результат, который мне нужен. Но он служит цели, чтобы указать мне в правильном направлении. –

+0

@KeyurGolani Я обновляю код и добавляю пояснения –

+0

Да. Мне нужны разные комбинации столбцов, чем «за каждые 1 минуту, партнер и iplong & agent», но это больше, чем я мог ожидать. Это было бы полезно для получения желаемых результатов. Большое спасибо человеку !!! Я очень ценю усилия. –