Google BigQuery Sink Connector for Confluent Platform
¶
The Kafka Connect Google BigQuery Sink connector is used to stream data into
BigQuery tables. When streaming data from Apache Kafka® topics that have registered
schemas, the sink connector can create BigQuery tables with the appropriate
BigQuery table schema. The BigQuery table schema is based upon information in
the Kafka schema for the topic.
Important
Google BigQuery Sink connector version 2.0.0 is not backward compatible
with 1.x.x versions. For information, see the
Upgrading to 2.0.0
section.
If you’re planning to use multiple connectors with a high number of tasks,
be sure to review
BigQuery rate limits
.
This connector supports routing invalid records to the
Dead Letter
Queue (DLQ)
. This includes any records
having a
400
code (invalid error message) from BigQuery. Note that DLQ
routing does not work if
allowSchemaUnionization
is set to
false
and
allowNewBigQueryFields
and
allowBigQueryRequiredFieldRelaxation
are set
to
true
(which is equivalent to setting
autoUpdateSchemas
to
true
in
versions earlier than 2.0.0
of this connector)
and the connector detects that the failure is due to schema mismatch. For
information about accessing and using the DLQ, see
Confluent Platform
Dead Letter Queue
.
The Google BigQuery Sink connector supports running one or more tasks. You can specify
the number of tasks in the
tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Even though the BigQuery connector streams records one at a time by default (as
opposed to running in batch mode), the connector is scalable because it contains
an internal thread pool that allows it to stream records in parallel. Note that
the internal thread pool defaults to 10 threads, which is configurable.
The BigQuery Sink connector has the following limitations:
The connector doesn’t support schemas with recursion.
The connector doesn’t support schemas having float fields with NaN or +Infinity values.
When the connector is configured with
upsertEnabled
or
deleteEnabled
,
it does not support
Single Message Transformations (SMTs)
that modify the topic name. Additionally, the following transformations are
not allowed:
Streaming into BigQuery is not available with the GCP free tier. If you try
to use streaming without enabling billing, you receive the following error:
BigQuery:
Streaming
insert
is
not
allowed
in
the
free
tier.
. For more
details, see
Streaming data into BigQuery
.
You must install the connector on every machine where Connect will run.
If you want to install the connector using Confluent Hub, you must install
the
Confluent Hub Client
. This
is installed by default with Confluent Enterprise.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
Connect: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
Java 1.8
Active Google Cloud Platform (GCP) account with authorization to create resources
To install the
latest
connector version using
Confluent Hub Client
, navigate to your Confluent Platform installation directory
and run the following command:
The following changes aren’t backward compatible in the BigQuery connector:
datasets was removed and defaultDataset has been introduced. The
connector now infers the dataset from the topic name if the topic is in the
form <dataset>:<tableName>. If the topic name is in the form
<tablename>, the connector defaults to defaultDataset.
topicsToTables was removed. You should use SMT
RegexRouter
to route topics to tables.
autoUpdateSchemas was replaced by allowNewBigQueryFields and
allowBigQueryRequiredFieldRelaxation.
value.converter.enhanced.avro.schema.support should be set to false or
removed. If this property is not removed or set to false, you may receive the
following error:
Invalid field name
"com.examples.project-super-important.v1.MyData". Fields must
contain only letters, numbers, and underscores, start with a letter or
underscore, and be at most 300 characters long.
The Confluent BigQuery Sink Connector can stream table records into BigQuery
from Kafka topics. These records are streamed at high throughput rates to
facilitate analytical queries in near real-time.
To install the BiqQuery connector, complete the following steps. Note that to
run the following steps, you must have Confluent Platform running locally.
Navigate to your Confluent Platform installation directory and enter the following command:
Use the Confluent CLI to restart Connect as adding a new connector plugin
requires restarting Kafka Connect.
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluentlocal. For example, the syntax for confluentstart is now
confluentlocalservicesstart. For more information, see confluent local.
A service account that can access the BigQuery project containing the dataset.
You can create this service account in the Google Cloud Console.
The service account must have access to the BigQuery project containing the
dataset. You create and download a key when creating a service account. You must
download the key as a JSON file as shown in the following example:
"type": "service_account",
"project_id": "confluent-842583",
"private_key_id": "...omitted...",
"private_key": "-----BEGIN PRIVATE ...omitted... =\n-----END PRIVATE KEY-----\n",
"client_email": "[email protected]",
"client_id": "...omitted...",
"auth_uri": "https://accounts.google.com/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/metadata/confluent2%40confluent-842583.iam.gserviceaccount.com"
According to `GCP specifications
<https://cloud.google.com/bigquery/docs/access-control>`__, the service
account will either need the **BigQueryEditor** primitive IAM role or the
**bigquery.dataEditor** predefined IAM role. The minimum permissions are as
follows:
.. code-block:: text
bigquery.datasets.get
bigquery.tables.create
bigquery.tables.get
bigquery.tables.getData
bigquery.tables.list
bigquery.tables.update
bigquery.tables.updateData
Note that the project key is the id value of the BigQuery project
in GCP. For datasets, the value ConfluentDataSet is the ID of
the dataset entered by the user during GCP dataset creation.``keyfile``
is the service account key JSON file location.
If you don’t want this connector to create a BigQuery table automatically,
create a BigQuery table with Partitioning:Partitionbyingestiontime
and a proper schema.
Also, note that the properties prefixed with transforms are used to set
up SMTs. The following is an example regex router SMT that strips kcbq_
from the topic name. Replace with relevant regex to replace the topic of each
sink record with destination dataset and table name in the format
<dataset>:<tableName> or only the destination table name in the format
<tableName>
Enter text for two test records, and press Enter after typing each line.
./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"Testing the Kafka-BigQuery Connector!"}
{"f1":"Testing the Kafka-BigQuery Connector for a second time!"}
To the check the results in BigQuery, complete the following steps:
Go to the BigQuery editor in GCP.
Enter the following SQL SELECT statement.
SELECT * FROM ConfluentDataSet.quickstart1;
ConfluentDataSet is the dataset ID and quickstart1 is the name of the
BigQuery table taken from the Kafka topic. In this case, the SMT strips
kcbq_ from the topic: the Connector converts this topic name
kcbq_quickstart1 to table name quickstart1.
To use timestamp partitioning by field name, you must set the
bigQueryPartitionDecorator to false, and then set the
timestampPartitionFieldName property to the field name that contains the
partitioning timestamps (for example, timestampPartitionFieldName=f2). With
the BigQuery console, you can use the following query to output a list of
existing partitions:
SELECT f2 as pt, FORMAT_TIMESTAMP("%Y%m%d", f2) as partition_id
FROM `PROJECT_ID.DATASET_ID.TABLE_ID`
GROUP BY f2
ORDER BY f2
The output lists all rows that have different dates and an additional
partition_id column. Records that have a timestamp within a day’s range will
have the same partition_id.
By clicking "SIGN UP" you agree that your personal data will be processed in accordance with
our Privacy Policy.