Apache Kafka is a popular distributed event store and stream processing platform. Previously loading data from Kafka into Redshift and Athena usually required Kafka connectors (e.g. Amazon Redshift Sink Connector and Amazon S3 Sink Connector). Recently these AWS services provide features to ingest data from Kafka directly, which facilitates a simpler architecture that achieves low-latency and high-speed ingestion of streaming data. In part 1 of the simplify streaming ingestion on AWS series, we discuss how to develop an end-to-end streaming ingestion solution using EventBridge, Lambda, MSK and Redshift Serverless on AWS.

Architecture

Fake online order data is generated by multiple Lambda functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Kafka producer Lambda function. In this way we are able to generate test data using multiple Lambda functions according to the desired volume of messages. Once the messages are sent to a Kafka topic, they can be consumed by a materialized view in an external schema that sources data from the MKS cluster. The infrastructure is built by Terraform and the AWS SAM CLI is used to develop the producer Lambda function locally before deploying to AWS.

Infrastructure

A VPC with 3 public and private subnets is created using the AWS VPC Terraform module (vpc.tf). Also, a SoftEther VPN server is deployed in order to access the resources in the private subnets from the developer machine (vpn.tf). It is particularly useful to monitor and manage the MSK cluster and Kafka topic as well as developing the Kafka producer Lambda function locally. The details about how to configure the VPN server can be found in an earlier post. The source can be found in the GitHub repository of this post.

MSK

An MSK cluster with 3 brokers is created. The broker nodes are deployed with the kafka.m5.large instance type in the private subnets. IAM authentication is used for the client authentication method. Note this method is the only secured authentication method supported by Redshift because the external schema supports either the no authentication or IAM authentication method only.

 1# integration-redshift/infra/variable.tf
 2locals {
 3  ...
 4  msk = {
 5    version          = "3.3.1"
 6    instance_size    = "kafka.m5.large"
 7    ebs_volume_size  = 20
 8    log_retention_ms = 604800000 # 7 days
 9  }
10  ...
11}
12
13# integration-redshift/infra/msk.tf
14resource "aws_msk_cluster" "msk_data_cluster" {
15  cluster_name           = "${local.name}-msk-cluster"
16  kafka_version          = local.msk.version
17  number_of_broker_nodes = length(module.vpc.private_subnets)
18  configuration_info {
19    arn      = aws_msk_configuration.msk_config.arn
20    revision = aws_msk_configuration.msk_config.latest_revision
21  }
22
23  broker_node_group_info {
24    instance_type   = local.msk.instance_size
25    client_subnets  = module.vpc.private_subnets
26    security_groups = [aws_security_group.msk.id]
27    storage_info {
28      ebs_storage_info {
29        volume_size = local.msk.ebs_volume_size
30      }
31    }
32  }
33
34  client_authentication {
35    sasl {
36      iam = true
37    }
38  }
39
40  logging_info {
41    broker_logs {
42      cloudwatch_logs {
43        enabled   = true
44        log_group = aws_cloudwatch_log_group.msk_cluster_lg.name
45      }
46      s3 {
47        enabled = true
48        bucket  = aws_s3_bucket.default_bucket.id
49        prefix  = "logs/msk/cluster-"
50      }
51    }
52  }
53
54  tags = local.tags
55
56  depends_on = [aws_msk_configuration.msk_config]
57}
58
59resource "aws_msk_configuration" "msk_config" {
60  name = "${local.name}-msk-configuration"
61
62  kafka_versions = [local.msk.version]
63
64  server_properties = <<PROPERTIES
65    auto.create.topics.enable = true
66    delete.topic.enable = true
67    log.retention.ms = ${local.msk.log_retention_ms}
68  PROPERTIES
69}

Inbound Rules for MSK Cluster

We need to allow access to the MSK cluster from multiple AWS resources. Specifically the VPN server needs access for monitoring/managing the cluster and topic as well as developing the producer Lambda function locally. Also, the Lambda function and Redshift cluster need access for producing and consuming messages respectively. Only the port 9098 is added to the inbound/outbound rules because client access is enabled by the IAM authentication method exclusively. Note that the security group and outbound rule of the Lambda function are created here while the Lambda function is created in a different Terraform stack. This is for ease of adding it to the inbound rule of the MSK’s security group, and later we will discuss how to make use of it with the Lambda function.

 1# integration-redshift/infra/msk.tf
 2resource "aws_security_group" "msk" {
 3  name   = "${local.name}-msk-sg"
 4  vpc_id = module.vpc.vpc_id
 5
 6  lifecycle {
 7    create_before_destroy = true
 8  }
 9
10  tags = local.tags
11}
12
13resource "aws_security_group_rule" "msk_vpn_inbound" {
14  count                    = local.vpn.to_create ? 1 : 0
15  type                     = "ingress"
16  description              = "VPN access"
17  security_group_id        = aws_security_group.msk.id
18  protocol                 = "tcp"
19  from_port                = 9098
20  to_port                  = 9098
21  source_security_group_id = aws_security_group.vpn[0].id
22}
23
24resource "aws_security_group_rule" "msk_lambda_inbound" {
25  type                     = "ingress"
26  description              = "lambda access"
27  security_group_id        = aws_security_group.msk.id
28  protocol                 = "tcp"
29  from_port                = 9098
30  to_port                  = 9098
31  source_security_group_id = aws_security_group.kafka_producer_lambda.id
32}
33
34resource "aws_security_group_rule" "msk_redshift_inbound" {
35  type                     = "ingress"
36  description              = "redshift access"
37  security_group_id        = aws_security_group.msk.id
38  protocol                 = "tcp"
39  from_port                = 9098
40  to_port                  = 9098
41  source_security_group_id = aws_security_group.redshift_serverless.id
42}
43
44...
45
46resource "aws_security_group" "kafka_producer_lambda" {
47  name   = "${local.name}-lambda-msk-access"
48  vpc_id = module.vpc.vpc_id
49
50  lifecycle {
51    create_before_destroy = true
52  }
53
54  tags = local.tags
55}
56
57resource "aws_security_group_rule" "kafka_producer_lambda_msk_egress" {
58  type              = "egress"
59  description       = "lambda msk access"
60  security_group_id = aws_security_group.kafka_producer_lambda.id
61  protocol          = "tcp"
62  from_port         = 9098
63  to_port           = 9098
64  cidr_blocks       = ["0.0.0.0/0"]
65}
66
67# integration-redshift/infra/redshift.tf
68resource "aws_security_group" "redshift_serverless" {
69  name   = "${local.name}-redshift-serverless"
70  vpc_id = module.vpc.vpc_id
71
72  lifecycle {
73    create_before_destroy = true
74  }
75
76  tags = local.tags
77}
78
79...
80
81resource "aws_security_group_rule" "redshift_msk_egress" {
82  type              = "egress"
83  description       = "lambda msk access"
84  security_group_id = aws_security_group.redshift_serverless.id
85  protocol          = "tcp"
86  from_port         = 9098
87  to_port           = 9098
88  cidr_blocks       = ["0.0.0.0/0"]
89}

Redshift Serverless

A namespace and workgroup are created to deploy a Redshift serverless cluster. As explained in the Redshift user guide, a namespace is a collection of database objects and users and a workgroup is a collection of compute resources.

 1# integration-redshift/infra/redshift.tf
 2resource "aws_redshiftserverless_namespace" "namespace" {
 3  namespace_name = "${local.name}-namespace"
 4
 5  admin_username       = local.redshift.admin_username
 6  admin_user_password  = random_password.redshift_admin_pw.result
 7  db_name              = local.redshift.db_name
 8  default_iam_role_arn = aws_iam_role.redshift_serverless_role.arn
 9  iam_roles            = [aws_iam_role.redshift_serverless_role.arn]
10
11  tags = local.tags
12}
13
14resource "aws_redshiftserverless_workgroup" "workgroup" {
15  namespace_name = aws_redshiftserverless_namespace.namespace.id
16  workgroup_name = "${local.name}-workgroup"
17
18  base_capacity      = local.redshift.base_capacity
19  subnet_ids         = module.vpc.private_subnets
20  security_group_ids = [aws_security_group.redshift_serverless.id]
21
22  tags = local.tags
23}

IAM Permission for MSK Access

As illustrated in the AWS documentation, we need an IAM policy that provides permission for communication with the Amazon MSK cluster. The applicable policy is added to the default IAM role of the cluster.

 1# integration-redshift/infra/redshift.tf
 2resource "aws_iam_role" "redshift_serverless_role" {
 3  name = "${local.name}-redshift-serverless-role"
 4
 5  assume_role_policy = data.aws_iam_policy_document.redshift_serverless_assume_role_policy.json
 6  managed_policy_arns = [
 7    "arn:aws:iam::aws:policy/AmazonS3FullAccess",
 8    "arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess",
 9    "arn:aws:iam::aws:policy/AmazonRedshiftFullAccess",
10    "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
11    aws_iam_policy.msk_redshift_permission.arn
12  ]
13}
14
15data "aws_iam_policy_document" "redshift_serverless_assume_role_policy" {
16  statement {
17    actions = ["sts:AssumeRole"]
18
19    principals {
20      type = "Service"
21      identifiers = [
22        "redshift.amazonaws.com",
23        "sagemaker.amazonaws.com",
24        "events.amazonaws.com",
25        "scheduler.redshift.amazonaws.com"
26      ]
27    }
28
29    principals {
30      type        = "AWS"
31      identifiers = ["arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"]
32    }
33  }
34}
35
36resource "aws_iam_policy" "msk_redshift_permission" {
37  name = "${local.name}-msk-redshift-permission"
38
39  policy = jsonencode({
40    Version = "2012-10-17"
41    Statement = [
42      {
43        Sid = "PermissionOnCluster"
44        Action = [
45          "kafka-cluster:ReadData",
46          "kafka-cluster:DescribeTopic",
47          "kafka-cluster:Connect",
48        ]
49        Effect = "Allow"
50        Resource = [
51          "arn:aws:kafka:*:${data.aws_caller_identity.current.account_id}:cluster/*/*",
52          "arn:aws:kafka:*:${data.aws_caller_identity.current.account_id}:topic/*/*"
53        ]
54      },
55      {
56        Sid = "PermissionOnGroups"
57        Action = [
58          "kafka:GetBootstrapBrokers"
59        ]
60        Effect   = "Allow"
61        Resource = "*"
62      }
63    ]
64  })
65}

Kafka Producer

The resources related to the Kafka producer Lambda function are managed in a separate Terraform stack. This is because it is easier to build the relevant resources iteratively. Note the SAM CLI builds the whole Terraform stack even for a small change of code, and it wouldn’t be convenient if the entire resources are managed in the same stack.

Producer Source

The Kafka producer is created to send messages to a topic named orders where fake order data is generated using the Faker package. The Order class generates one or more fake order records by the _create _method and an order record includes order ID, order timestamp, user ID and order items. The Lambda function sends 100 records at a time followed by sleeping for 1 second. It repeats until it reaches MAX_RUN_SEC (e.g. 60) environment variable value. A Kafka message is made up of an order ID as the key and an order record as the value. Both the key and value are serialised as JSON. Note that the stable version of the kafka-python package does not support the IAM authentication method. Therefore we need to install the package from a forked repository as discussed in this GitHub issue.

 1# integration-redshift/kafka_producer/src/app.py
 2import os
 3import datetime
 4import string
 5import json
 6import time
 7from kafka import KafkaProducer
 8from faker import Faker
 9
10
11class Order:
12    def __init__(self, fake: Faker = None):
13        self.fake = fake or Faker()
14
15    def order(self):
16        rand_int = self.fake.random_int(1, 1000)
17        user_id = "".join(
18            [string.ascii_lowercase[int(s)] if s.isdigit() else s for s in hex(rand_int)]
19        )[::-1]
20        return {
21            "order_id": self.fake.uuid4(),
22            "ordered_at": datetime.datetime.utcnow(),
23            "user_id": user_id,
24        }
25
26    def items(self):
27        return [
28            {
29                "product_id": self.fake.random_int(1, 9999),
30                "quantity": self.fake.random_int(1, 10),
31            }
32            for _ in range(self.fake.random_int(1, 4))
33        ]
34
35    def create(self, num: int):
36        return [{**self.order(), **{"items": self.items()}} for _ in range(num)]
37
38
39class Producer:
40    def __init__(self, bootstrap_servers: list, topic: str):
41        self.bootstrap_servers = bootstrap_servers
42        self.topic = topic
43        self.producer = self.create()
44
45    def create(self):
46        return KafkaProducer(
47            security_protocol="SASL_SSL",
48            sasl_mechanism="AWS_MSK_IAM",
49            bootstrap_servers=self.bootstrap_servers,
50            value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
51            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
52        )
53
54    def send(self, orders: list):
55        for order in orders:
56            self.producer.send(self.topic, key={"order_id": order["order_id"]}, value=order)
57        self.producer.flush()
58
59    def serialize(self, obj):
60        if isinstance(obj, datetime.datetime):
61            return obj.isoformat()
62        if isinstance(obj, datetime.date):
63            return str(obj)
64        return obj
65
66
67def lambda_function(event, context):
68    if os.getenv("BOOTSTRAP_SERVERS", "") == "":
69        return
70    fake = Faker()
71    producer = Producer(
72        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS").split(","), topic=os.getenv("TOPIC_NAME")
73    )
74    s = datetime.datetime.now()
75    ttl_rec = 0
76    while True:
77        orders = Order(fake).create(100)
78        producer.send(orders)
79        ttl_rec += len(orders)
80        print(f"sent {len(orders)} messages")
81        elapsed_sec = (datetime.datetime.now() - s).seconds
82        if elapsed_sec > int(os.getenv("MAX_RUN_SEC", "60")):
83            print(f"{ttl_rec} records are sent in {elapsed_sec} seconds ...")
84            break
85        time.sleep(1)

A sample order record is shown below.

 1{
 2  "order_id": "fc72ccf4-8e98-42b1-9a48-4a6222996be4",
 3  "ordered_at": "2023-02-05T04:30:58.722158",
 4  "user_id": "hfbxa",
 5  "items": [
 6    {
 7      "product_id": 8576,
 8      "quantity": 5
 9    },
10    {
11      "product_id": 3101,
12      "quantity": 8
13    }
14  ]
15}

Lambda Function

As the VPC, subnets, Lambda security group and MSK cluster are created in the infra Terraform stack, they need to be obtained from the producer Lambda stack. It can be achieved using the Terraform data sources as shown below. Note that the private subnets can be filtered by a specific tag (Tier: Private), which is added while creating them.

 1# integration-redshift/infra/vpc.tf
 2module "vpc" {
 3  source  = "terraform-aws-modules/vpc/aws"
 4  version = "~> 3.14"
 5
 6  name = "${local.name}-vpc"
 7  cidr = local.vpc.cidr
 8
 9  azs             = local.vpc.azs
10  public_subnets  = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k)]
11  private_subnets = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k + 3)]
12  ...
13
14  private_subnet_tags = {
15    "Tier" = "Private"
16  }
17
18  tags = local.tags
19}
20
21# integration-redshift/kafka_producer/variables.tf
22data "aws_caller_identity" "current" {}
23
24data "aws_region" "current" {}
25
26data "aws_vpc" "selected" {
27  filter {
28    name   = "tag:Name"
29    values = ["${local.infra_prefix}"]
30  }
31}
32
33data "aws_subnets" "private" {
34  filter {
35    name   = "vpc-id"
36    values = [data.aws_vpc.selected.id]
37  }
38
39  tags = {
40    Tier = "Private"
41  }
42}
43
44data "aws_msk_cluster" "msk_data_cluster" {
45  cluster_name = "${local.infra_prefix}-msk-cluster"
46}
47
48data "aws_security_group" "kafka_producer_lambda" {
49  name = "${local.infra_prefix}-lambda-msk-access"
50}
51locals {
52  ...
53  infra_prefix = "integration-redshift"
54  ...
55}

The AWS Lambda Terraform module is used to create the producer Lambda function. Note that, in order to develop a Lambda function using AWS SAM, we need to create sam metadata resource, which provides the AWS SAM CLI with the information it needs to locate Lambda functions and layers, along with their source code, build dependencies, and build logic from within your Terraform project. It is created by default by the Terraform module, which is convenient. Also, we need to give permission to the EventBridge rule to invoke the Lambda function, and it is given by the aws_lambda_permission resource.

 1# integration-redshift/kafka_producer/variables.tf
 2locals {
 3  name        = local.infra_prefix
 4  region      = data.aws_region.current.name
 5  environment = "dev"
 6
 7  infra_prefix = "integration-redshift"
 8
 9  producer = {
10    src_path          = "src"
11    function_name     = "kafka_producer"
12    handler           = "app.lambda_function"
13    concurrency       = 5
14    timeout           = 90
15    memory_size       = 128
16    runtime           = "python3.8"
17    schedule_rate     = "rate(1 minute)"
18    to_enable_trigger = false
19    environment = {
20      topic_name  = "orders"
21      max_run_sec = 60
22    }
23  }
24  ...
25}
26
27# integration-redshift/kafka_producer/main.tf
28module "kafka_producer_lambda" {
29  source = "terraform-aws-modules/lambda/aws"
30
31  function_name          = local.producer.function_name
32  handler                = local.producer.handler
33  runtime                = local.producer.runtime
34  timeout                = local.producer.timeout
35  memory_size            = local.producer.memory_size
36  source_path            = local.producer.src_path
37  vpc_subnet_ids         = data.aws_subnets.private.ids
38  vpc_security_group_ids = [data.aws_security_group.kafka_producer_lambda.id]
39  attach_network_policy  = true
40  attach_policies        = true
41  policies               = [aws_iam_policy.msk_lambda_permission.arn]
42  number_of_policies     = 1
43  environment_variables = {
44    BOOTSTRAP_SERVERS = data.aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
45    TOPIC_NAME        = local.producer.environment.topic_name
46    MAX_RUN_SEC       = local.producer.environment.max_run_sec
47  }
48
49  tags = local.tags
50}
51
52resource "aws_lambda_function_event_invoke_config" "kafka_producer_lambda" {
53  function_name          = module.kafka_producer_lambda.lambda_function_name
54  maximum_retry_attempts = 0
55}
56
57resource "aws_lambda_permission" "allow_eventbridge" {
58  count         = local.producer.to_enable_trigger ? 1 : 0
59  statement_id  = "InvokeLambdaFunction"
60  action        = "lambda:InvokeFunction"
61  function_name = local.producer.function_name
62  principal     = "events.amazonaws.com"
63  source_arn    = module.eventbridge.eventbridge_rule_arns["crons"]
64
65  depends_on = [
66    module.eventbridge
67  ]
68}

IAM Permission for MSK

The producer Lambda function needs permission to send messages to the orders topic of the MSK cluster. The following IAM policy is added to the Lambda function according to the AWS documentation.

 1# integration-redshift/kafka_producer/main.tf
 2resource "aws_iam_policy" "msk_lambda_permission" {
 3  name = "${local.name}-msk-lambda-permission"
 4
 5  policy = jsonencode({
 6    Version = "2012-10-17"
 7    Statement = [
 8      {
 9        Sid = "PermissionOnCluster"
10        Action = [
11          "kafka-cluster:Connect",
12          "kafka-cluster:AlterCluster",
13          "kafka-cluster:DescribeCluster"
14        ]
15        Effect   = "Allow"
16        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.infra_prefix}-msk-cluster/*"
17      },
18      {
19        Sid = "PermissionOnTopics"
20        Action = [
21          "kafka-cluster:*Topic*",
22          "kafka-cluster:WriteData",
23          "kafka-cluster:ReadData"
24        ]
25        Effect   = "Allow"
26        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.infra_prefix}-msk-cluster/*"
27      },
28      {
29        Sid = "PermissionOnGroups"
30        Action = [
31          "kafka-cluster:AlterGroup",
32          "kafka-cluster:DescribeGroup"
33        ]
34        Effect   = "Allow"
35        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.infra_prefix}-msk-cluster/*"
36      }
37    ]
38  })
39}

EventBridge Rule

The AWS EventBridge Terraform module is used to create the EventBridge schedule rule and targets. Note that 5 targets that point to the Kafka producer Lambda function are created so that it is invoked concurrently every minute.

 1# integration-redshift/kafka_producer/main.tf
 2module "eventbridge" {
 3  source = "terraform-aws-modules/eventbridge/aws"
 4
 5  create_bus = false
 6
 7  rules = {
 8    crons = {
 9      description         = "Kafka producer lambda schedule"
10      schedule_expression = local.producer.schedule_rate
11    }
12  }
13
14  targets = {
15    crons = [for i in range(local.producer.concurrency) : {
16      name = "lambda-target-${i}"
17      arn  = module.kafka_producer_lambda.lambda_function_arn
18    }]
19  }
20
21  depends_on = [
22    module.kafka_producer_lambda
23  ]
24
25  tags = local.tags
26}

Deployment

Topic Creation

We first need to create the Kafka topic and it is done using kafka-ui. The UI can be started using docker-compose with the following compose file. Note the VPN connection has to be established in order to access the cluster from the developer machine.

 1# integration-redshift/docker-compose.yml
 2version: "3"
 3
 4services:
 5  kafka-ui:
 6    image: provectuslabs/kafka-ui:master
 7    container_name: kafka-ui
 8    ports:
 9      - "8080:8080"
10    networks:
11      - kafkanet
12    environment:
13      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
14      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
15      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
16      KAFKA_CLUSTERS_0_NAME: msk
17      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: $BOOTSTRAP_SERVERS
18      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_SSL
19      KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: AWS_MSK_IAM
20      KAFKA_CLUSTERS_0_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS: software.amazon.msk.auth.iam.IAMClientCallbackHandler
21      KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: software.amazon.msk.auth.iam.IAMLoginModule required;
22
23networks:
24  kafkanet:
25    name: kafka-network

A topic named orders is created that has 3 partitions and replication factors. Also, it is set to retain data for 4 weeks.

Once created, it redirects to the overview section of the topic.

Local Testing with SAM

To simplify development, the Eventbridge permission is disabled by setting to_enable_trigger to false. Also, it is shortened to loop before it gets stopped by reducing msx_run_sec to 10.

 1# integration-redshift/kafka_producer/variables.tf
 2locals {
 3  producer = {
 4    ...
 5    to_enable_trigger = false
 6    environment = {
 7      topic_name  = "orders"
 8      max_run_sec = 10
 9    }
10  }
11  ...
12}

The Lambda function can be built with the SAM build command while specifying the hook name as terraform and enabling beta features. Once completed, it stores the build artifacts and template in the .aws-sam folder.

 1$ sam build --hook-name terraform --beta-features
 2# ...
 3#
 4# Apply complete! ...
 5
 6# Build Succeeded
 7
 8# Built Artifacts  : .aws-sam/build
 9# Built Template   : .aws-sam/build/template.yaml
10
11# Commands you can use next
12# =========================
13# [*] Invoke Function: sam local invoke --hook-name terraform
14# [*] Emulate local Lambda functions: sam local start-lambda --hook-name terraform

We can invoke the Lambda function locally using the SAM local invoke command. The Lambda function is invoked in a Docker container and the invocation logs are printed in the terminal as shown below.

 1$ sam local invoke --hook-name terraform module.kafka_producer_lambda.aws_lambda_function.this[0] --beta-features
 2# Experimental features are enabled for this session.
 3# Visit the docs page to learn more about the AWS Beta terms https://aws.amazon.com/service-terms/.
 4
 5# Skipped prepare hook. Current application is already prepared.
 6# Invoking app.lambda_function (python3.8)
 7# Skip pulling image and use local one: public.ecr.aws/sam/emulation-python3.8:rapid-1.70.0-x86_64.
 8
 9# Mounting .../kafka-pocs/integration-redshift/kafka_producer/.aws-sam/build/ModuleKafkaProducerLambdaAwsLambdaFunctionThis069E06354 as /var/task:ro,delegated inside runtime container
10# START RequestId: fbfc11be-362a-48df-b894-232cc88234ee Version: $LATEST
11# sent 100 messages
12# sent 100 messages
13# sent 100 messages
14# sent 100 messages
15# sent 100 messages
16# sent 100 messages
17# sent 100 messages
18# sent 100 messages
19# sent 100 messages
20# sent 100 messages
21# sent 100 messages
22# 1100 records are sent in 11 seconds ...
23# END RequestId: fbfc11be-362a-48df-b894-232cc88234ee
24# REPORT RequestId: fbfc11be-362a-48df-b894-232cc88234ee  Init Duration: 0.18 ms  Duration: 12494.86 ms   Billed Duration: 12495 ms       Memory Size: 128 MB     Max Memory Used: 128 MB
25# null

We can also check the messages using kafka-ui.

External Schema and Materialized View Creation

As we have messages in the orders topic, we can create a materialized view to consume data from it. First we need to create an external schema that sources data from the MSK cluster. We can use the default IAM role as we have already added the necessary IAM permission to it. Also, we should specify the IAM authentication method as it is the only allowed method for the MSK cluster. The materialized view selects key Kafka configuration variables and parses the Kafka value as data. The _JSON_PARSE _function converts the JSON string into the SUPER type, which makes it easy to select individual attributes. Also, it is configured to refresh automatically so that it ingests up-to-date data without manual refresh.

 1CREATE EXTERNAL SCHEMA msk_orders
 2FROM MSK
 3IAM_ROLE default
 4AUTHENTICATION iam
 5CLUSTER_ARN '<MSK Cluster ARN>';
 6
 7CREATE MATERIALIZED VIEW orders AUTO REFRESH YES AS
 8SELECT
 9  "kafka_partition",
10  "kafka_offset",
11  "kafka_timestamp_type",
12  "kafka_timestamp",
13  "kafka_key",
14  JSON_PARSE("kafka_value") as data,
15  "kafka_headers"
16FROM msk_orders.orders;

We can see the ingested Kafka messages as shown below.

Order Items View Creation

The materialized view keeps the entire order data in a single column, and it is not easy to build queries for analytics. As mentioned earlier, we can easily select individual attributes from the data column, but the issue is each record has an array of order items that has a variable length. Redshift doesn’t have a function to explode an array into rows, but we can achieve it using a recursive CTE. Below shows a view that converts order items array into rows recursively.

 1CREATE OR REPLACE VIEW order_items AS
 2  WITH RECURSIVE exploded_items (order_id, ordered_at, user_id, idx, product_id, quantity) AS (
 3      WITH cte AS (
 4          SELECT
 5              data.order_id::character(36) AS order_id,
 6              data.ordered_at::timestamp AS ordered_at,
 7              data.user_id::varchar(10) AS user_id,
 8              data.items AS items,
 9              get_array_length(data.items) AS num_items
10          FROM orders
11      )
12      SELECT
13          order_id,
14          ordered_at,
15          user_id,
16          0 AS idx,
17          items[0].product_id::int AS product_id,
18          items[0].quantity::int AS quantity
19      FROM cte
20      UNION ALL
21      SELECT
22          cte.order_id,
23          cte.ordered_at,
24          cte.user_id,
25          idx + 1,
26          cte.items[idx + 1].product_id::int,
27          cte.items[idx + 1].quantity::int
28      FROM cte
29      JOIN exploded_items ON cte.order_id = exploded_items.order_id
30      WHERE idx < cte.num_items - 1
31  )
32  SELECT *
33  FROM exploded_items
34  ORDER BY order_id;

We can see the exploded order items as shown below.

Kafka Producer Deployment

Now we can deploy the Kafka producer Lambda function and EventBridge scheduler using Terraform as usual after resetting the configuration variables. Once deployed, we can see that the scheduler rule has 5 targets of the same Lambda function.

We can check if the Kafka producer sends messages correctly using kafka-ui. After about 30 minutes, we see about 840,000 messages are created in the orders topic.

Query Order Items

As the materialized view is set to refresh automatically, we don’t have to refresh it manually. Using the order items view, we can query the top 10 popular products as shown below.

Summary

Streaming ingestion from Kafka (MSK) into Redshift and Athena can be much simpler as they now support direct integration. In part 1 of this series, we discussed an end-to-end streaming ingestion solution using EventBridge, Lambda, MSK and Redshift. We also used AWS SAM integrated with Terraform for developing a Lambda function locally.