Skip to content

Internal sinks, part 1 #13251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 27, 2022
Merged

Internal sinks, part 1 #13251

merged 5 commits into from
Jun 27, 2022

Conversation

teskje
Copy link
Contributor

@teskje teskje commented Jun 24, 2022

This PR is the first step towards enabling internal sinks in Materialize. It includes various changes to the existing persist sinks, to make them work like the envisioned internal sinks.

The PR does not make significant changes to the UI, the CREATE SINK ... INTO PERSIST syntax remains (albeit in a simpler form). Replacing it with the new CREATE RECORDED VIEW syntax will be the subject of a follow-up PR. The intention of this split is to keep the amount of changes that need to be reviewed somewhat manageable.

Changes made in this PR (see the separate commits for more detail):

  • SinkDesc receives a type parameter for storage metadata, akin to SourceInstanceDesc.
  • The persist location parameters are removed from CREATE SINK ... INTO PERSIST. Instead, the persist location is provided by the storage controller now.
  • When creating a persist sink, a source is registered with the storage controller. Right now this is only done to get a storage collection, but later we will use that source to read back sinked data.
  • computed's persist_sink is made compatible to storaged's persist_source, so sinked data can be read back.
  • The compute controller is extended to report updates of persist sink uppers to the storage controller, so the storage controller can track the frontiers of sink collections.

Usage

Sinking data now looks as follows from SQL:

CREATE TABLE table (a int);
CREATE SINK sink FROM table INTO PERSIST;
INSERT INTO table VALUES (1), (2), (2);
DELETE FROM table WHERE a != 1;

It is not currently possible to read back sinked data from SQL. Instead, you can write a simple tail command using the mz-persist-client crate directly, or clone https://github.com/teskje/persist-test.

When you crate a persist sink, computed prints the shard ID in an INFO trace:

[...] INFO mz_compute::sink::persist_sink: persist_sink shard ID: s00e6f38f-ccfc-4471-bb8c-49718abccc19

You can plug that into your tail command, e.g.:

$ cargo run --bin tail -- 'postgres://jan@%2Ftmp?options=--search_path=consensus' 'file:///Users/jan/devel/materialize/mzdata/persist/blob' 's00e6f38f-ccfc-4471-bb8c-49718abccc19'
[...]
(2), 1656079696243, -2
(1), 1656079696243, 1
(2), 1656079696243, 2

(Here, inserts and deletes appear to happen at the same time because the since of the persist shard has already advanced beyond the times of both events. If you want to see them happen in real-time, attach the tail first, before inserting/deleting from the table.)

Caveats

Persist sinks should behave correctly on replicated compute instances and when compute replicas are killed and restarted. They currently don't survive restarts of environmentd: The sink gets recreated but gets a new persist shard ID, so data is written to a new storage collection instead of the existing one. This happens because the storage controller currently doesn't persist shard ID.

Motivation

Tips for reviewer

I would suggest looking at the individual commits separately.

Testing

  • This PR has adequate test coverage / QA involvement has been duly considered.

Release notes

This PR includes no user-facing behavior changes.

@teskje teskje marked this pull request as ready for review June 24, 2022 15:17
Copy link
Contributor

@benesch benesch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I love these sorts of incremental steps.

teskje added 5 commits June 27, 2022 09:11
Following the example of `SourceInstanceDesc`, this commit adds a
type parameter for the storage metadata type to `SinkDesc`. Storage
metadata is only required for persist sink connections, where it
replaces the persist location fields.

As with `SourceInstanceDesc`, the coordinator only handles `SinkDesc`s
without storage metadata (i.e. `SinkDesc<()>`). The compute controller
is responsible for augmenting sink descriptions with their corresponding
storage metadata, converting them to `SinkDesc<CollectionMetadata>`,
which then are used by the compute instance to connect to the sink
collection.

Note that augmenting sink descriptions currently fails because the
storage controller does not yet know about the collections used by
persist sinks.
This commit removes the persist params (BLOB, CONSENSUS, SHARD) from the
`CREATE SOURCE ... INTO PERSIST` statement. Persist connection
information will be collected from the storage manager instead.
This commit makes the coordinator announce a source to the storage
controller when creating persist sinks. This makes the storage
controller create a storage collection which we can sink into.

It will also later give us a way to read sinked data back through the
source. This doesn't work currently because we don't add a catalog item
for the source.
Storage's persist_source expects data in persist shards to be of type
`(SourceData, ())`, so compute's persist_sink needs to write data of
that type too. Prior to this commit, persist_sinks wrote data of type
`(Row, Row)`.

We perform the translation by dropping the key and wrapping the value
into `SourceData`. This is correct because persist_sinks (like tail
sinks) don't use keys, so only the sinked values are non-empty.

Kafka sinks are the only sinks that use keys. Once they get removed from
compute (and moved to storage), we will be able to get rid of the
awkward translation step.
This commit makes the compute controller forward updates of sink storage
collection `upper`s to the storage controller. With this, the storage
controller is able to correctly advance the the collections' `since`
frontiers, unlocking compaction.

Eventually, we'll want the storage controller to keep track of storage
collection frontiers on its own, rendering this extra code in the
compute controller unnecessary.
Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, thanks!

@teskje teskje merged commit b47499a into MaterializeInc:main Jun 27, 2022
@teskje teskje deleted the internal-sinks branch June 27, 2022 08:21
Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll approve retroactively, since I didn't get to this in time. 👍

@teskje
Copy link
Contributor Author

teskje commented Jun 27, 2022

@aljoscha Thanks! Sorry, I was too eager with the merge trigger!

@aljoscha
Copy link
Contributor

Not at all, enough people had reviewed it by then! 👍

@teskje teskje restored the internal-sinks branch June 29, 2022 07:57
@teskje teskje deleted the internal-sinks branch June 29, 2022 08:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants