Ibis Project Blog

Python productivity framework for the Apache Hadoop ecosystem. Development updates, use cases, and internals.

Ibis Design: Modeling high level analytics tasks

Outside of scalability and high performance on large data sets with Python, Ibis is focused on simplifying analytics tasks for end users. By designing a rich pandas-like domain specific language (DSL) embedded in Python code, we can hide away the complexities normally associated with expressing analytical concepts in SQL or some other tool. This post gives some specific examples and shows how we're solving them in Ibis.

Case Study: The TopK Operation

We've all been there; you have some higher cardinality category and you wish to restrict your analysis to a fixed subset of values according to a ranking that you devise. If you're using pandas, you might do something like:

K = 5

avg_value = data.groupby('category').value.mean()
top_categories = avg_value.order(ascending=False)[:K].index

filtered_data = data[data.category.isin(top_categories)]

Let's look at a concrete example using both Ibis and SQL. I'm using the now-classic airlines dataset in Parquet format.

In [1]:
import ibis

ibis.options.interactive = True

hdfs = ibis.hdfs_connect('bottou01.sjc.cloudera.com')
con = ibis.impala.connect('bottou01.sjc.cloudera.com', hdfs_client=hdfs)

airlines = con.parquet_file('/user/wesm/airlines-parquet', persist=True, 
                            name='airlines_parquet', database='wes')
# airlines.compute_stats()

Let's look at the top 5 origin airports by total number of flights:

In [2]:
query = """
SELECT origin, count(*) as `nrows`
FROM wes.airlines_parquet
GROUP BY 1
ORDER BY nrows DESC
LIMIT 5
"""
expr = con.sql(query)
expr
Out[2]:
  origin    nrows
0    ORD  6597442
1    ATL  6100953
2    DFW  5710980
3    LAX  4089012
4    PHX  3491077

Constructed from SQL primitives, the TopK operation consists of 3 tasks

  • Aggregate
  • Sort
  • Limit

Here is the Impala query plan for this (use con.explain(expr) with Ibis to see it)

05:MERGING-EXCHANGE [UNPARTITIONED]
|  order by: count(*) DESC
|  limit: 5
|
02:TOP-N [LIMIT=5]
|  order by: count(*) DESC
|
04:AGGREGATE [FINALIZE]
|  output: count:merge(*)
|  group by: origin
|
03:EXCHANGE [HASH(origin)]
|
01:AGGREGATE
|  output: count(*)
|  group by: origin
|
00:SCAN HDFS [wes.airlines_parquet]
   partitions=1/1 files=8 size=1.34GB

As you can see, Impala, like most SQL engines, has the built-in notion of TOP-N in its query engine, but yet this is not expressible directly in SQL. Things get more complicated when you want to filter down to the categories in the result:

SELECT dest, avg(arrdelay) as `nrows`
FROM wes.airlines_parquet
WHERE origin in (
  SELECT origin 
  FROM (
    SELECT origin, count(*) as `nrows`
    FROM wes.airlines_parquet
    GROUP BY 1
    ORDER BY nrows DESC
    LIMIT 5  
  ) t0
)
GROUP BY 1

If you look at the query plan for this, it's more complicated but functionally contains the same TopK pattern plus a filter (using a LEFT SEMI JOIN, a type of efficient filter join).

A better TopK workflow with Ibis

Ibis is happy to let you build your own TopK expression using primitive operations:

In [3]:
top_origins = (airlines
               .group_by('origin')
               .aggregate(airlines.count().name('nrows'))
               .sort_by(ibis.desc('nrows'))
               .limit(5))
top_origins.execute()
Out[3]:
origin nrows
0 ORD 6597442
1 ATL 6100953
2 DFW 5710980
3 LAX 4089012
4 PHX 3491077

This same aggregated table can be used as a filter, too:

In [4]:
# Replace nulls with 0
avg_delay = airlines.arrdelay.mean().fillna(0).name('avg_delay')

subset_airlines = airlines[airlines.origin.isin(top_origins.origin)]
                    
avg_delay_subset = (subset_airlines
                    .group_by('dest')
                    .aggregate(avg_delay)
                    .sort_by(ibis.desc('avg_delay'))
                    .limit(10))
avg_delay_subset.execute()
Out[4]:
dest avg_delay
0 RDM 29.558824
1 SOP 25.598706
2 MQT 25.367129
3 HHH 23.744963
4 ACY 20.344538
5 PSE 19.000000
6 CPR 18.383459
7 CWA 18.002533
8 SCE 16.741325
9 ATW 16.138655

This is quite a bit of data gymnastics, though, and you haven't really modeled the fundamental Top-N task with the API. Because we have the power to create higher-level abstractions and expand them later into concrete operations (backend-dependent), we can craft a topk operation:

In [5]:
top_origins = airlines.origin.topk(5)

The result of topk in Ibis is what's known as an analytic expression. Its behavior depends on the context in which it's used. It is executable as is, yielding the top 5 origins by count:

In [6]:
top_origins.execute()
Out[6]:
origin count
0 ORD 6597442
1 ATL 6100953
2 DFW 5710980
3 LAX 4089012
4 PHX 3491077

But you can also use it directly as a table filter:

In [7]:
expr = (airlines[top_origins]
        .group_by('dest')
        .aggregate(avg_delay)
        .sort_by(ibis.desc('avg_delay'))
        .limit(10))
expr.execute()
Out[7]:
dest avg_delay
0 RDM 29.558824
1 SOP 25.598706
2 MQT 25.367129
3 HHH 23.744963
4 ACY 20.344538
5 PSE 19.000000
6 CPR 18.383459
7 CWA 18.002533
8 SCE 16.741325
9 ATW 16.138655

What's going on here is that the top_origins expression is a sort of logical data type that hasn't yet been adapted to any physical data operations. There's a number of ways it can be used, and that adaptation process happens automatically:

In [8]:
type(top_origins)
Out[8]:
ibis.expr.operations.TopKExpr

You might notice that I've sneakily done two TopK operations in a row, the first ranking by count and the second by average arrival delay. topk luckily gives you a way to provide a ranking metric other than count:

In [9]:
airlines.origin.topk(5, by=avg_delay).execute()
Out[9]:
origin avg_delay
0 FMN 164.0
1 OGD 148.4
2 CYS 122.0
3 BFF 93.0
4 PIR 33.5

We can combine all this together to do a double-TopN:

In [10]:
def avg_delay(x):
    return x.arrdelay.mean().fillna(0).name('avg_delay')
    
expr = airlines[top_origins].dest.topk(10, by=avg_delay)
expr.execute()
Out[10]:
dest avg_delay
0 RDM 29.558824
1 SOP 25.598706
2 MQT 25.367129
3 HHH 23.744963
4 ACY 20.344538
5 PSE 19.000000
6 CPR 18.383459
7 CWA 18.002533
8 SCE 16.741325
9 ATW 16.138655

Another example: bucketing

Why stop there? After seeing lots of use cases in the wild where code reuse and composability would make analysts a great deal more productive, we'll be working to build more and more high level analytics tools into Ibis.

As another example which I present as mostly code-only, consider the task of creating a discrete binning of arrival delays and computing a number of metrics. I'll let the code speak for itself:

In [11]:
airlines.arrdelay.summary()
Out[11]:
       count    nulls   min   max        sum      mean  approx_nunique
0  120947440  2587529 -1437  2598  852674931  7.049963            3801
In [12]:
delay_buckets = [1, 30, 60, 120]

bin = (airlines.arrdelay
       .fillna(0)
       .bucket(delay_buckets, 
               include_over=True,
               include_under=True).name('delay_bin'))

delay_summary = airlines.group_by(bin).arrdelay.summary()
delay_summary
Out[12]:
   delay_bin     count    nulls   min   max        sum        mean  \
0          3   4039142        0    60   120  333493735   82.565489   
1          4   1594279        0   121  2598  290961107  182.503255   
2          1  44759224        0     1    29  451748992   10.092869   
3          0  63108269  2587529 -1437     0 -531724696   -8.425595   
4          2   7446526        0    30    59  308195793   41.387862   

   approx_nunique  
0              62  
1            2490  
2              29  
3             471  
4              30  
In [13]:
bin_name = (delay_summary.delay_bin
            .label(['On time', 'Less then 30 min',
                    '30 min to 1 hour', 
                    '1 to 2 hours',
                    'More than 2 hours']))
expr = delay_summary.mutate(bin_name=bin_name)
expr.sort_by('delay_bin')
Out[13]:
   delay_bin     count    nulls   min   max        sum        mean  \
0          0  63108269  2587529 -1437     0 -531724696   -8.425595   
1          1  44759224        0     1    29  451748992   10.092869   
2          2   7446526        0    30    59  308195793   41.387862   
3          3   4039142        0    60   120  333493735   82.565489   
4          4   1594279        0   121  2598  290961107  182.503255   

   approx_nunique           bin_name  
0             471            On time  
1              29   Less then 30 min  
2              30   30 min to 1 hour  
3              62       1 to 2 hours  
4            2490  More than 2 hours  

In future posts I'll go into some more detail on category types in Ibis (like the result of bucket) and some of the other tools used here.