Modern, hybrid, open analytics
Introduction
As a Python data user, I’ve wanted a more modular, composable, and scalable ecosystem. I think it’s here. Wes McKinney released pandas c. 2009 to bring dataframes into Python and it became one of the most used software packages. It was built when small data was smaller and has some downsides Wes wrote about in his “Apache Arrow and the ‘10 things I hate about pandas’” blog post. Wes created Ibis (and co-created Apache Arrow) to address these issues and more with a different approach than any other Python dataframe library by decoupling the dataframe API from the backend implementation. This allows Ibis to support 20+ backends today including pandas, DuckDB, Polars, Postgres, BigQuery, Snowflake, PySpark, and over a dozen more.
I’ve experienced many pains of the ecosystem myself. I’ve downloaded a 1 GB CSV from a database GUI and used pandas to munge it because I couldn’t figure it out in SQL. When building larger ML demos that included data preprocessing on GBs, I hit OOM errors and learned about how much data expands in memory. Schema inference issues and file read speeds led me understand why storing data in compressed Parquet files might be better for my use cases. I learned about partitioning strategies and the joy of subtle differences and incompatibilities across data systems. I started using PySpark and Dask to overcome hurdles. I slowly learned about databases. Fortunately while I’ve been learning, others (including many Ibis contributors) have been building a modern, open-source Python data ecosystem! Now I can use what they’ve built to create a modern analytics application.
In this blog, we’ll look at how to build an end-to-end analytics project using Ibis as the frontend for data in Python. Ibis plays a key role, but is only one component in any real data project.
We’ll combine a variety of open-source tools and freemium services including:
- Ibis (dataframe library)
- DuckDB (database and query engine)
- Dagster (orchestration library)
- Plotly (visualization library)
- Streamlit (dashboard library)
- justfile (command runner)
- TOML (configuration)
- GitHub (source control, CI/CD)
- Azure VM (self-hosted runner)
- GitHub web APIs (source data)
- Google BigQuery (source data)
- Zulip (source data)
- MotherDuck (cloud service for DuckDB)
- Streamlit Community Cloud (cloud service for Streamlit)
to build an open-source, modular, composable, scalable, automated, hybrid-cloud, end-to-end analytics project processing a few million rows of data that provides real business value.
What’s the business value?
Today, Ibis is a thriving open-source project primarily backed by Voltron Data with contributors at Google, Claypot AI, Singlestore, Exasol, RisingWave, Starburst Data, and anyone who submits a PR and makes a commit. It aims to be a standard frontend for data in Python that scales to your backend. To understand the project’s health, we want to track some key adoption metrics. There was already a dashboard internally at Voltron Data, but it was written in R and I don’t know R. I do know Ibis and a few other OSS tools, and saw this as a cool opportunity to try out a few more while showcasing Ibis in a real-world project.
Let’s get started!
Finished product
The final result is a MotherDuck database powering a dashboard deployed to Streamlit Community Cloud.
Since writing this blog, I’ve added documentation metrics. This is ongoing and that page may be broken.
Look out for a follow up post on how new metrics are added to the dashboard!
While I consider this “production” it does go down occasionally, usually because of a stale connection to MotherDuck. There are screenshots below in case that happens while you’re reading this, or you can reproduce it locally (an exercise for the reader).
The dashboard can be viewed as a Streamlit app in your browser or embedded in this page below.
Architecture
The source of truth for the project’s architecture is the repository of code. A point-in-time description and snapshots of the code are below.
Goals
I had the following goals in mind:
- Modular: while this project uses Dagster, it should be easily swappable for other orchestration tools like Airflow or Prefect or Kedro. While this project uses DuckDB, it should be easily swappable for Polars or Clickhouse or any other engine. While this project uses Streamlit, it should be swappable for Quarto dashboards…
- Composable: the project should be easy to understand and extend. It should be easy to add new data sources, new data transformations, new metrics, and new visualizations.
- Automated: the project should be easy to run locally and in production. It should be easy to run the entire pipeline or just a single step. It should be easy to run the pipeline on a schedule or on a pull request.
- Iterative: the project should be easy to iterate on. It should be easy to explore the data, transform it, and visualize it. It should be easy to switch between local and production data sources.
Our open-source tools and freemium services allow us to accomplish these goals.
Overview
The architecture consists of Python code. Steps in the overall data pipeline are captured in a justfile
, which is a Makefile-like command runner. This allows us to run the pipeline locally or in automated CI/CD workflows. The data ingestion is a Python script that makes raw request
calls to GitHub web APIs, uses the Zulip Python client for Zulip web APIs, and Ibis + SQL to extract PyPI data from a public Google BigQuery project.
The data is then extracted from its ingested file formats (JSON and Parquet), transformed with Ibis using the default DuckDB backend, and loaded into final DuckDB database files. Further postprocessing is done to combine these into a single DuckDB database file for convenience, though this step isn’t necessary. The tables are then copied using Ibis to connect to MotherDuck, a serverless cloud service for DuckDB. This allows us to use MotherDuck’s servers to access the production data from anywhere, including my laptop and a Streamlit Community Cloud server. The dashboard deployed there updates automatically when the data is updated. Finally, we can run some tests by executing the dashboard code.
For the most part, you just need a Python environment with pip
installs including Ibis and a few other packages.
Python pip install
requirements
# python
ruff
python-dotenv
# web clients
zulip
PyGithub # unused for now
# data
duckdb>=0.9.0
ibis-framework[duckdb,bigquery,deltalake]
# ML
ibisml # unused for now
# viz
plotly
streamlit
# ops
dagster dagster-webserver
Various environment variables set to access the data sources and cloud services are stored in a .env
file and set in the respective cloud services (GitHub Actions secrets and Streamlit Community Cloud secrets).
.env file
BQ_PROJECT_ID="XXXXXXXX"
GITHUB_TOKEN="ghp_XXXXXXXX"
MOTHERDUCK_TOKEN="XXXXXXX" ZULIP_KEY="XXXXXXX"
Configuration
We use a config.toml
file to configure the project. This allows easily switching out the database (i.e. changing over to local for development) and adding new data sources. Most of the ingested data is currently unused, but this makes it easy in the future to set up additional dashboards for other Ibis projects as they grow and need to be tracked.
[app]
#database="data/app.ddb"
database="md:ibis_analytics"
[eda]
database="md:ibis_analytics"
#database="data/data.ddb"
[ingest.pypi]
packages = [
"ibis-framework",
"ibis-examples",
"ibis-substrait",
"ibisml",
"ibis-birdbrain",
]
[ingest.github]
repos = [
"ibis-project/ibis",
"ibis-project/ibis-examples",
"ibis-project/ibis-substrait",
"ibis-project/ibisml",
"ibis-project/ibis-birdbrain",
]
endpoints = [
"repo",
"stargazers",
"subscribers",
"commits",
"releases",
"forks",
"issues",
"contributors",
]
[ingest.zulip]
url = "https://ibis-project.zulipchat.com"
Automation
Anything run frequently is put in a justfile
. This makes it easy to run the same code locally or in a GitHub Action.
Show the justfile
of project commands
# justfile
# load environment variables
set dotenv-load
# variables
module := "dag"
# aliases
alias fmt:=format
alias etl:=run
alias open:=open-dash
alias dag-open:=open-dag
alias preview:=app
# format
format:
@ruff format .
# smoke-test
smoke-test:
@ruff format --check .
# list justfile recipes
default:
just --list
# setup
setup:
@pip install --upgrade -r requirements.txt
# eda
eda:
@ipython -i eda.py
# ingest
ingest:
@python {{module}}/ingest.py
# run
run:
@dagster job execute -j all_assets -m {{module}}
# postprocess
postprocess:
@python {{module}}/postprocess.py
# deploy
deploy:
@python {{module}}/deploy.py
# test
test:
@python metrics.py
@python pages/0_github.py
@python pages/1_pypi.py
@python pages/2_zulip.py
@python pages/3_about.py
# dag
dag:
@dagster dev -m {{module}}
# streamlit stuff
app:
@streamlit run metrics.py
# clean
clean:
@rm -r *.ddb* || true
@rm -r data/system || true
@rm -r data/backup || true
@rm data/backup.ddb || true
# open dag
open-dag:
@open http://localhost:3000/asset-groups
# open dash
open-dash:
@open https://ibis-analytics.streamlit.app
# cicd
cicd:
@gh workflow run cicd.yaml
Then our CI/CD workflow in cicd.yaml
is just:
name: cicd
on:
workflow_dispatch:
schedule:
- cron: "0 0/3 * * *"
pull_request:
paths:
- '.github/workflows/cicd.yaml'
- 'requirements.txt'
- '**.py'
- 'justfile'
jobs:
ingest-etl-postprocess-deploy-test:
runs-on: self-hosted
steps:
- uses: actions/checkout@v3
- uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCLOUD_JSON }}
- uses: extractions/setup-just@v1
- uses: actions/setup-python@v4
with:
python-version: 3.11
- name: install requirements
run: just setup
- name: ingest
run: just ingest
env:
BQ_PROJECT_ID: voltrondata-demo
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
ZULIP_KEY: ${{ secrets.ZULIP_KEY }}
- name: run ETL job
run: just run
- name: postprocess
run: just postprocess
- name: deploy to prod
run: just deploy
env:
MOTHERDUCK_TOKEN: ${{ secrets.MOTHERDUCK_TOKEN }}
- name: test
run: just test
env:
MOTHERDUCK_TOKEN: ${{ secrets.MOTHERDUCK_TOKEN }}
This will run the entire pipeline on a schedule, on a pull request, or manually. Everything is automated!
Data ingestion
Note that ingesting the PyPI download data from BigQuery will incur costs of 8 euros if you are above your free tier. To save cost, you can download a snapshot of the data from 23 June 2024 here.
After you just ingest
data you end up with:
data/ingest/
├── github/
│ └── ibis-project/
│ ├── ibis/
│ │ ├── commits.000001.json
│ │ ├── commits.000002.json
│ │ ├── ...
│ │ ├── forks.000001.json
│ │ ├── forks.000002.json
│ │ ├── ...
│ │ ├── issues.000001.json
│ │ ├── issues.000002.json
│ │ ├── ...
│ │ ├── pullRequests.000001.json
│ │ ├── pullRequests.000054.json
│ │ ├── ...
│ │ ├── stargazers.000001.json
│ │ ├── stargazers.000002.json
│ │ ├── ...
│ │ └── watchers.000001.json
│ ├── ibis-birdbrain/
│ │ ├── commits.000001.json
│ │ ├── forks.000001.json
│ │ ├── issues.000001.json
│ │ ├── pullRequests.000001.json
│ │ ├── stargazers.000001.json
│ │ └── watchers.000001.json
│ ├── ibis-examples/
│ │ ├── commits.000001.json
│ │ ├── forks.000001.json
│ │ ├── issues.000001.json
│ │ ├── pullRequests.000001.json
│ │ ├── stargazers.000001.json
│ │ └── watchers.000001.json
│ ├── ibis-substrait/
│ │ ├── commits.000001.json
│ │ ├── commits.000002.json
│ │ ├── ...
│ │ ├── forks.000001.json
│ │ ├── issues.000001.json
│ │ ├── pullRequests.000001.json
│ │ ├── pullRequests.000002.json
│ │ ├── ...
│ │ ├── stargazers.000001.json
│ │ └── watchers.000001.json
│ └── ibisml/
│ ├── commits.000001.json
│ ├── forks.000001.json
│ ├── issues.000001.json
│ ├── pullRequests.000001.json
│ ├── stargazers.000001.json
│ └── watchers.000001.json
├── pypi/
│ ├── ibis-birdbrain/
│ │ └── file_downloads.parquet
│ ├── ibis-examples/
│ │ └── file_downloads.parquet
│ ├── ibis-framework/
│ │ └── file_downloads.parquet
│ ├── ibis-substrait/
│ │ └── file_downloads.parquet
│ └── ibisml/
│ └── file_downloads.parquet
└── zulip/
├── members.json
└── messages.json
This runs a Python data ingestion script and takes a few minutes. It’s not the best code, but it works!
Show the data ingestion script
import os
import ibis
import toml
import json
import zulip
import inspect
import requests
import logging as log
from ibis import _
from dotenv import load_dotenv
from datetime import datetime, timedelta, date
from graphql_queries import (
issues_query,
pulls_query,
forks_query,
commits_query,
stargazers_query,
watchers_query,
)
# main function
def main():
# load environment variables
load_dotenv()
# ingest data
ingest_zulip()
ingest_pypi()
ingest_gh()# ingest_ci() # TODO: fix permissions, add assets
# helper functions
def write_json(data, filename):
# write the data to a file
with open(filename, "w") as f:
=4)
json.dump(data, f, indent
# ingest functions
def ingest_gh():
"""
Ingest the GitHub data.
"""
# configure logger
=log.INFO)
log.basicConfig(level
# constants
= "https://api.github.com/graphql"
GRAPH_URL
# load environment variables
= os.getenv("GITHUB_TOKEN")
GH_TOKEN
# load config
= toml.load("config.toml")["ingest"]["github"]
config f"Using repos: {config['repos']}")
log.info(
# construct header
= {
headers "Authorization": f"Bearer {GH_TOKEN}",
}
# map queries
= {
queries "issues": issues_query,
"pullRequests": pulls_query,
"commits": commits_query,
"forks": forks_query,
"stargazers": stargazers_query,
"watchers": watchers_query,
}
# define helper functions
def get_filename(query_name, page):
# return the filename
return f"{query_name}.{page:06}.json"
def get_next_link(link_header):
# if there is no link header, return None
if link_header is None:
return None
# split the link header into links
= link_header.split(", ")
links for link in links:
# split the link into segments
= link.split("; ")
segments
# if there are two segments and the second segment is rel="next"
if len(segments) == 2 and segments[1] == 'rel="next"':
# remove the < and > around the link
return segments[0].strip("<>")
# if there is no next link, return None
return None
def fetch_data(client, owner, repo, query_name, query, output_dir, num_items=100):
# initialize variables
= {
variables "owner": owner,
"repo": repo,
"num_items": num_items,
"before": "null",
}
# initialize page number
= 1
page
# while True
while True:
# request data
try:
f"\t\tFetching page {page}...")
log.info(= requests.post(
resp
GRAPH_URL,=headers,
headers={"query": query, "variables": variables},
json
)= resp.json()
json_data
f"\t\t\tStatus code: {resp.status_code}")
log.info(# log.info(f"\t\t\tResponse: {resp.text}")
# log.info(f"\t\t\tJSON: {json_data}")
if resp.status_code != 200:
log.error(f"\t\tFailed to fetch data for {owner}/{repo}; url={GRAPH_URL}\n\n {resp.status_code}\n {resp.text}"
)return
# extract data
if query_name == "commits":
= json_data["data"]["repository"]["defaultBranchRef"][
data "target"
"history"]["edges"]
][# get the next link
= json_data["data"]["repository"]["defaultBranchRef"][
cursor "target"
"history"]["pageInfo"]["endCursor"]
][= json_data["data"]["repository"]["defaultBranchRef"][
has_next_page "target"
"history"]["pageInfo"]["hasNextPage"]
][
else:
= json_data["data"]["repository"][query_name]["edges"]
data = json_data["data"]["repository"][query_name]["pageInfo"][
cursor "endCursor"
]= json_data["data"]["repository"][query_name][
has_next_page "pageInfo"
"hasNextPage"]
][
# save json to a file
= get_filename(query_name, page)
filename = os.path.join(output_dir, filename)
output_path f"\t\tWriting data to {output_path}")
log.info(
write_json(data, output_path)
"cursor"] = f"{cursor}"
variables[print(f"has_next_page={has_next_page}")
print(f"cursor={cursor}")
if not has_next_page:
break
# increment page number
+= 1
page except:
# print error if response
f"\t\tFailed to fetch data for {owner}/{repo}")
log.error(
try:
f"\t\t\tResponse: {resp.text}")
log.error(except:
pass
break
# create a requests session
with requests.Session() as client:
for repo in config["repos"]:
f"Fetching data for {repo}...")
log.info(for query in queries:
= repo.split("/")
owner, repo_name = os.path.join(
output_dir "data",
"ingest",
"github",
owner,
repo_name,
)=True)
os.makedirs(output_dir, exist_okf"\tFetching data for {owner}/{repo_name} {query}...")
log.info(
fetch_data(client, owner, repo_name, query, queries[query], output_dir)
def ingest_pypi():
"""
Ingest the PyPI data.
"""
# constants
# set DEFAULT_BACKFILL to the number of days
# since July 19th, 2015 until today
= (datetime.now() - datetime(2015, 7, 19)).days
DEFAULT_BACKFILL = "bigquery-public-data.pypi.file_downloads"
BIGQUERY_DATASET
# configure logger
=log.INFO)
log.basicConfig(level
# load environment variables
= os.getenv("BQ_PROJECT_ID")
project_id f"Project ID: {project_id}")
log.info(
# load config
= toml.load("config.toml")["ingest"]["pypi"]
config f"Packages: {config['packages']}")
log.info(
# configure lookback window
= config["backfill"] if "backfill" in config else DEFAULT_BACKFILL
backfill f"Backfill: {backfill}")
log.info(
# for each package
for package in config["packages"]:
f"Package: {package}")
log.info(# create output directory
= os.path.join("data", "ingest", "pypi", package)
output_dir =True)
os.makedirs(output_dir, exist_ok
# construct query
= f"""
query SELECT *
FROM `{BIGQUERY_DATASET}`
WHERE file.project = '{package}'
AND DATE(timestamp)
BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL {backfill} DAY)
AND CURRENT_DATE()
""".strip()
= inspect.cleandoc(query)
query
# connect to bigquery and execute query
= ibis.connect(f"bigquery://{project_id}")
con f"Executing query:\n{query}")
log.info(= con.sql(query)
t
# write to parquet
= f"file_downloads.parquet"
filename = os.path.join(output_dir, filename)
output_path f"Writing to: {output_path}")
log.info(
t.to_parquet(output_path)
def ingest_ci():
"""
Ingest the CI data.
"""
# constants
# set DEFAULT_BACKFILL to the number of days
# since July 19th, 2015 until today
= (datetime.now() - datetime(2015, 7, 19)).days
DEFAULT_BACKFILL
# configure logger
=log.INFO)
log.basicConfig(level
# load environment variables
= os.getenv("BQ_PROJECT_ID")
project_id f"Project ID: {project_id}")
log.info(
# load config
= toml.load("config.toml")["ingest"]["ci"]
config
# configure lookback window
= config["backfill"] if "backfill" in config else DEFAULT_BACKFILL
backfill f"Backfill: {backfill}")
log.info(
# make sure the data directory exists
"data/ingest/ci/ibis", exist_ok=True)
os.makedirs(
# connect to databases
= ibis.connect("duckdb://data/ingest/ci/ibis/raw.ddb")
con = ibis.connect(f"bigquery://{project_id}/workflows")
bq_con
# copy over tables
for table in bq_con.list_tables():
f"Writing table: {table}")
log.info(=True)
con.create_table(table, bq_con.table(table).to_pyarrow(), overwrite
def ingest_zulip():
"""Ingest the Zulip data."""
# constants
= "cody@dkdc.dev"
email
# load config
= toml.load("config.toml")["ingest"]["zulip"]
config f"Using url: {config['url']}")
log.info(
# configure logger
=log.INFO)
log.basicConfig(level
# load environment variables
= os.getenv("ZULIP_KEY")
zulip_key
# create the client
= zulip.Client(email=email, site=config["url"], api_key=zulip_key)
client
# get the users
= client.get_members()
r if r["result"] != "success":
f"Failed to get users: {r}")
log.error(else:
= r["members"]
members # make sure the directory exists
"data/ingest/zulip", exist_ok=True)
os.makedirs(
# write the users to a file
= "members.json"
filename = os.path.join("data", "ingest", "zulip", filename)
output_path f"Writing members to: {output_path}")
log.info(
write_json(members, output_path)
# get the messages
= []
all_messages = client.get_messages(
r "anchor": "newest", "num_before": 100, "num_after": 0, "type": "stream"}
{
)if r["result"] != "success":
f"Failed to get messages: {r}")
log.error(else:
= r["messages"]
messages
all_messages.extend(messages)while len(messages) > 1:
= client.get_messages(
r
{"anchor": messages[0]["id"],
"num_before": 100,
"num_after": 0,
"type": "stream",
}
)if r["result"] != "success":
f"Failed to get messages: {r}")
log.error(break
else:
= r["messages"]
messages
all_messages.extend(messages)
# make sure the directory exists
"data/ingest/zulip", exist_ok=True)
os.makedirs(
# write the messages to a file
= "messages.json"
filename = os.path.join("data", "ingest", "zulip", filename)
output_path f"Writing messages to: {output_path}")
log.info(
write_json(all_messages, output_path)
if __name__ == "__main__":
main()
Python’s requests
module is used for web API calls to GitHub’s REST and GraphQL APIs. The Zulip Python client is used for extracting data from there. Both of these sets of data are saved as JSON files, which we can read in with the Ibis DuckDB or other local backends.
The PyPI data is extracted using Ibis from BigQuery using raw SQL strings and saved as Parquet files. This is over ten million rows of data for the main Ibis package, so it takes a few minutes to run. We can also read in the Parquet files with Ibis.
Extract, transform, load (ETL)
It’s fun to just run
and watch my system usage spike as DuckDB goes brrrrr:
This executes the entire Dagster DAG, processing over 10 million rows of data in under a minute on my laptop.
Directed acyclic graph (DAG) and input/output management
We use Dagster to manage the ETL pipeline. I chose Dagster to try something new and I like its software-defined assets. Relationships between them in the DAG don’t need to be explicitly defined, and rather are inferred from function names and input parameters. Outside of the input/output management code (shown below), there is almost no Dagster-specific code other than some Python decorators. The rest is just Python code, using Ibis for data processing.
We can just dag
and view the DAG in a web browser with the Dagster GUI:
Since this pipeline is so simple, there are no joins and thus no assets with multiple inputs. Of course, joins and other more complex operations are possible with Ibis.
The DAG is defined in the assets
directory, separated into the three conventional ETL stages:
dag/assets/
├── __init__.py
├── extract/
│ ├── __init__.py
│ ├── backends.py
│ ├── github.py
│ ├── pypi.py
│ └── zulip.py
├── load/
│ ├── __init__.py
│ ├── backends.py
│ ├── github.py
│ ├── pypi.py
│ └── zulip.py
└── transform/
├── __init__.py
├── backends.py
├── github.py
├── pypi.py
└── zulip.py
Ibis makes data input/output management easy with file formats or tables in backends. I define some table IO managers and use the DuckDBIOManager
as the default for the project.
Show the input/output management code
import os
import ibis
from dagster import ConfigurableIOManager
class ParquetIOManager(ConfigurableIOManager):
"""
Manage tables as parquet files.
"""
str = "parquet"
extension: str = os.path.join("data", "system", "parquet")
base_path:
def handle_output(self, context, obj):
= self._get_paths(context)
dirname, filename =True)
os.makedirs(dirname, exist_ok= os.path.join(dirname, filename)
output_path
obj.to_parquet(output_path)
def load_input(self, context):
= self._get_paths(context)
dirname, filename = os.path.join(dirname, filename)
input_path return ibis.read_parquet(input_path)
def _get_paths(self, context):
= context.step_context.job_def.asset_layer.assets_def_for_asset(
group_name
context.asset_key
).group_names_by_key[context.asset_key]= os.path.join(self.base_path, group_name, *context.asset_key.path[:-1])
dirname = f"{context.asset_key.path[-1]}.{self.extension}"
filename return dirname, filename
class DeltaIOManager(ConfigurableIOManager):
"""
Manage tables as delta tables.
"""
str = "delta"
extension: str = os.path.join("data", "system", "delta")
base_path: str = "overwrite"
delta_write_mode:
def handle_output(self, context, obj):
= self._get_paths(context)
dirname, filename =True)
os.makedirs(dirname, exist_ok= os.path.join(dirname, filename)
output_path =self.delta_write_mode)
obj.to_delta(output_path, mode
def load_input(self, context):
= self._get_paths(context)
dirname, filename = os.path.join(dirname, filename)
input_path return ibis.read_delta(input_path)
def _get_paths(self, context):
= context.step_context.job_def.asset_layer.assets_def_for_asset(
group_name
context.asset_key
).group_names_by_key[context.asset_key]= os.path.join(self.base_path, *context.asset_key.path)
dirname = f"{context.asset_key.path[-1]}.{self.extension}"
filename return dirname, filename
class DuckDBIOManager(ConfigurableIOManager):
"""
Manage tables as duckdb files.
"""
str = "ddb"
extension: str = os.path.join("data", "system", "duckdb")
base_path:
def handle_output(self, context, obj):
= self._get_paths(context)
dirname, filename =True)
os.makedirs(dirname, exist_ok= os.path.join(dirname, filename)
output_path = ibis.duckdb.connect(output_path)
con -1], obj.to_pyarrow(), overwrite=True)
con.create_table(context.asset_key.path[
def load_input(self, context):
= self._get_paths(context)
dirname, filename = os.path.join(dirname, filename)
input_path = ibis.duckdb.connect(input_path)
con return con.table(context.asset_key.path[-1])
def _get_paths(self, context):
= context.step_context.job_def.asset_layer.assets_def_for_asset(
group_name
context.asset_key
).group_names_by_key[context.asset_key]= os.path.join(self.base_path, *context.asset_key.path)
dirname = f"{context.asset_key.path[-1]}.{self.extension}"
filename return dirname, filename
With this setup, we can swap file formats or backends for storage as desired. After executing the DAG, the following data is written to our local filesystem:
data/system/
└── duckdb/
├── extract_backends/
│ └── extract_backends.ddb
├── extract_commits/
│ └── extract_commits.ddb
├── extract_downloads/
│ └── extract_downloads.ddb
├── extract_forks/
│ └── extract_forks.ddb
├── extract_issues/
│ └── extract_issues.ddb
├── extract_pulls/
│ └── extract_pulls.ddb
├── extract_stars/
│ └── extract_stars.ddb
├── extract_watchers/
│ └── extract_watchers.ddb
├── extract_zulip_members/
│ └── extract_zulip_members.ddb
├── extract_zulip_messages/
│ └── extract_zulip_messages.ddb
├── load_backends/
│ └── load_backends.ddb
├── load_commits/
│ └── load_commits.ddb
├── load_downloads/
│ └── load_downloads.ddb
├── load_forks/
│ └── load_forks.ddb
├── load_issues/
│ └── load_issues.ddb
├── load_pulls/
│ └── load_pulls.ddb
├── load_stars/
│ └── load_stars.ddb
├── load_watchers/
│ └── load_watchers.ddb
├── load_zulip_members/
│ └── load_zulip_members.ddb
├── load_zulip_messages/
│ └── load_zulip_messages.ddb
├── transform_backends/
│ └── transform_backends.ddb
├── transform_commits/
│ └── transform_commits.ddb
├── transform_downloads/
│ └── transform_downloads.ddb
├── transform_forks/
│ └── transform_forks.ddb
├── transform_issues/
│ └── transform_issues.ddb
├── transform_pulls/
│ └── transform_pulls.ddb
├── transform_stars/
│ └── transform_stars.ddb
├── transform_watchers/
│ └── transform_watchers.ddb
├── transform_zulip_members/
│ └── transform_zulip_members.ddb
└── transform_zulip_messages/
└── transform_zulip_messages.ddb
We can change the configuration in the project to change these to Parquet or Delta Lake tables.
Common Python functions are defined in dag/functions.py
, including a fancy user-defined function (UDF) for regex matching.
A LLM wrote the entire clean_version
function, I’m never writing a regex again.
import re
import ibis
import datetime
import ibis.selectors as s
# udfs
@ibis.udf.scalar.python
def clean_version(version: str, patch: bool = True) -> str:
= r"(\d+\.\d+\.\d+)" if patch else r"(\d+\.\d+)"
pattern = re.search(pattern, version)
match if match:
return match.group(1)
else:
return version
# functions
def now():
return datetime.datetime.now()
def today():
return now().date()
def clean_data(t):
= t.rename("snake_case")
t # t = t.mutate(s.across(s.of_type("timestamp"), lambda x: x.cast("timestamp('')")))
return t
def add_ingested_at(t, ingested_at=now()):
= t.mutate(ingested_at=ingested_at).relocate("ingested_at")
t return t
These are imported as from dag import functions as f
by convention in the project.
Extract
We extract the data from the ingested files using Ibis with the default DuckDB backend.
The PyPI download data is in Parquet format:
import ibis
import dagster
from dag import functions as f
# assets
@dagster.asset
def extract_downloads():
"""
Extract the ingested PyPI downloads data.
"""
= f.clean_data(
downloads "data/ingest/pypi/ibis-framework/*.parquet")
ibis.read_parquet(
)return downloads
While the GitHub (omitted for brevity) and Zulip data is in JSON format:
import ibis
import dagster
from dag import functions as f
# assets
@dagster.asset
def extract_zulip_members():
"""
Extract the ingested Zulip members data.
"""
= f.clean_data(ibis.read_json("data/ingest/zulip/members.json"))
members return members
@dagster.asset
def extract_zulip_messages():
"""
Extract the ingested Zulip messages data.
"""
= f.clean_data(ibis.read_json("data/ingest/zulip/messages.json"))
messages return messages
As shown above, Dagster assets are configured to use the DuckDB table manager so the output of these functions are written to separate DuckDB database files for use downstream in the DAG.
Transform
With the data extracted, we can now transform it into its desired shape for downstream analytics. The most interesting transformation code is the PyPI data, shown here:
import ibis
import dagster
from dag import functions as f
# assets
@dagster.asset
def transform_downloads(extract_downloads):
"""
Transform the PyPI downloads data.
"""
= f.clean_data(
downloads "project").unpack("file").unpack("details")
extract_downloads.drop(
)= downloads.mutate(
downloads =f.clean_version(downloads["version"], patch=True),
major_minor_patch=f.clean_version(downloads["version"], patch=False).cast("float"),
major_minor
)= downloads.rename(
downloads
{"version_raw": "version",
"version": "major_minor_patch",
}
)= (
downloads
downloads.group_by(
["D").name("timestamp"),
ibis._.timestamp.truncate(
ibis._.country_code,
ibis._.version,
ibis._.python,"name"].name("system"),
ibis._.system[
]
)
.agg("downloads"),
ibis._.count().name(
)
.order_by(ibis._.timestamp.desc())
.mutate(sum()
ibis._.downloads.
.over(=(0, None),
rows=["country_code", "version", "python", "system"],
group_by=ibis._.timestamp.desc(),
order_by
)"total_downloads")
.name(
)
.order_by(ibis._.timestamp.desc())
)= downloads.mutate(ibis._["python"].fillna("").name("python_full"))
downloads = downloads.mutate(
downloads "python_full"], patch=False).name("python")
f.clean_version(downloads[
)return downloads
Like the extract stage, the output of the transform stage is written to DuckDB database files for use downstream in the DAG.
Load
Honestly this step isn’t doing anything. It could use Ibis to directly upload the tables to MotherDuck, but as implemented these assets are just passing through the transformed data. This is a bit wasteful but allows for a three-stage DAG that conforms to the ETL paradigm.
import ibis
import dagster
from dag import functions as f
# assets
@dagster.asset
def load_downloads(transform_downloads):
"""
Finalize the PyPI downloads data.
"""
return transform_downloads
Postprocess
We run a postprocessing script to combine the separate data into a single DuckDB database. This step is not necessary, but it makes it easier to query the data locally from a single Ibis connection.
Show the postprocessing script
import os
import ibis
import fnmatch
import logging as log
from datetime import datetime, timedelta, date
## local imports
from dag import functions as f
def main():
postprocess()
def postprocess() -> None:
"""
Postprocess the data.
"""
# configure logger
log.basicConfig(=log.INFO,
level
)
# backup loaded data as Delta Lake tables and a DuckDB database
= "data/system/duckdb"
source_path = "data/data.ddb"
target_path
=True)
os.makedirs(source_path, exist_ok
= ibis.duckdb.connect(target_path)
target
= f.now()
ingested_at
for root, dirs, files in os.walk(source_path):
for file in files:
if fnmatch.fnmatch(file, "load_*.ddb"):
= os.path.join(root, file)
full_path = ibis.duckdb.connect(full_path)
con = file.replace(".ddb", "")
tablename = con.table(tablename)
table = tablename.replace("load_", "")
tablename
f"Backing up {tablename} to {target_path}...")
log.info(=True)
target.create_table(tablename, table.to_pyarrow(), overwrite
f"Backing up {tablename} to data/backup/{tablename}.delta...")
log.info(=ingested_at).to_delta(
table.mutate(ingested_atf"data/backup/{tablename}.delta", mode="overwrite"
)
if __name__ == "__main__":
main()
This script also backs up the data as Delta Lake tables for good measure. After this, our data directory looks like:
data/
├── backup/
│ ├── backends.delta/
│ ├── commits.delta/
│ ├── downloads.delta/
│ ├── forks.delta/
│ ├── issues.delta/
│ ├── pulls.delta/
│ ├── stars.delta/
│ ├── watchers.delta/
│ ├── zulip_members.delta/
│ └── zulip_messages.delta/
├── data.ddb
├── ingest/
│ ├── github/
│ ├── pypi/
│ └── zulip/
└── system/
└── duckdb/
Exploratory data analysis (EDA) and iteration
While EDA is not part of the production pipeline, it is an essential part of the development workflow. The config.toml
shown earlier makes it easy to switch between the local data or the production MotherDuck database.
The eda.py
script in the root of the repo imports useful stuff and connects to the database with Ibis:
import re
import os
import sys
import toml
import ibis
import requests
import logging as log
import plotly.io as pio
import ibis.selectors as s
import plotly.express as px
from rich import print
from dotenv import load_dotenv
from datetime import datetime, timedelta, date
## local imports
from dag import functions as f
from dag.assets import extract, load, transform
# configuration
## logger
=log.INFO)
log.basicConfig(level
## config.toml
= toml.load("config.toml")["eda"]
config
## load .env file
load_dotenv()
## ibis config
= True
ibis.options.interactive repr.interactive.max_rows = 20
ibis.options.repr.interactive.max_columns = None
ibis.options.
# variables
= datetime.now()
NOW = NOW - timedelta(days=7)
NOW_7 = NOW - timedelta(days=30)
NOW_30 = NOW - timedelta(days=90)
NOW_90 = NOW - timedelta(days=180)
NOW_180 = NOW - timedelta(days=365)
NOW_365 = NOW - timedelta(days=3650)
NOW_10
# connect to database
= config["database"]
database f"database: {database}")
log.info(= ibis.connect(f"duckdb://{database}") con
I can then just eda
to open a quick iPython session and explore:
@voda ibis-analytics % just eda
(venv) cody3.11.5 (main, Sep 14 2023, 13:17:51) [Clang 14.0.3 (clang-1403.0.22.14.1)]
Python 'copyright', 'credits' or 'license' for more information
Type 8.20.0 -- An enhanced Interactive Python. Type '?' for help.
IPython
INFO:root:database: md:ibis_analytics
1]: con.list_tables()
[ins] In [1]:
Out['backends',
['commits',
'downloads',
'forks',
'issues',
'pulls',
'stars',
'watchers',
'zulip_members',
'zulip_messages']
2]: t = con.table("stars")
[ins] In [
3]: t.schema()
[ins] In [3]:
Out[
ibis.Schema {
starred_at timestampid string
login string
name string
company string
created_at timestamp
updated_at timestamp
total_stars int64
}
4]: t = t.select("starred_at", "login", "company")
[ins] In [
5]: t.filter(t.company.lower().contains("voltron"))
[ins] In [5]:
Out[
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ starred_at ┃ login ┃ company ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ timestamp │ string │ string │
├─────────────────────┼─────────────────────┼──────────────┤2023-08-24 21:09:55 │ ywc88 │ @voltrondata │
│ 2023-08-10 22:47:14 │ EpsilonPrime │ Voltron Data │
│ 2023-08-10 08:14:52 │ fmichonneau │ @voltrondata │
│ 2023-08-09 20:16:42 │ paleolimbot │ @voltrondata │
│ 2023-08-09 18:31:02 │ ian-flores │ @voltrondata │
│ 2023-08-09 18:29:57 │ MattBBaker │ Voltron Data │
│ 2023-08-09 18:27:29 │ zeroshade │ @voltrondata │
│ 2023-08-09 18:27:10 │ kkraus14 │ @VoltronData │
│ 2023-08-09 18:26:59 │ cryos │ @VoltronData │
│ 2023-08-09 18:26:48 │ austin3dickey │ Voltron Data │
│ 2023-08-09 18:26:41 │ assignUser │ @voltrondata │
│ 2023-08-09 18:26:39 │ boshek │ Voltron Data │
│ 2023-05-04 00:05:23 │ lostmygithubaccount │ Voltron Data │
│ 2023-04-20 20:42:57 │ richtia │ Voltron Data │
│ 2023-04-12 18:58:06 │ ksuarez1423 │ @voltrondata │
│ 2023-04-06 16:35:01 │ wmalpica │ Voltron Data │
│ 2023-03-15 17:04:46 │ felipecrv │ Voltron Data │
│ 2023-03-15 15:46:25 │ alistaire47 │ Voltron Data │
│ 2023-03-14 18:41:58 │ mariusvniekerk │ @voltrondata │
│ 2023-02-23 15:36:15 │ andrewseidl │ @voltrondata │
│
│ … │ … │ … │
└─────────────────────┴─────────────────────┴──────────────┘
6]: [ins] In [
Or I can add from eda import *
in a untitled.ipynb
for a classic notebook experience in VSCode, which I prefer when plotting or just saving my work.
Defining metrics and dashboarding
The EDA makes it easy to iteratively define metrics and plot the data. Once I’ve settled on the code I want, I add it to metrics.py
with sufficient comments to make it appear in the Streamlit dashboard. It’s convenient to have the metrics defined as code right where the visualization code happens. I can comment it for explanation of metrics definitions and ensure a level of consistency.
Show the metrics code
import toml
import ibis
import streamlit as st
import plotly.express as px
from dotenv import load_dotenv
from datetime import datetime, timedelta
# options
## load .env
load_dotenv()
## config.toml
= toml.load("config.toml")["app"]
config
## streamlit config
="wide")
st.set_page_config(layout
## ibis config
= ibis.connect(f"duckdb://{config['database']}", read_only=True)
con
# use precomputed data
= con.table("stars")
stars = con.table("forks")
forks = con.table("pulls")
pulls = con.table("issues")
issues = con.table("backends")
backends = con.table("downloads")
downloads = con.table("zulip_members")
members = con.table("zulip_messages")
messages
# display header stuff
with open("readme.md") as f:
= f.read()
readme_code
f"""
{readme_code} See [the about page](/about) for more details.
"""
with open("requirements.txt") as f:
= f.read()
metrics_code
with st.expander("show `requirements.txt`", expanded=False):
=True, language="python")
st.code(metrics_code, line_numbers
with open("config.toml") as f:
= f.read()
config_code
with st.expander("show `config.toml`", expanded=False):
=True, language="toml")
st.code(config_code, line_numbers
with open("justfile") as f:
= f.read()
justfile_code
with st.expander("show `justfile`", expanded=False):
=True, language="makefile")
st.code(justfile_code, line_numbers
with open(".github/workflows/cicd.yaml") as f:
= f.read()
cicd_code
with st.expander("show `cicd.yaml`", expanded=False):
=True, language="yaml")
st.code(cicd_code, line_numbers
with open("metrics.py") as f:
= f.read()
metrics_code
with st.expander("show `metrics.py` (source for this page)", expanded=False):
=True, language="python")
st.code(metrics_code, line_numbers
"""
---
"""
"""
## supported backends
"""
def fmt_number(value):
return f"{value:,}"
= (
current_backends_total filter(backends.ingested_at == backends.ingested_at.max())
backends.max()
.num_backends.
.to_pandas()
)= backends.backends.unnest().name("backends").as_table()
current_backends
"Total", f"{current_backends_total:,}")
st.metric(=True)
st.dataframe(current_backends, use_container_width
"""
## totals (all time)
"""
= stars.login.nunique().to_pandas()
total_stars_all_time = forks.login.nunique().to_pandas()
total_forks_all_time
= issues.number.nunique(
total_closed_issues_all_time =issues.state == "closed"
where
).to_pandas()
= (
total_merged_pulls_all_time, total_contributors_all_time
pulls.agg(=pulls.number.nunique(where=pulls.state == "merged"),
total_merged_pulls_all_time=pulls.login.nunique(
total_contributors_all_time=pulls.merged_at.notnull()
where
),
)
.to_pandas()
.squeeze()
)
= downloads["downloads"].sum().to_pandas()
downloads_all_time
= members.user_id.nunique().to_pandas()
total_members_all_time = messages.id.nunique().to_pandas()
total_messages_all_time
= st.columns(4)
col0, col1, col2, col3 with col0:
st.metric(="GitHub stars",
label=fmt_number(total_stars_all_time),
valuehelp=f"{total_stars_all_time:,}",
)
st.metric(="PyPI downloads",
label=fmt_number(downloads_all_time),
valuehelp=f"{downloads_all_time:,}",
)with col1:
st.metric(="GitHub contributors",
label=fmt_number(total_contributors_all_time),
valuehelp=f"{total_contributors_all_time:,}",
)
st.metric(="GitHub forks",
label=fmt_number(total_forks_all_time),
valuehelp=f"{total_forks_all_time:,}",
)with col2:
st.metric(="GitHub PRs merged",
label=fmt_number(total_merged_pulls_all_time),
valuehelp=f"{total_merged_pulls_all_time:,}",
)
st.metric(="GitHub issues closed",
label=fmt_number(total_closed_issues_all_time),
valuehelp=f"{total_closed_issues_all_time:,}",
)with col3:
st.metric(="Zulip members",
label=fmt_number(total_members_all_time),
valuehelp=f"{total_members_all_time:,}",
)
st.metric(="Zulip messages",
label=fmt_number(total_messages_all_time),
valuehelp=f"{total_messages_all_time:,}",
)
# variables
with st.form(key="app"):
= st.number_input(
days "X days",
=1,
min_value=365,
max_value=90,
value=30,
stepformat="%d",
)= st.form_submit_button(label="update")
update_button
= datetime.now() - timedelta(days=days * 2)
START = datetime.now() - timedelta(days=days)
STOP
# compute metrics
= (
total_stars, total_stars_prev
stars.agg(=stars.login.nunique(where=stars.starred_at >= STOP),
total_stars=stars.login.nunique(
total_stars_prev=stars.starred_at.between(START, STOP)
where
),
)
.to_pandas()
.squeeze()
)
= (
total_forks, total_forks_prev
forks.agg(=forks.login.nunique(where=forks.created_at >= STOP),
total_forks=forks.login.nunique(
total_forks_prev=forks.created_at.between(START, STOP)
where
),
)
.to_pandas()
.squeeze()
)
(
total_issues,
total_issues_prev,
total_issues_closed,
total_issues_closed_prev,= (
)
issues.agg(=issues.login.nunique(where=issues.created_at >= STOP),
total_issues=issues.login.nunique(
total_issues_prev=issues.created_at.between(START, STOP)
where
),=issues.number.nunique(where=issues.closed_at >= STOP),
total_issues_closed=issues.number.nunique(
total_issues_closed_prev=issues.closed_at.between(START, STOP)
where
),
)
.to_pandas()
.squeeze()
)
(
total_pulls,
total_pulls_prev,
total_pulls_merged,
total_pulls_merged_prev,
total_contributors,
total_contributors_prev,= (
)
pulls.agg(=pulls.number.nunique(where=pulls.created_at >= STOP),
total_pulls=pulls.number.nunique(
total_pulls_prev=pulls.created_at.between(START, STOP)
where
),=pulls.number.nunique(where=pulls.merged_at >= STOP),
total_pulls_merged=pulls.number.nunique(
total_pulls_merged_prev=pulls.merged_at.between(START, STOP)
where
),=pulls.login.nunique(where=pulls.merged_at >= STOP),
total_contributors=pulls.login.nunique(
total_contributors_prev=pulls.merged_at.between(START, STOP)
where
),
)
.to_pandas()
.squeeze()
)
= (
total_downloads, total_downloads_prev
downloads.agg(=downloads.downloads.sum(where=downloads.timestamp >= STOP),
total_downloads=downloads.downloads.sum(
total_downloads_prev=downloads.timestamp.between(START, STOP)
where
),
)
.to_pandas()
.squeeze()
)
def delta(current, previous):
= current - previous
delta = int(round(100.0 * delta / previous, 0))
pct_change return f"{fmt_number(delta)} ({pct_change:d}%)"
f"""
## totals (last {days} days)
"""
= st.columns(4)
col1, col2, col3, col4 with col1:
st.metric(="GitHub stars",
label=fmt_number(total_stars),
value=delta(total_stars, total_stars_prev),
deltahelp=f"{total_stars:,}",
)
st.metric(="PyPI downloads",
label=fmt_number(total_downloads),
value=delta(total_downloads, total_downloads_prev),
deltahelp=f"{total_downloads:,}",
)with col2:
st.metric(="GitHub contributors",
label=fmt_number(total_contributors),
value=delta(total_contributors, total_contributors_prev),
deltahelp=f"{total_contributors:,}",
)
st.metric(="GitHub forks created",
label=fmt_number(total_forks),
value=delta(total_forks, total_forks_prev),
deltahelp=f"{total_forks:,}",
)with col3:
st.metric(="GitHub PRs opened",
label=fmt_number(total_pulls),
value=delta(total_pulls, total_pulls_prev),
deltahelp=f"{total_pulls:,}",
)
st.metric(="GitHub issues opened",
label=fmt_number(total_issues),
value=delta(total_issues, total_issues_prev),
deltahelp=f"{total_issues:,}",
)with col4:
st.metric(="GitHub PRs merged",
label=fmt_number(total_pulls_merged),
value=delta(total_pulls_merged, total_pulls_merged_prev),
deltahelp=f"{total_pulls_merged:,}",
)
st.metric(="GitHub issues closed",
label=fmt_number(total_issues_closed),
value=delta(total_issues_closed, total_issues_closed_prev),
deltahelp=f"{total_issues_closed:,}",
)
f"""
## data (last {days} days)
"""
"""
### downloads by system and version
"""
= px.bar(
c0
downloads.group_by([ibis._.system, ibis._.version])=lambda t: t.downloads.sum(where=t.timestamp > STOP))
.agg(downloads
.order_by(ibis._.version.desc()),="version",
x="downloads",
y="system",
color="downloads by system and version",
title
)=True)
st.plotly_chart(c0, use_container_width
"""
### stars by company
"""
st.dataframe(
stars.group_by(ibis._.company)=lambda t: t.count(where=t.starred_at > STOP))
.agg(starsfilter(ibis._.stars > 0)
.
.order_by(ibis._.stars.desc())
.to_pandas(),=True,
use_container_width
)
"""
### issues by login
"""
= px.bar(
c1
issues.group_by([ibis._.login, ibis._.state])=lambda t: t.count(where=t.created_at > STOP))
.agg(issuesfilter(ibis._.issues > 0)
.
.order_by(ibis._.issues.desc()),="login",
x="issues",
y="state",
color="issues by login",
title
)=True)
st.plotly_chart(c1, use_container_width
"""
### PRs by login
"""
= px.bar(
c2
pulls.group_by([ibis._.login, ibis._.state])=lambda t: t.count(where=t.created_at > STOP))
.agg(pullsfilter(ibis._.pulls > 0)
.
.order_by(ibis._.pulls.desc()),="login",
x="pulls",
y="state",
color="PRs by login",
title
)=True) st.plotly_chart(c2, use_container_width
I can just app
to view the dashboard locally in a web browser:
If I need more detailed analysis, I can always go back to EDA above and iterate.
Deploy
To deploy to production, we use MotherDuck for a SaaS database and Streamlit Community Cloud for a SaaS dashboard.
Deploy the database with MotherDuck
We just deploy
to upload it to the production MotherDuck database.
This runs some Python code that takes the loaded tables and copies them to MotherDuck:
import os
import toml
import ibis
import fnmatch
import logging as log
from datetime import datetime, timedelta, date
import functions as f
def main():
deploy()
def deploy() -> None:
"""
Deploy the data.
"""
# constants
= "data/system/duckdb"
path
# configure logging
log.basicConfig(=log.INFO,
level
)
# load config
= toml.load("config.toml")["app"]
config f"Deploying to {config['database']}...")
log.info(
# connect to the database
= ibis.duckdb.connect(f"{config['database']}")
target
for root, dirs, files in os.walk(path):
for file in files:
if fnmatch.fnmatch(file, "load_*.ddb"):
= os.path.join(root, file)
full_path = ibis.duckdb.connect(full_path)
con = file.replace(".ddb", "")
tablename = con.table(tablename)
table = tablename.replace("load_", "")
tablename
f"\tDeploying {tablename} to {config['database']}...")
log.info(=True)
target.create_table(tablename, table.to_pyarrow(), overwrite
if __name__ == "__main__":
main()
Deploy the dashboard with Streamlit Community Cloud
Then, just deploy a Streamlit app from your GitHub repository (a few clicks in the GUI) and you’re done!
Conclusion
The future is now: it’s open-source, modular, composable, and scalable.
An end-to-end analytics pipeline this easy for a product manager to build was not possible just a few years ago. The increasingly modular, composable, and scalable Python data ecosystem has seen an abundance of new libraries that are pushing the limits of what individuals or small teams of data professionals can accomplish, all while efficiently utilizing local and cloud hardware at little to no cost.
I would love any feedback on this project. The best part is if you don’t like any one component (or prefer another), just swap it out! Altair or plotnine or another visualization library for Plotly. The Polars or DataFusion or Snowflake or PySpark or BigQuery backend for Ibis. A different dashboard library. A different cloud service. A different CI/CD service…
While I wouldn’t consider this a good project template for teams of engineers, it could be used to create one or with very little change adapted for other open-source Python projects. I hope you enjoyed!
The source code can be found and stolen from on the ibis-analytics
repository – feel free to open an issue, PR, or comment below!
Next steps
I plan to add some basic ML forecasting and likely a LLM interface to the dashboard. Follow along with the Ibis project to see what’s next!