Skip to content

How should we configure to keep the schema after restarting the apicurio deployment? #3145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
LittleWat opened this issue Feb 13, 2023 · 10 comments

Comments

@LittleWat
Copy link

Hello! I am relatively new to Apicuiro.

We set up the apicurio registry that uses kafka as its storage following the document.

The deployment file is like as follows:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: apicurio
  namespace: apicurio
  labels:
    app: apicurio
spec:
  replicas: 1
  selector:
    matchLabels:
      app: apicurio
  template:
    metadata:
      labels:
        app: apicurio
    spec:
      containers:
      - name: apicurio
        image: "apicurio/apicurio-registry-kafkasql:2.4.1.Final"
        imagePullPolicy: "IfNotPresent"
        ports:
        - name: schema-registry
          containerPort: 8080
          protocol: TCP
        resources:
          {}
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: <MY_KAFKA_BOOTSTRAP_SERVERS>
        - name: REGISTRY_KAFKASQL_TOPIC
          value: "apicurio.data"
        - name: REGISTRY_KAFKASQL_CONSUMER_GROUP_ID
          value: "apicurio.registry"
        - name: REGISTRY_RULES_GLOBAL_VALIDITY
          value: "FULL"

The schema registration via the Apicurui UI works fine, however, after restarting the deployment ($ kubectl rollout restart deployment -n apicurio apicurio), the schema is lost.

I would like to keep the schema after restarting the deployment. Is it possible?
If possible, how should I configure?

Thank you!

@LittleWat
Copy link
Author

LittleWat commented Feb 14, 2023

I confirmed that the schema is written to the apicurio.data topic
スクリーンショット 2023-02-14 12 12 01

so maybe loading the schema seems to have a problem 🤔

The consumer group seems to consume the messages successfully.
スクリーンショット 2023-02-14 15 11 34

@jsenko jsenko self-assigned this Feb 14, 2023
@jsenko
Copy link
Member

jsenko commented Feb 14, 2023

Can you try to remove custom topic & consumer group configuration properties to see if the error disappears? I have a hypothesis that there is a bug in the way we work with those configuration properties. Are there any interesting log messages?

@LittleWat
Copy link
Author

LittleWat commented Feb 15, 2023

Thank you for your quick reply!

I removed the consumer group configuration(REGISTRY_KAFKASQL_CONSUMER_GROUP_ID) and it could reload the schema. The topic configuration(REGISTRY_KAFKASQL_TOPIC) seems to be irrelevant.
So we can not use the consumer group configuration...?

I could reproduce this on my local laptop and the log was as follows. I could not see any interesting logs...

I checked the diff between the successful one and failure one and found an interesting log.
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-1) Database not initialized.

Maybe Checking to see if the DB is initialized. is not working as expected.

Starting the Java application using /opt/jboss/container/java/run/run-java.sh ...
INFO exec  java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -javaagent:/usr/share/java/jolokia-jvm-agent/jolokia-jvm.jar=config=/opt/jboss/container/jolokia/etc/jolokia.properties -XX:+UseParallelGC -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20 -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:+ExitOnOutOfMemoryError -cp "." -jar /deployments/apicurio-registry-storage-kafkasql-2.4.1.Final-runner.jar 
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jolokia.util.ClassUtil (file:/usr/share/java/jolokia-jvm-agent/jolokia-jvm.jar) to constructor sun.security.x509.X500Name(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.lang.String)
WARNING: Please consider reporting this to the maintainers of org.jolokia.util.ClassUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
I> No access restrictor found, access to any MBean is allowed
Jolokia: Agent started with URL https://10.244.0.74:8778/jolokia/
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2023-02-15 02:04:12 INFO <> [io.quarkus.bootstrap.runner.Timing] (main) apicurio-registry-storage-kafkasql 2.4.1.Final on JVM (powered by Quarkus 2.14.0.Final) started in 2.016s. Listening on: http://0.0.0.0:8080
2023-02-15 02:04:12 INFO <> [io.quarkus.bootstrap.runner.Timing] (main) Profile prod activated. 
2023-02-15 02:04:12 INFO <> [io.quarkus.bootstrap.runner.Timing] (main) Installed features: [agroal, cdi, jdbc-h2, micrometer, narayana-jta, oidc, rest-client, rest-client-jackson, resteasy, resteasy-jackson, scheduler, security, servlet, smallrye-context-propagation, smallrye-fault-tolerance, smallrye-health, vertx]
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.RegistryStorageProducer] (executor-thread-1) Using RegistryStore: io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage_ClientProxy
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage] (executor-thread-1) Using Kafka-SQL artifactStore.
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.config.AbstractConfig] (executor-thread-1) AdminClientConfig values: 
	bootstrap.servers = [my-kafka-kafka-bootstrap.kafka.svc:9092]
	client.dns.lookup = use_all_dns_ips
	client.id = 
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (executor-thread-1) Kafka version: 2.8.1
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (executor-thread-1) Kafka commitId: 839b886f9b732b15
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (executor-thread-1) Kafka startTimeMs: 1676426653153
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser] (kafka-admin-client-thread | adminclient-1) App info kafka.admin.client for adminclient-1 unregistered
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.metrics.Metrics] (kafka-admin-client-thread | adminclient-1) Metrics scheduler closed
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.metrics.Metrics] (kafka-admin-client-thread | adminclient-1) Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.metrics.Metrics] (kafka-admin-client-thread | adminclient-1) Metrics reporters closed
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage] (executor-thread-1) Starting KSQL consumer thread on topic: apicurio.data
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage] (executor-thread-1) Bootstrap servers: my-kafka-kafka-bootstrap.kafka.svc:9092
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.utils.kafka.AsyncProducer] (executor-thread-1) Creating new resilient producer.
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.config.AbstractConfig] (executor-thread-1) ProducerConfig values: 
	acks = -1
	batch.size = 16384
	bootstrap.servers = [my-kafka-kafka-bootstrap.kafka.svc:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = apicurio-registry-producer
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = false
	key.serializer = class io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlKeySerializer
	linger.ms = 10
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlValueSerializer

2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (executor-thread-1) Kafka version: 2.8.1
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (executor-thread-1) Kafka commitId: 839b886f9b732b15
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (executor-thread-1) Kafka startTimeMs: 1676426653454
2023-02-15 02:04:13 INFO <> [org.apache.kafka.clients.Metadata] (kafka-producer-network-thread | apicurio-registry-producer) [Producer clientId=apicurio-registry-producer] Cluster ID: SJzDCy2YT5G8S812HQgL1A
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage] (KSQL Kafka Consumer Thread) KSQL consumer thread startup lag: 100
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-1) SqlRegistryStorage constructed successfully.  JDBC URL: jdbc:h2:mem:registry_db
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-1) Checking to see if the DB is initialized.
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-1) Database not initialized.
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-1) Initializing the Apicurio Registry database.
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-1) 	Database type: h2
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage] (KSQL Kafka Consumer Thread) Subscribing to apicurio.data
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.config.AbstractConfig] (KSQL Kafka Consumer Thread) ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 1000
	auto.offset.reset = earliest
	bootstrap.servers = [my-kafka-kafka-bootstrap.kafka.svc:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-apicurio.registry-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = apicurio.registry
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlKeyDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlValueDeserializer

2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-1) Checking to see if the DB is up-to-date.
2023-02-15 02:04:13 INFO <> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-1) Build's DB version is 11
2023-02-15 02:04:13 WARN <> [org.apache.kafka.common.config.AbstractConfig] (KSQL Kafka Consumer Thread) The configuration 'poll.timeout' was supplied but isn't a known config.
2023-02-15 02:04:13 WARN <> [org.apache.kafka.common.config.AbstractConfig] (KSQL Kafka Consumer Thread) The configuration 'startupLag' was supplied but isn't a known config.
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (KSQL Kafka Consumer Thread) Kafka version: 2.8.1
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (KSQL Kafka Consumer Thread) Kafka commitId: 839b886f9b732b15
2023-02-15 02:04:13 INFO <> [org.apache.kafka.common.utils.AppInfoParser$AppInfo] (KSQL Kafka Consumer Thread) Kafka startTimeMs: 1676426653723
2023-02-15 02:04:13 INFO <> [org.apache.kafka.clients.consumer.KafkaConsumer] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Subscribed to topic(s): apicurio.data
2023-02-15 02:04:13 INFO <> [org.apache.kafka.clients.Metadata] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Cluster ID: SJzDCy2YT5G8S812HQgL1A
2023-02-15 02:04:13 INFO <> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Discovered group coordinator my-kafka-kafka-0.my-kafka-kafka-brokers.kafka.svc:9092 (id: 2147483647 rack: null)
2023-02-15 02:04:13 INFO <> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] (Re-)joining group
2023-02-15 02:04:13 INFO <> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] (Re-)joining group
2023-02-15 02:04:17 WARN <_> [io.sentry.dsn.Dsn] (executor-thread-1) *** Couldn't find a suitable DSN, Sentry operations will do nothing! See documentation: https://docs.sentry.io/clients/java/ ***
2023-02-15 02:04:17 WARN <_> [io.sentry.DefaultSentryClientFactory] (executor-thread-1) No 'stacktrace.app.packages' was configured, this option is highly recommended as it affects stacktrace grouping and display on Sentry. See documentation: https://docs.sentry.io/clients/java/config/#in-application-stack-frames
2023-02-15 02:04:18 INFO <> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Successfully joined group with generation Generation{generationId=5, memberId='consumer-apicurio.registry-1-1caac823-fc06-4483-9f83-c3070d579a99', protocol='range'}
2023-02-15 02:04:18 INFO <> [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Finished assignment for group at generation 5: {consumer-apicurio.registry-1-1caac823-fc06-4483-9f83-c3070d579a99=Assignment(partitions=[apicurio.data-0])}
2023-02-15 02:04:18 INFO <> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Successfully synced group in generation Generation{generationId=5, memberId='consumer-apicurio.registry-1-1caac823-fc06-4483-9f83-c3070d579a99', protocol='range'}
2023-02-15 02:04:18 INFO <> [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Notifying assignor about the new Assignment(partitions=[apicurio.data-0])
2023-02-15 02:04:18 INFO <> [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Adding newly assigned partitions: apicurio.data-0
2023-02-15 02:04:18 INFO <> [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (KSQL Kafka Consumer Thread) [Consumer clientId=consumer-apicurio.registry-1, groupId=apicurio.registry] Setting offset for partition apicurio.data-0 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[my-kafka-kafka-0.my-kafka-kafka-brokers.kafka.svc:9092 (id: 0 rack: null)], epoch=0}}
2023-02-15 02:04:18 INFO <> [io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage] (KSQL Kafka Consumer Thread) KafkaSQL storage bootstrapped in 5267ms.

@LittleWat
Copy link
Author

If we cannot specify the consumer group id, we have to use the default one, right?
The default consumer group is named like apicurio-registry-3838aa78-b28e-49fc-92cf-1643a0274e46
So is it enough to add the following ACL?

kafka-acls --bootstrap-server  <bootstrap_servers> --add \
--allow-principal <principal>  \
--allow-host "*" \
--operation read \
--group "apicurio-registry-" \
--resource-pattern-type prefixed

@jsenko
Copy link
Member

jsenko commented Feb 15, 2023

Thanks for the helpful info, I'll see if I can fix the consumer group issue. The Database not initialized. log message is OK I think. The ACL should work for the default group.

@LittleWat
Copy link
Author

Thank you for your reply! I hope this information helps you solve your problem. For the time being, I will not specify the consumer group id, and l use the default consumer group ID.

@mattieserver
Copy link

We have the same issue, when REGISTRY_KAFKASQL_CONSUMER_GROUP_ID is present and you reboot the docker image the artifacts are no longer visible.

I tried change some other settings like client.id but nothings works as long as the REGISTRY_KAFKASQL_CONSUMER_GROUP_ID is there.
If you need more logs or stuff to test, let me know.

@carlesarnal
Copy link
Member

For anyone looking at this in the future the way Kafkasql works is that for writes, a message is sent to the Kafka topic. There is then an internal H2 database that is used for reads. Fort this to work, each Registry replica has to be put on a different consumer group so all the replicas consume all the messages in the topic. If the replicas share the consumer group, then they will miss some part of the information resulting in potential schemas not being present.

@LittleWat
Copy link
Author

LittleWat commented May 22, 2024

@carlesarnal thank you for your explanation! I could understand the reason for this. 🙇
If so, maybe the code should output the warning or error log when REGISTRY_KAFKASQL_CONSUMER_GROUP_ID is set...?
We are pretending [k8s namespace ]. to all the kakfa topics/consumer groups so that Kafka can have multi-tenancy.
so it would be great if REGISTRY_KAFKASQL_CONSUMER_GROUP_ID_PREFIX is supported instead...

@carlesarnal
Copy link
Member

@LittleWat That is for sure a fair ask, do you mind opening a new GH issue for it? Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants