Skip to content

Commit b297574

Browse files
authored
Instead of a key, use an incrementing index for events. (#3672)
## Motivation The system needs to enforce that events are never overwritten anyway. It makes them much harder to use for applications if the applications need to keep track of that. ## Proposal Replace the general `key: Vec<u8>` with `index: u32`. Instead of the application, the system now chooses the index, and always uses the next available number. ## Test Plan The tests have been updated, and already use system events. The user application events will be tested more thoroughly later, when we have an example (probably `social`) that uses them. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent cf2ecfb commit b297574

File tree

23 files changed

+95
-115
lines changed

23 files changed

+95
-115
lines changed

examples/meta-counter/src/contract.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,8 @@ impl Contract for MetaCounterContract {
4040
// Send a no-op message to ourselves. This is only for testing contracts that send messages
4141
// on initialization. Since the value is 0 it does not change the counter value.
4242
let this_chain = self.runtime.chain_id();
43-
self.runtime.emit(
44-
StreamName(b"announcements".to_vec()),
45-
b"updates",
46-
b"instantiated",
47-
);
43+
self.runtime
44+
.emit(StreamName(b"announcements".to_vec()), b"instantiated");
4845
self.runtime.send_message(this_chain, Message::Increment(0));
4946
}
5047

linera-base/src/data_types.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -1198,10 +1198,8 @@ impl BcsHashable<'_> for Blob {}
11981198
pub struct Event {
11991199
/// The ID of the stream this event belongs to.
12001200
pub stream_id: StreamId,
1201-
/// The event key.
1202-
#[debug(with = "hex_debug")]
1203-
#[serde(with = "serde_bytes")]
1204-
pub key: Vec<u8>,
1201+
/// The event index, i.e. the number of events in the stream before this one.
1202+
pub index: u32,
12051203
/// The payload data.
12061204
#[debug(with = "hex_debug")]
12071205
#[serde(with = "serde_bytes")]
@@ -1214,7 +1212,7 @@ impl Event {
12141212
EventId {
12151213
chain_id,
12161214
stream_id: self.stream_id.clone(),
1217-
key: self.key.clone(),
1215+
index: self.index,
12181216
}
12191217
}
12201218
}

linera-base/src/identifiers.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -523,8 +523,8 @@ pub struct EventId {
523523
pub chain_id: ChainId,
524524
/// The ID of the stream this event belongs to.
525525
pub stream_id: StreamId,
526-
/// The event key.
527-
pub key: Vec<u8>,
526+
/// The event index, i.e. the number of events in the stream before this one.
527+
pub index: u32,
528528
}
529529

530530
/// The destination of a message, relative to a particular application.

linera-core/src/chain_worker/state/temporary_changes.rs

+1-23
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
//! Operations that don't persist any changes to the chain state.
55
6-
use std::collections::HashMap;
7-
86
use linera_base::{
97
data_types::{ApplicationDescription, ArithmeticError, Blob, Timestamp},
108
ensure,
@@ -19,7 +17,7 @@ use linera_chain::{
1917
};
2018
use linera_execution::{Query, QueryOutcome};
2119
use linera_storage::{Clock as _, Storage};
22-
use linera_views::views::{View, ViewError};
20+
use linera_views::views::View;
2321
#[cfg(with_testing)]
2422
use {
2523
linera_base::{crypto::CryptoHash, data_types::BlockHeight},
@@ -233,26 +231,6 @@ where
233231
.await?
234232
};
235233

236-
// Verify that no event values are overwritten.
237-
let mut new_events = HashMap::new();
238-
for event in outcome.events.iter().flatten() {
239-
let event_id = event.id(chain.chain_id());
240-
if let Some(old_value) = new_events.insert(event_id.clone(), &event.value) {
241-
ensure!(
242-
*old_value == event.value,
243-
WorkerError::OverwritingEvent(Box::new(event_id))
244-
);
245-
}
246-
match self.0.storage.read_event(event_id.clone()).await {
247-
Ok(old_value) => ensure!(
248-
old_value == event.value,
249-
WorkerError::OverwritingEvent(Box::new(event_id))
250-
),
251-
Err(ViewError::EventsNotFound(_)) => {}
252-
Err(err) => return Err(err.into()),
253-
}
254-
}
255-
256234
let executed_block = outcome.with(block.clone());
257235
ensure!(
258236
!round.is_fast() || !executed_block.outcome.has_oracle_responses(),

linera-core/src/client/mod.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -3108,14 +3108,17 @@ where
31083108
(min_epoch, epoch.try_add_one()?)
31093109
};
31103110
let mut epoch_change_ops = Vec::new();
3111-
while self.has_admin_event(EPOCH_STREAM_NAME, next_epoch).await? {
3111+
while self
3112+
.has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
3113+
.await?
3114+
{
31123115
epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
31133116
next_epoch,
31143117
)));
31153118
next_epoch.try_add_assign_one()?;
31163119
}
31173120
while self
3118-
.has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch)
3121+
.has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
31193122
.await?
31203123
{
31213124
epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
@@ -3131,12 +3134,12 @@ where
31313134
async fn has_admin_event(
31323135
&self,
31333136
stream_name: &[u8],
3134-
key: impl Serialize,
3137+
index: u32,
31353138
) -> Result<bool, ChainClientError> {
31363139
let event_id = EventId {
31373140
chain_id: self.admin_id,
31383141
stream_id: StreamId::system(stream_name),
3139-
key: bcs::to_bytes(&key).unwrap(),
3142+
index,
31403143
};
31413144
match self.client.storage.read_event(event_id).await {
31423145
Ok(_) => Ok(true),

linera-core/src/unit_tests/wasm_client_tests.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ where
349349
application_id: application_id2.forget_abi().into(),
350350
stream_name: StreamName(b"announcements".to_vec()),
351351
},
352-
key: b"updates".to_vec(),
352+
index: 0,
353353
value: b"instantiated".to_vec(),
354354
}]]
355355
);

linera-core/src/unit_tests/worker_tests.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -2450,7 +2450,7 @@ where
24502450
let event_id = EventId {
24512451
chain_id: admin_id,
24522452
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
2453-
key: bcs::to_bytes(&Epoch::from(1)).unwrap(),
2453+
index: 1,
24542454
};
24552455
let committee_blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
24562456
// `PublishCommitteeBlob` is tested e.g. in `client_tests::test_change_voting_rights`, so we
@@ -2470,7 +2470,7 @@ where
24702470
events: vec![
24712471
vec![Event {
24722472
stream_id: event_id.stream_id.clone(),
2473-
key: event_id.key.clone(),
2473+
index: event_id.index,
24742474
value: bcs::to_bytes(&blob_hash).unwrap(),
24752475
}],
24762476
Vec::new(),
@@ -2560,7 +2560,7 @@ where
25602560
EventId {
25612561
chain_id: admin_id,
25622562
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
2563-
key: bcs::to_bytes(&Epoch::from(1)).unwrap(),
2563+
index: 1,
25642564
},
25652565
bcs::to_bytes(&blob_hash).unwrap(),
25662566
),
@@ -2698,7 +2698,7 @@ where
26982698
previous_message_blocks: BTreeMap::new(),
26992699
events: vec![vec![Event {
27002700
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
2701-
key: bcs::to_bytes(&Epoch::from(1)).unwrap(),
2701+
index: 1,
27022702
value: bcs::to_bytes(&committee_blob.id().hash).unwrap(),
27032703
}]],
27042704
blobs: vec![Vec::new()],
@@ -2837,13 +2837,13 @@ where
28372837
events: vec![
28382838
vec![Event {
28392839
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
2840-
key: bcs::to_bytes(&Epoch::from(1)).unwrap(),
2840+
index: 1,
28412841
value: bcs::to_bytes(&committee_blob.id().hash).unwrap(),
28422842
}],
28432843
vec![Event {
2844-
value: Vec::new(),
28452844
stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
2846-
key: bcs::to_bytes(&Epoch::from(0)).unwrap(),
2845+
index: 0,
2846+
value: Vec::new(),
28472847
}],
28482848
],
28492849
blobs: vec![Vec::new(); 2],

linera-core/src/worker.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use linera_base::{
1919
},
2020
doc_scalar,
2121
hashed::Hashed,
22-
identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId},
22+
identifiers::{AccountOwner, ApplicationId, BlobId, ChainId},
2323
time::timer::{sleep, timeout},
2424
};
2525
use linera_chain::{
@@ -210,8 +210,6 @@ pub enum WorkerError {
210210
BlobsNotFound(Vec<BlobId>),
211211
#[error("The block proposal is invalid: {0}")]
212212
InvalidBlockProposal(String),
213-
#[error("Trying to overwrite an event: {0:?}")]
214-
OverwritingEvent(Box<EventId>),
215213
#[error("The worker is too busy to handle new chains")]
216214
FullChainWorkerCache,
217215
#[error("Failed to join spawned worker task")]

linera-execution/src/execution.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ use std::{mem, vec};
66
use futures::{FutureExt, StreamExt};
77
use linera_base::{
88
data_types::{Amount, BlockHeight},
9-
identifiers::{Account, AccountOwner, BlobType, Destination},
9+
identifiers::{Account, AccountOwner, BlobType, Destination, StreamId},
1010
};
1111
use linera_views::{
1212
context::Context,
1313
key_value_store_view::KeyValueStoreView,
14+
map_view::MapView,
1415
reentrant_collection_view::HashedReentrantCollectionView,
1516
views::{ClonableView, View},
1617
};
@@ -41,6 +42,8 @@ pub struct ExecutionStateView<C> {
4142
pub system: SystemExecutionStateView<C>,
4243
/// User applications.
4344
pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
45+
/// The number of events in the streams that this chain is writing to.
46+
pub stream_event_counts: MapView<C, StreamId, u32>,
4447
}
4548

4649
/// How to interact with a long-lived service runtime.

linera-execution/src/execution_state_actor.rs

+23-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ use linera_base::prometheus_util::{
1515
exponential_bucket_latencies, register_histogram_vec, MeasureLatency as _,
1616
};
1717
use linera_base::{
18-
data_types::{Amount, ApplicationPermissions, BlobContent, BlockHeight, Timestamp},
18+
data_types::{
19+
Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
20+
},
1921
ensure, hex_debug, hex_vec_debug, http,
20-
identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, MessageId},
22+
identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, MessageId, StreamId},
2123
ownership::ChainOwnership,
2224
};
2325
use linera_views::{batch::Batch, context::Context, views::View};
@@ -433,6 +435,19 @@ where
433435
callback.respond(self.system.blob_used(None, blob_id).await?)
434436
}
435437

438+
NextEventIndex {
439+
stream_id,
440+
callback,
441+
} => {
442+
let count = self
443+
.stream_event_counts
444+
.get_mut_or_default(&stream_id)
445+
.await?;
446+
let index = *count;
447+
*count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
448+
callback.respond(index)
449+
}
450+
436451
GetApplicationPermissions { callback } => {
437452
let app_permissions = self.system.application_permissions.get();
438453
callback.respond(app_permissions.clone());
@@ -689,6 +704,12 @@ pub enum ExecutionRequest {
689704
callback: Sender<bool>,
690705
},
691706

707+
NextEventIndex {
708+
stream_id: StreamId,
709+
#[debug(skip)]
710+
callback: Sender<u32>,
711+
},
712+
692713
GetApplicationPermissions {
693714
#[debug(skip)]
694715
callback: Sender<ApplicationPermissions>,

linera-execution/src/lib.rs

+2-11
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ pub use crate::{
7777
transaction_tracker::{TransactionOutcome, TransactionTracker},
7878
};
7979

80-
/// The maximum length of an event key in bytes.
81-
const MAX_EVENT_KEY_LEN: usize = 64;
8280
/// The maximum length of a stream name.
8381
const MAX_STREAM_NAME_LEN: usize = 64;
8482

@@ -262,8 +260,6 @@ pub enum ExecutionError {
262260
local_time: Timestamp,
263261
},
264262

265-
#[error("Event keys can be at most {MAX_EVENT_KEY_LEN} bytes.")]
266-
EventKeyTooLong,
267263
#[error("Stream names can be at most {MAX_STREAM_NAME_LEN} bytes.")]
268264
StreamNameTooLong,
269265
#[error("Blob exceeds size limit")]
@@ -732,13 +728,8 @@ pub trait ContractRuntime: BaseRuntime {
732728
argument: Vec<u8>,
733729
) -> Result<Vec<u8>, ExecutionError>;
734730

735-
/// Adds a new item to an event stream.
736-
fn emit(
737-
&mut self,
738-
name: StreamName,
739-
key: Vec<u8>,
740-
value: Vec<u8>,
741-
) -> Result<(), ExecutionError>;
731+
/// Adds a new item to an event stream. Returns the new event's index in the stream.
732+
fn emit(&mut self, name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError>;
742733

743734
/// Queries a service.
744735
fn query_service(

linera-execution/src/revm.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -569,12 +569,9 @@ where
569569
let mut runtime = self.db.runtime.lock().expect("The lock should be possible");
570570
let stream_name = bcs::to_bytes("ethereum_event")?;
571571
let stream_name = StreamName(stream_name);
572-
for (log, index) in logs.iter().enumerate() {
573-
let mut key = bcs::to_bytes(&contract_address)?;
574-
bcs::serialize_into(&mut key, origin)?;
575-
bcs::serialize_into(&mut key, index)?;
576-
let value = bcs::to_bytes(&log)?;
577-
runtime.emit(stream_name.clone(), key, value)?;
572+
for log in &logs {
573+
let value = bcs::to_bytes(&(origin, contract_address, log))?;
574+
runtime.emit(stream_name.clone(), value)?;
578575
}
579576
}
580577
Ok(())

linera-execution/src/runtime.rs

+11-13
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636
FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation, OperationContext,
3737
OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, TransactionTracker,
3838
UserContractCode, UserContractInstance, UserServiceCode, UserServiceInstance,
39-
MAX_EVENT_KEY_LEN, MAX_STREAM_NAME_LEN,
39+
MAX_STREAM_NAME_LEN,
4040
};
4141

4242
#[cfg(test)]
@@ -1284,17 +1284,8 @@ impl ContractRuntime for ContractSyncRuntimeHandle {
12841284
Ok(value)
12851285
}
12861286

1287-
fn emit(
1288-
&mut self,
1289-
stream_name: StreamName,
1290-
key: Vec<u8>,
1291-
value: Vec<u8>,
1292-
) -> Result<(), ExecutionError> {
1287+
fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
12931288
let mut this = self.inner();
1294-
ensure!(
1295-
key.len() <= MAX_EVENT_KEY_LEN,
1296-
ExecutionError::EventKeyTooLong
1297-
);
12981289
ensure!(
12991290
stream_name.0.len() <= MAX_STREAM_NAME_LEN,
13001291
ExecutionError::StreamNameTooLong
@@ -1304,8 +1295,15 @@ impl ContractRuntime for ContractSyncRuntimeHandle {
13041295
stream_name,
13051296
application_id,
13061297
};
1307-
this.transaction_tracker.add_event(stream_id, key, value);
1308-
Ok(())
1298+
let index = this
1299+
.execution_state_sender
1300+
.send_request(|callback| ExecutionRequest::NextEventIndex {
1301+
stream_id: stream_id.clone(),
1302+
callback,
1303+
})?
1304+
.recv_response()?;
1305+
this.transaction_tracker.add_event(stream_id, index, value);
1306+
Ok(index)
13091307
}
13101308

13111309
fn query_service(

0 commit comments

Comments
 (0)