Skip to content

Commit 60dc472

Browse files
authored
Add blob size limits. (#2705) (#2720)
* Add blob size limits. (#2705) * Add a blob size limit. * Add a bytecode size limit. * Add unit tests for limits. * Don't enforce the limit for already published bytecode. * Simplify LimitedWriter; add unit test. * Add decompressed_size_at_most. * Update and copy comment about #2710. * Extract MEBIBYTE constant.
1 parent f667fb4 commit 60dc472

File tree

9 files changed

+174
-36
lines changed

9 files changed

+174
-36
lines changed

linera-base/src/data_types.rs

+43-29
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{
2929
ApplicationId, BlobId, BlobType, BytecodeId, Destination, GenericApplicationId, MessageId,
3030
UserApplicationId,
3131
},
32+
limited_writer::{LimitedWriter, LimitedWriterError},
3233
time::{Duration, SystemTime},
3334
};
3435

@@ -842,14 +843,8 @@ impl fmt::Debug for Bytecode {
842843
#[derive(Error, Debug)]
843844
pub enum DecompressionError {
844845
/// Compressed bytecode is invalid, and could not be decompressed.
845-
#[cfg(not(target_arch = "wasm32"))]
846-
#[error("Bytecode could not be decompressed")]
847-
InvalidCompressedBytecode(#[source] io::Error),
848-
849-
/// Compressed bytecode is invalid, and could not be decompressed.
850-
#[cfg(target_arch = "wasm32")]
851-
#[error("Bytecode could not be decompressed")]
852-
InvalidCompressedBytecode(#[from] ruzstd::frame_decoder::FrameDecoderError),
846+
#[error("Bytecode could not be decompressed: {0}")]
847+
InvalidCompressedBytecode(#[from] io::Error),
853848
}
854849

855850
/// A compressed WebAssembly module's bytecode.
@@ -878,30 +873,57 @@ impl From<Bytecode> for CompressedBytecode {
878873
}
879874

880875
#[cfg(not(target_arch = "wasm32"))]
881-
impl TryFrom<&CompressedBytecode> for Bytecode {
882-
type Error = DecompressionError;
883-
884-
fn try_from(compressed_bytecode: &CompressedBytecode) -> Result<Self, Self::Error> {
885-
let bytes = zstd::stream::decode_all(&*compressed_bytecode.compressed_bytes)
886-
.map_err(DecompressionError::InvalidCompressedBytecode)?;
876+
impl CompressedBytecode {
877+
/// Returns `true` if the decompressed size does not exceed the limit.
878+
pub fn decompressed_size_at_most(&self, limit: u64) -> Result<bool, DecompressionError> {
879+
let mut decoder = zstd::stream::Decoder::new(&*self.compressed_bytes)?;
880+
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
881+
let mut writer = LimitedWriter::new(io::sink(), limit);
882+
match io::copy(&mut decoder, &mut writer) {
883+
Ok(_) => Ok(true),
884+
Err(error) => {
885+
error.downcast::<LimitedWriterError>()?;
886+
Ok(false)
887+
}
888+
}
889+
}
887890

891+
/// Decompresses a [`CompressedBytecode`] into a [`Bytecode`].
892+
pub fn decompress(&self) -> Result<Bytecode, DecompressionError> {
893+
let bytes = zstd::stream::decode_all(&*self.compressed_bytes)?;
888894
Ok(Bytecode { bytes })
889895
}
890896
}
891897

892898
#[cfg(target_arch = "wasm32")]
893-
impl TryFrom<&CompressedBytecode> for Bytecode {
894-
type Error = DecompressionError;
899+
impl CompressedBytecode {
900+
/// Returns `true` if the decompressed size does not exceed the limit.
901+
pub fn decompressed_size_at_most(&self, limit: u64) -> Result<bool, DecompressionError> {
902+
let compressed_bytes = &*self.compressed_bytes;
903+
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
904+
let mut writer = LimitedWriter::new(io::sink(), limit);
905+
let mut decoder = ruzstd::streaming_decoder::StreamingDecoder::new(compressed_bytes)
906+
.map_err(io::Error::other)?;
907+
908+
// TODO(#2710): Decode multiple frames, if present
909+
match io::copy(&mut decoder, &mut writer) {
910+
Ok(_) => Ok(true),
911+
Err(error) => {
912+
error.downcast::<LimitedWriterError>()?;
913+
Ok(false)
914+
}
915+
}
916+
}
895917

896-
fn try_from(compressed_bytecode: &CompressedBytecode) -> Result<Self, Self::Error> {
918+
/// Decompresses a [`CompressedBytecode`] into a [`Bytecode`].
919+
pub fn decompress(&self) -> Result<Bytecode, DecompressionError> {
897920
use ruzstd::{io::Read, streaming_decoder::StreamingDecoder};
898921

899-
let compressed_bytes = &*compressed_bytecode.compressed_bytes;
922+
let compressed_bytes = &*self.compressed_bytes;
900923
let mut bytes = Vec::new();
901-
let mut decoder = StreamingDecoder::new(compressed_bytes)?;
924+
let mut decoder = StreamingDecoder::new(compressed_bytes).map_err(io::Error::other)?;
902925

903-
// Decode multiple frames, if present
904-
// (https://github.com/KillingSpark/zstd-rs/issues/57)
926+
// TODO(#2710): Decode multiple frames, if present
905927
while !decoder.get_ref().is_empty() {
906928
decoder
907929
.read_to_end(&mut bytes)
@@ -912,14 +934,6 @@ impl TryFrom<&CompressedBytecode> for Bytecode {
912934
}
913935
}
914936

915-
impl TryFrom<CompressedBytecode> for Bytecode {
916-
type Error = DecompressionError;
917-
918-
fn try_from(compressed_bytecode: CompressedBytecode) -> Result<Self, Self::Error> {
919-
Bytecode::try_from(&compressed_bytecode)
920-
}
921-
}
922-
923937
impl fmt::Debug for CompressedBytecode {
924938
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
925939
f.debug_struct("CompressedBytecode").finish_non_exhaustive()

linera-base/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod crypto;
1818
pub mod data_types;
1919
mod graphql;
2020
pub mod identifiers;
21+
mod limited_writer;
2122
pub mod ownership;
2223
#[cfg(with_metrics)]
2324
pub mod prometheus_util;

linera-base/src/limited_writer.rs

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) Zefchain Labs, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::io::{self, Write};
5+
6+
use thiserror::Error;
7+
8+
use crate::ensure;
9+
10+
#[derive(Error, Debug)]
11+
#[error("Writer limit exceeded")]
12+
pub struct LimitedWriterError;
13+
14+
/// Custom writer that enforces a byte limit.
15+
pub struct LimitedWriter<W: Write> {
16+
inner: W,
17+
limit: usize,
18+
written: usize,
19+
}
20+
21+
impl<W: Write> LimitedWriter<W> {
22+
pub fn new(inner: W, limit: usize) -> Self {
23+
Self {
24+
inner,
25+
limit,
26+
written: 0,
27+
}
28+
}
29+
}
30+
31+
impl<W: Write> Write for LimitedWriter<W> {
32+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
33+
// Calculate the number of bytes we can write without exceeding the limit.
34+
// Fail if the buffer doesn't fit.
35+
ensure!(
36+
self.limit
37+
.checked_sub(self.written)
38+
.is_some_and(|remaining| buf.len() <= remaining),
39+
io::Error::other(LimitedWriterError)
40+
);
41+
// Forward to the inner writer.
42+
let n = self.inner.write(buf)?;
43+
self.written += n;
44+
Ok(n)
45+
}
46+
47+
fn flush(&mut self) -> io::Result<()> {
48+
self.inner.flush()
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use super::*;
55+
56+
#[test]
57+
fn test_limited_writer() {
58+
let mut out_buffer = Vec::new();
59+
let mut writer = LimitedWriter::new(&mut out_buffer, 5);
60+
assert_eq!(writer.write(b"foo").unwrap(), 3);
61+
assert_eq!(writer.write(b"ba").unwrap(), 2);
62+
assert!(writer
63+
.write(b"r")
64+
.unwrap_err()
65+
.downcast::<LimitedWriterError>()
66+
.is_ok());
67+
}
68+
}

linera-core/src/chain_worker/state/temporary_changes.rs

+21-2
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
use std::borrow::Cow;
77

88
use linera_base::{
9-
data_types::{ArithmeticError, Timestamp, UserApplicationDescription},
9+
data_types::{ArithmeticError, CompressedBytecode, Timestamp, UserApplicationDescription},
1010
ensure,
11-
identifiers::{GenericApplicationId, UserApplicationId},
11+
identifiers::{BlobType, GenericApplicationId, UserApplicationId},
1212
};
1313
use linera_chain::{
1414
data_types::{
@@ -28,6 +28,7 @@ use {
2828

2929
use super::{check_block_epoch, ChainWorkerState};
3030
use crate::{
31+
client::{MAXIMUM_BLOB_SIZE, MAXIMUM_BYTECODE_SIZE},
3132
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
3233
worker::WorkerError,
3334
};
@@ -213,6 +214,24 @@ where
213214
for blob in blobs {
214215
self.0.cache_recent_blob(Cow::Borrowed(blob)).await;
215216
}
217+
for blob in self.0.get_blobs(block.published_blob_ids()).await? {
218+
match blob.id().blob_type {
219+
BlobType::Data => {}
220+
BlobType::ContractBytecode | BlobType::ServiceBytecode => {
221+
ensure!(
222+
CompressedBytecode::from(blob.content().clone())
223+
.decompressed_size_at_most(MAXIMUM_BYTECODE_SIZE)?,
224+
WorkerError::BytecodeTooLarge
225+
);
226+
}
227+
}
228+
ensure!(
229+
u64::try_from(blob.content().bytes.len())
230+
.ok()
231+
.is_some_and(|size| size <= MAXIMUM_BLOB_SIZE),
232+
WorkerError::BlobTooLarge
233+
)
234+
}
216235

217236
let local_time = self.0.storage.clock().current_time();
218237
ensure!(

linera-core/src/client/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ mod chain_state;
8383
#[path = "../unit_tests/client_tests.rs"]
8484
mod client_tests;
8585

86+
const MEBIBYTE: u64 = 1024 * 1024;
87+
88+
/// The maximum size of a data or bytecode blob, in bytes.
89+
pub(crate) const MAXIMUM_BLOB_SIZE: u64 = 3 * MEBIBYTE;
90+
/// The maximum size of decompressed bytecode, in bytes.
91+
pub(crate) const MAXIMUM_BYTECODE_SIZE: u64 = 30 * MEBIBYTE;
92+
8693
#[cfg(with_metrics)]
8794
mod metrics {
8895
use std::sync::LazyLock;

linera-core/src/unit_tests/client_tests.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::test_utils::ServiceStorageBuilder;
4141
use crate::{
4242
client::{
4343
BlanketMessagePolicy, ChainClient, ChainClientError, ClientOutcome, MessageAction,
44-
MessagePolicy,
44+
MessagePolicy, MAXIMUM_BLOB_SIZE,
4545
},
4646
local_node::LocalNodeError,
4747
node::{
@@ -2560,5 +2560,11 @@ where
25602560
assert_eq!(executed_block.block.incoming_bundles.len(), 1);
25612561
assert_eq!(executed_block.required_blob_ids().len(), 1);
25622562

2563+
let large_blob_bytes = vec![0; MAXIMUM_BLOB_SIZE as usize + 1];
2564+
let result = client1
2565+
.publish_data_blob(BlobContent::new(large_blob_bytes))
2566+
.await;
2567+
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));
2568+
25632569
Ok(())
25642570
}

linera-core/src/unit_tests/wasm_client_tests.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ use crate::client::client_tests::RocksDbStorageBuilder;
4141
use crate::client::client_tests::ScyllaDbStorageBuilder;
4242
#[cfg(feature = "storage-service")]
4343
use crate::client::client_tests::ServiceStorageBuilder;
44-
use crate::client::client_tests::{MemoryStorageBuilder, StorageBuilder, TestBuilder};
44+
use crate::client::{
45+
client_tests::{MemoryStorageBuilder, StorageBuilder, TestBuilder},
46+
ChainClientError, MAXIMUM_BYTECODE_SIZE,
47+
};
4548

4649
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
4750
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
@@ -195,6 +198,18 @@ where
195198
let balance_after_init = creator.local_balance().await?;
196199
assert!(balance_after_init < balance_after_messaging);
197200

201+
let large_bytecode = Bytecode::new(vec![0; MAXIMUM_BYTECODE_SIZE as usize + 1]);
202+
let small_bytecode = Bytecode::new(vec![]);
203+
// Publishing bytecode that exceeds the limit fails.
204+
let result = publisher
205+
.publish_bytecode(large_bytecode.clone(), small_bytecode.clone())
206+
.await;
207+
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));
208+
let result = publisher
209+
.publish_bytecode(small_bytecode, large_bytecode)
210+
.await;
211+
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));
212+
198213
Ok(())
199214
}
200215

linera-core/src/worker.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use std::{
1414
use linera_base::crypto::PublicKey;
1515
use linera_base::{
1616
crypto::{CryptoHash, KeyPair},
17-
data_types::{ArithmeticError, Blob, BlockHeight, Round, UserApplicationDescription},
17+
data_types::{
18+
ArithmeticError, Blob, BlockHeight, DecompressionError, Round, UserApplicationDescription,
19+
},
1820
doc_scalar,
1921
identifiers::{BlobId, ChainId, Owner, UserApplicationId},
2022
};
@@ -215,6 +217,12 @@ pub enum WorkerError {
215217
FullChainWorkerCache,
216218
#[error("Failed to join spawned worker task")]
217219
JoinError,
220+
#[error("Blob exceeds size limit")]
221+
BlobTooLarge,
222+
#[error("Bytecode exceeds size limit")]
223+
BytecodeTooLarge,
224+
#[error(transparent)]
225+
Decompression(#[from] DecompressionError),
218226
}
219227

220228
impl From<linera_chain::ChainError> for WorkerError {

linera-storage/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ pub trait Storage: Sized {
296296
contract_blob
297297
.into_inner_contract_bytecode()
298298
.expect("Contract Bytecode Blob is of the wrong Blob type!")
299-
.try_into()?,
299+
.decompress()?,
300300
wasm_runtime,
301301
)
302302
.await?,
@@ -335,7 +335,7 @@ pub trait Storage: Sized {
335335
service_blob
336336
.into_inner_service_bytecode()
337337
.expect("Service Bytecode Blob is of the wrong Blob type!")
338-
.try_into()?,
338+
.decompress()?,
339339
wasm_runtime,
340340
)
341341
.await?,

0 commit comments

Comments
 (0)