Configuring Kafka Connect for Managed Service for Apache Kafka® clusters
Note
Managed Service for Apache Kafka® has built-in support for certain connectors and allows you to manage them. For a list of available connectors, see Connectors . If you need other connectors or want to manage Kafka Connect manually, refer to this tutorial.
Kafka Connect is a tool for streaming data between Apache Kafka® and other data stores.
Data in Kafka Connect is handled using processes called workers. You can deploy the tool either in distributed mode with multiple workers or standalone mode with a single worker.
Data is moved using connectors that are run in separate worker threads.
To learn more about Kafka Connect, see the documentation
Apache Kafka®
Next, we will configure Kafka Connect to interact with a Managed Service for Apache Kafka® cluster. The tool will be deployed on a Yandex Cloud VM as a separate installation. SSL encryption will be used to protect the connection.
We will also set up a simple
FileStreamSource
You can use any other Kafka Connect connector to interact with Managed Service for Apache Kafka® clusters.
To configure Kafka Connect to work with a Managed Service for Apache Kafka® cluster:
If you no longer need the resources you created, delete them .
Getting started
Create a topic
named
messages
for exchanging messages between Kafka Connect and the Managed Service for Apache Kafka® cluster.
Create a user
named
user
and
grant them permission
for the
messages
topic:
ACCESS_ROLE_CONSUMER
ACCESS_ROLE_PRODUCER
In the network hosting the Managed Service for Apache Kafka® cluster, create a virtual machine with Ubuntu 20.04 and a public IP address.
Get the authentication credentials . You can add them to environment variables or specify them later in the provider configuration file.
Configure and initialize a provider
. There is no need to create a provider configuration file manually, you can
download it
Place the configuration file in a separate working directory and specify the parameter values . If you did not add the authentication credentials to environment variables, specify them in the configuration file.
Download the
kafka-connect.tf
This file describes:
Network
Subnet
Default security group and rules required to connect to the cluster and VM from the internet.
Virtual machine with Ubuntu 20.04 .
Properly configured Managed Service for Apache Kafka® cluster.
In the file, specify a password for the
user
user, which will be used for accessing Managed Service for Apache Kafka® clusters as well as a username and the public SSH key for the virtual machine. If the virtual machine has Ubuntu 20.04 installed from the recommended
image list
, the username specified here will be ignored. If this is the case, use
ubuntu
as your username for the
connection
.
Make sure the Terraform configuration files are correct using this command:
terraform validate
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
If you are happy with the planned changes, apply them:
Run the command:
terraform apply
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the -storepass
parameter for additional storage protection:
sudo keytool -importcert \
-alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
-keystore ssl -storepass <certificate_store_password> \
--noprompt
Prepare the test dataCreate a /var/log/sample.json
file with test data. This file contains data from car sensors in JSON format:
sample.json
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-05 17:27:00","latitude":55.70329032,"longitude":37.65472196,"altitude":427.5,"speed":0,"battery_voltage":23.5,"cabin_temperature":17,"fuel_level":null}
{"device_id":"rhibbh3y08qm********","datetime":"2020-06-06 09:49:54","latitude":55.71294467,"longitude":37.66542005,"altitude":429.13,"speed":55.5,"battery_voltage":null,"cabin_temperature":18,"fuel_level":32}
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-07 15:00:10","latitude":55.70985913,"longitude":37.62141918,"altitude":417,"speed":15.7,"battery_voltage":10.3,"cabin_temperature":17,"fuel_level":null}
# AdminAPI connect properties
bootstrap.servers=<broker_host_FQDN>:9091
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks
ssl.truststore.password=<certificate_store_password>
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<password_of_the_user_named_user>";
# Producer connect properties
producer.sasl.mechanism=SCRAM-SHA-512
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks
producer.ssl.truststore.password=<certificate_store_password>
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<password_of_the_user_named_user>";
# Worker properties
plugin.path=/etc/kafka-connect-worker/plugins
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/etc/kafka-connect-worker/worker.offset
Kafka Connect will connect to the Managed Service for Apache Kafka® cluster as the previously created user named user
.
You can request the FQDNs of broker hosts with a list of hosts in the cluster.
Create a file named /etc/kafka-connect-worker/file-connector.properties
with connector settings:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/var/log/sample.json
topic=messages
cd ~/opt/kafka/bin/ && \
sudo ./connect-standalone.sh \
/etc/kafka-connect-worker/worker.properties \
/etc/kafka-connect-worker/file-connector.properties
-X sasl.mechanisms=SCRAM-SHA-512 \
-X sasl.username=user \
-X sasl.password="<user_account_password>" \
-X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
You can request the FQDNs of broker hosts with a list of hosts in the cluster.
The command output will contain the contents of the /var/log/sample.json
test file passed in the previous step.
Delete the resources you createdDelete the resources you no longer need to avoid paying for them:
Manually
Terraform
Delete the VM.
If you reserved a public static IP address for the VM, delete it.
Delete the Managed Service for Apache Kafka® cluster.
To delete the infrastructure created with Terraform:
In the terminal window, go to the directory containing the infrastructure plan.
Delete the kafka-connect.tf
configuration file.
Make sure the Terraform configuration files are correct using this command:
terraform validate
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
If you are happy with the planned changes, apply them:
Run the command:
terraform apply
Was the article helpful?