sql - SparkSQL Fetch rows before and after from dataframe after grouping -
given dataframe df
+-----------+--------------------+-------------+-------+ |custnumb | purchasedate| price| activeflag| +-----------+--------------------+-------------+-------+ | 3|2013-07-17 00:00:...| 17.9| 0| | 3|2013-08-27 00:00:...| 61.13| 0| | 3|2013-08-28 00:00:...| 25.07| 1| | 3|2013-08-29 00:00:...| 24.23| 0| | 3|2013-09-06 00:00:...| 3.94| 0| | 20|2013-02-28 00:00:...| 354.64| 0| | 20|2013-04-07 00:00:...| 15.0| 0| | 20|2013-05-10 00:00:...| 545.0| 0| | 28|2013-02-17 00:00:...| 190.0| 0| | 28|2013-04-08 00:00:...| 20.0| 0| | 28|2013-04-16 00:00:...| 89.0| 0| | 28|2013-05-18 00:00:...| 260.0| 0| | 28|2013-06-06 00:00:...| 586.57| 1| | 28|2013-06-09 00:00:...| 250.0| 0|
i result returns average of price of 2 rows before , after ordered purchase date when finds inactive flag '1'. here result looking for:
+-----------+--------------------+-------------+-------+---------------+ |custnumb | purchasedate| price| activeflag| outputval | +-----------+--------------------+-------------+-------+------------+ | 3|2013-07-17 00:00:...| 17.9| 0| 17.9 | 3|2013-08-27 00:00:...| 61.13| 0| 61.13 | 3|2013-08-28 00:00:...| 25.07| 1| 26.8 (avg of 2 prices before , 2 after) | 3|2013-08-29 00:00:...| 24.23| 0| 24.23 | 3|2013-09-06 00:00:...| 3.94| 0| 3.94 | 20|2013-02-28 00:00:...| 354.64| 0| 354.64 | 20|2013-04-07 00:00:...| 15.0| 0| 15.0 | 20|2013-05-10 00:00:...| 545.0| 0| 545.0 | 28|2013-02-17 00:00:...| 190.0| 0| 190.0 | 28|2013-04-08 00:00:...| 20.0| 0| 20.0 | 28|2013-04-16 00:00:...| 89.0| 0| 89.0 | 28|2013-05-18 00:00:...| 260.0| 0| 260.0 | 28|2013-06-06 00:00:...| 586.57| 1| 199.6 (avg of 2 prices before , 1 after) | 28|2013-06-09 00:00:...| 250.0| 0| 250
in above example custnum 3 , 28, have activeflag 1, need calculate average of 2 rows before , after if exists same custnumb..
i thinking of using window functions on dataframe, unable ideas resolve in spark m quite new spark programming
val w = window.partitionby("custnumb").orderby("purchasedate")
how can achieve , achievable window function or better ways of doing ?
if have window simple conditional should work fine:
val cond = ($"activeflag" === 1) && (lag($"activeflag", 1).over(w) === 0) // windows covering rows before , after val before = w.rowsbetween(-2, -1) val after = w.rowsbetween(1, 2) // expression sum of rows , number of rows val sumprice = sum($"price").over(before) + sum($"price").over(after) val countprice = sum($"ones_").over(before) + sum($"ones_").over(after) val expr = when(cond, sumprice / countprice).otherwise($"price") df.withcolumn("ones_", lit(1)).withcolumn("outputval", expr)
Comments
Post a Comment