Ibis Project Blog

Python productivity framework for the Apache Hadoop ecosystem. Development updates, use cases, and internals.

Leveraging SQL window functions in Ibis

Window (also known as analytic) functions are a valuable technique in analytic SQL, but unfortunately they are generally considered to be an advanced skill among SQL programmers. Conceptually, they are relatively simple, and indeed many everyday pandas and R operations can be expressed in SQL through their use. Mechanically, they can be difficult to use, largely because of the SQL syntax.

Ibis has had comprehensive support for window functions from 0.3 onward, and I invested quite a bit of effort to design an API to make them available to users in a much simpler way. I also made sure that you don't have to be a SQL expert to use them.

What are SQL window functions?

Standard SQL functions do not have "data visibility" outside the context of the current row. This makes group-wise data analysis more difficult. In pandas, you can write code that looks like:

demeaned_col = df.col_name - df.col_name.mean()

first_order_diff = df.col_name - df.col_name.shift(1)

But you can't write this SQL

SELECT col_name - AVG(col_name) AS demeaned_col
FROM df

This is where window functions come in. They give you a way to group and order an entire data set and compute either an aggregation or some other manipulation of ordered groups. Depending on your SQL engine, you probably have functions like:

  • Aggregations: standard aggregates like AVG and SUM can be used in window function contexts
  • Shifts: LAG and LEAD
  • Quantiling: NTILE, and others

These collectively are called analytic functions.

Mechanically, a SQL window function involves adorning the OVER keyword along with 3 components:

  • Group clause: PARTITION BY
  • Sort clause: ORDER BY
  • Window frame: it's complicated

Without going into great detail about all the ins and outs of SQL OVER clauses, here's an example of 1 year deviation from the full sample average of a value on some monthly data. In pandas, this would be:

def apply_func(g):
    g = g.sort('date')
    metric = pd.rolling_mean(g.value - df.value.mean(), 12)
    return pd.DataFrame({'metric': metric, 'state': g.state})

result = df.groupby('state').apply(apply_func)

In SQL, here it is:

SELECT AVG(value - AVG(value) OVER ()) 
       OVER(PARTITION BY state
            ORDER BY date
            RANGE BETWEEN 11 PRECEDING AND CURRENT ROW)
       AS metric
FROM df

I feel like we can do better than this.

More pain-free group-wise analytics in Ibis

In many cases, window functions are only an implementation detail. In Ibis, I've tried to make using them as pain-free as possible, and invisible in a lot of cases. There's still plenty more to do and more usability improvements and higher level tools (getting around SQL limitations) will appear in upcoming releases.

Let's look at the airlines data again.

In [1]:
import ibis
ibis.options.interactive = True

con = ibis.impala.connect('bottou01.sjc.cloudera.com')

db = con.database('wes')
table = db.airlines_parquet

Suppose you wanted to demean the arrdelay column in this dataset. Here you can write:

In [2]:
expr = table.arrdelay - table.arrdelay.mean()
expr.execute()[:10]
Out[2]:
0   -14.049963
1   -11.049963
2    -0.049963
3   -13.049963
4    -5.049963
5   -12.049963
6    -0.049963
7   -13.049963
8   -19.049963
9   -10.049963
Name: tmp, dtype: float64

Let's take a closer look:

In [3]:
t = table
expr = t[t.arrdelay, t.arrdelay.mean(), 
         (t.arrdelay - t.arrdelay.mean()).name('demeaned')]
expr.limit(10)
Out[3]:
   arrdelay      mean   demeaned
0        -7  7.049963 -14.049963
1        -4  7.049963 -11.049963
2         7  7.049963  -0.049963
3        -6  7.049963 -13.049963
4         2  7.049963  -5.049963
5        -5  7.049963 -12.049963
6         7  7.049963  -0.049963
7        -6  7.049963 -13.049963
8       -12  7.049963 -19.049963
9        -3  7.049963 -10.049963

How about if you wanted to demean by dest. Now you can write:

In [4]:
t = table['arrdelay', 'dest']
expr = (t.group_by('dest')
        .mutate(dest_avg=t.arrdelay.mean(),
                dev=t.arrdelay - t.arrdelay.mean()))

worst = expr[expr.dev.notnull()].sort_by(ibis.desc('dev')).limit(10)
worst
Out[4]:
   arrdelay dest  dest_avg          dev
0      2598  DTW  4.998577  2593.001423
1      2461  MSP  4.476601  2456.523399
2      2453  MSP  4.476601  2448.523399
3      2137  MSP  4.476601  2132.523399
4      1951  DTW  4.998577  1946.001423
5      1946  DTW  4.998577  1941.001423
6      1942  MSP  4.476601  1937.523399
7      1925  SEA  8.158243  1916.841757
8      1879  SEA  8.158243  1870.841757
9      1864  MSP  4.476601  1859.523399

For the SQL junkies in the room, here was the actual query that Ibis executed

SELECT *
FROM (
  SELECT `arrdelay`, `dest`,
         avg(`arrdelay`) OVER (PARTITION BY `dest`) AS `dest_avg`,
         `arrdelay` - avg(`arrdelay`) OVER (PARTITION BY `dest`) AS `dev`
  FROM wes.`airlines_parquet`
) t0
WHERE `dev` IS NOT NULL
ORDER BY `dev` DESC
LIMIT 10

Note that you can explicitly construct a window to use with analytic functions and use the over expression method:

In [5]:
w = ibis.window(group_by=table.dest, order_by=table.year)

yearly_avg_delay = (table.group_by(['dest', 'year'])
                    .aggregate(table.arrdelay.mean().name('avg_delay')))

delay = yearly_avg_delay.avg_delay
yoy_chg = (delay - delay.lag().over(w))
yearly_avg_delay = yearly_avg_delay.mutate(yoy_change=yoy_chg)

yearly_avg_delay[(yearly_avg_delay.dest == 'ORD')
                 & (yearly_avg_delay.year >= 2000)]
Out[5]:
  dest  year  avg_delay  yoy_change
0  ORD  2000  16.630993    4.742508
1  ORD  2001   7.849993   -8.781000
2  ORD  2002   3.597082   -4.252911
3  ORD  2003   7.268648    3.671566
4  ORD  2004  12.812650    5.544002
5  ORD  2005   8.563716   -4.248934
6  ORD  2006  15.744943    7.181227
7  ORD  2007  16.804941    1.059998
8  ORD  2008  15.224999   -1.579942

Moving window functions and ordering

Aggregations (like sum, mean, etc.) used in a window function context by default operate on the full window data sample. This is configurable, though, and you can choose which values to include in the aggregated sample based on a set ordering. Let's take the annual delays from above for a couple airports:

In [6]:
annual_delay = (table[table.dest.isin(['JFK', 'SFO'])]                
                .group_by(['dest', 'year'])
                .aggregate(table.arrdelay.mean().name('avg_delay')))
annual_delay.limit(20)
Out[6]:
   dest  year  avg_delay
0   JFK  1993   6.441211
1   JFK  2002   2.723808
2   JFK  1999   7.005718
3   SFO  2003   3.977823
4   SFO  1998  17.903815
5   JFK  1994   9.017850
6   SFO  1999  12.464576
7   JFK  2001   8.875575
8   SFO  1990   8.622028
9   JFK  1998   5.931865
10  JFK  2003   2.930521
11  SFO  1995  10.024748
12  SFO  2006  12.377844
13  JFK  2007  18.385748
14  JFK  1987   9.168451
15  JFK  1989   9.387740
16  JFK  2008  13.774854
17  JFK  2000   9.554254
18  JFK  2004   6.992708
19  SFO  2001   8.233889

If you wanted to add a full sample average-of-averages by each dest, you could write this code

In [7]:
enriched = (annual_delay
            .group_by('dest')
            .mutate(grand_avg=annual_delay.avg_delay.mean()))
enriched.limit(10).execute()
Out[7]:
dest year avg_delay grand_avg
0 SFO 1991 9.211260 10.58818
1 SFO 2004 6.034957 10.58818
2 SFO 1993 6.788378 10.58818
3 SFO 2002 4.721505 10.58818
4 SFO 1994 6.266924 10.58818
5 SFO 1992 4.347605 10.58818
6 SFO 2005 9.145214 10.58818
7 SFO 1988 8.897241 10.58818
8 SFO 2000 19.235428 10.58818
9 SFO 1997 9.026597 10.58818

Suppose now you wanted to compare with trailing 10 year average instead. Ibis gives you two ways to do that, the generic ibis.window or convenience method ibis.trailing_window:

In [8]:
# 9 + current period = 10
w = ibis.window(group_by=annual_delay.dest, 
                order_by=annual_delay.year,
                preceding=9, following=0)
w2 = ibis.trailing_window(9, group_by=annual_delay.dest,
                          order_by=annual_delay.year)

Now, you can use the over method:

In [9]:
metric = annual_delay.avg_delay.mean().over(w2)
enriched = annual_delay[annual_delay, metric]
enriched.sort_by(ibis.desc('year')).limit(10)
Out[9]:
  dest  year  avg_delay       mean
0  SFO  2008  13.651567  10.233152
1  JFK  2008  13.774854   9.566769
2  JFK  2007  18.385748   8.782470
3  SFO  2007  12.488720  10.658377
4  SFO  2006  12.377844  10.312165
5  JFK  2006  13.109659   7.560915
6  JFK  2005  12.314841   7.784602
7  SFO  2005   9.145214  10.572521
8  SFO  2004   6.034957  10.660475
9  JFK  2004   6.992708   7.443993

I could also have achieved the same result by doing:

In [10]:
w3 = ibis.trailing_window(9)

expr = (annual_delay
        .group_by('dest')
        .order_by('year')
        .mutate(avg_10y=annual_delay.avg_delay.mean().over(w3))
        .sort_by(ibis.desc('year'))
        .limit(10))
expr
Out[10]:
  dest  year  avg_delay    avg_10y
0  SFO  2008  13.651567  10.233152
1  JFK  2008  13.774854   9.566769
2  JFK  2007  18.385748   9.236323
3  SFO  2007  12.488720  10.930485
4  SFO  2006  12.377844  10.771828
5  JFK  2006  13.109659   8.980813
6  JFK  2005  12.314841   9.470483
7  SFO  2005   9.145214  11.095642
8  SFO  2004   6.034957  11.019149
9  JFK  2004   6.992708   9.430359

Some functions, like any of the cum* methods, like cumsum, automatically turn into the right window functions under the hood.

All the window frames with Ibis

I made a handy illustration (hat tip to Hadley Wickham and his post on dplyr) illustrating how to create each kind of window with Ibis:

Helping more: window composability

You can think of a Window object as providing a description of how to group a data set. So why not make the windows composable? Let's consider a case where you have two different windows is an analysis:

In [12]:
w1 = ibis.window(group_by='dest', order_by=['year', 'month'])
w2 = ibis.trailing_window(12)

To simplify making a window that's the union of all these parameters, we can simply do:

In [13]:
w = w1.combine(w2)

You can even construct the combined window yourself and check that they're the same:

In [14]:
w3 = ibis.window(group_by='dest', order_by=['year', 'month'],
                 preceding=12, following=0)
w.equals(w3)
Out[14]:
True

Takeaway

There's no reason for analytics in a SQL engine requiring window functions to be so tedious. If you're an avid user of the tools described in this post, I'd love to hear your feedback about how to make things even more productive.