As Kafka producer and consumer apps are decoupled, they operate on Kafka topics rather than communicating with each other directly. As described in the Confluent document, Schema Registry provides a centralized repository for managing and validating schemas for topic message data, and for serialization and deserialization of the data over the network. Producers and consumers to Kafka topics can use schemas to ensure data consistency and compatibility as schemas evolve. In AWS, the Glue Schema Registry supports features to manage and enforce schemas on data streaming applications using convenient integrations with Apache Kafka, Amazon Managed Streaming for Apache Kafka, Amazon Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda. In this post, we will discuss how to integrate Python Kafka producer and consumer apps In AWS Lambda with the Glue Schema Registry.

Architecture

Fake online order data is generated by multiple Lambda functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Kafka producer Lambda function. In this way we are able to generate test data using multiple Lambda functions according to the desired volume of messages. Note that, before sending a message, the producer validates the schema and registers a new one if it is not registered yet. Then it serializes the message and sends it to the cluster.

Once messages are sent to a Kafka topic, they can be consumed by Lambda where Amazon MSK is configured as an event source. The serialized record (message key or value) includes the schema ID so that the consumer can request the schema from the schema registry (if not cached) in order to deserialize it.

The infrastructure is built by Terraform and the AWS SAM CLI is used to develop the producer Lambda function locally before deploying to AWS.

Infrastructure

A VPC with 3 public and private subnets is created using the AWS VPC Terraform module (vpc.tf). Also, a SoftEther VPN server is deployed in order to access the resources in the private subnets from the developer machine (vpn.tf). It is particularly useful to monitor and manage the MSK cluster and Kafka topic as well as developing the Kafka producer Lambda function locally. The details about how to configure the VPN server can be found in an earlier post. The source can be found in the GitHub repository of this post.

MSK

An MSK cluster with 3 brokers is created. The broker nodes are deployed with the kafka.m5.large instance type in private subnets and IAM authentication is used for the client authentication method. Finally, additional server configurations are added such as enabling auto creation of topics and topic deletion.

 1# glue-schema-registry/infra/variable.tf
 2locals {
 3  ...
 4  msk = {
 5    version          = "3.3.1"
 6    instance_size    = "kafka.m5.large"
 7    ebs_volume_size  = 20
 8    log_retention_ms = 604800000 # 7 days
 9  }
10  ...
11}
12# glue-schema-registry/infra/msk.tf
13resource "aws_msk_cluster" "msk_data_cluster" {
14  cluster_name           = "${local.name}-msk-cluster"
15  kafka_version          = local.msk.version
16  number_of_broker_nodes = length(module.vpc.private_subnets)
17  configuration_info {
18    arn      = aws_msk_configuration.msk_config.arn
19    revision = aws_msk_configuration.msk_config.latest_revision
20  }
21
22  broker_node_group_info {
23    instance_type   = local.msk.instance_size
24    client_subnets  = module.vpc.private_subnets
25    security_groups = [aws_security_group.msk.id]
26    storage_info {
27      ebs_storage_info {
28        volume_size = local.msk.ebs_volume_size
29      }
30    }
31  }
32
33  client_authentication {
34    sasl {
35      iam = true
36    }
37  }
38
39  logging_info {
40    broker_logs {
41      cloudwatch_logs {
42        enabled   = true
43        log_group = aws_cloudwatch_log_group.msk_cluster_lg.name
44      }
45      s3 {
46        enabled = true
47        bucket  = aws_s3_bucket.default_bucket.id
48        prefix  = "logs/msk/cluster-"
49      }
50    }
51  }
52
53  tags = local.tags
54
55  depends_on = [aws_msk_configuration.msk_config]
56}
57
58resource "aws_msk_configuration" "msk_config" {
59  name = "${local.name}-msk-configuration"
60
61  kafka_versions = [local.msk.version]
62
63  server_properties = <<PROPERTIES
64    auto.create.topics.enable = true
65    delete.topic.enable = true
66    log.retention.ms = ${local.msk.log_retention_ms}
67  PROPERTIES
68}

Security Groups

Two security groups are created - one for the MSK cluster and the other for the Lambda apps.

The inbound/outbound rules of the former are created for accessing the cluster by

  • Event Source Mapping (ESM) for Lambda
    • This is for the Lambda consumer that subscribes the MSK cluster. As described in the AWS re:Post doc, when a Lambda function is configured with an Amazon MSK trigger or a self-managed Kafka trigger, an ESM resource is automatically created. An ESM is separate from the Lambda function, and it continuously polls records from the topic in the Kafka cluster. The ESM bundles those records into a payload. Then, it calls the Lambda Invoke API to deliver the payload to your Lambda function for processing. Note it doesn’t inherit the VPC network settings of the Lambda function but uses the subnet and security group settings that are configured on the target MSK cluster. Therefore, the MSK cluster’s security group must include a rule that grants ingress traffic from itself and egress traffic to itself. For us, the rules on port 9098 as the cluster only supports IAM authentication. Also, an additional egress rule is created to access the Glue Schema Registry.
  • Other Resources
    • Two ingress rules are created for the VPN server and Lambda. The latter is only for the Lambda producer because the consumer doesn’t rely on the Lambda network setting.

The second security group is created here, while the Lambda function is created in a different Terraform stack. This is for ease of adding it to the inbound rule of the MSK’s security group. Later we will discuss how to make use of it with the Lambda function. The outbound rule allows all outbound traffic although only port 9098 for the MSK cluster and 443 for the Glue Schema Registry would be sufficient.

 1# glue-schema-registry/infra/msk.tf
 2resource "aws_security_group" "msk" {
 3  name   = "${local.name}-msk-sg"
 4  vpc_id = module.vpc.vpc_id
 5
 6  lifecycle {
 7    create_before_destroy = true
 8  }
 9
10  tags = local.tags
11}
12
13# for lambda event source mapping
14resource "aws_security_group_rule" "msk_ingress_self_broker" {
15  type                     = "ingress"
16  description              = "msk ingress self"
17  security_group_id        = aws_security_group.msk.id
18  protocol                 = "tcp"
19  from_port                = 9098
20  to_port                  = 9098
21  source_security_group_id = aws_security_group.msk.id
22}
23
24resource "aws_security_group_rule" "msk_egress_self_broker" {
25  type                     = "egress"
26  description              = "msk egress self"
27  security_group_id        = aws_security_group.msk.id
28  protocol                 = "tcp"
29  from_port                = 9098
30  to_port                  = 9098
31  source_security_group_id = aws_security_group.msk.id
32}
33
34resource "aws_security_group_rule" "msk_all_outbound" {
35  type              = "egress"
36  description       = "allow outbound all"
37  security_group_id = aws_security_group.msk.id
38  protocol          = "-1"
39  from_port         = "0"
40  to_port           = "0"
41  cidr_blocks       = ["0.0.0.0/0"]
42}
43
44# for other resources
45resource "aws_security_group_rule" "msk_vpn_inbound" {
46  count                    = local.vpn.to_create ? 1 : 0
47  type                     = "ingress"
48  description              = "VPN access"
49  security_group_id        = aws_security_group.msk.id
50  protocol                 = "tcp"
51  from_port                = 9098
52  to_port                  = 9098
53  source_security_group_id = aws_security_group.vpn[0].id
54}
55
56resource "aws_security_group_rule" "msk_lambda_inbound" {
57  type                     = "ingress"
58  description              = "lambda access"
59  security_group_id        = aws_security_group.msk.id
60  protocol                 = "tcp"
61  from_port                = 9098
62  to_port                  = 9098
63  source_security_group_id = aws_security_group.kafka_app_lambda.id
64}
65
66...
67
68# lambda security group
69resource "aws_security_group" "kafka_app_lambda" {
70  name   = "${local.name}-lambda-msk-access"
71  vpc_id = module.vpc.vpc_id
72
73  lifecycle {
74    create_before_destroy = true
75  }
76
77  tags = local.tags
78}
79
80resource "aws_security_group_rule" "kafka_app_lambda_msk_egress" {
81  type              = "egress"
82  description       = "allow outbound all"
83  security_group_id = aws_security_group.kafka_app_lambda.id
84  protocol          = "-1"
85  from_port         = 0
86  to_port           = 0
87  cidr_blocks       = ["0.0.0.0/0"]
88}

Kafka Apps

The resources related to the Kafka producer and consumer Lambda functions are managed in a separate Terraform stack. This is because it is easier to build the relevant resources iteratively. Note the SAM CLI builds the whole Terraform stack even for a small change of code, and it wouldn’t be convenient if the entire resources are managed in the same stack.

Producer App

Order Data

Fake order data is generated using the Faker package and the dataclasses_avroschema package is used to automatically generate the Avro schema according to its attributes. A mixin class called InjectCompatMixin is injected into the Order class, which specifies a schema compatibility mode into the generated schema. The auto() class method is used to instantiate the class automatically. Finally, the OrderMore class is created for the schema evolution demo, which will be discussed later.

 1# glue-schema-registry/app/producer/src/order.py
 2import datetime
 3import string
 4import json
 5import typing
 6import dataclasses
 7import enum
 8
 9from faker import Faker
10from dataclasses_avroschema import AvroModel
11
12
13class Compatibility(enum.Enum):
14    NONE = "NONE"
15    DISABLED = "DISABLED"
16    BACKWARD = "BACKWARD"
17    BACKWARD_ALL = "BACKWARD_ALL"
18    FORWARD = "FORWARD"
19    FORWARD_ALL = "FORWARD_ALL"
20    FULL = "FULL"
21    FULL_ALL = "FULL_ALL"
22
23
24class InjectCompatMixin:
25    @classmethod
26    def updated_avro_schema_to_python(cls, compat: Compatibility = Compatibility.BACKWARD):
27        schema = cls.avro_schema_to_python()
28        schema["compatibility"] = compat.value
29        return schema
30
31    @classmethod
32    def updated_avro_schema(cls, compat: Compatibility = Compatibility.BACKWARD):
33        schema = cls.updated_avro_schema_to_python(compat)
34        return json.dumps(schema)
35
36
37@dataclasses.dataclass
38class OrderItem(AvroModel):
39    product_id: int
40    quantity: int
41
42
43@dataclasses.dataclass
44class Order(AvroModel, InjectCompatMixin):
45    "Online fake order item"
46    order_id: str
47    ordered_at: datetime.datetime
48    user_id: str
49    order_items: typing.List[OrderItem]
50
51    class Meta:
52        namespace = "Order V1"
53
54    def asdict(self):
55        return dataclasses.asdict(self)
56
57    @classmethod
58    def auto(cls, fake: Faker = Faker()):
59        rand_int = fake.random_int(1, 1000)
60        user_id = "".join(
61            [string.ascii_lowercase[int(s)] if s.isdigit() else s for s in hex(rand_int)]
62        )[::-1]
63        order_items = [
64            OrderItem(fake.random_int(1, 9999), fake.random_int(1, 10))
65            for _ in range(fake.random_int(1, 4))
66        ]
67        return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
68
69    def create(self, num: int):
70        return [self.auto() for _ in range(num)]
71
72
73@dataclasses.dataclass
74class OrderMore(Order):
75    is_prime: bool
76
77    @classmethod
78    def auto(cls, fake: Faker = Faker()):
79        o = Order.auto()
80        return cls(o.order_id, o.ordered_at, o.user_id, o.order_items, fake.pybool())

The generated schema of the Order class can be found below.

 1{
 2	"doc": "Online fake order item",
 3	"namespace": "Order V1",
 4	"name": "Order",
 5	"compatibility": "BACKWARD",
 6	"type": "record",
 7	"fields": [
 8		{
 9			"name": "order_id",
10			"type": "string"
11		},
12		{
13			"name": "ordered_at",
14			"type": {
15				"type": "long",
16				"logicalType": "timestamp-millis"
17			}
18		},
19		{
20			"name": "user_id",
21			"type": "string"
22		},
23		{
24			"name": "order_items",
25			"type": {
26				"type": "array",
27				"items": {
28					"type": "record",
29					"name": "OrderItem",
30					"fields": [
31						{
32							"name": "product_id",
33							"type": "long"
34						},
35						{
36							"name": "quantity",
37							"type": "long"
38						}
39					]
40				},
41				"name": "order_item"
42			}
43		}
44	]
45}

Below shows an example order record.

 1{
 2  "order_id": "53263c42-81b3-4a53-8067-fcdb44fa5479",
 3  "ordered_at": 1680745813045,
 4  "user_id": "dicxa",
 5  "order_items": [
 6    {
 7      "product_id": 5947,
 8      "quantity": 8
 9    }
10  ]
11}

Producer

The aws-glue-schema-registry package is used serialize the value of order messages. It provides the KafkaSerializer class that validates, registers and serializes the relevant records. It supports Json and Avro schemas, and we can add it to the value_serializer argument of the KafkaProducer class. By default, the schemas are named as <topic>-key and <topic>-value and it can be changed by updating the schema_naming_strategy argument. Note that, when sending a message, the value should be a tuple of data and schema. Note also that the stable version of the kafka-python package does not support the IAM authentication method. Therefore, we need to install the package from a forked repository as discussed in this GitHub issue.

 1# glue-schema-registry/app/producer/src/producer.py
 2import os
 3import datetime
 4import json
 5import typing
 6
 7import boto3
 8import botocore.exceptions
 9from kafka import KafkaProducer
10from aws_schema_registry import SchemaRegistryClient
11from aws_schema_registry.avro import AvroSchema
12from aws_schema_registry.adapter.kafka import KafkaSerializer
13from aws_schema_registry.exception import SchemaRegistryException
14
15from .order import Order
16
17
18class Producer:
19    def __init__(self, bootstrap_servers: list, topic: str, registry: str, is_local: bool = False):
20        self.bootstrap_servers = bootstrap_servers
21        self.topic = topic
22        self.registry = registry
23        self.glue_client = boto3.client(
24            "glue", region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-2")
25        )
26        self.is_local = is_local
27        self.producer = self.create()
28
29    @property
30    def serializer(self):
31        client = SchemaRegistryClient(self.glue_client, registry_name=self.registry)
32        return KafkaSerializer(client)
33
34    def create(self):
35        params = {
36            "bootstrap_servers": self.bootstrap_servers,
37            "key_serializer": lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
38            "value_serializer": self.serializer,
39        }
40        if not self.is_local:
41            params = {
42                **params,
43                **{"security_protocol": "SASL_SSL", "sasl_mechanism": "AWS_MSK_IAM"},
44            }
45        return KafkaProducer(**params)
46
47    def send(self, orders: typing.List[Order], schema: AvroSchema):
48        if not self.check_registry():
49            print(f"registry not found, create {self.registry}")
50            self.create_registry()
51
52        for order in orders:
53            data = order.asdict()
54            try:
55                self.producer.send(
56                    self.topic, key={"order_id": data["order_id"]}, value=(data, schema)
57                )
58            except SchemaRegistryException as e:
59                raise RuntimeError("fails to send a message") from e
60        self.producer.flush()
61
62    def serialize(self, obj):
63        if isinstance(obj, datetime.datetime):
64            return obj.isoformat()
65        if isinstance(obj, datetime.date):
66            return str(obj)
67        return obj
68
69    def check_registry(self):
70        try:
71            self.glue_client.get_registry(RegistryId={"RegistryName": self.registry})
72            return True
73        except botocore.exceptions.ClientError as e:
74            if e.response["Error"]["Code"] == "EntityNotFoundException":
75                return False
76            else:
77                raise e
78
79    def create_registry(self):
80        try:
81            self.glue_client.create_registry(RegistryName=self.registry)
82            return True
83        except botocore.exceptions.ClientError as e:
84            if e.response["Error"]["Code"] == "AlreadyExistsException":
85                return True
86            else:
87                raise e

Lambda Handler

The Lambda function sends 100 records at a time followed by sleeping for 1 second. It repeats until it reaches MAX_RUN_SEC (e.g. 60) environment variable value. The last conditional block is for demonstrating a schema evolution example, which will be discussed later.

 1# glue-schema-registry/app/producer/lambda_handler.py
 2import os
 3import datetime
 4import time
 5
 6from aws_schema_registry.avro import AvroSchema
 7
 8from src.order import Order, OrderMore, Compatibility
 9from src.producer import Producer
10
11
12def lambda_function(event, context):
13    producer = Producer(
14        bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"].split(","),
15        topic=os.environ["TOPIC_NAME"],
16        registry=os.environ["REGISTRY_NAME"],
17    )
18    s = datetime.datetime.now()
19    ttl_rec = 0
20    while True:
21        orders = Order.auto().create(100)
22        schema = AvroSchema(Order.updated_avro_schema(Compatibility.BACKWARD))
23        producer.send(orders, schema)
24        ttl_rec += len(orders)
25        print(f"sent {len(orders)} messages")
26        elapsed_sec = (datetime.datetime.now() - s).seconds
27        if elapsed_sec > int(os.getenv("MAX_RUN_SEC", "60")):
28            print(f"{ttl_rec} records are sent in {elapsed_sec} seconds ...")
29            break
30        time.sleep(1)
31
32
33if __name__ == "__main__":
34    producer = Producer(
35        bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"].split(","),
36        topic=os.environ["TOPIC_NAME"],
37        registry=os.environ["REGISTRY_NAME"],
38        is_local=True,
39    )
40    use_more = os.getenv("USE_MORE") is not None
41    if not use_more:
42        orders = Order.auto().create(1)
43        schema = AvroSchema(Order.updated_avro_schema(Compatibility.BACKWARD))
44    else:
45        orders = OrderMore.auto().create(1)
46        schema = AvroSchema(OrderMore.updated_avro_schema(Compatibility.BACKWARD))
47    print(orders)
48    producer.send(orders, schema)

Lambda Resource

The VPC, subnets, Lambda security group and MSK cluster are created in the infra Terraform stack, and they need to be obtained from the Kafka app stack. It can be achieved using the Terraform data sources as shown below. Note that the private subnets can be filtered by a specific tag (Tier: Private), which is added when creating them.

 1# glue-schema-registry/infra/vpc.tf
 2module "vpc" {
 3  source  = "terraform-aws-modules/vpc/aws"
 4  version = "~> 3.14"
 5
 6  name = "${local.name}-vpc"
 7  cidr = local.vpc.cidr
 8
 9  azs             = local.vpc.azs
10  public_subnets  = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k)]
11  private_subnets = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k + 3)]
12
13  ...
14
15  private_subnet_tags = {
16    "Tier" = "Private"
17  }
18
19  tags = local.tags
20}
21
22# glue-schema-registry/app/variables.tf
23data "aws_caller_identity" "current" {}
24
25data "aws_region" "current" {}
26
27data "aws_vpc" "selected" {
28  filter {
29    name   = "tag:Name"
30    values = ["${local.infra_prefix}"]
31  }
32}
33
34data "aws_subnets" "private" {
35  filter {
36    name   = "vpc-id"
37    values = [data.aws_vpc.selected.id]
38  }
39
40  tags = {
41    Tier = "Private"
42  }
43}
44
45data "aws_msk_cluster" "msk_data_cluster" {
46  cluster_name = "${local.infra_prefix}-msk-cluster"
47}
48
49data "aws_security_group" "kafka_producer_lambda" {
50  name = "${local.infra_prefix}-lambda-msk-access"
51}
52
53
54locals {
55  ...
56  infra_prefix = "glue-schema-registry"
57  ...
58}

The AWS Lambda Terraform module is used to create the producer Lambda function. Note that, in order to develop a Lambda function using AWS SAM, we need to create SAM metadata resource, which provides the AWS SAM CLI with the information it needs to locate Lambda functions and layers, along with their source code, build dependencies, and build logic from within your Terraform project. It is created by default by the Terraform module, which is convenient. Also, we need to give permission to the EventBridge rule to invoke the Lambda function, and it is given by the aws_lambda_permission resource.

 1# glue-schema-registry/app/variables.tf
 2locals {
 3  name        = local.infra_prefix
 4  region      = data.aws_region.current.name
 5  environment = "dev"
 6
 7  infra_prefix = "glue-schema-registry"
 8
 9  producer = {
10    src_path          = "producer"
11    function_name     = "kafka_producer"
12    handler           = "lambda_handler.lambda_function"
13    concurrency       = 5
14    timeout           = 90
15    memory_size       = 128
16    runtime           = "python3.8"
17    schedule_rate     = "rate(1 minute)"
18    to_enable_trigger = true
19    environment = {
20      topic_name    = "orders"
21      registry_name = "customer"
22      max_run_sec   = 60
23    }
24  }
25
26  ...
27}
28
29# glue-schema-registry/app/main.tf
30module "kafka_producer_lambda" {
31  source = "terraform-aws-modules/lambda/aws"
32
33  function_name          = local.producer.function_name
34  handler                = local.producer.handler
35  runtime                = local.producer.runtime
36  timeout                = local.producer.timeout
37  memory_size            = local.producer.memory_size
38  source_path            = local.producer.src_path
39  vpc_subnet_ids         = data.aws_subnets.private.ids
40  vpc_security_group_ids = [data.aws_security_group.kafka_app_lambda.id]
41  attach_network_policy  = true
42  attach_policies        = true
43  policies               = [aws_iam_policy.msk_lambda_producer_permission.arn]
44  number_of_policies     = 1
45  environment_variables = {
46    BOOTSTRAP_SERVERS = data.aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
47    TOPIC_NAME        = local.producer.environment.topic_name
48    REGISTRY_NAME     = local.producer.environment.registry_name
49    MAX_RUN_SEC       = local.producer.environment.max_run_sec
50  }
51
52  tags = local.tags
53}
54
55resource "aws_lambda_function_event_invoke_config" "kafka_producer_lambda" {
56  function_name          = module.kafka_producer_lambda.lambda_function_name
57  maximum_retry_attempts = 0
58}
59
60resource "aws_lambda_permission" "allow_eventbridge" {
61  count         = local.producer.to_enable_trigger ? 1 : 0
62  statement_id  = "InvokeLambdaFunction"
63  action        = "lambda:InvokeFunction"
64  function_name = local.producer.function_name
65  principal     = "events.amazonaws.com"
66  source_arn    = module.eventbridge.eventbridge_rule_arns["crons"]
67
68  depends_on = [
69    module.eventbridge
70  ]
71}
IAM Permission

The producer Lambda function needs permission to send messages to the orders topic of the MSK cluster. Also, it needs permission on the Glue schema registry and schema. The following IAM policy is added to the Lambda function.

 1# glue-schema-registry/app/main.tf
 2resource "aws_iam_policy" "msk_lambda_producer_permission" {
 3  name = "${local.producer.function_name}-msk-lambda-producer-permission"
 4
 5  policy = jsonencode({
 6    Version = "2012-10-17"
 7    Statement = [
 8      {
 9        Sid = "PermissionOnCluster"
10        Action = [
11          "kafka-cluster:Connect",
12          "kafka-cluster:AlterCluster",
13          "kafka-cluster:DescribeCluster"
14        ]
15        Effect   = "Allow"
16        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.infra_prefix}-msk-cluster/*"
17      },
18      {
19        Sid = "PermissionOnTopics"
20        Action = [
21          "kafka-cluster:*Topic*",
22          "kafka-cluster:WriteData",
23          "kafka-cluster:ReadData"
24        ]
25        Effect   = "Allow"
26        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.infra_prefix}-msk-cluster/*"
27      },
28      {
29        Sid = "PermissionOnGroups"
30        Action = [
31          "kafka-cluster:AlterGroup",
32          "kafka-cluster:DescribeGroup"
33        ]
34        Effect   = "Allow"
35        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.infra_prefix}-msk-cluster/*"
36      },
37      {
38        Sid = "PermissionOnGlueSchema"
39        Action = [
40          "glue:*Schema*",
41          "glue:GetRegistry",
42          "glue:CreateRegistry",
43          "glue:ListRegistries",
44        ]
45        Effect   = "Allow"
46        Resource = "*"
47      }
48    ]
49  })
50}

EventBridge Rule

The AWS EventBridge Terraform module is used to create the EventBridge schedule rule and targets. Note that 5 targets that point to the Kafka producer Lambda function are created so that it is invoked concurrently every minute.

 1module "eventbridge" {
 2  source = "terraform-aws-modules/eventbridge/aws"
 3
 4  create_bus = false
 5
 6  rules = {
 7    crons = {
 8      description         = "Kafka producer lambda schedule"
 9      schedule_expression = local.producer.schedule_rate
10    }
11  }
12
13  targets = {
14    crons = [for i in range(local.producer.concurrency) : {
15      name = "lambda-target-${i}"
16      arn  = module.kafka_producer_lambda.lambda_function_arn
17    }]
18  }
19
20  depends_on = [
21    module.kafka_producer_lambda
22  ]
23
24  tags = local.tags
25}

Consumer App

Lambda Handler

The Lambda event includes records, which is a dictionary where the key is a topic partition (topic_name-partiton_number) and the value is a list of consumer records. The consumer records include both the message metadata (topic, partition, offset, timestamp…), key and value. An example payload is shown below.

 1{
 2    "eventSource": "aws:kafka",
 3    "eventSourceArn": "<msk-cluster-arn>",
 4    "bootstrapServers": "<bootstrap-server-addresses>",
 5    "records": {
 6        "orders-2": [
 7            {
 8                "topic": "orders",
 9                "partition": 2,
10                "offset": 10293,
11                "timestamp": 1680631941838,
12                "timestampType": "CREATE_TIME",
13                "key": "eyJvcmRlcl9pZCI6ICJkNmQ4ZDJjNi1hODYwLTQyNTYtYWY1Yi04ZjU3NDkxZmM4YWYifQ==",
14                "value": "AwDeD/rgjCxCeawN/ZaIO6VuSGQ2ZDhkMmM2LWE4NjAtNDI1Ni1hZjViLThmNTc0OTFmYzhhZu6pwtfpYQppYWJ4YQa8UBSEHgbkVAYA",
15                "headers": [],
16            }
17        ]
18    }
19}

The ConsumerRecord class parses/formats a consumer record. As the key and value are returned as base64 encoded string, it is decoded into bytes, followed by decoding or deserializing appropriately. The LambdaDeserializer class is created to deserialize the value. Also, the message timestamp is converted into the datetime object. The parse_record() method returns the consumer record with parsed/formatted values.

 1# glue-schema-registry/app/consumer/lambda_handler.py
 2import os
 3import json
 4import base64
 5import datetime
 6
 7import boto3
 8from aws_schema_registry import SchemaRegistryClient
 9from aws_schema_registry.adapter.kafka import Deserializer, KafkaDeserializer
10
11
12class LambdaDeserializer(Deserializer):
13    def __init__(self, registry: str):
14        self.registry = registry
15
16    @property
17    def deserializer(self):
18        glue_client = boto3.client(
19            "glue", region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-2")
20        )
21        client = SchemaRegistryClient(glue_client, registry_name=self.registry)
22        return KafkaDeserializer(client)
23
24    def deserialize(self, topic: str, bytes_: bytes):
25        return self.deserializer.deserialize(topic, bytes_)
26
27
28class ConsumerRecord:
29    def __init__(self, record: dict):
30        self.topic = record["topic"]
31        self.partition = record["partition"]
32        self.offset = record["offset"]
33        self.timestamp = record["timestamp"]
34        self.timestamp_type = record["timestampType"]
35        self.key = record["key"]
36        self.value = record["value"]
37        self.headers = record["headers"]
38
39    def parse_key(self):
40        return base64.b64decode(self.key).decode()
41
42    def parse_value(self, deserializer: LambdaDeserializer):
43        parsed = deserializer.deserialize(self.topic, base64.b64decode(self.value))
44        return parsed.data
45
46    def format_timestamp(self, to_str: bool = True):
47        ts = datetime.datetime.fromtimestamp(self.timestamp / 1000)
48        if to_str:
49            return ts.isoformat()
50        return ts
51
52    def parse_record(
53        self, deserializer: LambdaDeserializer, to_str: bool = True, to_json: bool = True
54    ):
55        rec = {
56            **self.__dict__,
57            **{
58                "key": self.parse_key(),
59                "value": self.parse_value(deserializer),
60                "timestamp": self.format_timestamp(to_str),
61            },
62        }
63        if to_json:
64            return json.dumps(rec, default=self.serialize)
65        return rec
66
67    def serialize(self, obj):
68        if isinstance(obj, datetime.datetime):
69            return obj.isoformat()
70        if isinstance(obj, datetime.date):
71            return str(obj)
72        return obj
73
74
75def lambda_function(event, context):
76    deserializer = LambdaDeserializer(os.getenv("REGISTRY_NAME", "customer"))
77    for _, records in event["records"].items():
78        for record in records:
79            cr = ConsumerRecord(record)
80            print(cr.parse_record(deserializer))

Lambda Resource

The AWS Lambda Terraform module is used to create the consumer Lambda function as well. Lambda event source mapping is created so that it polls messages from the orders topic and invoke the consumer function. Also, we need to give permission to the MSK cluster to invoke the Lambda function, and it is given by the aws_lambda_permission resource.

 1# glue-schema-registry/app/variables.tf
 2locals {
 3  name        = local.infra_prefix
 4  region      = data.aws_region.current.name
 5  environment = "dev"
 6
 7  infra_prefix = "glue-schema-registry"
 8
 9  ...
10
11  consumer = {
12    src_path          = "consumer"
13    function_name     = "kafka_consumer"
14    handler           = "lambda_handler.lambda_function"
15    timeout           = 90
16    memory_size       = 128
17    runtime           = "python3.8"
18    topic_name        = "orders"
19    starting_position = "TRIM_HORIZON"
20    environment = {
21      registry_name = "customer"
22    }
23  }
24}
25
26# glue-schema-registry/app/main.tf
27module "kafka_consumer_lambda" {
28  source = "terraform-aws-modules/lambda/aws"
29
30  function_name          = local.consumer.function_name
31  handler                = local.consumer.handler
32  runtime                = local.consumer.runtime
33  timeout                = local.consumer.timeout
34  memory_size            = local.consumer.memory_size
35  source_path            = local.consumer.src_path
36  vpc_subnet_ids         = data.aws_subnets.private.ids
37  vpc_security_group_ids = [data.aws_security_group.kafka_app_lambda.id]
38  attach_network_policy  = true
39  attach_policies        = true
40  policies               = [aws_iam_policy.msk_lambda_consumer_permission.arn]
41  number_of_policies     = 1
42  environment_variables = {
43    REGISTRY_NAME = local.producer.environment.registry_name
44  }
45
46  tags = local.tags
47}
48
49resource "aws_lambda_event_source_mapping" "kafka_consumer_lambda" {
50  event_source_arn  = data.aws_msk_cluster.msk_data_cluster.arn
51  function_name     = module.kafka_consumer_lambda.lambda_function_name
52  topics            = [local.consumer.topic_name]
53  starting_position = local.consumer.starting_position
54  amazon_managed_kafka_event_source_config {
55    consumer_group_id = "${local.consumer.topic_name}-group-01"
56  }
57}
58
59resource "aws_lambda_permission" "allow_msk" {
60  statement_id  = "InvokeLambdaFunction"
61  action        = "lambda:InvokeFunction"
62  function_name = local.consumer.function_name
63  principal     = "kafka.amazonaws.com"
64  source_arn    = data.aws_msk_cluster.msk_data_cluster.arn
65}
IAM Permission

As the Lambda event source mapping uses the permission of the Lambda function, we need to add permission related to Kafka cluster, Kafka and networking - see the AWS documentation for details. Finally, permission on the Glue schema registry and schema is added as the consumer should be able to request relevant schemas.

 1# glue-schema-registry/app/main.tf
 2resource "aws_iam_policy" "msk_lambda_consumer_permission" {
 3  name = "${local.consumer.function_name}-msk-lambda-consumer-permission"
 4
 5  policy = jsonencode({
 6    Version = "2012-10-17"
 7    Statement = [
 8      {
 9        Sid = "PermissionOnKafkaCluster"
10        Action = [
11          "kafka-cluster:Connect",
12          "kafka-cluster:DescribeGroup",
13          "kafka-cluster:AlterGroup",
14          "kafka-cluster:DescribeTopic",
15          "kafka-cluster:ReadData",
16          "kafka-cluster:DescribeClusterDynamicConfiguration"
17        ]
18        Effect = "Allow"
19        Resource = [
20          "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.infra_prefix}-msk-cluster/*",
21          "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.infra_prefix}-msk-cluster/*",
22          "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.infra_prefix}-msk-cluster/*"
23        ]
24      },
25      {
26        Sid = "PermissionOnKafka"
27        Action = [
28          "kafka:DescribeCluster",
29          "kafka:GetBootstrapBrokers"
30        ]
31        Effect   = "Allow"
32        Resource = "*"
33      },
34      {
35        Sid = "PermissionOnNetwork"
36        Action = [
37          # The first three actions also exist in netwrok policy attachment in lambda module
38          # "ec2:CreateNetworkInterface",
39          # "ec2:DescribeNetworkInterfaces",
40          # "ec2:DeleteNetworkInterface",
41          "ec2:DescribeVpcs",
42          "ec2:DescribeSubnets",
43          "ec2:DescribeSecurityGroups"
44        ]
45        Effect   = "Allow"
46        Resource = "*"
47      },
48      {
49        Sid = "PermissionOnGlueSchema"
50        Action = [
51          "glue:*Schema*",
52          "glue:ListRegistries"
53        ]
54        Effect   = "Allow"
55        Resource = "*"
56      }
57    ]
58  })
59}

Schema Evolution Demo

Before testing the Kafka applications, I’ll quickly demonstrate how the schema registry can be used for managing and validating schemas for topic message data. Each schema can have a compatibility mode (or disabled) and the scope of changes is restricted by it. For example, the default BACKWARD mode only allows to delete fields or add optional fields. (See the Confluent document for a quick summary.) Therefore, if we add a mandatory field to an existing schema, it will be not validated, and it fails to send a message to the topic. In order to illustrate it, I created a single node Kafka cluster using docker-compose as shown below.

 1# glue-schema-registry/compose-demo.yml
 2version: "3.5"
 3
 4services:
 5  zookeeper:
 6    image: docker.io/bitnami/zookeeper:3.8
 7    container_name: zookeeper
 8    ports:
 9      - "2181"
10    networks:
11      - kafkanet
12    environment:
13      - ALLOW_ANONYMOUS_LOGIN=yes
14    volumes:
15      - zookeeper_data:/bitnami/zookeeper
16  kafka-0:
17    image: docker.io/bitnami/kafka:3.3
18    container_name: kafka-0
19    expose:
20      - 9092
21    ports:
22      - "9093:9093"
23    networks:
24      - kafkanet
25    environment:
26      - ALLOW_PLAINTEXT_LISTENER=yes
27      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
28      - KAFKA_CFG_BROKER_ID=0
29      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
30      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
31      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://localhost:9093
32      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
33    volumes:
34      - kafka_0_data:/bitnami/kafka
35    depends_on:
36      - zookeeper
37
38networks:
39  kafkanet:
40    name: kafka-network
41
42volumes:
43  zookeeper_data:
44    driver: local
45  kafka_0_data:
46    driver: local

Recall that the producer lambda handler has a conditional block, and it is executed when it is called as a script. If we don’t specify the environment variable of USE_MORE, it sends a messages based on the Order class. Otherwise, a message is created from the OrderMore class, which has an additional boolean attribute called is_prime. As the compatibility mode is set to be BACKWARD, we can expect the second round of execution will not be successful. As shown below, I executed the lambda handler twice and the second round failed with the following error, which indicates schema validation failure.

aws_schema_registry.exception.SchemaRegistryException: Schema Found but status is FAILURE

 1export BOOTSTRAP_SERVERS=localhost:9093
 2export TOPIC_NAME=demo
 3export REGISTRY_NAME=customer
 4
 5cd glue-schema-registry/app/producer
 6
 7## Round 1 - send message from the Order class
 8python lambda_handler.py
 9
10## Round 2 - send message from the OrderMore class
11export USE_MORE=1
12
13python lambda_handler.py

We can see the details from the schema version and the second version is marked as failed.

Note that schema versioning and validation would be more relevant to the clients that tightly link the schema and message records. However, it would still be important for a Python client in order to work together with those clients or Kafka connect.

Deployment

Topic Creation

We plan to create the orders topic with multiple partitions. Although we can use the Kafka CLI tool, it can be performed easily using Kpow. It is a Kafka monitoring and management tool, which provides a web UI. Also, it supports the Glue Schema Registry and MSK Connect out-of-box, which is quite convenient. In the docker-compose file, we added environment variables for the MSK cluster, MSK Connect and Glue Schema Registry details. Note it fails to start if the schema registry does not exist. I created the registry while I demonstrated schema evolution, or it can be created simply as shown below.

1$ aws glue create-registry --registry-name customer
 1# glue-schema-registry/docker-compose.yml
 2version: "3.5"
 3
 4services:
 5  kpow:
 6    image: factorhouse/kpow-ce:91.2.1
 7    container_name: kpow
 8    ports:
 9      - "3000:3000"
10    networks:
11      - kafkanet
12    environment:
13      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
14      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
15      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
16      # kafka cluster
17      BOOTSTRAP: $BOOTSTRAP_SERVERS
18      SECURITY_PROTOCOL: SASL_SSL
19      SASL_MECHANISM: AWS_MSK_IAM
20      SASL_CLIENT_CALLBACK_HANDLER_CLASS: software.amazon.msk.auth.iam.IAMClientCallbackHandler
21      SASL_JAAS_CONFIG: software.amazon.msk.auth.iam.IAMLoginModule required;
22      # msk connect
23      CONNECT_AWS_REGION: $AWS_DEFAULT_REGION
24      # glue schema registry
25      SCHEMA_REGISTRY_ARN: $SCHEMA_REGISTRY_ARN
26      SCHEMA_REGISTRY_REGION: $AWS_DEFAULT_REGION
27
28networks:
29  kafkanet:
30    name: kafka-network

Once started, we can visit the UI on port 3000. The topic is created in the Topic menu by specifying the topic name and the number of partitions.

Once created, we can check details of the topic by selecting the topic from the drop-down menu.

Local Testing with SAM

To simplify development, the EventBridge permission is disabled by setting to_enable_trigger to false. Also, it is shortened to loop before it gets stopped by reducing max_run_sec to 10.

 1# glue-schema-registry/app/variables.tf
 2locals {
 3  ...
 4
 5  producer = {
 6    ...
 7    to_enable_trigger = false
 8    environment = {
 9      topic_name    = "orders"
10      registry_name = "customer"
11      max_run_sec   = 10
12    }
13  }
14  ...
15}

The Lambda function can be built with the SAM build command while specifying the hook name as terraform and enabling beta features. Once completed, it stores the build artifacts and template in the .aws-sam folder.

 1$ sam build --hook-name terraform --beta-features
 2
 3# Apply complete! Resources: 3 added, 0 changed, 0 destroyed.
 4
 5# Build Succeeded
 6
 7# Built Artifacts  : .aws-sam/build
 8# Built Template   : .aws-sam/build/template.yaml
 9
10# Commands you can use next
11# =========================
12# [*] Invoke Function: sam local invoke --hook-name terraform
13# [*] Emulate local Lambda functions: sam local start-lambda --hook-name terraform
14
15# SAM CLI update available (1.78.0); (1.70.0 installed)
16# To download: https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html

We can invoke the Lambda function locally using the SAM local invoke command. The Lambda function is invoked in a Docker container and the invocation logs are printed in the terminal as shown below. Note that we should be connected to the VPN server in order to send messages into the MSK cluster, which is deployed in private subnets.

 1$ sam local invoke --hook-name terraform module.kafka_producer_lambda.aws_lambda_function.this[0] --beta-features
 2
 3# Experimental features are enabled for this session.
 4# Visit the docs page to learn more about the AWS Beta terms https://aws.amazon.com/service-terms/.
 5
 6# Skipped prepare hook. Current application is already prepared.
 7# Invoking lambda_handler.lambda_function (python3.8)
 8# Skip pulling image and use local one: public.ecr.aws/sam/emulation-python3.8:rapid-1.70.0-x86_64.
 9
10# Mounting /home/jaehyeon/personal/kafka-pocs/glue-schema-registry/app/.aws-sam/build/ModuleKafkaProducerLambdaAwsLambdaFunctionThis069E06354 as /var/task:ro,delegated inside runtime container
11# START RequestId: fdbba255-e5b0-4e21-90d3-fe0b2ebbf629 Version: $LATEST
12# sent 100 messages
13# sent 100 messages
14# sent 100 messages
15# sent 100 messages
16# sent 100 messages
17# sent 100 messages
18# sent 100 messages
19# sent 100 messages
20# sent 100 messages
21# sent 100 messages
22# 1000 records are sent in 11 seconds ...
23# END RequestId: fdbba255-e5b0-4e21-90d3-fe0b2ebbf629
24# REPORT RequestId: fdbba255-e5b0-4e21-90d3-fe0b2ebbf629  Init Duration: 0.22 ms  Duration: 12146.61 ms   Billed Duration: 12147 ms       Memory Size: 128 MB     Max Memory Used: 128 MB
25# null

Once completed, we can check the value schema (orders-value) is created in the Kpow UI as shown below.

We can check the messages. In order to check them correctly, we need to select AVRO as the value deserializer and glue1 as the schema registry.

Kafka App Deployment

Now we can deploy the Kafka applications using Terraform as usual after resetting the configuration variables. Once deployed, we can see that the scheduler rule has 5 targets of the same Lambda function.

We can see the Lambda consumer parses the consumer records correctly in CloudWatch logs.

Summary

Schema registry provides a centralized repository for managing and validating schemas for topic message data. In AWS, the Glue Schema Registry supports features to manage and enforce schemas on data streaming applications using convenient integrations with a range of AWS services. In this post, we discussed how to integrate Python Kafka producer and consumer apps in AWS Lambda with the Glue Schema Registry.