This connector leverages ClickHouse-specific optimizations, such as advanced partitioning and predicate pushdown, to
improve query performance and data handling.
The connector is based on ClickHouse's official JDBC connector, and
manages its own catalog.
Before Spark 3.0, Spark lacked a built-in catalog concept, so users typically relied on external catalog systems such as
Hive Metastore or AWS Glue.
With these external solutions, users had to register their data source tables manually before accessing them in Spark.
However, since Spark 3.0 introduced the catalog concept, Spark can now automatically discover tables by registering
catalog plugins.
Spark's default catalog is spark_catalog, and tables are identified by {catalog name}.{database}.{table}. With the new
catalog feature, it is now possible to add and work with multiple catalogs in a single Spark application.
Choosing Between Catalog API and TableProvider API
The ClickHouse Spark connector supports two access patterns: the Catalog API and the TableProvider API (format-based access). Understanding the differences helps you choose the right approach for your use case.
Catalog API vs TableProvider API
| Feature | Catalog API | TableProvider API |
|---|
| Configuration | Centralized via Spark configuration | Per-operation via options |
| Table Discovery | Automatic via catalog | Manual table specification |
| DDL Operations | Full support (CREATE, DROP, ALTER) | Limited (automatic table creation only) |
| Spark SQL Integration | Native (clickhouse.database.table) | Requires format specification |
| Use Case | Long-term, stable connections with centralized config | Ad-hoc, dynamic, or temporary access |
Requirements
- Java 8 or 17 (Java 17+ required for Spark 4.0)
- Scala 2.12 or 2.13 (Spark 4.0 only supports Scala 2.13)
- Apache Spark 3.3, 3.4, 3.5, or 4.0
Compatibility matrix
| Version | Compatible Spark Versions | ClickHouse JDBC version |
|---|
| main | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.4 |
| 0.9.0 | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.4 |
| 0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
| 0.6.0 | Spark 3.3 | 0.3.2-patch11 |
| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
| 0.4.0 | Spark 3.2, 3.3 | Not depend on |
| 0.3.0 | Spark 3.2, 3.3 | Not depend on |
| 0.2.1 | Spark 3.2 | Not depend on |
| 0.1.2 | Spark 3.2 | Not depend on |
Installation & setup
For integrating ClickHouse with Spark, there are multiple installation options to suit different project setups.
You can add the ClickHouse Spark connector as a dependency directly in your project's build file (such as in pom.xml
for Maven or build.sbt for SBT).
Alternatively, you can put the required JAR files in your $SPARK_HOME/jars/ folder, or pass them directly as a Spark
option using the --jars flag in the spark-submit command.
Both approaches ensure the ClickHouse connector is available in your Spark environment.
Import as a Dependency
<dependency>
<groupId>com.clickhouse.spark</groupId>
<artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
<version>{{ stable_version }}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<classifier>all</classifier>
<version>{{ clickhouse_jdbc_version }}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
Add the following repository if you want to use SNAPSHOT version.
<repositories>
<repository>
<id>sonatype-oss-snapshots</id>
<name>Sonatype OSS Snapshots Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
dependencies {
implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
}
Add the following repository if you want to use the SNAPSHOT version:
repositries {
maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
}
libraryDependencies += "com.clickhouse" % "clickhouse-jdbc" % {{ clickhouse_jdbc_version }} classifier "all"
libraryDependencies += "com.clickhouse.spark" %% clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }} % {{ stable_version }}
When working with Spark's shell options (Spark SQL CLI, Spark Shell CLI, and Spark Submit command), the dependencies can be
registered by passing the required jars:
$SPARK_HOME/bin/spark-sql \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
If you want to avoid copying the JAR files to your Spark client node, you can use the following instead:
--repositories https://{maven-central-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}
Note: For SQL-only use cases, Apache Kyuubi is recommended
for production.
Download the library
The name pattern of the binary JAR is:
clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar
You can find all available released JAR files
in the Maven Central Repository
and all daily build SNAPSHOT JAR files in the Sonatype OSS Snapshots Repository.
Register the catalog (required)
In order to access your ClickHouse tables, you must configure a new Spark catalog with the following configs:
| Property | Value | Default Value | Required |
|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | Yes |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | No |
spark.sql.catalog.<catalog_name>.protocol | http | http | No |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | No |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | No |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (empty string) | No |
spark.sql.catalog.<catalog_name>.database | <database> | default | No |
spark.<catalog_name>.write.format | json | arrow | No |
These settings could be set via one of the following:
- Edit/Create
spark-defaults.conf.
- Pass the configuration to your
spark-submit command (or to your spark-shell/spark-sql CLI commands).
- Add the configuration when initiating your context.
References
When working with a ClickHouse cluster, you need to set a unique catalog name for each instance.
For example:
spark.sql.catalog.clickhouse1 com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host 10.0.0.1
spark.sql.catalog.clickhouse1.protocol https
spark.sql.catalog.clickhouse1.http_port 8443
spark.sql.catalog.clickhouse1.user default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database default
spark.sql.catalog.clickhouse1.option.ssl true
spark.sql.catalog.clickhouse2 com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host 10.0.0.2
spark.sql.catalog.clickhouse2.protocol https
spark.sql.catalog.clickhouse2.http_port 8443
spark.sql.catalog.clickhouse2.user default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database default
spark.sql.catalog.clickhouse2.option.ssl true
That way, you would be able to access clickhouse1 table <ck_db>.<ck_table> from Spark SQL by
clickhouse1.<ck_db>.<ck_table>, and access clickhouse2 table <ck_db>.<ck_table> by clickhouse2.<ck_db>.<ck_table>.
Using the TableProvider API (Format-based Access)
In addition to the catalog-based approach, the ClickHouse Spark connector supports a format-based access pattern via the TableProvider API.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Read from ClickHouse using format API
df = spark.read \
.format("clickhouse") \
.option("host", "your-clickhouse-host") \
.option("protocol", "https") \
.option("http_port", "8443") \
.option("database", "default") \
.option("table", "your_table") \
.option("user", "default") \
.option("password", "your_password") \
.option("ssl", "true") \
.load()
df.show()
val df = spark.read
.format("clickhouse")
.option("host", "your-clickhouse-host")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "your_table")
.option("user", "default")
.option("password", "your_password")
.option("ssl", "true")
.load()
df.show()
Dataset<Row> df = spark.read()
.format("clickhouse")
.option("host", "your-clickhouse-host")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "your_table")
.option("user", "default")
.option("password", "your_password")
.option("ssl", "true")
.load();
df.show();
# Write to ClickHouse using format API
df.write \
.format("clickhouse") \
.option("host", "your-clickhouse-host") \
.option("protocol", "https") \
.option("http_port", "8443") \
.option("database", "default") \
.option("table", "your_table") \
.option("user", "default") \
.option("password", "your_password") \
.option("ssl", "true") \
.mode("append") \
.save()
df.write
.format("clickhouse")
.option("host", "your-clickhouse-host")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "your_table")
.option("user", "default")
.option("password", "your_password")
.option("ssl", "true")
.mode("append")
.save()
df.write()
.format("clickhouse")
.option("host", "your-clickhouse-host")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "your_table")
.option("user", "default")
.option("password", "your_password")
.option("ssl", "true")
.mode("append")
.save();
TableProvider Features
The TableProvider API provides several powerful features:
Automatic Table Creation
When writing to a non-existent table, the connector automatically creates the table with an appropriate schema. The connector provides intelligent defaults:
- Engine: Defaults to
MergeTree() if not specified. You can specify a different engine using the engine option (e.g., ReplacingMergeTree(), SummingMergeTree(), etc.)
- ORDER BY: Required - You must explicitly specify the
order_by option when creating a new table. The connector validates that all specified columns exist in the schema.
- Nullable Key Support: Automatically adds
settings.allow_nullable_key=1 if ORDER BY contains nullable columns
# Table will be created automatically with explicit ORDER BY (required)
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "new_table") \
.option("order_by", "id") \
.mode("append") \
.save()
# Specify table creation options with custom engine
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "new_table") \
.option("order_by", "id, timestamp") \
.option("engine", "ReplacingMergeTree()") \
.option("settings.allow_nullable_key", "1") \
.mode("append") \
.save()
// Table will be created automatically with explicit ORDER BY (required)
df.write
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "new_table")
.option("order_by", "id")
.mode("append")
.save()
// With explicit table creation options and custom engine
df.write
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "new_table")
.option("order_by", "id, timestamp")
.option("engine", "ReplacingMergeTree()")
.option("settings.allow_nullable_key", "1")
.mode("append")
.save()
// Table will be created automatically with explicit ORDER BY (required)
df.write()
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "new_table")
.option("order_by", "id")
.mode("append")
.save();
// With explicit table creation options and custom engine
df.write()
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "new_table")
.option("order_by", "id, timestamp")
.option("engine", "ReplacingMergeTree()")
.option("settings.allow_nullable_key", "1")
.mode("append")
.save();
References
ORDER BY Required: The order_by option is required when creating a new table via the TableProvider API. You must explicitly specify which column(s) to use for the ORDER BY clause. The connector validates that all specified columns exist in the schema and will throw an error if any columns are missing.
Engine Selection: The default engine is MergeTree(), but you can specify any ClickHouse table engine using the engine option (e.g., ReplacingMergeTree(), SummingMergeTree(), AggregatingMergeTree(), etc.).
TableProvider Connection Options
When using the format-based API, the following connection options are available:
Connection Options
| Option | Description | Default Value | Required |
|---|
host | ClickHouse server hostname | localhost | Yes |
protocol | Connection protocol (http or https) | http | No |
http_port | HTTP/HTTPS port | 8123 | No |
database | Database name | default | Yes |
table | Table name | N/A | Yes |
user | Username for authentication | default | No |
password | Password for authentication | (empty string) | No |
ssl | Enable SSL connection | false | No |
ssl_mode | SSL mode (NONE, STRICT, etc.) | STRICT | No |
timezone | Timezone for date/time operations | server | No |
Table Creation Options
These options are used when the table doesn't exist and needs to be created:
| Option | Description | Default Value | Required |
|---|
order_by | Column(s) to use for ORDER BY clause. Comma-separated for multiple columns | N/A | Yes |
engine | ClickHouse table engine (e.g., MergeTree(), ReplacingMergeTree(), SummingMergeTree(), etc.) | MergeTree() | No |
settings.allow_nullable_key | Enable nullable keys in ORDER BY (for ClickHouse Cloud) | Auto-detected** | No |
settings.<key> | Any ClickHouse table setting | N/A | No |
cluster | Cluster name for Distributed tables | N/A | No |
clickhouse.column.<name>.variant_types | Comma-separated list of ClickHouse types for Variant columns (e.g., String, Int64, Bool, JSON). Type names are case-sensitive. Spaces after commas are optional. | N/A | No |
* The order_by option is required when creating a new table. All specified columns must exist in the schema.
** Automatically set to 1 if ORDER BY contains nullable columns and not explicitly provided.
Tip
Best Practice: For ClickHouse Cloud, explicitly set settings.allow_nullable_key=1 if your ORDER BY columns might be nullable, as ClickHouse Cloud requires this setting.
Writing Modes
The Spark connector (both TableProvider API and Catalog API) supports the following Spark write modes:
append: Add data to existing table
overwrite: Replace all data in the table (truncates table)
References
Partition Overwrite Not Supported: The connector does not currently support partition-level overwrite operations (e.g., overwrite mode with partitionBy). This feature is in progress. See GitHub issue #34 for tracking this feature.
# Overwrite mode (truncates table first)
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "my_table") \
.mode("overwrite") \
.save()
// Overwrite mode (truncates table first)
df.write
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "my_table")
.mode("overwrite")
.save()
// Overwrite mode (truncates table first)
df.write()
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "my_table")
.mode("overwrite")
.save();
Configuring ClickHouse Options
Both the Catalog API and TableProvider API support configuring ClickHouse-specific options (not connector options). These are passed through to ClickHouse when creating tables or executing queries.
ClickHouse options allow you to configure ClickHouse-specific settings like allow_nullable_key, index_granularity, and other table-level or query-level settings. These are different from connector options (like host, database, table) which control how the connector connects to ClickHouse.
Using TableProvider API
With the TableProvider API, use the settings.<key> option format:
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "my_table") \
.option("order_by", "id") \
.option("settings.allow_nullable_key", "1") \
.option("settings.index_granularity", "8192") \
.mode("append") \
.save()
df.write
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "my_table")
.option("order_by", "id")
.option("settings.allow_nullable_key", "1")
.option("settings.index_granularity", "8192")
.mode("append")
.save()
df.write()
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "my_table")
.option("order_by", "id")
.option("settings.allow_nullable_key", "1")
.option("settings.index_granularity", "8192")
.mode("append")
.save();
Using Catalog API
With the Catalog API, use the spark.sql.catalog.<catalog_name>.option.<key> format in your Spark configuration:
spark.sql.catalog.clickhouse.option.allow_nullable_key 1
spark.sql.catalog.clickhouse.option.index_granularity 8192
Or set them when creating tables via Spark SQL:
CREATE TABLE clickhouse.default.my_table (
id INT,
name STRING
) USING ClickHouse
TBLPROPERTIES (
engine = 'MergeTree()',
order_by = 'id',
'settings.allow_nullable_key' = '1',
'settings.index_granularity' = '8192'
)
ClickHouse Cloud settings
When connecting to ClickHouse Cloud, make sure to enable SSL and set the appropriate SSL mode. For example:
spark.sql.catalog.clickhouse.option.ssl true
spark.sql.catalog.clickhouse.option.ssl_mode NONE
Read data
public static void main(String[] args) {
// Create a Spark session
SparkSession spark = SparkSession.builder()
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate();
Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");
df.show();
spark.stop();
}
object NativeSparkRead extends App {
val spark = SparkSession.builder
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate
val df = spark.sql("select * from clickhouse.default.example_table")
df.show()
spark.stop()
}
from pyspark.sql import SparkSession
packages = [
"com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0",
"com.clickhouse:clickhouse-client:0.7.0",
"com.clickhouse:clickhouse-http-client:0.7.0",
"org.apache.httpcomponents.client5:httpclient5:5.2.1"
]
spark = (SparkSession.builder
.config("spark.jars.packages", ",".join(packages))
.getOrCreate())
spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1")
spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http")
spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123")
spark.conf.set("spark.sql.catalog.clickhouse.user", "default")
spark.conf.set("spark.sql.catalog.clickhouse.password", "123456")
spark.conf.set("spark.sql.catalog.clickhouse.database", "default")
spark.conf.set("spark.clickhouse.write.format", "json")
df = spark.sql("select * from clickhouse.default.example_table")
df.show()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
SELECT * FROM jdbcTable;
Write data
References
Partition Overwrite Not Supported: The Catalog API does not currently support partition-level overwrite operations (e.g., overwrite mode with partitionBy). This feature is in progress. See GitHub issue #34 for tracking this feature.
public static void main(String[] args) throws AnalysisException {
// Create a Spark session
SparkSession spark = SparkSession.builder()
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate();
// Define the schema for the DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
});
List<Row> data = Arrays.asList(
RowFactory.create(1, "Alice"),
RowFactory.create(2, "Bob")
);
// Create a DataFrame
Dataset<Row> df = spark.createDataFrame(data, schema);
df.writeTo("clickhouse.default.example_table").append();
spark.stop();
}
object NativeSparkWrite extends App {
// Create a Spark session
val spark: SparkSession = SparkSession.builder
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate
// Define the schema for the DataFrame
val rows = Seq(Row(1, "John"), Row(2, "Doe"))
val schema = List(
StructField("id", DataTypes.IntegerType, nullable = false),
StructField("name", StringType, nullable = true)
)
// Create the df
val df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
df.writeTo("clickhouse.default.example_table").append()
spark.stop()
}
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Feel free to use any other packages combination satesfying the compatibility matrix provided above.
packages = [
"com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0",
"com.clickhouse:clickhouse-client:0.7.0",
"com.clickhouse:clickhouse-http-client:0.7.0",
"org.apache.httpcomponents.client5:httpclient5:5.2.1"
]
spark = (SparkSession.builder
.config("spark.jars.packages", ",".join(packages))
.getOrCreate())
spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1")
spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http")
spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123")
spark.conf.set("spark.sql.catalog.clickhouse.user", "default")
spark.conf.set("spark.sql.catalog.clickhouse.password", "123456")
spark.conf.set("spark.sql.catalog.clickhouse.database", "default")
spark.conf.set("spark.clickhouse.write.format", "json")
# Create DataFrame
data = [Row(id=11, name="John"), Row(id=12, name="Doe")]
df = spark.createDataFrame(data)
# Write DataFrame to ClickHouse
df.writeTo("clickhouse.default.example_table").append()
-- resultTable is the Spark intermediate df we want to insert into clickhouse.default.example_table
INSERT INTO TABLE clickhouse.default.example_table
SELECT * FROM resultTable;
DDL operations
You can perform DDL operations on your ClickHouse instance using Spark SQL, with all changes immediately persisted in
ClickHouse.
Spark SQL allows you to write queries exactly as you would in ClickHouse,
so you can directly execute commands such as CREATE TABLE, TRUNCATE, and more - without modification, for instance:
Note
When using Spark SQL, only one statement can be executed at a time.
CREATE TABLE test_db.tbl_sql (
create_time TIMESTAMP NOT NULL,
m INT NOT NULL COMMENT 'part key',
id BIGINT NOT NULL COMMENT 'sort key',
value STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
engine = 'MergeTree()',
order_by = 'id',
settings.index_granularity = 8192
);
The above examples demonstrate Spark SQL queries, which you can run within your application using any API—Java, Scala,
PySpark, or shell.
Working with VariantType
Note
VariantType support is available in Spark 4.0+ and requires ClickHouse 25.3+ with experimental JSON/Variant types enabled.
The connector supports Spark's VariantType for working with semi-structured data. VariantType maps to ClickHouse's JSON and Variant types, allowing you to store and query flexible schema data efficiently.
Note
This section focuses specifically on VariantType mapping and usage. For a complete overview of all supported data types, see the Supported data types section.
ClickHouse Type Mapping
| ClickHouse Type | Spark Type | Description |
|---|
JSON | VariantType | Stores JSON objects only (must start with {) |
Variant(T1, T2, ...) | VariantType | Stores multiple types including primitives, arrays, and JSON |
Reading VariantType Data
When reading from ClickHouse, JSON and Variant columns are automatically mapped to Spark's VariantType:
// Read JSON column as VariantType
val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
// Access variant data
df.show()
// Convert variant to JSON string for inspection
import org.apache.spark.sql.functions._
df.select(
col("id"),
to_json(col("data")).as("data_json")
).show()
# Read JSON column as VariantType
df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
# Access variant data
df.show()
# Convert variant to JSON string for inspection
from pyspark.sql.functions import to_json
df.select(
"id",
to_json("data").alias("data_json")
).show()
// Read JSON column as VariantType
Dataset<Row> df = spark.sql("SELECT id, data FROM clickhouse.default.json_table");
// Access variant data
df.show();
// Convert variant to JSON string for inspection
import static org.apache.spark.sql.functions.*;
df.select(
col("id"),
to_json(col("data")).as("data_json")
).show();
Writing VariantType Data
You can write VariantType data to ClickHouse using either JSON or Variant column types:
import org.apache.spark.sql.functions._
// Create DataFrame with JSON data
val jsonData = Seq(
(1, """{"name": "Alice", "age": 30}"""),
(2, """{"name": "Bob", "age": 25}"""),
(3, """{"name": "Charlie", "city": "NYC"}""")
).toDF("id", "json_string")
// Parse JSON strings to VariantType
val variantDF = jsonData.select(
col("id"),
parse_json(col("json_string")).as("data")
)
// Write to ClickHouse with JSON type (JSON objects only)
variantDF.writeTo("clickhouse.default.user_data").create()
// Or specify Variant with multiple types
spark.sql("""
CREATE TABLE clickhouse.default.mixed_data (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON',
'engine' = 'MergeTree()',
'order_by' = 'id'
)
""")
from pyspark.sql.functions import parse_json
# Create DataFrame with JSON data
json_data = [
(1, '{"name": "Alice", "age": 30}'),
(2, '{"name": "Bob", "age": 25}'),
(3, '{"name": "Charlie", "city": "NYC"}')
]
df = spark.createDataFrame(json_data, ["id", "json_string"])
# Parse JSON strings to VariantType
variant_df = df.select(
"id",
parse_json("json_string").alias("data")
)
# Write to ClickHouse with JSON type
variant_df.writeTo("clickhouse.default.user_data").create()
# Or specify Variant with multiple types
spark.sql("""
CREATE TABLE clickhouse.default.mixed_data (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON',
'engine' = 'MergeTree()',
'order_by' = 'id'
)
""")
import static org.apache.spark.sql.functions.*;
// Create DataFrame with JSON data
List<Row> jsonData = Arrays.asList(
RowFactory.create(1, "{\"name\": \"Alice\", \"age\": 30}"),
RowFactory.create(2, "{\"name\": \"Bob\", \"age\": 25}"),
RowFactory.create(3, "{\"name\": \"Charlie\", \"city\": \"NYC\"}")
);
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("json_string", DataTypes.StringType, false)
});
Dataset<Row> jsonDF = spark.createDataFrame(jsonData, schema);
// Parse JSON strings to VariantType
Dataset<Row> variantDF = jsonDF.select(
col("id"),
parse_json(col("json_string")).as("data")
);
// Write to ClickHouse with JSON type (JSON objects only)
variantDF.writeTo("clickhouse.default.user_data").create();
// Or specify Variant with multiple types
spark.sql("CREATE TABLE clickhouse.default.mixed_data (" +
"id INT, " +
"data VARIANT" +
") USING clickhouse " +
"TBLPROPERTIES (" +
"'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON', " +
"'engine' = 'MergeTree()', " +
"'order_by' = 'id'" +
")");
Creating VariantType Tables with Spark SQL
You can create VariantType tables using Spark SQL DDL:
-- Create table with JSON type (default)
CREATE TABLE clickhouse.default.json_table (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'engine' = 'MergeTree()',
'order_by' = 'id'
)
-- Create table with Variant type supporting multiple types
CREATE TABLE clickhouse.default.flexible_data (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
'engine' = 'MergeTree()',
'order_by' = 'id'
)
Configuring Variant Types
When creating tables with VariantType columns, you can specify which ClickHouse types to use:
JSON Type (Default)
If no variant_types property is specified, the column defaults to ClickHouse's JSON type, which only accepts JSON objects:
CREATE TABLE clickhouse.default.json_table (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'engine' = 'MergeTree()',
'order_by' = 'id'
)
This creates the following ClickHouse query:
CREATE TABLE json_table (id Int32, data JSON) ENGINE = MergeTree() ORDER BY id
Variant Type with Multiple Types
To support primitives, arrays, and JSON objects, specify the types in the variant_types property:
CREATE TABLE clickhouse.default.flexible_data (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
'engine' = 'MergeTree()',
'order_by' = 'id'
)
This creates the following ClickHouse query:
CREATE TABLE flexible_data (
id Int32,
data Variant(String, Int64, Float64, Bool, Array(String), JSON)
) ENGINE = MergeTree() ORDER BY id
Supported Variant Types
The following ClickHouse types can be used in Variant():
- Primitives:
String, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64, Bool
- Arrays:
Array(T) where T is any supported type, including nested arrays
- JSON:
JSON for storing JSON objects
By default, JSON and Variant columns are read as VariantType. You can override this behavior to read them as strings:
// Read JSON/Variant as strings instead of VariantType
spark.conf.set("spark.clickhouse.read.jsonAs", "string")
val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
// data column will be StringType containing JSON strings
# Read JSON/Variant as strings instead of VariantType
spark.conf.set("spark.clickhouse.read.jsonAs", "string")
df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
# data column will be StringType containing JSON strings
// Read JSON/Variant as strings instead of VariantType
spark.conf().set("spark.clickhouse.read.jsonAs", "string");
Dataset<Row> df = spark.sql("SELECT id, data FROM clickhouse.default.json_table");
// data column will be StringType containing JSON strings
VariantType write support varies by format:
| Format | Support | Notes |
|---|
| JSON | ✅ Full | Supports both JSON and Variant types. Recommended for VariantType data |
| Arrow | ⚠️ Partial | Supports writing to ClickHouse JSON type. Does not support ClickHouse Variant type. Full support is pending resolution of https://github.com/ClickHouse/ClickHouse/issues/92752 |
Configure the write format:
spark.conf.set("spark.clickhouse.write.format", "json") // Recommended for Variant types
Tip
If you need to write to a ClickHouse Variant type, use JSON format. Arrow format only supports writing to JSON type.
Best Practices
- Use JSON type for JSON-only data: If you only store JSON objects, use the default JSON type (no
variant_types property)
- Specify types explicitly: When using
Variant(), explicitly list all types you plan to store
- Enable experimental features: Ensure ClickHouse has
allow_experimental_json_type = 1 enabled
- Use JSON format for writes: JSON format is recommended for VariantType data for better compatibility
- Consider query patterns: JSON/Variant types support ClickHouse's JSON path queries for efficient filtering
- Column hints for performance: When using JSON fields in ClickHouse, adding column hints improves query performance. Currently, adding column hints via Spark is not supported. See GitHub issue #497 for tracking this feature.
Example: Complete Workflow
import org.apache.spark.sql.functions._
// Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1")
// Create table with Variant column
spark.sql("""
CREATE TABLE clickhouse.default.events (
event_id BIGINT,
event_time TIMESTAMP,
event_data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON',
'engine' = 'MergeTree()',
'order_by' = 'event_time'
)
""")
// Prepare data with mixed types
val events = Seq(
(1L, "2024-01-01 10:00:00", """{"action": "login", "user_id": 123}"""),
(2L, "2024-01-01 10:05:00", """{"action": "purchase", "amount": 99.99}"""),
(3L, "2024-01-01 10:10:00", """{"action": "logout", "duration": 600}""")
).toDF("event_id", "event_time", "json_data")
// Convert to VariantType and write
val variantEvents = events.select(
col("event_id"),
to_timestamp(col("event_time")).as("event_time"),
parse_json(col("json_data")).as("event_data")
)
variantEvents.writeTo("clickhouse.default.events").append()
// Read and query
val result = spark.sql("""
SELECT event_id, event_time, event_data
FROM clickhouse.default.events
WHERE event_time >= '2024-01-01'
ORDER BY event_time
""")
result.show(false)
from pyspark.sql.functions import parse_json, to_timestamp
# Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1")
# Create table with Variant column
spark.sql("""
CREATE TABLE clickhouse.default.events (
event_id BIGINT,
event_time TIMESTAMP,
event_data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON',
'engine' = 'MergeTree()',
'order_by' = 'event_time'
)
""")
# Prepare data with mixed types
events = [
(1, "2024-01-01 10:00:00", '{"action": "login", "user_id": 123}'),
(2, "2024-01-01 10:05:00", '{"action": "purchase", "amount": 99.99}'),
(3, "2024-01-01 10:10:00", '{"action": "logout", "duration": 600}')
]
df = spark.createDataFrame(events, ["event_id", "event_time", "json_data"])
# Convert to VariantType and write
variant_events = df.select(
"event_id",
to_timestamp("event_time").alias("event_time"),
parse_json("json_data").alias("event_data")
)
variant_events.writeTo("clickhouse.default.events").append()
# Read and query
result = spark.sql("""
SELECT event_id, event_time, event_data
FROM clickhouse.default.events
WHERE event_time >= '2024-01-01'
ORDER BY event_time
""")
result.show(truncate=False)
import static org.apache.spark.sql.functions.*;
// Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1");
// Create table with Variant column
spark.sql("CREATE TABLE clickhouse.default.events (" +
"event_id BIGINT, " +
"event_time TIMESTAMP, " +
"event_data VARIANT" +
") USING clickhouse " +
"TBLPROPERTIES (" +
"'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON', " +
"'engine' = 'MergeTree()', " +
"'order_by' = 'event_time'" +
")");
// Prepare data with mixed types
List<Row> events = Arrays.asList(
RowFactory.create(1L, "2024-01-01 10:00:00", "{\"action\": \"login\", \"user_id\": 123}"),
RowFactory.create(2L, "2024-01-01 10:05:00", "{\"action\": \"purchase\", \"amount\": 99.99}"),
RowFactory.create(3L, "2024-01-01 10:10:00", "{\"action\": \"logout\", \"duration\": 600}")
);
StructType eventSchema = new StructType(new StructField[]{
DataTypes.createStructField("event_id", DataTypes.LongType, false),
DataTypes.createStructField("event_time", DataTypes.StringType, false),
DataTypes.createStructField("json_data", DataTypes.StringType, false)
});
Dataset<Row> eventsDF = spark.createDataFrame(events, eventSchema);
// Convert to VariantType and write
Dataset<Row> variantEvents = eventsDF.select(
col("event_id"),
to_timestamp(col("event_time")).as("event_time"),
parse_json(col("json_data")).as("event_data")
);
variantEvents.writeTo("clickhouse.default.events").append();
// Read and query
Dataset<Row> result = spark.sql("SELECT event_id, event_time, event_data " +
"FROM clickhouse.default.events " +
"WHERE event_time >= '2024-01-01' " +
"ORDER BY event_time");
result.show(false);
Configurations
The following are the adjustable configurations available in the connector.
Note
Using Configurations: These are Spark-level configuration options that apply to both Catalog API and TableProvider API. They can be set in two ways:
-
Global Spark configuration (applies to all operations):
spark.conf.set("spark.clickhouse.write.batchSize", "20000")
spark.conf.set("spark.clickhouse.write.compression.codec", "lz4")
-
Per-operation override (TableProvider API only - can override global settings):
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "my_table") \
.option("spark.clickhouse.write.batchSize", "20000") \
.option("spark.clickhouse.write.compression.codec", "lz4") \
.mode("append") \
.save()
Alternatively, set them in spark-defaults.conf or when creating the Spark session.
| Key | Default | Description | Since |
|---|
| spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouse supports using complex expressions as sharding keys or partition values, e.g. cityHash64(col_1, col_2), which are currently not supported by Spark. If true, ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when spark.clickhouse.write.distributed.convertLocal is enabled, ignore unsupported sharding keys may corrupt the data. | 0.4.0 |
| spark.clickhouse.read.compression.codec | lz4 | The codec used to decompress data for reading. Supported codecs: none, lz4. | 0.5.0 |
| spark.clickhouse.read.distributed.convertLocal | true | When reading Distributed table, read local table instead of itself. If true, ignore spark.clickhouse.read.distributed.useClusterNodes. | 0.1.0 |
| spark.clickhouse.read.fixedStringAs | binary | Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string | 0.8.0 |
| spark.clickhouse.read.format | json | Serialize format for reading. Supported formats: json, binary | 0.6.0 |
| spark.clickhouse.read.runtimeFilter.enabled | false | Enable runtime filter for reading. | 0.8.0 |
| spark.clickhouse.read.splitByPartitionId | true | If true, construct input partition filter by virtual column _partition_id, instead of partition value. There are known issues with assembling SQL predicates by partition value. This feature requires ClickHouse Server v21.6+ | 0.4.0 |
| spark.clickhouse.useNullableQuerySchema | false | If true, mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ... on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true. | 0.8.0 |
| spark.clickhouse.write.batchSize | 10000 | The number of records per batch on writing to ClickHouse. | 0.1.0 |
| spark.clickhouse.write.compression.codec | lz4 | The codec used to compress data for writing. Supported codecs: none, lz4. | 0.3.0 |
| spark.clickhouse.write.distributed.convertLocal | false | When writing Distributed table, write local table instead of itself. If true, ignore spark.clickhouse.write.distributed.useClusterNodes. | 0.1.0 |
| spark.clickhouse.write.distributed.useClusterNodes | true | Write to all nodes of cluster when writing Distributed table. | 0.1.0 |
| spark.clickhouse.write.format | arrow | Serialize format for writing. Supported formats: json, arrow | 0.4.0 |
| spark.clickhouse.write.localSortByKey | true | If true, do local sort by sort keys before writing. | 0.3.0 |
| spark.clickhouse.write.localSortByPartition | value of spark.clickhouse.write.repartitionByPartition | If true, do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition. | 0.3.0 |
| spark.clickhouse.write.maxRetry | 3 | The maximum number of write we will retry for a single batch write failed with retryable codes. | 0.1.0 |
| spark.clickhouse.write.repartitionByPartition | true | Whether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing. | 0.3.0 |
| spark.clickhouse.write.repartitionNum | 0 | Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement. | 0.1.0 |
| spark.clickhouse.write.repartitionStrictly | false | If true, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true. | 0.3.0 |
| spark.clickhouse.write.retryInterval | 10s | The interval in seconds between write retry. | 0.1.0 |
| spark.clickhouse.write.retryableErrorCodes | 241 | The retryable error codes returned by ClickHouse server when write failing. | 0.1.0 |
Supported data types
This section outlines the mapping of data types between Spark and ClickHouse. The tables below provide quick references
for converting data types when reading from ClickHouse into Spark and when inserting data from Spark into ClickHouse.
Reading data from ClickHouse into Spark
| ClickHouse Data Type | Spark Data Type | Supported | Is Primitive | Notes |
|---|
Nothing | NullType | ✅ | Yes | |
Bool | BooleanType | ✅ | Yes | |
UInt8, Int16 | ShortType | ✅ | Yes | |
Int8 | ByteType | ✅ | Yes | |
UInt16,Int32 | IntegerType | ✅ | Yes | |
UInt32,Int64, UInt64 | LongType | ✅ | Yes | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | Yes | |
Float32 | FloatType | ✅ | Yes | |
Float64 | DoubleType | ✅ | Yes | |
String, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | Yes | |
FixedString | BinaryType, StringType | ✅ | Yes | Controlled by configuration READ_FIXED_STRING_AS |
Decimal | DecimalType | ✅ | Yes | Precision and scale up to Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | Yes | |
Decimal64 | DecimalType(18, scale) | ✅ | Yes | |
Decimal128 | DecimalType(38, scale) | ✅ | Yes | |
Date, Date32 | DateType | ✅ | Yes | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | Yes | |
Array | ArrayType | ✅ | No | Array element type is also converted |
Map | MapType | ✅ | No | Keys are limited to StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | Yes | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | Yes | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | No | Specific interval type is used |
JSON, Variant | VariantType | ✅ | No | Requires Spark 4.0+ and ClickHouse 25.3+. Can be read as StringType with spark.clickhouse.read.jsonAs=string |
Object | | ❌ | | |
Nested | | ❌ | | |
Tuple | StructType | ✅ | No | Supports both named and unnamed tuples. Named tuples map to struct fields by name, unnamed tuples use _1, _2, etc. Supports nested structs and nullable fields |
Point | | ❌ | | |
Polygon | | ❌ | | |
MultiPolygon | | ❌ | | |
Ring | | ❌ | | |
IntervalQuarter | | ❌ | | |
IntervalWeek | | ❌ | | |
Decimal256 | | ❌ | | |
AggregateFunction | | ❌ | | |
SimpleAggregateFunction | | ❌ | | |
Inserting data from Spark into ClickHouse
| Spark Data Type | ClickHouse Data Type | Supported | Is Primitive | Notes |
|---|
BooleanType | Bool | ✅ | Yes | Mapped to Bool type (not UInt8) since version 0.9.0 |
ByteType | Int8 | ✅ | Yes | |
ShortType | Int16 | ✅ | Yes | |
IntegerType | Int32 | ✅ | Yes | |
LongType | Int64 | ✅ | Yes | |
FloatType | Float32 | ✅ | Yes | |
DoubleType | Float64 | ✅ | Yes | |
StringType | String | ✅ | Yes | |
VarcharType | String | ✅ | Yes | |
CharType | String | ✅ | Yes | |
DecimalType | Decimal(p, s) | ✅ | Yes | Precision and scale up to Decimal128 |
DateType | Date | ✅ | Yes | |
TimestampType | DateTime | ✅ | Yes | |
ArrayType (list, tuple, or array) | Array | ✅ | No | Array element type is also converted |
MapType | Map | ✅ | No | Keys are limited to StringType |
StructType | Tuple | ✅ | No | Converted to named Tuple with field names. |
VariantType | JSON or Variant | ✅ | No | Requires Spark 4.0+ and ClickHouse 25.3+. Defaults to JSON type. Use clickhouse.column.<name>.variant_types property to specify Variant with multiple types. |
Object | | ❌ | | |
Nested | | ❌ | | |
Contributing and support
If you'd like to contribute to the project or report any issues, we welcome your input!
Visit our GitHub repository to open an issue, suggest
improvements, or submit a pull request.
Contributions are welcome! Please check the contribution guidelines in the repository before starting.
Thank you for helping improve our ClickHouse Spark connector!