This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
web application for flight log analysis & review

Flight Review This is a web application for flight log analysis. It allows users to upload ULog flight logs, and analyze them through the browser. It

PX4 Drone Autopilot 145 Dec 20, 2022
Compute and visualise incidence (reworking of the original incidence package)

incidence2 incidence2 is an R package that implements functions and classes to compute, handle and visualise incidence from linelist data. It refocuss

15 Nov 22, 2022
Cartopy - a cartographic python library with matplotlib support

Cartopy is a Python package designed to make drawing maps for data analysis and visualisation easy. Table of contents Overview Get in touch License an

1.2k Jan 01, 2023
:art: Diagram as Code for prototyping cloud system architectures

Diagrams Diagram as Code. Diagrams lets you draw the cloud system architecture in Python code. It was born for prototyping a new system architecture d

MinJae Kwon 27.5k Dec 30, 2022
These data visualizations were created for my introductory computer science course using Python

Homework 2: Matplotlib and Data Visualization Overview These data visualizations were created for my introductory computer science course using Python

Sophia Huang 12 Oct 20, 2022
Alternative layout visualizer for ZSA Moonlander keyboard

General info This is a keyboard layout visualizer for ZSA Moonlander keyboard (because I didn't find their Oryx or their training tool particularly us

10 Jul 19, 2022
An intuitive library to add plotting functionality to scikit-learn objects.

Welcome to Scikit-plot Single line functions for detailed visualizations The quickest and easiest way to go from analysis... ...to this. Scikit-plot i

Reiichiro Nakano 2.3k Dec 31, 2022
3D Vision functions with end-to-end support for deep learning developers, written in Ivy.

Ivy vision focuses predominantly on 3D vision, with functions for camera geometry, image projections, co-ordinate frame transformations, forward warping, inverse warping, optical flow, depth triangul

Ivy 61 Dec 29, 2022
Getting started with Python, Dash and Plot.ly for the Data Dashboards team

data_dashboards Getting started with Python, Dash and Plot.ly for the Data Dashboards team Getting started MacOS users: # Install the pyenv version ma

Department for Levelling Up, Housing and Communities 1 Nov 08, 2021
Draw datasets from within Jupyter.

drawdata This small python app allows you to draw a dataset in a jupyter notebook. This should be very useful when teaching machine learning algorithm

vincent d warmerdam 505 Nov 27, 2022
The official colors of the FAU as matplotlib/seaborn colormaps

FAU - Colors The official colors of Friedrich-Alexander-Universität Erlangen-Nürnberg (FAU) as matplotlib / seaborn colormaps. We support the old colo

Machine Learning and Data Analytics Lab FAU 9 Sep 05, 2022
Jupyter notebook and datasets from the pandas Q&A video series

Python pandas Q&A video series Read about the series, and view all of the videos on one page: Easier data analysis in Python with pandas. Jupyter Note

Kevin Markham 2k Jan 05, 2023
A GUI for Pandas DataFrames

PandasGUI A GUI for analyzing Pandas DataFrames. Demo Installation Install latest release from PyPi: pip install pandasgui Install directly from Githu

Adam 2.8k Jan 03, 2023
University of Missouri - Kansas City: CS451R: Capstone

CS451RC University of Missouri - Kansas City: CS451R: Capstone Installation cd git clone https://github.com/ala2q6/CS451RC.git cd CS451RC pip3 instal

Alex Arbuckle 1 Nov 17, 2021
Yata is a fast, simple and easy Data Visulaization tool, running on python dash

Yata is a fast, simple and easy Data Visulaization tool, running on python dash. The main goal of Yata is to provide a easy way for persons with little programming knowledge to visualize their data e

Cybercreek 3 Jun 28, 2021
The interactive graphing library for Python (includes Plotly Express) :sparkles:

plotly.py Latest Release User forum PyPI Downloads License Data Science Workspaces Our recommended IDE for Plotly’s Python graphing library is Dash En

Plotly 12.7k Jan 05, 2023
DataVisualization - The evolution of my arduino and python journey. New level of competence achieved

DataVisualization - The evolution of my arduino and python journey. New level of competence achieved

1 Jan 03, 2022
Some examples with MatPlotLib library in Python

MatPlotLib Example Some examples with MatPlotLib library in Python Point: Run files only in project's directory About me Full name: Matin Ardestani Ag

Matin Ardestani 4 Mar 29, 2022
Main repository for Vispy

VisPy: interactive scientific visualization in Python Main website: http://vispy.org VisPy is a high-performance interactive 2D/3D data visualization

vispy 3k Jan 03, 2023
The plottify package is makes matplotlib plots more legible

plottify The plottify package is makes matplotlib plots more legible. It's a thin wrapper around matplotlib that automatically adjusts font sizes, sca

Andy Jones 97 Nov 04, 2022