In Part 3, we discussed the portability layer of Apache Beam as it helps understand (1) how Python pipelines run on the Flink Runner and (2) how multiple SDKs can be used in a single pipeline, followed by demonstrating local Flink and Kafka cluster creation for developing streaming pipelines. In this post, we build a streaming pipeline that aggregates page visits by user in a fixed time window of 20 seconds. Two versions of the pipeline are created with/without relying on Beam SQL.

Streaming Pipeline

The streaming pipeline we discuss in this post aggregates website visits by user ID in a fixed time window of 20 seconds. Two versions of the pipeline are created with/without relying on Beam SQL, and they run on a Flink cluster at the end. The source of this post can be found in this GitHub repository.

Traffic Aggregation

It begins with reading and decoding messages from a Kafka topic named website-visit, followed by parsing the decoded Json string into a custom type named EventLog. Note the coder for this custom type is registered, but it is not required because we don’t have a cross-language transformation that deals with it. On the other hand, the coder has to be registered for the SQL version because it is used by the SQL transformation, which is performed using the Java SDK.

After that, timestamp is re-assigned based on the event_datetime attribute and the element is converted into a key-value pair where user ID is taken as the key and 1 is given as the value. By default, the Kafka reader assigns processing time (wall clock) as the element timestamp. If record timestamp is different from wall clock, we would have more relevant outcomes by re-assigning based on record timestamp.

The tuple elements are aggregated in a fixed time window of 20 seconds and written to a Kafka topic named traffic-agg. The output messages include two additional attributes (window_start and window_end) to clarify in which window they belong to.

  1# section3/traffic_agg.py
  2import os
  3import datetime
  4import argparse
  5import json
  6import logging
  7import typing
  8
  9import apache_beam as beam
 10from apache_beam.io import kafka
 11from apache_beam.options.pipeline_options import PipelineOptions
 12from apache_beam.options.pipeline_options import SetupOptions
 13
 14
 15class EventLog(typing.NamedTuple):
 16    ip: str
 17    id: str
 18    lat: float
 19    lng: float
 20    user_agent: str
 21    age_bracket: str
 22    opted_into_marketing: bool
 23    http_request: str
 24    http_response: int
 25    file_size_bytes: int
 26    event_datetime: str
 27    event_ts: int
 28
 29
 30beam.coders.registry.register_coder(EventLog, beam.coders.RowCoder)
 31
 32
 33def decode_message(kafka_kv: tuple):
 34    return kafka_kv[1].decode("utf-8")
 35
 36
 37def create_message(element: dict):
 38    key = {"event_id": element["id"], "window_start": element["window_start"]}
 39    print(element)
 40    return json.dumps(key).encode("utf-8"), json.dumps(element).encode("utf-8")
 41
 42
 43def parse_json(element: str):
 44    row = json.loads(element)
 45    # lat/lng sometimes empty string
 46    if not row["lat"] or not row["lng"]:
 47        row = {**row, **{"lat": -1, "lng": -1}}
 48    return EventLog(**row)
 49
 50
 51def assign_timestamp(element: EventLog):
 52    ts = datetime.datetime.strptime(
 53        element.event_datetime, "%Y-%m-%dT%H:%M:%S.%f"
 54    ).timestamp()
 55    return beam.window.TimestampedValue(element, ts)
 56
 57
 58class AddWindowTS(beam.DoFn):
 59    def process(self, element: tuple, window=beam.DoFn.WindowParam):
 60        window_start = window.start.to_utc_datetime().isoformat(timespec="seconds")
 61        window_end = window.end.to_utc_datetime().isoformat(timespec="seconds")
 62        output = {
 63            "id": element[0],
 64            "window_start": window_start,
 65            "window_end": window_end,
 66            "page_views": element[1],
 67        }
 68        yield output
 69
 70
 71def run():
 72    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 73    parser.add_argument(
 74        "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
 75    )
 76    parser.add_argument(
 77        "--use_own",
 78        action="store_true",
 79        default="Flag to indicate whether to use an own local cluster",
 80    )
 81    opts = parser.parse_args()
 82
 83    pipeline_opts = {
 84        "runner": opts.runner,
 85        "job_name": "traffic-agg",
 86        "environment_type": "LOOPBACK",
 87        "streaming": True,
 88        "parallelism": 3,
 89        "experiments": [
 90            "use_deprecated_read"
 91        ],  ## https://github.com/apache/beam/issues/20979
 92        "checkpointing_interval": "60000",
 93    }
 94    if opts.use_own is True:
 95        pipeline_opts = {**pipeline_opts, **{"flink_master": "localhost:8081"}}
 96    print(pipeline_opts)
 97    options = PipelineOptions([], **pipeline_opts)
 98    # Required, else it will complain that when importing worker functions
 99    options.view_as(SetupOptions).save_main_session = True
100
101    p = beam.Pipeline(options=options)
102    (
103        p
104        | "Read from Kafka"
105        >> kafka.ReadFromKafka(
106            consumer_config={
107                "bootstrap.servers": os.getenv(
108                    "BOOTSTRAP_SERVERS",
109                    "host.docker.internal:29092",
110                ),
111                "auto.offset.reset": "earliest",
112                # "enable.auto.commit": "true",
113                "group.id": "traffic-agg",
114            },
115            topics=["website-visit"],
116        )
117        | "Decode messages" >> beam.Map(decode_message)
118        | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
119        | "Assign timestamp" >> beam.Map(assign_timestamp)
120        | "Form key value pair" >> beam.Map(lambda e: (e.id, 1))
121        | "Tumble window per minute" >> beam.WindowInto(beam.window.FixedWindows(20))
122        | "Sum by key" >> beam.CombinePerKey(sum)
123        | "Add window timestamp" >> beam.ParDo(AddWindowTS())
124        | "Create messages"
125        >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
126        | "Write to Kafka"
127        >> kafka.WriteToKafka(
128            producer_config={
129                "bootstrap.servers": os.getenv(
130                    "BOOTSTRAP_SERVERS",
131                    "host.docker.internal:29092",
132                )
133            },
134            topic="traffic-agg",
135        )
136    )
137
138    logging.getLogger().setLevel(logging.INFO)
139    logging.info("Building pipeline ...")
140
141    p.run().wait_until_finish()
142
143
144if __name__ == "__main__":
145    run()

SQL Traffic Aggregation

The main difference of this version is that multiple transformations are performed by a single SQL transformation. Specifically it aggregates the number of page views by user in a fixed time window of 20 seconds. The SQL transformation performed in a separate Docker container using the Java SDK and thus the output type has to be specified before it. Otherwise, an error is thrown because the Java SDK doesn’t know how to encode/decode the elements.

  1# section3/traffic_agg_sql.py
  2import os
  3import datetime
  4import argparse
  5import json
  6import logging
  7import typing
  8
  9import apache_beam as beam
 10from apache_beam.io import kafka
 11from apache_beam.transforms.sql import SqlTransform
 12from apache_beam.options.pipeline_options import PipelineOptions
 13from apache_beam.options.pipeline_options import SetupOptions
 14
 15
 16class EventLog(typing.NamedTuple):
 17    ip: str
 18    id: str
 19    lat: float
 20    lng: float
 21    user_agent: str
 22    age_bracket: str
 23    opted_into_marketing: bool
 24    http_request: str
 25    http_response: int
 26    file_size_bytes: int
 27    event_datetime: str
 28    event_ts: int
 29
 30
 31beam.coders.registry.register_coder(EventLog, beam.coders.RowCoder)
 32
 33
 34def decode_message(kafka_kv: tuple):
 35    return kafka_kv[1].decode("utf-8")
 36
 37
 38def create_message(element: dict):
 39    key = {"event_id": element["event_id"], "window_start": element["window_start"]}
 40    print(element)
 41    return json.dumps(key).encode("utf-8"), json.dumps(element).encode("utf-8")
 42
 43
 44def parse_json(element: str):
 45    row = json.loads(element)
 46    # lat/lng sometimes empty string
 47    if not row["lat"] or not row["lng"]:
 48        row = {**row, **{"lat": -1, "lng": -1}}
 49    return EventLog(**row)
 50
 51
 52def format_timestamp(element: EventLog):
 53    event_ts = datetime.datetime.fromisoformat(element.event_datetime)
 54    temp_dict = element._asdict()
 55    temp_dict["event_datetime"] = datetime.datetime.strftime(
 56        event_ts, "%Y-%m-%d %H:%M:%S"
 57    )
 58    return EventLog(**temp_dict)
 59
 60
 61def run():
 62    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 63    parser.add_argument(
 64        "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
 65    )
 66    parser.add_argument(
 67        "--use_own",
 68        action="store_true",
 69        default="Flag to indicate whether to use an own local cluster",
 70    )
 71    opts = parser.parse_args()
 72
 73    options = PipelineOptions()
 74    pipeline_opts = {
 75        "runner": opts.runner,
 76        "job_name": "traffic-agg-sql",
 77        "environment_type": "LOOPBACK",
 78        "streaming": True,
 79        "parallelism": 3,
 80        "experiments": [
 81            "use_deprecated_read"
 82        ],  ## https://github.com/apache/beam/issues/20979
 83        "checkpointing_interval": "60000",
 84    }
 85    if opts.use_own is True:
 86        pipeline_opts = {**pipeline_opts, **{"flink_master": "localhost:8081"}}
 87    print(pipeline_opts)
 88    options = PipelineOptions([], **pipeline_opts)
 89    # Required, else it will complain that when importing worker functions
 90    options.view_as(SetupOptions).save_main_session = True
 91
 92    query = """
 93    WITH cte AS (
 94        SELECT
 95            id, 
 96            CAST(event_datetime AS TIMESTAMP) AS ts
 97        FROM PCOLLECTION
 98    )
 99    SELECT
100        id AS event_id,
101        CAST(TUMBLE_START(ts, INTERVAL '20' SECOND) AS VARCHAR) AS window_start,
102        CAST(TUMBLE_END(ts, INTERVAL '20' SECOND) AS VARCHAR) AS window_end,
103        COUNT(*) AS page_view
104    FROM cte
105    GROUP BY
106        TUMBLE(ts, INTERVAL '20' SECOND), id
107    """
108
109    p = beam.Pipeline(options=options)
110    (
111        p
112        | "Read from Kafka"
113        >> kafka.ReadFromKafka(
114            consumer_config={
115                "bootstrap.servers": os.getenv(
116                    "BOOTSTRAP_SERVERS",
117                    "host.docker.internal:29092",
118                ),
119                "auto.offset.reset": "earliest",
120                # "enable.auto.commit": "true",
121                "group.id": "traffic-agg-sql",
122            },
123            topics=["website-visit"],
124        )
125        | "Decode messages" >> beam.Map(decode_message)
126        | "Parse elements" >> beam.Map(parse_json)
127        | "Format timestamp" >> beam.Map(format_timestamp).with_output_types(EventLog)
128        | "Count per minute" >> SqlTransform(query)
129        | "To dictionary" >> beam.Map(lambda e: e._asdict())
130        | "Create messages"
131        >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
132        | "Write to Kafka"
133        >> kafka.WriteToKafka(
134            producer_config={
135                "bootstrap.servers": os.getenv(
136                    "BOOTSTRAP_SERVERS",
137                    "host.docker.internal:29092",
138                )
139            },
140            topic="traffic-agg-sql",
141        )
142    )
143
144    logging.getLogger().setLevel(logging.INFO)
145    logging.info("Building pipeline ...")
146
147    p.run().wait_until_finish()
148
149
150if __name__ == "__main__":
151    run()

Run Pipeline

We can use local Flink and Kafka clusters as discussed in Part 3. The Flink cluster is optional as Beam runs a pipeline on an embedded Flink cluster if we do not specify a cluster URL.

Start Flink/Kafka Clusters

As shown later, I have an issue to run the SQL version of the pipeline on a local cluster, and it has to be deployed on an embedded cluster instead. With the -a option, we can deploy local Flink and Kafka clusters, and they are used for the pipeline without SQL while only a local Kafka cluster is launched for the SQL version with the -k option.

1# start both flink and kafka cluster for traffic aggregation
2$ ./setup/start-flink-env.sh -a
3
4# start only kafka cluster for sql traffic aggregation
5$ ./setup/start-flink-env.sh -k

Data Generation

For streaming data generation, we can use the website visit log generator that was introduced in Part 1. We can execute the script while specifying the source argument to streaming. Below shows an example of generating Kafka messages for the streaming pipeline.

1$ python datagen/generate_data.py --source streaming --num_users 5 --delay_seconds 0.5
2...
310 events created so far...
4{'ip': '151.21.93.137', 'id': '2142139324490406578', 'lat': 45.5253, 'lng': 9.333, 'user_agent': 'Mozilla/5.0 (iPad; CPU iPad OS 14_2_1 like Mac OS X) AppleWebKit/536.0 (KHTML, like Gecko) FxiOS/16.3w0588.0 Mobile/66I206 Safari/536.0', 'age_bracket': '26-40', 'opted_into_marketing': True, 'http_request': 'GET amoebozoa.html HTTP/1.0', 'http_response': 200, 'file_size_bytes': 453, 'event_datetime': '2024-04-28T23:12:50.484', 'event_ts': 1714309970484}
520 events created so far...
6{'ip': '146.13.4.138', 'id': '5642783739616136718', 'lat': 39.0437, 'lng': -77.4875, 'user_agent': 'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_7_5 rv:4.0; bg-BG) AppleWebKit/532.16.6 (KHTML, like Gecko) Version/4.0.5 Safari/532.16.6', 'age_bracket': '41-55', 'opted_into_marketing': False, 'http_request': 'GET archaea.html HTTP/1.0', 'http_response': 200, 'file_size_bytes': 207, 'event_datetime': '2024-04-28T23:12:55.526', 'event_ts': 1714309975526}
730 events created so far...
8{'ip': '36.255.131.188', 'id': '676397447776623774', 'lat': 31.2222, 'lng': 121.4581, 'user_agent': 'Mozilla/5.0 (compatible; MSIE 7.0; Windows 98; Win 9x 4.90; Trident/4.0)', 'age_bracket': '26-40', 'opted_into_marketing': False, 'http_request': 'GET fungi.html HTTP/1.0', 'http_response': 200, 'file_size_bytes': 440, 'event_datetime': '2024-04-28T23:13:00.564', 'event_ts': 1714309980564}

Execute Pipeline Script

Traffic Aggregation

The traffic aggregation pipeline can be executed using the local Flink cluster by specifying the use_own argument.

1$ python section3/traffic_agg.py --use_own

After a while, we can check both the input and output topics in the Topics section of kafka-ui. It can be accessed on localhost:8080.

We can use the Flink web UI to monitor the pipeline as a Flink job. When we click the traffic-agg job in the Running Jobs section, we see 4 operations are linked in the Overview tab. The first two operations are polling and reading Kafka source description. All the transformations up to windowing the keyed elements are performed in the third operation, and the elements are aggregated and written to the Kafka output topic in the last operation.

SQL Traffic Aggregation

I see the following error when I execute the SQL version of the pipeline with the use_own option. It seems that the Java SDK container for SQL transformation fails to download its expansion service and does not complete initialisation steps - see Part 3 for details about how multiple SDKs can be used in a single pipeline. Therefore, the Flink job fails to access the SDK container, and it keeps recreate a new container.

We can see lots of containers are stopped and get recreated.

1$ docker ps -a --format "table {{.ID}}\t{{.Image}}\t{{.Status}}" | grep apache/beam_java11_sdk
246c51d89e966   apache/beam_java11_sdk:2.53.0   Up 7 seconds
32ad755fc66df   apache/beam_java11_sdk:2.53.0   Up 7 seconds
4cf023d9bf39f   apache/beam_java11_sdk:2.53.0   Exited (1) 13 seconds ago
5a549729318e3   apache/beam_java11_sdk:2.53.0   Exited (1) 38 seconds ago
695626f645252   apache/beam_java11_sdk:2.53.0   Exited (1) 57 seconds ago
738b56216e29a   apache/beam_java11_sdk:2.53.0   Exited (1) About a minute ago
83aee486b472f   apache/beam_java11_sdk:2.53.0   Exited (1) About a minute ago

Instead, we can run the pipeline on an embedded Flink cluster without adding the use_own option. Note that we need to stop the existing clusters, start only a Kafka cluster with the -k option and re-generate data before executing this pipeline script.

1$ python section3/traffic_agg_sql.py

Similar to the earlier version, we can check the input and output topics on localhost:8080 as well.

Summary

In this post, we developed a streaming pipeline that aggregates website visits by user in a fixed time window of 20 seconds. Two versions of the pipeline were created with/without relying on Beam SQL. The first version that doesn’t rely on SQL was deployed on a local Flink cluster, and how it is deployed as a Flink job is checked on the Flink web UI. The second version, however, had an issue to deploy on a local Flink cluster, and it was deployed on an embedded Flink cluster.