Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50618][SS][SQL] Make DataFrameReader and DataStreamReader leverage the analyzer more #49238

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

brkyvz
Copy link
Contributor

@brkyvz brkyvz commented Dec 19, 2024

What changes were proposed in this pull request?

Introduces two logical nodes:

  • UnresolvedDataSource
  • UnresolvedJDBCRelation

The DataFrameReader and DataStreamReader creates these unresolved nodes instead, and calls the analyzer to resolve these data sources. These then get analyzed as part of the ResolveDataSource rule. All logic in DataFrameReader and DataStreamReader has been moved here.

There is still logic around text based format parsing on an existing Dataset. I will refactor this in a subsequent PR.

Why are the changes needed?

The DataFrameReader and DataStreamReader typically creates analyzed relations as part of their respective .load() methods.

This creates inconsistencies for what rules get applied to the query plan as part of Catalyst depending on your API of choice, such as SQL vs Python or SQL vs Scala.

The goal of this Jira is to refactor the logic in DataFrameReader and DataStreamReader classes to create unresolved plans that get analyzed as part of Catalyst.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing unit tests and will add new tests

Was this patch authored or co-authored using generative AI tooling?

No

}
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
Dataset.ofRows(sparkSession, UnresolvedJDBCRelation(url, table, predicates, params))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we reuse UnresolvedDataSource with the source set as "jdbc"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm the parameters are different. I'm wondering how this is done in SQL. Seems people can't use arbitrary predicates to build JDBCPartition with SQL API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should add this feature to SQL first, and then unify it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having this available at the moment can help us unify it in SQL afterwards :) This seems to be the only edge case - seems like a new API since I looked at Spark. Do you want me to leave it as is and just migrate the file based and generic data sources?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea let's leave out the JDBC one for now. Once we have feature parity between SQL and DataFrame, we can revisit this.

@brkyvz
Copy link
Contributor Author

brkyvz commented Dec 20, 2024

Thanks for the feedback @cloud-fan! Addressed

@HyukjinKwon HyukjinKwon changed the title [SPARK-50618] Make DataFrameReader and DataStreamReader leverage the analyzer more [SPARK-50618][SS] Make DataFrameReader and DataStreamReader leverage the analyzer more Dec 22, 2024
@HyukjinKwon HyukjinKwon changed the title [SPARK-50618][SS] Make DataFrameReader and DataStreamReader leverage the analyzer more [SPARK-50618][SS][SQL] Make DataFrameReader and DataStreamReader leverage the analyzer more Dec 22, 2024
assertFirstUnresolved(spark.read.format("org.apache.spark.sql.test").load())
assertFirstUnresolved(spark.read.format("org.apache.spark.sql.test").load(dir))
assertFirstUnresolved(spark.read.format("org.apache.spark.sql.test").load(dir, dir, dir))
assertFirstUnresolved(spark.read.format("org.apache.spark.sql.test").load(Seq(dir, dir): _*))
Option(dir).map(spark.read.format("org.apache.spark.sql.test").load)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to your PR but what does this line test?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants