Skip to content

Compatibility with ksqlDB - path /apis/ccompat/v6/schemas/ids/0 not found #2151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
barnyrelph opened this issue Jan 4, 2022 · 17 comments
Closed
Assignees
Labels
type/question Further information is requested

Comments

@barnyrelph
Copy link

Hi All, I'm using Debezium to capture Postgres events, stream them into Kafka and hopefully process them in ksqlDB, then stream them out to a separate Postgres DB for analysis.

I have a Connector set up watching a couple of tables. On inserting to the table, I can see Schemas created in the registry.
I have ksqlDB set up using:

        - name: KSQL_KSQL_SCHEMA_REGISTRY_URL
          value: http://apicurio-registry-service:8080/apis/ccompat/v6

I then create a stream in ksqlDB:

CREATE OR REPLACE STREAM apicurio_person_id (  
      id INT,
      person_identifier varchar  
        )
  WITH (kafka_topic='ksqldb-schema-connector.public.person_topic', key_format='avro', value_format='avro', partitions=1);

Upon insert, the schema registry is populated and I can see ksqlDB go looking for the schema, but in the logs for the apicurio registry, I can see this:

2022-01-04 17:29:13 INFO <_> [io.apicurio.registry.logging.audit.AuditLogService] (executor-thread-405) registry.audit action="createArtifact" result="success" src_ip="10.215.226.55" if_exists="RETURN_OR_UPDATE" artifact_type="AVRO" canonical="false" artifact_id="ksqldb-schema-connector.public.finance_provider-key"
2022-01-04 17:29:13 INFO <_> [io.apicurio.registry.logging.audit.AuditLogService] (executor-thread-405) registry.audit action="createArtifact" result="success" src_ip="10.215.226.55" if_exists="RETURN_OR_UPDATE" artifact_type="AVRO" canonical="false" artifact_id="ksqldb-schema-connector.public.finance_provider-value"
2022-01-04 17:29:14 INFO <_> [io.apicurio.registry.logging.audit.AuditLogService] (executor-thread-405) registry.audit action="createArtifact" result="success" src_ip="10.215.226.55" if_exists="RETURN_OR_UPDATE" artifact_type="AVRO" canonical="false" artifact_id="ksqldb-schema-connector.public.person_finance_provider-key"
2022-01-04 17:29:14 INFO <_> [io.apicurio.registry.logging.audit.AuditLogService] (executor-thread-405) registry.audit action="createArtifact" result="success" src_ip="10.215.226.55" if_exists="RETURN_OR_UPDATE" artifact_type="AVRO" canonical="false" artifact_id="ksqldb-schema-connector.public.person_finance_provider-value"
2022-01-04 17:29:14 INFO <_> [io.apicurio.registry.logging.audit.AuditLogService] (executor-thread-405) registry.audit action="request" result="failure" src_ip="10.215.226.77" path="/apis/ccompat/v6/schemas/ids/0" response_code="404" method="GET" user=""

That last request seems to be where things are going awry. Is this a problem with my configuration (most likely) or some aspect of the Confluent API missing?

Connector setup snippet for reference: :

        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter.apicurio.registry.url": "http://apicurio-registry-service:8080/apis/registry/v2",
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "true",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://apicurio-registry-service:8080/apis/registry/v2",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "true",

Thanks in advance for any guidance

@ofelix03
Copy link

ofelix03 commented Sep 6, 2022

Hi @barnyrelph

I encountered this very issue just this past week. After scouring online with nothing positive, I accidentally traced the issue to a missing option in the Debezium source connector configuration.

Add this option to your configuration to rectify this.

"value.converter.apicurio.registry.as-confluent": true

Whiles introducing the above options ensures a lookup with the Global ID assigned as the schema, the lookup still returns a 404. The reason for this appears to be that Apicurio expects the value of the id to be the registered Content-Id instead of the Global-Id

Yet to figure this out.

@EricWittmann
Copy link
Member

So there are two compatibility issues you might run into when using both confluent and apicurio tools. The first is a client setting in apicurio tooling that instructs our tools to use integers instead of lungs for all IDs that we encode in the messages. Confluent uses 4 byte integers while we use 8 byte longs. That is the option you discovered and is one piece of the puzzle.

The other piece is that early versions of apicurio used global identifiers rather than content identifiers as the unique lookup in those tools. Global IDs are unique for every version of every artifact Even if the same content is uploaded multiple times. Content IDs on the other hand are shared. So that if you upload the same content multiple times you will get back the same ID.

Confluent server has always used content IDs (although I think they call them global IDs) in their API. As of version 2.0 of apicurio registry, our API uses content IDs as well. So it should be easier to interoperate with our tools and their tools.

However if you are still using any legacy apicurio tools, or the V1 edition of our rest API, there may still be an incompatibility.

Does that context help at all? It's possible we still have a mismatch somewhere between the confluent compatibility API and our core API.

@choucavalier
Copy link

choucavalier commented Jan 17, 2023

This is an issue we have as well. Setting up a data infrastructure at a Parisian hospital. I think we'll have to switch to the Confluent schema registry because of this :/ We tried all the solutions mentioned in this thread. Context: we use Debezium and ksqldb

@carlesarnal
Copy link
Member

carlesarnal commented Jan 17, 2023

This is an issue we have as well. Setting up a data infrastructure at a Parisian hospital. I think we'll have to switch to the Confluent schema registry because of this :/ We tried all the solutions mentioned in this thread. Context: we use Debezium and ksqldb

Sorry for being so late to the party. In addition to what Eric described, you can still force the Confluent Compatibility API to use the global ID across the entire API by setting the environment variable ENABLE_CCOMPAT_LEGACY_ID_MODE to true in your Apicurio Registry instance and this is the important point, this is a configuration for the server, not the converter. Alternatively (and probably easier) you can instruct the converter to use the contentId instead of the global ID by setting apicurio.registry.use-id to contentId since it uses the globalId by default thus why it's trying to use it to get your schemas.

@ofelix03
Copy link

Hi @barnyrelph

I encountered this very issue just this past week. After scouring online with nothing positive, I accidentally traced the issue to a missing option in the Debezium source connector configuration.

Add this option to your configuration to rectify this.

"value.converter.apicurio.registry.as-confluent": true

Whiles introducing the above options ensures a lookup with the Global ID assigned as the schema, the lookup still returns a 404. The reason for this appears to be that Apicurio expects the value of the id to be the registered Content-Id instead of the Global-Id

Yet to figure this out.

--UPDATE--.

@tgy

Resolved this issue some months back. I have data pipelines running smoothly now with Kafka, Debezium Connectors, Apicurio Registry and ksqlDB.
The issue was resolved by adding this to my Debezium connector configurations.

"value.converter.apicurio.registry.as-confluent": true,
"value.converter.apicurio.registry.use-id": "contentId",

Repeat the same if you are serializing/deserializing your message key data with a schema registry(using Apicurio Registry in my case)

"key.converter.apicurio.registry.as-confluent": true,
"key.converter.apicurio.registry.use-id": "contentId",

@carlesarnal
Copy link
Member

Hi,

Yes, that is the expected configuration, you can't use as-confluent without also setting use-id.

@tgy can you please confirm if setting this configuration fixes the problem for you?

Thanks!

@choucavalier
Copy link

Hi guys, thanks a lot for your answers. We're looking into the solutions you shared and will let you know if it worked!

@jaj42
Copy link

jaj42 commented Jan 18, 2023

Hi @carlesarnal and @ofelix03,
thank you very much for your help. I'm from the same team as @tgy .
We tried the solution you provided but unfortunately it does not work for us.
ksqldb is able to read the schema in order to create a "stream":

ksql> create stream dwcnumeric_ with
(
  kafka_topic='dwcavro.Philips.PatientData._Export.Numeric_', format='AVRO'
);

 Message
----------------
 Stream created
----------------

It does correctly identify the fields in the schema:

ksql> describe dwcnumeric_ extended;

Name                 : DWCNUMERIC_
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : AVRO
Value format         : AVRO
Kafka topic          : dwcavro.Philips.PatientData._Export.Numeric_ (partitions: 1, replication: 1)
Statement            : CREATE STREAM DWCNUMERIC_ (ROWKEY STRUCT<ID BIGINT> KEY, ID BIGINT, TIMESTAMP STRING, BASEPHYSIOID BIGINT, PHYSIOID BIGINT, LABEL STRING, UNITLABEL STRING, SUBPHYSIOID BIGINT, SUBLABEL STRING, __OP STRING) WITH (FORMAT='AVRO', KAFKA_TOPIC='dwcavro.Philips.PatientData._Export.Numeric_');

 Field        | Type                    
----------------------------------------
 ROWKEY       | STRUCT<ID BIGINT> (key) 
 ID           | BIGINT                  
 TIMESTAMP    | VARCHAR(STRING)         
 BASEPHYSIOID | BIGINT                  
 PHYSIOID     | BIGINT                  
 LABEL        | VARCHAR(STRING)         
 UNITLABEL    | VARCHAR(STRING)         
 SUBPHYSIOID  | BIGINT                  
 SUBLABEL     | VARCHAR(STRING)         
 __OP         | VARCHAR(STRING)         
----------------------------------------

However, when I want to query the data, I obtain no result:

ksql> select * from dwcnumeric_ limit 10;
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+           
|ROWKEY            |ID                |TIMESTAMP         |BASEPHYSIOID      |PHYSIOID          |LABEL             |UNITLABEL         |SUBPHYSIOID       |SUBLABEL          |__OP              |           
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+           
Query Completed
Query terminated

When I run this query, I see the following error repeating in the apicurio log:

023-01-18 10:46:16 INFO <_> [io.apicurio.common.apps.logging.audit.AuditLogService] (executor-thread-12) apicurio.audit action="request" result="failure" src_ip="172.28.0.8" path="/apis/ccompat/v6/schemas/ids/0" response_code="404" method="GET" user=""

This error message repeats many times (maybe as many times as there are messages in the topic).

At the same time, the ksqldb-server logs show the following error:

[2023-01-18 11:17:44,442] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing message from topic: dwcavro.Philips.PatientData._Export.Numeric_","recordB64":null,"cause":["Failed to deserialize data for topic dwcavro.Philips.PatientData._Export.Numeric_ to Avro: ","Error retrieving Avro key schema for id 0","No content with id/hash 'contentId-0' was found.; error code: 40403"],"topic":"dwcavro.Philips.PatientData._Export.Numeric_"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_DWCNUMERIC__9209023474268862486.KsqlTopic.Source.deserializer:44)

It looks like ksqldb queries the Id 0, which is strange since my Ids start at 1. I have schema Ids 1 to 17. The relevant schema Ids for the topic in question are 8 for dwcavro.Philips.PatientData.Export.Numeric-key and 9 for dwcavro.Philips.PatientData.Export.Numeric-value.

If I request the correct Ids from apicurio, it actually returns a result. I don't understand why ksqldb requests Id 0 over and over again.

➜  ~ curl http://localhost:8082/apis/ccompat/v6/schemas/ids/0
{"message":"No content with id/hash 'contentId-0' was found.","error_code":40403}%                                                                                                                         ➜  ~
➜  ~
➜  ~ curl http://localhost:8082/apis/ccompat/v6/schemas/ids/8
{"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"dwcavro.Philips.PatientData._Export.Numeric_\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"}],\"connect.name\":\"dwcavro.Philips.PatientData._Export.Numeric_.Key\"}","references":[]}%
➜  ~ curl http://localhost:8082/apis/ccompat/v6/schemas/ids/9
{"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"dwcavro.Philips.PatientData._Export.Numeric_\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"},{\"name\":\"TimeStamp\",\"type\":{\"type\":\"string\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.ZonedTimestamp\"}},{\"name\":\"BasePhysioId\",\"type\":\"long\"},{\"name\":\"PhysioId\",\"type\":\"long\"},{\"name\":\"Label\",\"type\":\"string\"},{\"name\":\"UnitLabel\",\"type\":\"string\"},{\"name\":\"SubPhysioId\",\"type\":\"long\"},{\"name\":\"SubLabel\",\"type\":\"string\"},{\"name\":\"__op\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"dwcavro.Philips.PatientData._Export.Numeric_.Value\"}","references":[]}%

Best regards,
Jona Joachim

@ofelix03
Copy link

@jaj42

To better understand what's going on, please share some details

  1. Which setup of the Apicurio Registry are you running(in-memory, SQL database, Kafka storage)

  2. Do you have the apicurio.registry.auto-register in your configuration? If yes, what's the current value?

@choucavalier
Copy link

choucavalier commented Jan 18, 2023

@ofelix03 Thanks for taking the time to look at our setup

Our Apicurio Registry is stored in a PostgreSQL database (in docker-compose.yml we use REGISTRY_DATASOURCE_URL=jdbc:postgresql://host.docker.internal/apicurio with the apicurio/apicurio-registry-sql:latest-snapshot image).

We're not setting any AUTO_REGISTER_ARTIFACT environment variable so I guess we're not touching the apicurio.registry.auto-register config, which defaults to false.

@jaj42
Copy link

jaj42 commented Jan 18, 2023

@ofelix03 @tgy
We run on docker using the debezium/connect:nightly and apicurio/apicurio-registry-sql:latest-snapshot images.
We do as a matter of fact use the apicurio.registry.auto-register configuration knob.
The debezium kafka-connect container is configured using these environment variables according to debezium documentation (https://debezium.io/documentation/reference/stable/configuration/avro.html)

      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
      - CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
      - CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
      - CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
      - CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
      - CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
      - CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
      - CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
      - CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
      - CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro

@jaj42
Copy link

jaj42 commented Jan 20, 2023

@ofelix03 @tgy
just as a follow-up, I ran the same setup with confluent schema registry and it works as expected with no error message.
In the confluent registry, schema Ids start with 1 as well and requesting schema 0 results in much the same error as with apicurio:

➜  ~ curl http://localhost:8181/schemas/ids/0                
{"error_code":40403,"message":"Schema 0 not found"}%
➜  ~
➜  ~ curl http://localhost:8181/schemas/ids/5
{"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"LRB_DWC01._Export.Numeric_\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"}],\"connect.name\":\"LRB_DWC01._Export.Numeric_.Key\"}"}%
➜  ~ curl http://localhost:8181/schemas/ids/6
{"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"LRB_DWC01._Export.Numeric_\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"},{\"name\":\"TimeStamp\",\"type\":{\"type\":\"string\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.ZonedTimestamp\"}},{\"name\":\"BasePhysioId\",\"type\":\"long\"},{\"name\":\"PhysioId\",\"type\":\"long\"},{\"name\":\"Label\",\"type\":\"string\"},{\"name\":\"UnitLabel\",\"type\":\"string\"},{\"name\":\"SubPhysioId\",\"type\":\"long\"},{\"name\":\"SubLabel\",\"type\":\"string\"},{\"name\":\"__op\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"LRB_DWC01._Export.Numeric_.Value\"}"}%

However, ksqldb is able to parse the topic with no error. It doesn't even try to query ids/0. Something must be wrong in the apicurio emulation of the confluent api, prompting ksqldb to request schema id 0 instead of the correct schema id.
The same holds true for kafka-ui which could not deserialize avro with apicurio-registry and showed binary data. With confluent-registry it now shows correctly deserialized data.

Best regards,
Jona

@carlesarnal
Copy link
Member

Hi @jaj42 @tgy,

If you can share your full setup since you mentioned you're running docker-compose (obviously omitting any confidential information) and not only some snippets that would be very helpful. I think you might be missing some essential configuration on the Apicurio Registry side of things, so I would like to take a look at your configuration. Keep in mind that although we provide a compatibility API both the tooling and the server should be configured properly to make things work.

Thanks in advance.

@choucavalier
Copy link

choucavalier commented Jan 23, 2023

Hi Carl. Thanks for taking the time. Here's our docker-compose.yaml

version: "2"

services:
  zookeeper-lrb:
    restart: "unless-stopped"
    image: debezium/zookeeper:1.9
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka-lrb:
    restart: "unless-stopped"
    image: debezium/kafka:1.9
    ports:
      - 9092:9092
      - 9011:9011
    links:
      - zookeeper-lrb
    environment:
      - ZOOKEEPER_CONNECT=zookeeper-lrb:2181
      - KAFKA_LOG_RETENTION_MS=3600000
      - KAFKA_LOG_ROLL_MS=3600000
      - KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=5000
      - KAFKA_LISTENERS=INTERN://kafka-lrb:29092,EXTERN://kafka-lrb:9092
      - KAFKA_ADVERTISED_LISTENERS=INTERN://kafka-lrb:29092,EXTERN://10.130.192.132:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERN:PLAINTEXT,EXTERN:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERN
      - JMXPORT=9011
      - JMXHOST=localhost

  apicurio-registry:
    profiles:
      - donotstart
    restart: "unless-stopped"
    image: apicurio/apicurio-registry-sql:latest-snapshot
    ports:
      - 8082:8080
    environment:
      - REGISTRY_DATASOURCE_URL=jdbc:postgresql://host.docker.internal/apicurio
      - REGISTRY_DATASOURCE_USERNAME=X
      - REGISTRY_DATASOURCE_PASSWORD=X
    extra_hosts:
      - host.docker.internal:host-gateway

  confluent-registry:
    restart: "unless-stopped"
    image: confluentinc/cp-schema-registry:latest
    ports:
      - 8181:8081
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka-lrb:9092
      - SCHEMA_REGISTRY_HOST_NAME=confluent-registry
      - SCHEMA_REGISTRY_LISTENERS=http://confluent-registry:8081

  connect-lrb:
    restart: "unless-stopped"
    build:
      context: connect-jmx
      args:
        DEBEZIUM_VERSION: 1.9
        JMX_AGENT_VERSION: 0.15.0
    ports:
      - 8083:8083
      - 9012:9012
    links:
      - kafka-lrb
    environment:
      - BOOTSTRAP_SERVERS=kafka-lrb:29092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=physiodata_configs
      - OFFSET_STORAGE_TOPIC=physiodata_offsets
      - STATUS_STORAGE_TOPIC=physiodata_statuses
      - KAFKA_CONNECT_PLUGINS_DIR=/kafka/connect/
      - KAFKA_HEAP_OPTS=-Xms1G -Xmx20G
      - KAFKA_OPTS=-javaagent:/kafka/etc/jmx_prometheus_javaagent.jar=8080:/kafka/etc/config.yml
      - JMXPORT=9012
      - JMXHOST=10.130.192.132
    depends_on:
      - zookeeper-lrb
      - kafka-lrb

  kafka-bridge:
    restart: "unless-stopped"
    image: quay.io/strimzi/kafka-bridge
    ports:
      - 9080:8080
    volumes:
      - ./kafka-bridge/config/:/opt/strimzi/config/
    entrypoint: /opt/strimzi/bin/kafka_bridge_run.sh
    command: --config-file=config/application.properties
    depends_on:
      - kafka-lrb

  prometheus:
    restart: "unless-stopped"
    build:
      context: prometheus
      args:
        PROMETHEUS_VERSION: v2.26.0
    ports:
      - 9090:9090
    links:
      - connect-lrb
    user: root
    volumes:
      - /data/infra/prometheus:/prometheus

  prometheus-alertmanager:
    image: prom/alertmanager:v0.25.0
    restart: unless-stopped
    ports:
      - 9093:9093
    volumes:
      - /data/infra/alertmanager/conf:/config
      - /data/infra/alertmanager/data:/data
    command: --config.file=/config/alertmanager.yml --log.level=debug

  kafka-ui:
    restart: "unless-stopped"
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8100:8080
    environment:
      - KAFKA_CLUSTERS_0_NAME=lariboisiere
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-lrb:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-lrb:2181
      #- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://apicurio-registry:8080/apis/ccompat/v6
      - KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://confluent-registry:8081
      - KAFKA_CLUSTERS_0_KSQLDBSERVER=http://ksqldb-server:8088
      - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=debezium
      - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=http://debezium:8083
      - KAFKA_CLUSTERS_0_KAFKACONNECT_1_NAME=prometheus
      - KAFKA_CLUSTERS_0_KAFKACONNECT_1_ADDRESS=http://connect-lrb:8083

  ksqldb-server:
    restart: "unless-stopped"
    image: confluentinc/ksqldb-server:latest
    container_name: ksqldb-server
    depends_on:
      - kafka-lrb
    ports:
      - "8088:8088"
    volumes:
      - "./connect-plugins:/usr/share/kafka/plugins"
    environment:
      - KSQL_LISTENERS=http://0.0.0.0:8088
      - KSQL_BOOTSTRAP_SERVERS=kafka-lrb:9092
      - KSQL_KSQL_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/ccompat/v6
      # Configuration to embed - Kafka Connect support.
      - KSQL_CONNECT_GROUP_ID=ksql-connect-cluster
      - KSQL_CONNECT_BOOTSTRAP_SERVERS=kafka-lrb:9092
      - KSQL_CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
      - KSQL_CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/ccompat/v6
      - KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/ccompat/v6
      - KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
      #- KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
      - KSQL_CONNECT_CONFIG_STORAGE_TOPIC=ksql-connect-configs
      - KSQL_CONNECT_OFFSET_STORAGE_TOPIC=ksql-connect-offsets
      - KSQL_CONNECT_STATUS_STORAGE_TOPIC=ksql-connect-statuses
      - KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
      - KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
      - KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
      - KSQL_CONNECT_PLUGIN_PATH=/usr/share/kafka/plugins
    extra_hosts:
      - host.docker.internal:host-gateway

  ksqldb-cli:
    restart: "unless-stopped"
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  debezium:
    restart: "unless-stopped"
    image: debezium/connect:nightly
    depends_on:
      - kafka-lrb
    ports:
      - 8084:8083
    volumes:
      - "./debezium-plugins:/kafka/plugins"
    environment:
      - ENABLE_APICURIO_CONVERTERS=true
      - GROUP_ID=2
      - BOOTSTRAP_SERVERS=kafka-lrb:29092
      - CONFIG_STORAGE_TOPIC=dbz_connect_configs
      - OFFSET_STORAGE_TOPIC=dbz_connect_offsets
      - KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
      - VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
      - CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
      - CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
      - CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
      - CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
      - CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
      - CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
      - CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
      - CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
      - CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro

@choucavalier
Copy link

and this:

{
  "name": "dwc-connector-avro",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "database.hostname": "...",
    "database.port": "...",
    "database.encrypt": "false",
    "database.user": "...",
    "database.password": "...",
    "database.dbname": "...",
    "database.names": "...",
    "database.server.name": "...",
    "topic.prefix": "dwcavro",
    "snapshot.mode": "schema_only",
    "schema.history.internal.kafka.topic": "...",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-lrb:29092",
    "key.converter.apicurio.registry.as-confluent": true,
    "key.converter.apicurio.registry.use-id": "contentId",
    "value.converter.apicurio.registry.as-confluent": true,
    "value.converter.apicurio.registry.use-id": "contentId",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op",
    "decimal.handling.mode": "double",
    "table.include.list": "_Export.Patient_,_Export.PatientStringAttribute_,_Export.PatientDateAttribute_,_Export.Numeric_,_Export.NumericValue_,_Export.Wave_,_Export.WaveSample_",
    "message.key.columns": "_Export.Wave_:Id;_Export.WaveSample_:WaveId;_Export.Numeric_:Id;_Export.NumericValue_:NumericId;_Export.PatientStringAttribute_:PatientId,Timestamp"
  }
}

@carlesarnal
Copy link
Member

carlesarnal commented Jan 23, 2023

Hi,

After some testing, I think you're missing some key configuration points in your connector. Here you have an example I just used to successfully use all the stack together (kafka-ui, apicurio-registry, debezium, ksqldb).

{
	"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
	"database.user": "postgres",
	"database.dbname": "apicuriodb",
	"tasks.max": "1",
	"database.history.kafka.bootstrap.servers": "kafka:29092",
	"database.history.kafka.topic": "schema-changes.inventory",
	"database.server.name": "apicuriodb",
	"schema.include.list": "public",
	"value.converter.apicurio.registry.auto-register": "true",
	"key.converter.apicurio.registry.as-confluent": "true",
	"database.port": "5432",
	"plugin.name": "pgoutput",
	"topic.prefix": "postgre-changes",
	"database.hostname": "apicurio-studio-db",
	"database.password": "******",
	"key.converter.apicurio.registry.use-id": "contentId",
	"name": "Test",
        "value.converter.apicurio.registry.url": "http://schema-registry:8080/apis/registry/v2",
	"key.converter.apicurio.registry.url": "http://schema-registry:8080/apis/registry/v2",
	"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
	"key.converter.apicurio.registry.auto-register": "true",
	"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
	"value.converter.apicurio.registry.as-confluent": "true",
	"value.converter.apicurio.registry.use-id": "contentId"
}

Notice especially the difference in value.converter and key.converter configurations (ignore the ones related to PG). I have tried it and with only your config ksql tries to query the schema with id 0, with my configuration it just works. If this does not work either, let me know the result and we can try to see what happens.

@carlesarnal
Copy link
Member

Closing as stale, if someone finds this in the future, please, re-open the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/question Further information is requested
Projects
None yet
Development

No branches or pull requests

7 participants