Skip to content

Commit 20ffbf7

Browse files
sunchaodongjoon-hyun
authored andcommitted
[SPARK-37377][SQL] Initial implementation of Storage-Partitioned Join
### What changes were proposed in this pull request? This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)). Changes: - `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface. - with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated. - `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature. - a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other. - a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`. - changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`. - A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false. ### Why are the changes needed? Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details. ### Does this PR introduce _any_ user-facing change? With the changes, a user can now: - have V2 data sources to report distribution and ordering to Spark on read path - Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these. - a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior ### How was this patch tested? - Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature - Extended `EnsureRequirementsSuite` to cover `DataSourcePartitioning` - Some existing test classes, such as `InMemoryTable` are extended to cover the changes Closes #35657 from sunchao/SPARK-37377-partitioning. Lead-authored-by: Chao Sun <[email protected]> Co-authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 608f70d commit 20ffbf7

File tree

45 files changed

+1839
-338
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1839
-338
lines changed

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class AvroRowReaderSuite
5959

6060
val df = spark.read.format("avro").load(dir.getCanonicalPath)
6161
val fileScan = df.queryExecution.executedPlan collectFirst {
62-
case BatchScanExec(_, f: AvroScan, _) => f
62+
case BatchScanExec(_, f: AvroScan, _, _) => f
6363
}
6464
val filePath = fileScan.get.fileIndex.inputFiles(0)
6565
val fileSize = new File(new URI(filePath)).length

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2335,7 +2335,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
23352335
})
23362336

23372337
val fileScan = df.queryExecution.executedPlan collectFirst {
2338-
case BatchScanExec(_, f: AvroScan, _) => f
2338+
case BatchScanExec(_, f: AvroScan, _, _) => f
23392339
}
23402340
assert(fileScan.nonEmpty)
23412341
assert(fileScan.get.partitionFilters.nonEmpty)
@@ -2368,7 +2368,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
23682368
assert(filterCondition.isDefined)
23692369

23702370
val fileScan = df.queryExecution.executedPlan collectFirst {
2371-
case BatchScanExec(_, f: AvroScan, _) => f
2371+
case BatchScanExec(_, f: AvroScan, _, _) => f
23722372
}
23732373
assert(fileScan.nonEmpty)
23742374
assert(fileScan.get.partitionFilters.isEmpty)
@@ -2449,7 +2449,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
24492449
.where("value = 'a'")
24502450

24512451
val fileScan = df.queryExecution.executedPlan collectFirst {
2452-
case BatchScanExec(_, f: AvroScan, _) => f
2452+
case BatchScanExec(_, f: AvroScan, _, _) => f
24532453
}
24542454
assert(fileScan.nonEmpty)
24552455
if (filtersPushdown) {

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
372372

373373
private def checkAggregatePushed(df: DataFrame, funcName: String): Unit = {
374374
df.queryExecution.optimizedPlan.collect {
375-
case DataSourceV2ScanRelation(_, scan, _) =>
375+
case DataSourceV2ScanRelation(_, scan, _, _) =>
376376
assert(scan.isInstanceOf[V1ScanWrapper])
377377
val wrapper = scan.asInstanceOf[V1ScanWrapper]
378378
assert(wrapper.pushedDownOperators.aggregation.isDefined)

core/src/main/scala/org/apache/spark/util/collection/Utils.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,13 @@ private[spark] object Utils {
3636
}
3737
ordering.leastOf(input.asJava, num).iterator.asScala
3838
}
39+
40+
/**
41+
* Only returns `Some` iff ALL elements in `input` are defined. In this case, it is
42+
* equivalent to `Some(input.flatten)`.
43+
*
44+
* Otherwise, returns `None`.
45+
*/
46+
def sequenceToOption[T](input: Seq[Option[T]]): Option[Seq[T]] =
47+
if (input.forall(_.isDefined)) Some(input.flatten) else None
3948
}

project/MimaExcludes.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,13 @@ object MimaExcludes {
6464
// [SPARK-37600][BUILD] Upgrade to Hadoop 3.3.2
6565
ProblemFilters.exclude[MissingClassProblem]("org.apache.hadoop.shaded.net.jpountz.lz4.LZ4Compressor"),
6666
ProblemFilters.exclude[MissingClassProblem]("org.apache.hadoop.shaded.net.jpountz.lz4.LZ4Factory"),
67-
ProblemFilters.exclude[MissingClassProblem]("org.apache.hadoop.shaded.net.jpountz.lz4.LZ4SafeDecompressor")
67+
ProblemFilters.exclude[MissingClassProblem]("org.apache.hadoop.shaded.net.jpountz.lz4.LZ4SafeDecompressor"),
68+
69+
// [SPARK-37377][SQL] Initial implementation of Storage-Partitioned Join
70+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution"),
71+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.read.partitioning.Distribution"),
72+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.connector.read.partitioning.Partitioning.*"),
73+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.connector.read.partitioning.Partitioning.*")
6874
)
6975

7076
// Exclude rules for 3.2.x from 3.1.1

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/partitioning/Distribution.java

Lines changed: 0 additions & 44 deletions
This file was deleted.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read.partitioning;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.connector.expressions.Expression;
22+
23+
/**
24+
* Represents a partitioning where rows are split across partitions based on the
25+
* partition transform expressions returned by {@link KeyGroupedPartitioning#keys}.
26+
* <p>
27+
* Note: Data source implementations should make sure for a single partition, all of its rows
28+
* must be evaluated to the same partition value after being applied by
29+
* {@link KeyGroupedPartitioning#keys} expressions. Different partitions can share the same
30+
* partition value: Spark will group these into a single logical partition during planning phase.
31+
*
32+
* @since 3.3.0
33+
*/
34+
@Evolving
35+
public class KeyGroupedPartitioning implements Partitioning {
36+
private final Expression[] keys;
37+
private final int numPartitions;
38+
39+
public KeyGroupedPartitioning(Expression[] keys, int numPartitions) {
40+
this.keys = keys;
41+
this.numPartitions = numPartitions;
42+
}
43+
44+
/**
45+
* Returns the partition transform expressions for this partitioning.
46+
*/
47+
public Expression[] keys() {
48+
return keys;
49+
}
50+
51+
@Override
52+
public int numPartitions() {
53+
return numPartitions;
54+
}
55+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/partitioning/Partitioning.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,25 @@
1818
package org.apache.spark.sql.connector.read.partitioning;
1919

2020
import org.apache.spark.annotation.Evolving;
21-
import org.apache.spark.sql.connector.read.InputPartition;
2221
import org.apache.spark.sql.connector.read.SupportsReportPartitioning;
2322

2423
/**
2524
* An interface to represent the output data partitioning for a data source, which is returned by
26-
* {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work
27-
* like a snapshot. Once created, it should be deterministic and always report the same number of
28-
* partitions and the same "satisfy" result for a certain distribution.
25+
* {@link SupportsReportPartitioning#outputPartitioning()}.
26+
* <p>
27+
* Note: implementors <b>should NOT</b> directly implement this interface. Instead, they should
28+
* use one of the following subclasses:
29+
* <ul>
30+
* <li>{@link KeyGroupedPartitioning}</li>
31+
* <li>{@link UnknownPartitioning}</li>
32+
* </ul>
2933
*
3034
* @since 3.0.0
3135
*/
3236
@Evolving
3337
public interface Partitioning {
34-
3538
/**
36-
* Returns the number of partitions(i.e., {@link InputPartition}s) the data source outputs.
39+
* Returns the number of partitions that the data is split across.
3740
*/
3841
int numPartitions();
39-
40-
/**
41-
* Returns true if this partitioning can satisfy the given distribution, which means Spark does
42-
* not need to shuffle the output data of this data source for some certain operations.
43-
* <p>
44-
* Note that, Spark may add new concrete implementations of {@link Distribution} in new releases.
45-
* This method should be aware of it and always return false for unrecognized distributions. It's
46-
* recommended to check every Spark new release and support new distributions if possible, to
47-
* avoid shuffle at Spark side for more cases.
48-
*/
49-
boolean satisfy(Distribution distribution);
5042
}
Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,22 @@
1818
package org.apache.spark.sql.connector.read.partitioning;
1919

2020
import org.apache.spark.annotation.Evolving;
21-
import org.apache.spark.sql.connector.read.PartitionReader;
2221

2322
/**
24-
* A concrete implementation of {@link Distribution}. Represents a distribution where records that
25-
* share the same values for the {@link #clusteredColumns} will be produced by the same
26-
* {@link PartitionReader}.
23+
* Represents a partitioning where rows are split across partitions in an unknown pattern.
2724
*
28-
* @since 3.0.0
25+
* @since 3.3.0
2926
*/
3027
@Evolving
31-
public class ClusteredDistribution implements Distribution {
28+
public class UnknownPartitioning implements Partitioning {
29+
private final int numPartitions;
3230

33-
/**
34-
* The names of the clustered columns. Note that they are order insensitive.
35-
*/
36-
public final String[] clusteredColumns;
31+
public UnknownPartitioning(int numPartitions) {
32+
this.numPartitions = numPartitions;
33+
}
3734

38-
public ClusteredDistribution(String[] clusteredColumns) {
39-
this.clusteredColumns = clusteredColumns;
35+
@Override
36+
public int numPartitions() {
37+
return numPartitions;
4038
}
4139
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.util
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.catalyst.expressions.{Murmur3HashFunction, RowOrdering}
24+
import org.apache.spark.sql.types.{DataType, StructField, StructType}
25+
26+
/**
27+
* A mutable Set with [[InternalRow]] as its element type. It uses Spark's internal murmur hash to
28+
* compute hash code from an row, and uses [[RowOrdering]] to perform equality checks.
29+
*
30+
* @param dataTypes the data types for the row keys this set holds
31+
*/
32+
class InternalRowSet(val dataTypes: Seq[DataType]) extends mutable.Set[InternalRow] {
33+
private val baseSet = new mutable.HashSet[InternalRowContainer]
34+
35+
private val structType = StructType(dataTypes.map(t => StructField("f", t)))
36+
private val ordering = RowOrdering.createNaturalAscendingOrdering(dataTypes)
37+
38+
override def contains(row: InternalRow): Boolean =
39+
baseSet.contains(new InternalRowContainer(row))
40+
41+
private class InternalRowContainer(val row: InternalRow) {
42+
override def hashCode(): Int = Murmur3HashFunction.hash(row, structType, 42L).toInt
43+
44+
override def equals(other: Any): Boolean = other match {
45+
case r: InternalRowContainer => ordering.compare(row, r.row) == 0
46+
case r => this == r
47+
}
48+
}
49+
50+
override def +=(row: InternalRow): InternalRowSet.this.type = {
51+
val rowKey = new InternalRowContainer(row)
52+
baseSet += rowKey
53+
this
54+
}
55+
56+
override def -=(row: InternalRow): InternalRowSet.this.type = {
57+
val rowKey = new InternalRowContainer(row)
58+
baseSet -= rowKey
59+
this
60+
}
61+
62+
override def iterator: Iterator[InternalRow] = {
63+
baseSet.iterator.map(_.row)
64+
}
65+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.util
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.catalyst.expressions.{Murmur3HashFunction, RowOrdering}
24+
import org.apache.spark.sql.types.{DataType, StructField, StructType}
25+
26+
/**
27+
* A mutable Set with [[InternalRow]] as its element type. It uses Spark's internal murmur hash to
28+
* compute hash code from an row, and uses [[RowOrdering]] to perform equality checks.
29+
*
30+
* @param dataTypes the data types for the row keys this set holds
31+
*/
32+
class InternalRowSet(val dataTypes: Seq[DataType]) extends mutable.Set[InternalRow] {
33+
private val baseSet = new mutable.HashSet[InternalRowContainer]
34+
35+
private val structType = StructType(dataTypes.map(t => StructField("f", t)))
36+
private val ordering = RowOrdering.createNaturalAscendingOrdering(dataTypes)
37+
38+
override def contains(row: InternalRow): Boolean =
39+
baseSet.contains(new InternalRowContainer(row))
40+
41+
private class InternalRowContainer(val row: InternalRow) {
42+
override def hashCode(): Int = Murmur3HashFunction.hash(row, structType, 42L).toInt
43+
44+
override def equals(other: Any): Boolean = other match {
45+
case r: InternalRowContainer => ordering.compare(row, r.row) == 0
46+
case r => this == r
47+
}
48+
}
49+
50+
override def addOne(row: InternalRow): InternalRowSet.this.type = {
51+
val rowKey = new InternalRowContainer(row)
52+
baseSet += rowKey
53+
this
54+
}
55+
56+
override def subtractOne(row: InternalRow): InternalRowSet.this.type = {
57+
val rowKey = new InternalRowContainer(row)
58+
baseSet -= rowKey
59+
this
60+
}
61+
62+
override def clear(): Unit = {
63+
baseSet.clear()
64+
}
65+
66+
override def iterator: Iterator[InternalRow] = {
67+
baseSet.iterator.map(_.row)
68+
}
69+
}

0 commit comments

Comments
 (0)