Skip to content

Commit 3537052

Browse files
authored
Add blob size limits. (#2705) (#2728)
* Add blob size limits. (#2705) * Add a blob size limit. * Add a bytecode size limit. * Add unit tests for limits. * Don't enforce the limit for already published bytecode. * Simplify LimitedWriter; add unit test. * Add decompressed_size_at_most. * Update and copy comment about #2710. * Don't cache blobs that are too large. (#2726) * Don't cache blobs that are too large. * Check size first, then decompress.
1 parent 522aa9a commit 3537052

File tree

8 files changed

+191
-21
lines changed

8 files changed

+191
-21
lines changed

linera-base/src/data_types.rs

+53-15
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::{
3737
ApplicationId, BlobId, BlobType, BytecodeId, Destination, GenericApplicationId, MessageId,
3838
UserApplicationId,
3939
},
40+
limited_writer::{LimitedWriter, LimitedWriterError},
4041
time::{Duration, SystemTime},
4142
};
4243

@@ -861,14 +862,8 @@ impl fmt::Debug for Bytecode {
861862
#[derive(Error, Debug)]
862863
pub enum DecompressionError {
863864
/// Compressed bytecode is invalid, and could not be decompressed.
864-
#[cfg(not(target_arch = "wasm32"))]
865-
#[error("Bytecode could not be decompressed")]
866-
InvalidCompressedBytecode(#[source] io::Error),
867-
868-
/// Compressed bytecode is invalid, and could not be decompressed.
869-
#[cfg(target_arch = "wasm32")]
870-
#[error("Bytecode could not be decompressed")]
871-
InvalidCompressedBytecode(#[from] ruzstd::frame_decoder::FrameDecoderError),
865+
#[error("Bytecode could not be decompressed: {0}")]
866+
InvalidCompressedBytecode(#[from] io::Error),
872867
}
873868

874869
/// A compressed WebAssembly module's bytecode.
@@ -880,20 +875,53 @@ pub struct CompressedBytecode {
880875
pub compressed_bytes: Vec<u8>,
881876
}
882877

878+
#[cfg(not(target_arch = "wasm32"))]
883879
impl CompressedBytecode {
880+
/// Returns `true` if the decompressed size does not exceed the limit.
881+
pub fn decompressed_size_at_most(&self, limit: u64) -> Result<bool, DecompressionError> {
882+
let mut decoder = zstd::stream::Decoder::new(&*self.compressed_bytes)?;
883+
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
884+
let mut writer = LimitedWriter::new(io::sink(), limit);
885+
match io::copy(&mut decoder, &mut writer) {
886+
Ok(_) => Ok(true),
887+
Err(error) => {
888+
error.downcast::<LimitedWriterError>()?;
889+
Ok(false)
890+
}
891+
}
892+
}
893+
884894
/// Decompresses a [`CompressedBytecode`] into a [`Bytecode`].
885-
#[cfg(not(target_arch = "wasm32"))]
886895
pub fn decompress(&self) -> Result<Bytecode, DecompressionError> {
887896
#[cfg(with_metrics)]
888897
let _decompression_latency = BYTECODE_DECOMPRESSION_LATENCY.measure_latency();
889-
let bytes = zstd::stream::decode_all(&*self.compressed_bytes)
890-
.map_err(DecompressionError::InvalidCompressedBytecode)?;
898+
let bytes = zstd::stream::decode_all(&*self.compressed_bytes)?;
891899

892900
Ok(Bytecode { bytes })
893901
}
902+
}
903+
904+
#[cfg(target_arch = "wasm32")]
905+
impl CompressedBytecode {
906+
/// Returns `true` if the decompressed size does not exceed the limit.
907+
pub fn decompressed_size_at_most(&self, limit: u64) -> Result<bool, DecompressionError> {
908+
let compressed_bytes = &*self.compressed_bytes;
909+
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
910+
let mut writer = LimitedWriter::new(io::sink(), limit);
911+
let mut decoder = ruzstd::streaming_decoder::StreamingDecoder::new(compressed_bytes)
912+
.map_err(io::Error::other)?;
913+
914+
// TODO(#2710): Decode multiple frames, if present
915+
match io::copy(&mut decoder, &mut writer) {
916+
Ok(_) => Ok(true),
917+
Err(error) => {
918+
error.downcast::<LimitedWriterError>()?;
919+
Ok(false)
920+
}
921+
}
922+
}
894923

895924
/// Decompresses a [`CompressedBytecode`] into a [`Bytecode`].
896-
#[cfg(target_arch = "wasm32")]
897925
pub fn decompress(&self) -> Result<Bytecode, DecompressionError> {
898926
use ruzstd::{io::Read, streaming_decoder::StreamingDecoder};
899927

@@ -902,10 +930,9 @@ impl CompressedBytecode {
902930

903931
let compressed_bytes = &*self.compressed_bytes;
904932
let mut bytes = Vec::new();
905-
let mut decoder = StreamingDecoder::new(compressed_bytes)?;
933+
let mut decoder = StreamingDecoder::new(compressed_bytes).map_err(io::Error::other)?;
906934

907-
// Decode multiple frames, if present
908-
// (https://github.com/KillingSpark/zstd-rs/issues/57)
935+
// TODO(#2710): Decode multiple frames, if present
909936
while !decoder.get_ref().is_empty() {
910937
decoder
911938
.read_to_end(&mut bytes)
@@ -1035,6 +1062,17 @@ impl BlobContent {
10351062
pub fn blob_bytes(&self) -> BlobBytes {
10361063
BlobBytes(self.inner_bytes())
10371064
}
1065+
1066+
/// Returns the size of the blob content in bytes.
1067+
pub fn size(&self) -> usize {
1068+
match self {
1069+
BlobContent::Data(bytes) => bytes.len(),
1070+
BlobContent::ContractBytecode(compressed_bytecode)
1071+
| BlobContent::ServiceBytecode(compressed_bytecode) => {
1072+
compressed_bytecode.compressed_bytes.len()
1073+
}
1074+
}
1075+
}
10381076
}
10391077

10401078
impl From<Blob> for BlobContent {

linera-base/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod crypto;
1818
pub mod data_types;
1919
mod graphql;
2020
pub mod identifiers;
21+
mod limited_writer;
2122
pub mod ownership;
2223
#[cfg(not(target_arch = "wasm32"))]
2324
pub mod port;

linera-base/src/limited_writer.rs

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) Zefchain Labs, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::io::{self, Write};
5+
6+
use thiserror::Error;
7+
8+
use crate::ensure;
9+
10+
#[derive(Error, Debug)]
11+
#[error("Writer limit exceeded")]
12+
pub struct LimitedWriterError;
13+
14+
/// Custom writer that enforces a byte limit.
15+
pub struct LimitedWriter<W: Write> {
16+
inner: W,
17+
limit: usize,
18+
written: usize,
19+
}
20+
21+
impl<W: Write> LimitedWriter<W> {
22+
pub fn new(inner: W, limit: usize) -> Self {
23+
Self {
24+
inner,
25+
limit,
26+
written: 0,
27+
}
28+
}
29+
}
30+
31+
impl<W: Write> Write for LimitedWriter<W> {
32+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
33+
// Calculate the number of bytes we can write without exceeding the limit.
34+
// Fail if the buffer doesn't fit.
35+
ensure!(
36+
self.limit
37+
.checked_sub(self.written)
38+
.is_some_and(|remaining| buf.len() <= remaining),
39+
io::Error::other(LimitedWriterError)
40+
);
41+
// Forward to the inner writer.
42+
let n = self.inner.write(buf)?;
43+
self.written += n;
44+
Ok(n)
45+
}
46+
47+
fn flush(&mut self) -> io::Result<()> {
48+
self.inner.flush()
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use super::*;
55+
56+
#[test]
57+
fn test_limited_writer() {
58+
let mut out_buffer = Vec::new();
59+
let mut writer = LimitedWriter::new(&mut out_buffer, 5);
60+
assert_eq!(writer.write(b"foo").unwrap(), 3);
61+
assert_eq!(writer.write(b"ba").unwrap(), 2);
62+
assert!(writer
63+
.write(b"r")
64+
.unwrap_err()
65+
.downcast::<LimitedWriterError>()
66+
.is_ok());
67+
}
68+
}

linera-core/src/chain_worker/state/temporary_changes.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
//! Operations that don't persist any changes to the chain state.
55
6-
use std::borrow::Cow;
6+
use std::{borrow::Cow, collections::BTreeSet};
77

88
use linera_base::{
9-
data_types::{ArithmeticError, Timestamp, UserApplicationDescription},
9+
data_types::{ArithmeticError, BlobContent, Timestamp, UserApplicationDescription},
1010
ensure,
1111
identifiers::{GenericApplicationId, UserApplicationId},
1212
};
@@ -28,6 +28,7 @@ use {
2828

2929
use super::{check_block_epoch, ChainWorkerState};
3030
use crate::{
31+
client::{MAXIMUM_BLOB_SIZE, MAXIMUM_BYTECODE_SIZE},
3132
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
3233
worker::WorkerError,
3334
};
@@ -207,12 +208,20 @@ where
207208
self.0.chain.remove_bundles_from_inboxes(block).await?;
208209
// Verify that all required bytecode hashed certificate values and blobs are available, and no
209210
// unrelated ones provided.
211+
let published_blob_ids = block.published_blob_ids();
210212
self.0
211-
.check_no_missing_blobs(block.published_blob_ids(), blobs)
213+
.check_no_missing_blobs(published_blob_ids.clone(), blobs)
212214
.await?;
213215
for blob in blobs {
216+
Self::check_blob_size(blob.content())?;
214217
self.0.cache_recent_blob(Cow::Borrowed(blob)).await;
215218
}
219+
let checked_blobs = blobs.iter().map(|blob| blob.id()).collect::<BTreeSet<_>>();
220+
for blob in self.0.get_blobs(published_blob_ids).await? {
221+
if !checked_blobs.contains(&blob.id()) {
222+
Self::check_blob_size(blob.content())?;
223+
}
224+
}
216225

217226
let local_time = self.0.storage.clock().current_time();
218227
ensure!(
@@ -333,6 +342,26 @@ where
333342
}
334343
Ok(ChainInfoResponse::new(info, self.0.config.key_pair()))
335344
}
345+
346+
fn check_blob_size(content: &BlobContent) -> Result<(), WorkerError> {
347+
ensure!(
348+
u64::try_from(content.size())
349+
.ok()
350+
.is_some_and(|size| size <= MAXIMUM_BLOB_SIZE),
351+
WorkerError::BlobTooLarge
352+
);
353+
match content {
354+
BlobContent::ContractBytecode(compressed_bytecode)
355+
| BlobContent::ServiceBytecode(compressed_bytecode) => {
356+
ensure!(
357+
compressed_bytecode.decompressed_size_at_most(MAXIMUM_BYTECODE_SIZE)?,
358+
WorkerError::BytecodeTooLarge
359+
);
360+
}
361+
BlobContent::Data(_) => {}
362+
}
363+
Ok(())
364+
}
336365
}
337366

338367
impl<StorageClient> Drop for ChainWorkerStateWithTemporaryChanges<'_, StorageClient>

linera-core/src/client/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ mod chain_state;
8383
#[path = "../unit_tests/client_tests.rs"]
8484
mod client_tests;
8585

86+
const MEBIBYTE: u64 = 1024 * 1024;
87+
88+
/// The maximum size of a data or bytecode blob, in bytes.
89+
pub(crate) const MAXIMUM_BLOB_SIZE: u64 = 3 * MEBIBYTE;
90+
/// The maximum size of decompressed bytecode, in bytes.
91+
pub(crate) const MAXIMUM_BYTECODE_SIZE: u64 = 30 * MEBIBYTE;
92+
8693
#[cfg(with_metrics)]
8794
mod metrics {
8895
use std::sync::LazyLock;

linera-core/src/unit_tests/client_tests.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::test_utils::ServiceStorageBuilder;
4141
use crate::{
4242
client::{
4343
BlanketMessagePolicy, ChainClient, ChainClientError, ClientOutcome, MessageAction,
44-
MessagePolicy,
44+
MessagePolicy, MAXIMUM_BLOB_SIZE,
4545
},
4646
local_node::LocalNodeError,
4747
node::{
@@ -2458,5 +2458,9 @@ where
24582458
assert_eq!(executed_block.block.incoming_bundles.len(), 1);
24592459
assert_eq!(executed_block.required_blob_ids().len(), 1);
24602460

2461+
let large_blob_bytes = vec![0; MAXIMUM_BLOB_SIZE as usize + 1];
2462+
let result = client1.publish_data_blob(large_blob_bytes).await;
2463+
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));
2464+
24612465
Ok(())
24622466
}

linera-core/src/unit_tests/wasm_client_tests.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ use crate::client::client_tests::RocksDbStorageBuilder;
4141
use crate::client::client_tests::ScyllaDbStorageBuilder;
4242
#[cfg(feature = "storage-service")]
4343
use crate::client::client_tests::ServiceStorageBuilder;
44-
use crate::client::client_tests::{MemoryStorageBuilder, StorageBuilder, TestBuilder};
44+
use crate::client::{
45+
client_tests::{MemoryStorageBuilder, StorageBuilder, TestBuilder},
46+
ChainClientError, MAXIMUM_BYTECODE_SIZE,
47+
};
4548

4649
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
4750
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
@@ -148,6 +151,18 @@ where
148151
let balance_after_init = creator.local_balance().await?;
149152
assert!(balance_after_init < balance_after_messaging);
150153

154+
let large_bytecode = Bytecode::new(vec![0; MAXIMUM_BYTECODE_SIZE as usize + 1]);
155+
let small_bytecode = Bytecode::new(vec![]);
156+
// Publishing bytecode that exceeds the limit fails.
157+
let result = publisher
158+
.publish_bytecode(large_bytecode.clone(), small_bytecode.clone())
159+
.await;
160+
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));
161+
let result = publisher
162+
.publish_bytecode(small_bytecode, large_bytecode)
163+
.await;
164+
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));
165+
151166
Ok(())
152167
}
153168

linera-core/src/worker.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use std::{
1414
use linera_base::crypto::PublicKey;
1515
use linera_base::{
1616
crypto::{CryptoHash, KeyPair},
17-
data_types::{ArithmeticError, Blob, BlockHeight, Round, UserApplicationDescription},
17+
data_types::{
18+
ArithmeticError, Blob, BlockHeight, DecompressionError, Round, UserApplicationDescription,
19+
},
1820
doc_scalar,
1921
identifiers::{BlobId, ChainId, Owner, UserApplicationId},
2022
time::timer::{sleep, timeout},
@@ -212,6 +214,12 @@ pub enum WorkerError {
212214
FullChainWorkerCache,
213215
#[error("Failed to join spawned worker task")]
214216
JoinError,
217+
#[error("Blob exceeds size limit")]
218+
BlobTooLarge,
219+
#[error("Bytecode exceeds size limit")]
220+
BytecodeTooLarge,
221+
#[error(transparent)]
222+
Decompression(#[from] DecompressionError),
215223
}
216224

217225
impl From<linera_chain::ChainError> for WorkerError {

0 commit comments

Comments
 (0)