2016-11-30 3 views
0

максимальное значение У меня есть pyspark dataframe какpyspark GroupBy и выбор

name city  date 
satya Mumbai 13/10/2016 
satya Pune 02/11/2016 
satya Mumbai 22/11/2016 
satya Pune 29/11/2016 
satya Delhi 30/11/2016 
panda Delhi 29/11/2016 
brata BBSR 28/11/2016 
brata Goa  30/10/2016 
brata Goa  30/10/2016 

Мне нужно найти выход наиболее предпочтительный город для каждого имени и логики является "взять город как fav_city, если город, имеющий максимум нет. Города возникновение по совокупности 'name' + 'city' pair. И если обнаруживается многократное обнаружение, то рассмотрим город с последней датой. Поясню:

d = df.groupby('name','city').count() 
#name city count 
brata Goa 2 #clear favourite 
brata BBSR 1 
panda Delhi 1 #as single so clear favourite 
satya Pune 2 ##Confusion 
satya Mumbai 2 ##confusion 
satya Delhi 1 ##shd be discard as other cities having higher count than this city 

#So get cities having max count 
dd = d.groupby('name').agg(F.max('count').alias('count')) 
ddd = dd.join(d,['name','count'],'left') 
#name count city 
brata 2 Goa #fav found 
panda 1 Delhi #fav found 
satya 2 Mumbai #can't say 
satya 2 Pune #can't say 

В случае пользователя «сатья» я должен вернуться к trx_history и получить последнюю дату для городов, имеющих equal_max подсчитывать I: E от «Мумбаи» или «Пуна», который является последним транзакционных (макс дата), считайте этот город как fav_city. В этом случае «Pune» как «29/11/2016» является последней/максимальной датой.

Но я не могу продолжить дальше, как это сделать.

Пожалуйста, помогите мне с логикой или если какое-либо лучшее решение (более быстрый/компактный способ), пожалуйста, предложите. Благодарю.

ответ

2

Первое свидание новообращенный DateType:

df_with_date = df.withColumn(
    "date", 
    F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date") 
) 

Следующая groupBy пользователя и город, но расширить агрегацию как это:

df_agg = (df_with_date 
    .groupBy("name", "city") 
    .agg(F.count("city").alias("count"), F.max("date").alias("max_date"))) 

Определить окно:

from pyspark.sql.window import Window 

w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date")) 

Добавить ранг:

df_with_rank = (df_agg 
    .withColumn("rank", F.dense_rank().over(w))) 

И фильтр:

result = df_with_rank.where(F.col("rank") == 1) 

Вы можете обнаружить оставшиеся дубликаты с помощью кода, как это:

import sys 

final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize) 
result.withColumn("tie", F.count("*").over(final_w) != 1)