Building a SQL processor with Apache Calcite
17 Jul 2024 by dzlab
In a previous article, we saw how to create an Adapter for Apache Calcite and then how to run SQL queries against random data source. In this article we will see in step by step how to use Apache Cacite to implement a SQL processor to parse an input query, validate it and then execute it.
As an example query we will use the following simple JOIN
query between two tables customer
and orders
.
SELECT `C_NAME`, `O_ORDERKEY`, `O_ORDERDATE`
FROM `CUSTOMER`
INNER JOIN `ORDERS` ON `CUSTOMER`.`c_custkey` = `ORDERS`.`o_custkey`
WHERE `CUSTOMER`.`c_custkey` < 3
ORDER BY `C_NAME`, `O_ORDERKEY`
Catalog
We need to build the catalog of metadata for Caclite to resolve the query.
First, we need to create the root schema and type factory:
CalciteSchema schema = CalciteSchema.createRootSchema(false);
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
Then, create the metadata of the two tables (columns and data types) then register them with the root schema
RelDataTypeFactory.Builder builder1 = typeFactory.builder();
builder1.add("c_custkey", typeFactory.createJavaType(Integer.class).getSqlTypeName());
builder1.add("c_name", typeFactory.createJavaType(String.class).getSqlTypeName());
schema.add("customer", new MyTable(builder1.build(), ...){...});
RelDataTypeFactory.Builder builder2 = typeFactory.builder();
builder2.add("o_orderkey", typeFactory.createJavaType(Integer.class).getSqlTypeName());
builder2.add("o_custkey", typeFactory.createJavaType(Integer.class).getSqlTypeName());
builder2.add("o_orderdate", typeFactory.createJavaType(Date.class).getSqlTypeName());
schema.add("orders", new MyTable(builder2.build(), ...){...});
Note:
MyTable
should be replaced with the actual class used to access the data and implements Calcite’sTable
/ScannableTable
After that, Configure and instantiate a catalog reader that Calcite can use to access the metadata
CalciteConnectionConfig readerConfig = CalciteConnectionConfig.DEFAULT
.set(CalciteConnectionProperty.CASE_SENSITIVE, "false");
CalciteCatalogReader catalogReader = new CalciteCatalogReader(schema, Collections.emptyList(), typeFactory, readerConfig);
Note: we set the case-sensitivity to false so that we it is OK to user all uppercase table or column names.
Query to AST
To parse the text query into an Abstract Syntax Tree (AST), we first create a SQL parser
SqlParser parser = SqlParser.create(sqlQuery);
Then, we can use it to parse the query into an AST as follows:
SqlNode parseAst = parser.parseQuery();
We can get back the original query from the AST with parseAst.toString()
.
Once we have the AST, we can validate it against the catalog. First, create a SQL validator using the standard operator table and default configuration.
SqlValidator sqlValidator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(),
catalogReader, typeFactory, SqlValidator.Config.DEFAULT);
Now we can validate the initial AST:
SqlNode validAst = sqlValidator.validate(parseAst);
Similarly to before, we can get back the original query from the validated AST with validAst.toString()
AST to Logical plan
Query optimization cannot be applied to an AST, the later must be converted to Relational Algebra expression.
First, Create the optimization cluster to maintain planning information
RelOptPlanner planner = new VolcanoPlanner();
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
Then, Configure and instantiate an AST to Logical plan converter with default configuration and Standard expression normalization
RelOptTable.ViewExpander NOOP_EXPANDER = (type, query, schema, path) -> null;
SqlToRelConverter sqlToRelConverter = new SqlToRelConverter(NOOP_EXPANDER,
sqlValidator, catalogReader, cluster,
StandardConvertletTable.INSTANCE,
SqlToRelConverter.config());
Now, we can convert the validated AST into a logical plan and print it to standard output
RelNode logPlan = sqlToRelConverter.convertQuery(validAst, false, true).rel;
// TODO 13. Display the logical plan with explain attributes
System.out.println(
RelOptUtil.dumpPlan("[Logical plan]", logPlan, SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES)
);
We should see a Logical plan that look like this:
LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
LogicalProject(C_NAME=[$1], O_ORDERKEY=[$8], O_ORDERDATE=[$12])
LogicalFilter(condition=[<($0, 3)])
LogicalJoin(condition=[=($0, $9)], joinType=[inner])
LogicalTableScan(table=[[CUSTOMER]])
LogicalTableScan(table=[[ORDERS]])
Logical to Physical plan
We need to optimize the Logical Plan and convert it to a plan that can be executed by the underlying storage system.
First, initialize optimizer/planner with the necessary rules that will be used to transform the Logical Plan:
RelOptPlanner planner = cluster.getPlanner();
planner.addRule(CoreRules.FILTER_TO_CALC);
planner.addRule(CoreRules.PROJECT_TO_CALC);
planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_CALC_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_JOIN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
Next, define the type of the output plan, in this case we want a physical plan in EnumerableContention
logPlan = planner.changeTraits(logPlan, logPlan.getTraitSet().replace(EnumerableConvention.INSTANCE));
planner.setRoot(logPlan);
Start the optimization process to obtain the most efficient physical plan based on the provided rule set.
EnumerableRel phyPlan = (EnumerableRel) planner.findBestExp();
We can visualize the Physical plan
System.out.println(
RelOptUtil.dumpPlan("[Physical plan]", phyPlan, SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES)
);
Which will give us something like this:
EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
EnumerableCalc(expr#0..16=[{inputs}], C_NAME=[$t1], O_ORDERKEY=[$t8], O_ORDERDATE=[$t12])
EnumerableCalc(expr#0..16=[{inputs}], expr#17=[3], expr#18=[<($t0, $t17)], proj#0..16=[{exprs}], $condition=[$t18])
EnumerableHashJoin(condition=[=($0, $9)], joinType=[inner])
EnumerableTableScan(table=[[CUSTOMER]])
EnumerableTableScan(table=[[ORDERS]])
Or generate a Dotviz graph which would look like this:
Physical to Executable plan
With the physical plan at hand we can now execute it.
First, create simple data context that contains only schema information for similicity.
private static final class SchemaOnlyDataContext implements DataContext {
private final SchemaPlus schema;
SchemaOnlyDataContext(CalciteSchema calciteSchema) {
this.schema = calciteSchema.plus();
}
@Override public SchemaPlus getRootSchema() {
return schema;
}
@Override public JavaTypeFactory getTypeFactory() {
return new JavaTypeFactoryImpl();
}
@Override public QueryProvider getQueryProvider() {
return null;
}
@Override public Object get(final String name) {
return null;
}
}
Next, compile generated code and obtain the executable program
Bindable<Object[]> execPlan = EnumerableInterpretable.toBindable(new HashMap<>(), null, phyPlan, EnumerableRel.Prefer.ARRAY);
Finally, run the program using a context simply providing access to the schema and print the resulting rows:
for(Object[] row: execPlan.bind(new SchemaOnlyDataContext(schema))) {
System.out.println(Arrays.toString(row));
}
That’s all folks
In this article, we saw in a step by step how to process a SQL query and execute it. Calcite can do these steps for us when we simply create a CalciteConnection
as seen in a previous article.
I hope you enjoyed this article, feel free to leave a comment or reach out on twitter @bachiirc.