Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sink Connector with Hana Json Store not working #125

Open
mschuch opened this issue Feb 25, 2022 · 5 comments
Open

Sink Connector with Hana Json Store not working #125

mschuch opened this issue Feb 25, 2022 · 5 comments

Comments

@mschuch
Copy link

mschuch commented Feb 25, 2022

Hi wanted to test if the sink connector is working with the Hana JSON Store, but I am unlucky with that.
Here is my config:

{
    "name": "test_topic_json_sink",
    "config": {
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "connector.class":"com.sap.kafka.connect.sink.hana.HANASinkConnector",
        "value.converter.schemas.enable":"false",
        "topics":"test_json",
        "connection.url":"jdbc:sap://[url]l:33041/?currentschema=SCHEMA&encrypt=true&validateCertificate=false",
        "connection.user":"xxxxx",
        "connection.password":"xxxxx",
        "test_json.table.name":"\"SCHEMA\".\"TEST_COLLECTION\"",
        "test_json.type":"collection",
        "auto.create": "true",
        "principal.service.name": "xxx",
        "principal.service.password": "xxxxt"
    }
}

I am getting the following HANAConfigMissingException : A table name or query must be specified for HANA Kafka Connectors to work

I got a little further than that, but I am now stuck with a DataException. "Cannot list fields on non-struct type".
What am i doing wrong here? Is there a working example?

@elakito
Copy link
Collaborator

elakito commented Feb 28, 2022

@mschuch unfortunately, there is no example in the tests. Could you post the exception? It looks like the sink connector is compelling about the incoming schema definition. Can you try to ingest the data into a normal table type to verify that the incoming schema is okay?

@mschuch
Copy link
Author

mschuch commented Mar 2, 2022

Hi @elakito,
i have tested it with the normal table type and an avro schema, there everything is fine.
I also did that without a schema, and plain JSON (see above connect config)

I always get the same exception with or without a schema:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:638)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)
\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
\tat java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Cannot list fields on non-struct type
\tat org.apache.kafka.connect.data.ConnectSchema.fields(ConnectSchema.java:179)
\tat com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:64)
\tat com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56)
\tat com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44)
\tat scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
\tat scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
\tat scala.collection.AbstractIterable.foreach(Iterable.scala:919)
\tat scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
\tat com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44)
\tat com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:604)
\t... 10 more

@elakito
Copy link
Collaborator

elakito commented Mar 4, 2022

@mschuch How are you creating the input message? If your avro scenario is working when the messages are sent to a normal sink table, these messages are valid and you can also send them to a collection sink table by just changing the table type and name in the sink connector configuration.

The error from your stack trace indicates the error happens when the structure of the message is analysed. It looks like something is wrong with the schema of the message. After the structure is determined from the schema, the message is prepared for the actual storing. When the target table is of collection type, they are serialised into a json document and stored into the collection table. Otherwise the records are stored as normal table records.

@AndreasFischbach3003
Copy link

Hi, @elakito.
I use the same configuration with the keys converted with org.apache.kafka.connect.storage.StringConverter and the values converted with io.confluent.connect.avro.AvroConverter.
i get the exception
[2022-08-30 16:02:05,830] DEBUG [sap_kafka_connector|task-0] [Consumer clientId=connector-consumer-sap_kafka_connector-0, groupId=connect-sap_kafka_connector] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1180)
[2022-08-30 16:02:06,213] INFO [sap_kafka_connector|task-0] PHASE - 1 ended for task, with assigned partitions [erp.nav.local.variantSalesPriceChanged-0, erp.nav.local.variantSalesPriceChanged-4, erp.nav.local.variantSalesPriceChanged-3, erp.nav.local.variantSalesPriceChanged-2, erp.nav.local.variantSalesPriceChanged-1, erp.nav.local.variantSalesPriceChanged-5] (com.sap.kafka.connect.sink.hana.HANASinkTask:56)
[2022-08-30 16:02:06,213] ERROR [sap_kafka_connector|task-0] WorkerSinkTask{id=sap_kafka_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot list fields on non-struct type (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.DataException: Cannot list fields on non-struct type
at org.apache.kafka.connect.data.ConnectSchema.fields(ConnectSchema.java:179)
at com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:124)
at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56)
at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
at com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44)
at com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

It looks like even though the key is with the StringConverter, the keySchema is not null, but it has no fields attached to it.

i have tried and changed line 123 from

      if (recordSchema.keySchema != null) {

to

      if (recordSchema.keySchema != null && recordSchema.keySchema.name() != null) {

and line 63 from
if (recordSchema.keySchema != null) {

to

    if (recordSchema.keySchema != null && recordSchema.keySchema.name() != null) {

that worked and i could sink the values into the hana db.

can you perhaps check if i could have changed my configuration to get this working?
Or is this something that needs to be enhanced for the

key.converter=org.apache.kafka.connect.storage.StringConverter

Kind Regards and thanks for the help

@elakito
Copy link
Collaborator

elakito commented Sep 22, 2022

Thanks for your investigation. I added a more explicit condition to inspect the structured schema.
#135

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants