From 6c585347b7d31b094b7d3b8dbdc5efb9b7fbe2ea Mon Sep 17 00:00:00 2001
From: Robert Yokota <rayokota@gmail.com>
Date: Mon, 6 Jan 2025 16:48:41 -0800
Subject: [PATCH] DGS-19492 Handle records nested in arrays/maps when searching
 for tags (#1890)

* Handle records nested in arrays/maps when searching for tags

* Fix formatting
---
 src/confluent_kafka/schema_registry/avro.py |  6 +-
 tests/schema_registry/test_avro_serdes.py   | 75 +++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py
index 7a4a48319..6c800f977 100644
--- a/src/confluent_kafka/schema_registry/avro.py
+++ b/src/confluent_kafka/schema_registry/avro.py
@@ -752,7 +752,11 @@ def _get_inline_tags_recursively(
         return
     else:
         schema_type = schema.get("type")
-        if schema_type == 'record':
+        if schema_type == 'array':
+            _get_inline_tags_recursively(ns, name, schema.get("items"), tags)
+        elif schema_type == 'map':
+            _get_inline_tags_recursively(ns, name, schema.get("values"), tags)
+        elif schema_type == 'record':
             record_ns = schema.get("namespace")
             record_name = schema.get("name")
             if record_ns is None:
diff --git a/tests/schema_registry/test_avro_serdes.py b/tests/schema_registry/test_avro_serdes.py
index ce7c8839b..aeb58d984 100644
--- a/tests/schema_registry/test_avro_serdes.py
+++ b/tests/schema_registry/test_avro_serdes.py
@@ -757,6 +757,81 @@ def test_avro_cel_field_transform_complex_with_none():
     assert obj2 == newobj
 
 
+def test_avro_cel_field_transform_complex_nested():
+    conf = {'url': _BASE_URL}
+    client = SchemaRegistryClient.new_client(conf)
+    ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
+    schema = {
+        'type': 'record',
+        'name': 'UnionTest',
+        'namespace': 'test',
+        'fields': [
+            {
+                'name': 'emails',
+                'type': [
+                    'null',
+                    {
+                        'type': 'array',
+                        'items': {
+                            'type': 'record',
+                            'name': 'Email',
+                            'fields': [
+                                {
+                                    'name': 'email',
+                                    'type': [
+                                        'null',
+                                        'string'
+                                    ],
+                                    'doc': 'Email address',
+                                    'confluent:tags': [
+                                        'PII'
+                                    ]
+                                }
+                            ]
+                        }
+                    }
+                ],
+                'doc': 'Communication Email',
+            }
+        ]
+    }
+
+    rule = Rule(
+        "test-cel",
+        "",
+        RuleKind.TRANSFORM,
+        RuleMode.WRITE,
+        "CEL_FIELD",
+        None,
+        None,
+        "typeName == 'STRING' ; value + '-suffix'",
+        None,
+        None,
+        False
+    )
+    client.register_schema(_SUBJECT, Schema(
+        json.dumps(schema),
+        "AVRO",
+        [],
+        None,
+        RuleSet(None, [rule])
+    ))
+
+    obj = {
+        'emails': [{'email': 'john@acme.com'}]
+    }
+    ser = AvroSerializer(client, schema_str=None, conf=ser_conf)
+    ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
+    obj_bytes = ser(obj, ser_ctx)
+
+    obj2 = {
+        'emails': [{'email': 'john@acme.com-suffix'}]
+    }
+    deser = AvroDeserializer(client)
+    newobj = deser(obj_bytes, ser_ctx)
+    assert obj2 == newobj
+
+
 def test_avro_cel_field_condition():
     conf = {'url': _BASE_URL}
     client = SchemaRegistryClient.new_client(conf)