Building Apache Flink Applications in Java is a course to introduce Apache Flink through a series of hands-on exercises, and it is provided by Confluent. Utilising the Flink DataStream API, the course develops three Flink applications that populate multiple source data sets, collect them into a standardised data set, and aggregate it to produce usage statistics. As part of learning the Flink DataStream API in Pyflink, I converted the Java apps into Python equivalent while performing the course exercises in Pyflink. This post summarises the progress of the conversion and shows the final output.

Architecture

There are two airlines (SkyOne and Sunset) and they have their own flight data in different schemas. While the course ingests the source data into corresponding topics using a Flink application that makes use of the DataGen DataStream Connector, we use a Kafka producer application here because the DataGen connector is not available for python.

The flight importer job reads the messages from the source topics, standardises them into the flight data schema, and pushed into another Kafka topic, called flightdata. It is developed using Pyflink.

The usage statistics calculator sources the flightdata topic and calculates usage statistics over a one-minute window, which is grouped by email address. Moreover, while accessing the global state, it produces cumulative usage statistics, which carries information from one window to the next. It is developed using Pyflink as well.

Course Contents

Below describes course contents. ✅ and ☑️ indicate exercises and course materials respectively. The lesson 3 covers how to set up Kafka and Flink clusters using Docker Compose. The Kafka producer app is created as the lesson 5 exercise. The final versions of the flight importer job and usage statistics calculator can be found as exercises of the lesson 16 and 20 respectively.

  1. Apache Flink with Java - An Introduction
  2. Datastream Programming
  3. ✅ How to Start Flink and Get Setup (Exercise)
    • Built Kafka and Flink clusters using Docker
    • Bitnami images are used for the Kafka cluster - see this page for details.
    • A custom Docker image (building-pyflink-apps:1.17.1) is created to install Python and the Pyflink package as well as to save dependent Jar files
      • See the Dockerfile, and it can be built by docker build -t=building-pyflink-apps:1.17.1 .
    • See the docker-compose.yml and the clusters can be started by docker-compose up -d
  4. ☑️ The Flink Job Lifecycle
    • A minimal example of executing a Pyflink app is added.
    • See course content(s) below
  5. ✅ Running a Flink Job (Exercise)
    • Pyflink doesn’t have the DataGen DataStream connector. Used a Kafka producer instead to create topics and send messages.
      • 4 topics are created (skyone, sunset, flightdata and userstatistics) and messages are sent to the first two topics.
    • See course content(s) below
      • s05_data_gen.py
        • Topics are created by a flag argument so add it if it is the first time running it. i.e. python src/s05_data_gen.py --create. Basically it deletes the topics if exits and creates them.
  6. Anatomy of a Stream
  7. Flink Data Sources
  8. ✅ Creating a Flink Data Source (Exercise)
    • It reads from the skyone topic and prints the values. The values are deserialized as string in this exercise.
    • This and all the other Pyflink applications can be executed locally or run in the Flink cluster. See the script for details.
    • See course content(s) below
  9. Serializers & Deserializers
  10. ✅ Deserializing Messages in Flink (Exercise)
    • The skyone message values are deserialized as Json string and they are returned as the named Row type. As the Flink type is not convenient for processing, it is converted into a Python object, specifically Data Classes.
    • See course content(s) below
  11. ☑️ Transforming Data in Flink
  12. ✅ Flink Data Transformations (Exercise)
    • The source data is transformed into the flight data. Later data from skyone and sunset will be converted into this schema for merging them.
    • The transformation is performed in a function called define_workflow for being tested. This function will be updated gradually.
    • See course content(s) below
  13. Flink Data Sinks
  14. ✅ Creating a Flink Data Sink (Exercise)
    • The converted data from skyone will be pushed into a Kafka topic (flightdata).
    • Note that, as the Python Data Classes cannot be serialized, records are converted into the named Row type before being sent.
    • See course content(s) below
  15. ☑️ Creating Branching Data Streams in Flink
    • Various branching methods are illustrated, which covers Union, CoProcessFunction, CoMapFunction, CoFlatMapFunction, and Side Outputs.
    • See course content(s) below
  16. ✅ Merging Flink Data Streams (Exercise)
    • Records from the skyone and sunset topics are merged and sent into the flightdata topic after being converted into the flight data.
    • See course content(s) below
  17. Windowing and Watermarks in Flink
  18. ✅ Aggregating Flink Data using Windowing (Exercise)
    • Usage statistics (total flight duration and number of flights) are calculated by email address, and they are sent into the userstatistics topic.
    • Note the transformation is stateless in a sense that aggregation is entirely within a one-minute tumbling window.
    • See course content(s) below
  19. Working with Keyed State in Flink
  20. ✅ Managing State in Flink (Exercise)
    • The transformation gets stateful so that usage statistics are continuously updated by accessing the state values.
    • The reduce function includes a window function that allows you to access the global state. The window function takes the responsibility to keep updating the global state and to return updated values.
    • See course content(s) below
  21. Closing Remarks

Start Applications

After creating the Kafka and Flink clusters using Docker Compose, we need to start the Python producer in one terminal. Then we can submit the other Pyflink applications in another terminal.

 1#### build docker image for Pyflink
 2docker build -t=building-pyflink-apps:1.17.1 .
 3
 4#### create kafka and flink clusters and kafka-ui
 5docker-compose up -d
 6
 7#### start kafka producer in one terminal
 8python -m venv venv
 9source venv/bin/activate
10# upgrade pip (optional)
11pip install pip --upgrade
12# install required packages
13pip install -r requirements-dev.txt
14## start with --create flag to create topics before sending messages
15python src/s05_data_gen.py --create
16
17#### submit pyflink apps in another terminal
18## flight importer
19docker exec jobmanager /opt/flink/bin/flink run \
20    --python /tmp/src/s16_merge.py \
21    --pyFiles file:///tmp/src/models.py,file:///tmp/src/utils.py \
22    -d
23
24## usage calculator
25docker exec jobmanager /opt/flink/bin/flink run \
26    --python /tmp/src/s20_manage_state.py \
27    --pyFiles file:///tmp/src/models.py,file:///tmp/src/utils.py \
28    -d

We can check the Pyflink jobs are running on the Flink Dashboard via localhost:8081.

Also, we can check the Kafka topics on kafka-ui via localhost:8080.

Unit Testing

Four lessons have unit testing cases, and they are expected to run separately by specifying a testing script. For example, below shows running unit testing cases of the final usage statistics calculator job.