Skip to content

Commit b47499a

Browse files
authored
Merge pull request #13251 from teskje/internal-sinks
Internal sinks, part 1
2 parents b1ab66b + 92f52fe commit b47499a

File tree

17 files changed

+192
-140
lines changed

17 files changed

+192
-140
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/compute/src/render/sinks.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use differential_dataflow::operators::arrange::arrangement::ArrangeByKey;
1818
use differential_dataflow::{Collection, Hashable};
1919
use timely::dataflow::Scope;
2020

21-
use mz_dataflow_types::sinks::*;
21+
use mz_dataflow_types::client::controller::storage::CollectionMetadata;
22+
use mz_dataflow_types::sinks::{SinkConnection, SinkDesc, SinkEnvelope};
2223
use mz_expr::{permutation_for_arrangement, MapFilterProject};
2324
use mz_interchange::envelopes::{combine_at_timestamp, dbz_format, upsert_format};
2425
use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
@@ -36,7 +37,7 @@ where
3637
tokens: &mut std::collections::BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
3738
import_ids: BTreeSet<GlobalId>,
3839
sink_id: GlobalId,
39-
sink: &SinkDesc,
40+
sink: &SinkDesc<CollectionMetadata>,
4041
) {
4142
let sink_render = get_sink_render_for(&sink.connection);
4243

@@ -92,7 +93,7 @@ where
9293

9394
#[allow(clippy::borrowed_box)]
9495
fn apply_sink_envelope<G>(
95-
sink: &SinkDesc,
96+
sink: &SinkDesc<CollectionMetadata>,
9697
sink_render: &Box<dyn SinkRender<G>>,
9798
collection: Collection<G, Row, Diff>,
9899
) -> Collection<G, (Option<Row>, Option<Row>), Diff>
@@ -215,15 +216,15 @@ where
215216
fn render_continuous_sink(
216217
&self,
217218
compute_state: &mut crate::compute_state::ComputeState,
218-
sink: &SinkDesc,
219+
sink: &SinkDesc<CollectionMetadata>,
219220
sink_id: GlobalId,
220221
sinked_collection: Collection<G, (Option<Row>, Option<Row>), Diff>,
221222
) -> Option<Rc<dyn Any>>
222223
where
223224
G: Scope<Timestamp = Timestamp>;
224225
}
225226

226-
fn get_sink_render_for<G>(connection: &SinkConnection) -> Box<dyn SinkRender<G>>
227+
fn get_sink_render_for<G>(connection: &SinkConnection<CollectionMetadata>) -> Box<dyn SinkRender<G>>
227228
where
228229
G: Scope<Timestamp = Timestamp>,
229230
{

src/compute/src/sink/kafka.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use differential_dataflow::{AsCollection, Collection, Hashable};
2222
use futures::executor::block_on;
2323
use futures::{StreamExt, TryFutureExt};
2424
use itertools::Itertools;
25-
use mz_interchange::json::JsonEncoder;
2625
use prometheus::core::AtomicU64;
2726
use rdkafka::client::ClientContext;
2827
use rdkafka::config::ClientConfig;
@@ -45,6 +44,7 @@ use tokio::sync::Mutex;
4544
use tracing::{debug, error, info};
4645

4746
use mz_avro::types::Value;
47+
use mz_dataflow_types::client::controller::storage::CollectionMetadata;
4848
use mz_dataflow_types::connections::ConnectionContext;
4949
use mz_dataflow_types::sinks::{
5050
KafkaSinkConnection, KafkaSinkConsistencyConnection, PublishedSchemaInfo, SinkAsOf, SinkDesc,
@@ -54,6 +54,7 @@ use mz_interchange::avro::{
5454
self, get_debezium_transaction_schema, AvroEncoder, AvroSchemaGenerator,
5555
};
5656
use mz_interchange::encode::Encode;
57+
use mz_interchange::json::JsonEncoder;
5758
use mz_kafka_util::client::{create_new_client_config, MzClientContext};
5859
use mz_ore::cast::CastFrom;
5960
use mz_ore::collections::CollectionExt;
@@ -88,7 +89,7 @@ where
8889
fn render_continuous_sink(
8990
&self,
9091
compute_state: &mut crate::compute_state::ComputeState,
91-
sink: &SinkDesc,
92+
sink: &SinkDesc<CollectionMetadata>,
9293
sink_id: GlobalId,
9394
sinked_collection: Collection<G, (Option<Row>, Option<Row>), Diff>,
9495
) -> Option<Rc<dyn Any>>

src/compute/src/sink/persist_sink.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ use timely::progress::Antichain;
2121
use timely::progress::Timestamp as TimelyTimestamp;
2222
use timely::PartialOrder;
2323

24+
use mz_dataflow_types::client::controller::storage::CollectionMetadata;
2425
use mz_dataflow_types::sinks::{PersistSinkConnection, SinkDesc};
25-
use mz_persist_client::PersistLocation;
26+
use mz_dataflow_types::sources::SourceData;
2627
use mz_repr::{Diff, GlobalId, Row, Timestamp};
2728
use mz_timely_util::operators_async_ext::OperatorBuilderExt;
2829

2930
use crate::render::sinks::SinkRender;
3031

31-
impl<G> SinkRender<G> for PersistSinkConnection
32+
impl<G> SinkRender<G> for PersistSinkConnection<CollectionMetadata>
3233
where
3334
G: Scope<Timestamp = Timestamp>,
3435
{
@@ -47,15 +48,24 @@ where
4748
fn render_continuous_sink(
4849
&self,
4950
compute_state: &mut crate::compute_state::ComputeState,
50-
_sink: &SinkDesc,
51+
_sink: &SinkDesc<CollectionMetadata>,
5152
sink_id: GlobalId,
5253
sinked_collection: Collection<G, (Option<Row>, Option<Row>), Diff>,
5354
) -> Option<Rc<dyn Any>>
5455
where
5556
G: Scope<Timestamp = Timestamp>,
5657
{
5758
let scope = sinked_collection.scope();
58-
let operator_name = format!("persist_sink({})", self.shard_id);
59+
60+
let persist_clients = Arc::clone(&compute_state.persist_clients);
61+
let persist_location = self.storage_metadata.persist_location.clone();
62+
let shard_id = self.storage_metadata.persist_shard;
63+
64+
// Log the shard ID so we know which shard to read for testing.
65+
// TODO(teskje): Remove once we have a built-in way for reading back sinked data.
66+
tracing::info!("persist_sink shard ID: {shard_id}");
67+
68+
let operator_name = format!("persist_sink({})", shard_id);
5969
let mut persist_op = OperatorBuilder::new(operator_name, scope.clone());
6070

6171
// We want exactly one worker (in the cluster) to send all the data to persist. It's fine
@@ -72,13 +82,6 @@ where
7282
let mut input =
7383
persist_op.new_input(&sinked_collection.inner, Exchange::new(move |_| hashed_id));
7484

75-
let persist_clients = Arc::clone(&compute_state.persist_clients);
76-
let persist_location = PersistLocation {
77-
consensus_uri: self.consensus_uri.clone(),
78-
blob_uri: self.blob_uri.clone(),
79-
};
80-
let shard_id = self.shard_id.clone();
81-
8285
let token = Rc::new(());
8386
let token_weak = Rc::downgrade(&token);
8487

@@ -113,7 +116,7 @@ where
113116
.open(persist_location)
114117
.await
115118
.expect("could not open persist client")
116-
.open_writer::<Row, Row, Timestamp, Diff>(shard_id)
119+
.open_writer::<SourceData, (), Timestamp, Diff>(shard_id)
117120
.await
118121
.expect("could not open persist shard");
119122

@@ -131,9 +134,13 @@ where
131134
data.swap(&mut buffer);
132135

133136
for ((key, value), ts, diff) in buffer.drain(..) {
134-
let key = key.unwrap_or_default();
135-
let value = value.unwrap_or_default();
136-
stash.entry(ts).or_default().push(((key, value), ts, diff));
137+
assert!(key.is_none(), "persist_source does not support keys");
138+
let row = value.expect("persist_source must have values");
139+
stash.entry(ts).or_default().push((
140+
(SourceData(Ok(row)), ()),
141+
ts,
142+
diff,
143+
));
137144
}
138145
});
139146

src/compute/src/sink/tail.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use timely::progress::timestamp::Timestamp as TimelyTimestamp;
2121
use timely::progress::Antichain;
2222

2323
use mz_dataflow_types::{
24+
client::controller::storage::CollectionMetadata,
2425
sinks::{SinkAsOf, SinkDesc, TailSinkConnection},
2526
TailResponse,
2627
};
@@ -47,7 +48,7 @@ where
4748
fn render_continuous_sink(
4849
&self,
4950
compute_state: &mut crate::compute_state::ComputeState,
50-
sink: &SinkDesc,
51+
sink: &SinkDesc<CollectionMetadata>,
5152
sink_id: GlobalId,
5253
sinked_collection: Collection<G, (Option<Row>, Option<Row>), Diff>,
5354
) -> Option<Rc<dyn Any>>

src/coord/src/catalog.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use mz_dataflow_types::client::{
3030
ComputeInstanceId, ConcreteComputeInstanceReplicaConfig, ProcessId, ReplicaId,
3131
};
3232
use mz_dataflow_types::logging::LoggingConfig as DataflowLoggingConfig;
33-
use mz_dataflow_types::sinks::{SinkConnection, SinkConnectionBuilder, SinkEnvelope};
33+
use mz_dataflow_types::sinks::{
34+
PersistSinkConnection, PersistSinkConnectionBuilder, SinkConnection, SinkConnectionBuilder,
35+
SinkEnvelope,
36+
};
3437
use mz_dataflow_types::sources::{ExternalSourceConnection, SourceConnection, Timeline};
3538
use mz_expr::{ExprHumanizer, MirScalarExpr, OptimizedMirRelationExpr};
3639
use mz_ore::collections::CollectionExt;
@@ -174,6 +177,26 @@ impl CatalogState {
174177
desc: source.desc.clone(),
175178
})
176179
}
180+
// TODO(teskje): Replace once `CatalogItem::RecordedView` lands.
181+
CatalogItem::Sink(Sink {
182+
connection:
183+
SinkConnectionState::Ready(SinkConnection::Persist(PersistSinkConnection {
184+
value_desc,
185+
..
186+
}))
187+
| SinkConnectionState::Pending(SinkConnectionBuilder::Persist(
188+
PersistSinkConnectionBuilder { value_desc },
189+
)),
190+
..
191+
}) => {
192+
let connection = SourceConnection::Local {
193+
timeline: Timeline::EpochMilliseconds,
194+
};
195+
Some(mz_dataflow_types::sources::SourceDesc {
196+
connection,
197+
desc: value_desc.clone(),
198+
})
199+
}
177200
_ => None,
178201
}
179202
}

src/coord/src/coord.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,29 @@ impl<S: Append + 'static> Coordinator<S> {
794794
}
795795
CatalogItem::View(_) => (),
796796
CatalogItem::Sink(sink) => {
797+
// Re-announce the source description.
798+
// TODO(teskje): Remove this once persist sinks are replaced by recorded views.
799+
if let Some(desc) = self.catalog.state().source_description_for(entry.id()) {
800+
let ingestion = IngestionDescription {
801+
id: entry.id(),
802+
desc,
803+
since: Antichain::from_elem(Timestamp::minimum()),
804+
source_imports: BTreeMap::new(),
805+
storage_metadata: (),
806+
};
807+
self.dataflow_client
808+
.storage_mut()
809+
.create_sources(vec![ingestion])
810+
.await
811+
.unwrap();
812+
self.initialize_storage_read_policies(
813+
vec![entry.id()],
814+
self.logical_compaction_window_ms,
815+
)
816+
.await;
817+
}
818+
819+
// Re-create the sink on the compute instance.
797820
let builder = match &sink.connection {
798821
SinkConnectionState::Pending(builder) => builder,
799822
SinkConnectionState::Ready(_) => {
@@ -2904,7 +2927,29 @@ impl<S: Append + 'static> Coordinator<S> {
29042927
})
29052928
.await;
29062929
match transact_result {
2907-
Ok(()) => (),
2930+
Ok(()) => {
2931+
// Announce the creation of the sink's corresponding source.
2932+
// TODO(teskje): Remove this once persist sinks are replaced by recorded views.
2933+
if let Some(desc) = self.catalog.state().source_description_for(id) {
2934+
let ingestion = IngestionDescription {
2935+
id,
2936+
desc,
2937+
since: Antichain::from_elem(Timestamp::minimum()),
2938+
source_imports: BTreeMap::new(),
2939+
storage_metadata: (),
2940+
};
2941+
self.dataflow_client
2942+
.storage_mut()
2943+
.create_sources(vec![ingestion])
2944+
.await
2945+
.unwrap();
2946+
self.initialize_storage_read_policies(
2947+
vec![id],
2948+
self.logical_compaction_window_ms,
2949+
)
2950+
.await;
2951+
}
2952+
}
29082953
Err(CoordError::Catalog(catalog::Error {
29092954
kind: catalog::ErrorKind::ItemAlreadyExists(_),
29102955
..
@@ -5077,6 +5122,17 @@ impl<S: Append + 'static> Coordinator<S> {
50775122
.entry(compute_instance)
50785123
.or_insert(vec![])
50795124
.push(id);
5125+
5126+
// Persist sinks write to storage collections, which need to be
5127+
// dropped when their sinks are dropped.
5128+
// TODO(teskje): Remove this once persist sinks are replaced by recorded views.
5129+
if self.dataflow_client.storage_mut().collection(id).is_ok() {
5130+
self.dataflow_client
5131+
.storage_mut()
5132+
.drop_sources(vec![id])
5133+
.await
5134+
.unwrap();
5135+
}
50805136
}
50815137
for (compute_instance, ids) in by_compute_instance {
50825138
// A cluster could have been dropped, so verify it exists.

src/coord/src/sink_connection.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,9 +342,7 @@ fn build_persist_sink(
342342
_id: GlobalId,
343343
) -> Result<SinkConnection, CoordError> {
344344
Ok(SinkConnection::Persist(PersistSinkConnection {
345-
consensus_uri: builder.consensus_uri,
346-
blob_uri: builder.blob_uri,
347-
shard_id: builder.shard_id,
348345
value_desc: builder.value_desc,
346+
storage_metadata: (),
349347
}))
350348
}

src/dataflow-types/src/client/controller/compute.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::client::replicated::ActiveReplication;
3737
use crate::client::{ComputeClient, ComputeCommand, ComputeInstanceId, InstanceConfig, ReplicaId};
3838
use crate::client::{GenericClient, Peek};
3939
use crate::logging::LoggingConfig;
40+
use crate::sinks::{PersistSinkConnection, SinkConnection, SinkDesc};
4041
use crate::{DataflowDescription, SourceInstanceDesc};
4142
use mz_expr::RowSetFinishing;
4243
use mz_ore::tracing::OpenTelemetryContext;
@@ -338,8 +339,8 @@ where
338339
}
339340
}
340341

341-
// Here we augment all the imported sources with the appropriate storage metadata needed by
342-
// the compute instance to read them
342+
// Here we augment all imported sources and all exported sinks with with the appropriate
343+
// storage metadata needed by the compute instance.
343344
let mut augmented_dataflows = Vec::with_capacity(dataflows.len());
344345
for d in dataflows {
345346
let mut source_imports = BTreeMap::new();
@@ -353,13 +354,37 @@ where
353354
source_imports.insert(id, desc);
354355
}
355356

357+
let mut sink_exports = BTreeMap::new();
358+
for (id, se) in d.sink_exports {
359+
let connection = match se.connection {
360+
SinkConnection::Persist(conn) => {
361+
let metadata = self.storage_controller.collection_metadata(id)?;
362+
let conn = PersistSinkConnection {
363+
value_desc: conn.value_desc,
364+
storage_metadata: metadata,
365+
};
366+
SinkConnection::Persist(conn)
367+
}
368+
SinkConnection::Kafka(conn) => SinkConnection::Kafka(conn),
369+
SinkConnection::Tail(conn) => SinkConnection::Tail(conn),
370+
};
371+
let desc = SinkDesc {
372+
from: se.from,
373+
from_desc: se.from_desc,
374+
connection,
375+
envelope: se.envelope,
376+
as_of: se.as_of,
377+
};
378+
sink_exports.insert(id, desc);
379+
}
380+
356381
augmented_dataflows.push(DataflowDescription {
357382
source_imports,
383+
sink_exports,
358384
// The rest of the fields are identical
359385
index_imports: d.index_imports,
360386
objects_to_build: d.objects_to_build,
361387
index_exports: d.index_exports,
362-
sink_exports: d.sink_exports,
363388
as_of: d.as_of,
364389
debug_name: d.debug_name,
365390
id: d.id,
@@ -587,6 +612,21 @@ where
587612
self.update_read_capabilities(&mut read_capability_changes)
588613
.await?;
589614
}
615+
616+
// Tell the storage controller about new write frontiers for storage
617+
// collections that are advanced by compute sinks.
618+
// TODO(teskje): The storage controller should have a task to directly
619+
// keep track of the frontiers of storage collections, instead of
620+
// relying on others for that information.
621+
let storage_updates: Vec<_> = updates
622+
.iter()
623+
.filter(|(id, _)| self.storage_mut().collection(*id).is_ok())
624+
.cloned()
625+
.collect();
626+
self.storage_mut()
627+
.update_write_frontiers(&storage_updates)
628+
.await?;
629+
590630
Ok(())
591631
}
592632

0 commit comments

Comments
 (0)