Большой набор данных с pyspark - оптимизации соединения, сортировать, сравнивать между строк и группы с агрегацией


У меня есть CSV-файл с более 700,000,000 записи в этой структуре:

product_id      start_date       end_date
1               19-Jan-2000      20-Mar-2000
1               20-Mar-2000      25-Apr-2000
1               20-May-2000      27-Jul-2000
1               27-Jul-2000      
2               20-Mar-2000      25-Apr-2000
3               12-Jan-2010      30-Mar-2010
3               30-Mar-2010

End_date это null означает, что продукт в настоящее время используется.

End_date может означать 2 вещи, 1 - отключить продукт, 2 - аккумуляторная батарея заменить

Если End_date такой же, как следующий start_dateтогда это замена батареи.

Ожидает результат, product_id вместе с start_date своего текущего жизненного цикла (батарея заменить засчитывается в текущего жизненного цикла).

Что имею в виду start_date должна быть дата после последнего инвалидности. На примере выше, на выходе будет:

product_id      start_date       
1               20-May-2000
3               12-Jan-2010      

Мой код, как показано ниже. Какие-то уродливые, так что, если вы могли бы пожалуйста комментарий и посоветовать, если этот код может хорошо работать с 700,000,000 записи или есть лучшие способы/методы, чтобы решить эту проблему. Я запустить этот код, и кажется немного медленным для тестовый файл 100 записей.Спасибо за вашу помощь.

Код:

# csv input
df = spark.read.csv('productlist.csv', header=True, inferSchema=True)

# filter out stopped product id 
df2 = df.select("product_id").filter("end_date is null")
df = df.join(df2, ["product_id"])

# sort dataframe by product id & start date desc
df = df.sort(['product_id', 'start_date'],ascending=False)

# create window to add next start date of the product
w = Window.partitionBy("product_id").orderBy(desc("product_id"))
df = df.withColumn("next_time", F.lag(df.start_date).over(w))

# add column to classify if the change of the current record is product disability or battery change.
df = df.withColumn('diff', F.when(F.isnull(df.end_date), 0)
                  .otherwise(F.when((df.end_date != df.next_start_date), 1).otherwise(0)))

# add column to classify if the product has been disabled at least once or not
df3 = df.groupBy('product_id').agg(F.sum("diff").alias("disable"))
df = df.join(df3, ["product_id"])

# get requested start date for those products have not been disabled
df1 = df.filter(df.disable == 0).groupBy("product_id").agg(F.min("start_date").alias("first_start_date"))

# get requested date for those products have been disabled once, 
# which is the first next start date at the most recent disable date 
df2 = df.filter(df.diff == 1).groupBy("product_id").agg(F.max("next_start_date").alias("first_start_date"))


Комментарии
1 ответ

Я считаю, что ниже решение должны решить, что вы хотите сделать в более эффективным способом. Ваш текущий метод включает в себя много "перетасовать" операций (группировка, сортировка, объединение). Ниже, должны помочь сократить количество операций Shuffle в работе зажигания.


  1. вам ведущих дата начала

  2. сделать записи отключены

  3. добавить столбец, который указывает, является ли продукт когда-либо для инвалидов (Макс отключена)

  4. набор данных замена захват

  5. получите максимум дата замены

  6. создать индикатор для текущего цикла записи

  7. фильтрация данных для текущего цикла записи.


# csv input
df = spark.read.csv('productlist.csv', header=True, inferSchema=True)
# get ordered and unordered windows
wo = Window.partitionBy("product_id").orderBy("start_date")
wu = Window.partitionBy("product_id")
df1 = df.withColumn("lead_start_date", F.lead(col("start_date"), 1).over(wo))\
.withColumn("is_disabled", F.when((col("end_date").isNotNull()) &
((col("end_date") != col("lead_start_date")) | (col("lead_start_date").isNull())), 1).otherwise(0))\
.withColumn("has_been_disabled", F.max(col("is_disabled")).over(wu))\
.withColumn("replacement_date", F.when((col("end_date").isNotNull()) &
(col("end_date") == col("lead_start_date")) & (col("lead_start_date").isNotNull()), col("start_date")).otherwise(lit(None)))\
.withColumn("max_replacement_date", F.max(col("replacement_date")).over(wu))\
.withColumn("is_current_lifecycle_record", F.when(((col("replacement_date") == col("max_replacement_date")) & col("replacement_date").isNotNull()) |
((col("has_been_disabled") == 0) & (col("max_replacement_date").isNull())), 1).otherwise(0)) # never disabled / replaced
# filter for current lifecycle record and select target columns
df_final = df1.filter(col("is_current_lifecycle_record") == 1).select(["product_id", "start_date"])

1
ответ дан 16 апреля 2018 в 09:04 Источник Поделиться