-
Notifications
You must be signed in to change notification settings - Fork 41
[PECO-1752] Refactor CloudFetch downloader #234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…ator interface (simplifies usage and encapsulates implementation details) Signed-off-by: Levko Kravets <[email protected]>
Signed-off-by: Levko Kravets <[email protected]>
Signed-off-by: Levko Kravets <[email protected]>
Signed-off-by: Levko Kravets <[email protected]>
Signed-off-by: Levko Kravets <[email protected]>
Signed-off-by: Levko Kravets <[email protected]>
Signed-off-by: Levko Kravets <[email protected]>
jackyhu-db
reviewed
Aug 8, 2024
jackyhu-db
reviewed
Aug 8, 2024
jackyhu-db
approved these changes
Aug 23, 2024
atzoum
pushed a commit
to rudderlabs/sqlconnect-go
that referenced
this pull request
Sep 9, 2024
….6.0 to 1.6.1 (#174) Bumps [github.com/databricks/databricks-sql-go](https://github.com/databricks/databricks-sql-go) from 1.6.0 to 1.6.1. <details> <summary>Release notes</summary> <p><em>Sourced from <a href="https://github.com/databricks/databricks-sql-go/releases">github.com/databricks/databricks-sql-go's releases</a>.</em></p> <blockquote> <h2>v1.6.1</h2> <p><code>databricks/databricks-sql-go#234</code></p> <p><strong>Full changelog:</strong> <a href="https://github.com/databricks/databricks-sql-go/compare/v1.6.0...v1.6.1">https://github.com/databricks/databricks-sql-go/compare/v1.6.0...v1.6.1</a></p> </blockquote> </details> <details> <summary>Changelog</summary> <p><em>Sourced from <a href="https://github.com/databricks/databricks-sql-go/blob/main/CHANGELOG.md">github.com/databricks/databricks-sql-go's changelog</a>.</em></p> <blockquote> <h2>v1.6.1 (2024-08-27)</h2> <ul> <li><code>databricks/databricks-sql-go#234</code></li> </ul> </blockquote> </details> <details> <summary>Commits</summary> <ul> <li><a href="https://github.com/databricks/databricks-sql-go/commit/0ddea3730ba688650e9fb46e521e079267e726cd"><code>0ddea37</code></a> Prepare release 1.6.1 (<a href="https://redirect.github.com/databricks/databricks-sql-go/issues/245">#245</a>)</li> <li><a href="https://github.com/databricks/databricks-sql-go/commit/f70e5a5686a5648d14aa9e57279b5b0349e97542"><code>f70e5a5</code></a> [PECO-1752] Refactor CloudFetch downloader (<a href="https://redirect.github.com/databricks/databricks-sql-go/issues/234">#234</a>)</li> <li>See full diff in <a href="https://github.com/databricks/databricks-sql-go/compare/v1.6.0...v1.6.1">compare view</a></li> </ul> </details> <br /> [](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- <details> <summary>Dependabot commands and options</summary> <br /> You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show <dependency name> ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) </details> Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
PECO-1752
This PR is an attempt to fix a CloudFetch error "row number N is not contained in any arrow batch".
See also databricks/databricks-sql-python#405 - basically, the same issue, the same root cause, similar solution.
The problem
In current implementation, all the links from a single TRowSet are added to a concurrent thread (goroutine) pool. The pool downloads them in a random order (all the tasks have the same priority and as a result - same chance to be executed first). To maintain the order of results,
startRowOffset
/rowCount
fields from each CloudFetch link are used: library keeps track of the current row number, and use it to pick the right CloudFetch link (looking for the file where the current row is within [startRowOffset; startRowOffset + rowCount]).This solution has several caveats. First of all, library allows to fetch data only from beginning to the end. With a concurrent thread pool, you never know which file will be downloaded first. In the worst case, while the user is waiting for the very first file, the library may download all the other ones and keep them in memory because the user may need them in future. This increases the latency (on average it will be okay, but we have no control over it), and also memory consumption.
Another problem with this approach is that if any of the files cannot be downloaded - there is no need to download the remaining files, the user won’t be able to process them anyway. But because files are downloaded in arbitrary order - nobody knows how many files will be downloaded before the user reaches the failed one.
Also, seems that error handling wasn't done quite right, but that part of code was a bit unclear to me. Anyway, with this fix all the errors are properly handled and propagated to user when needed.
The solution
This PR changes CloudFetch downloader to use a queue. Downloader keeps a list of pending links (not scheduled), and current tasks. Number of tasks is limited, so new files are scheduled only when previous task is completed and extracted from queue. As user requests next files, downloader will pick the first task from the queue, and schedule the new one to run in background - to keep the queue full. Then, downloader will wait for the task it picked from the queue, and then return it to user. Tasks are still running in in parallel in background. Also, each task itself is reponsible for handling errors (e.g. retry failed downloads), so when task completes - it is either eventually successfull, or failed after all possible retries.
With this approach, the proper order of files is automatically assured. All errors are either handled in downloader or propagated to user. If some file cannot be downloaded due to error - library will not download the remaining ones (like it did previously). Because new files are downloaded only when user consumes previous ones - library will not keep the whole dataset in memory.