Skip to content

Commit b61bd94

Browse files
authored
Support for non-multipart put uploads (#20)
* Support for non-multipart put uploads * rename put_file to put * Add put test
1 parent 2c59b64 commit b61bd94

File tree

12 files changed

+196
-75
lines changed

12 files changed

+196
-75
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import object_store_rs as obs
5252

5353
store = obs.store.MemoryStore()
5454

55-
obs.put_file(store, "file.txt", b"hello world!")
55+
obs.put(store, "file.txt", b"hello world!")
5656
response = obs.get(store, "file.txt")
5757
response.meta
5858
# {'size': 12,
@@ -77,7 +77,7 @@ import object_store_rs as obs
7777

7878
store = obs.store.MemoryStore()
7979

80-
await obs.put_file_async(store, "file.txt", b"hello world!")
80+
await obs.put_async(store, "file.txt", b"hello world!")
8181
response = await obs.get_async(store, "file.txt")
8282
response.meta
8383
# {

docs/api/put.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
# Put
22

3-
::: object_store_rs.put_file
4-
::: object_store_rs.put_file_async
3+
::: object_store_rs.put
4+
::: object_store_rs.put_async

object-store-rs/python/object_store_rs/_object_store_rs.pyi

+2-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from ._copy import copy as copy
22
from ._copy import copy_async as copy_async
3-
from ._copy import copy_if_not_exists as copy_if_not_exists
4-
from ._copy import copy_if_not_exists_async as copy_if_not_exists_async
53
from ._delete import delete as delete
64
from ._delete import delete_async as delete_async
75
from ._get import GetOptions as GetOptions
@@ -20,12 +18,10 @@ from ._list import list as list
2018
from ._list import list_async as list_async
2119
from ._list import list_with_delimiter as list_with_delimiter
2220
from ._list import list_with_delimiter_async as list_with_delimiter_async
23-
from ._put import put_file as put_file
24-
from ._put import put_file_async as put_file_async
21+
from ._put import put as put
22+
from ._put import put_async as put_async
2523
from ._rename import rename as rename
2624
from ._rename import rename_async as rename_async
27-
from ._rename import rename_if_not_exists as rename_if_not_exists
28-
from ._rename import rename_if_not_exists_async as rename_if_not_exists_async
2925
from ._sign import HTTP_METHOD as HTTP_METHOD
3026
from ._sign import SignCapableStore as SignCapableStore
3127
from ._sign import sign_url as sign_url
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,60 @@
11
from pathlib import Path
2-
from typing import IO
2+
from typing import IO, TypedDict
33

44
from .store import ObjectStore
55

6-
def put_file(
6+
class PutResult(TypedDict):
7+
"""
8+
Result for a put request.
9+
"""
10+
11+
e_tag: str | None
12+
"""
13+
The unique identifier for the newly created object
14+
15+
<https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
16+
"""
17+
18+
version: str | None
19+
"""A version indicator for the newly created object."""
20+
21+
def put(
722
store: ObjectStore,
823
location: str,
924
file: IO[bytes] | Path | bytes,
1025
*,
11-
chunk_size: int = 5 * 1024,
26+
use_multipart: bool | None = None,
27+
chunk_size: int = 5 * 1024 * 1024,
1228
max_concurrency: int = 12,
13-
) -> None:
29+
) -> PutResult:
1430
"""Save the provided bytes to the specified location
1531
1632
The operation is guaranteed to be atomic, it will either successfully write the
1733
entirety of `file` to `location`, or fail. No clients should be able to observe a
1834
partially written object.
1935
20-
This will use a multipart upload under the hood.
21-
2236
Args:
2337
store: The ObjectStore instance to use.
2438
location: The path within ObjectStore for where to save the file.
2539
file: The object to upload. Can either be file-like, a `Path` to a local file,
2640
or a `bytes` object.
41+
42+
Keyword args:
43+
use_multipart: Whether to use a multipart upload under the hood. Defaults using a multipart upload if the length of the file is greater than `chunk_size`.
2744
chunk_size: The size of chunks to use within each part of the multipart upload. Defaults to 5 MB.
2845
max_concurrency: The maximum number of chunks to upload concurrently. Defaults to 12.
2946
"""
3047

31-
async def put_file_async(
48+
async def put_async(
3249
store: ObjectStore,
3350
location: str,
3451
file: IO[bytes] | Path | bytes,
3552
*,
36-
chunk_size: int = 5 * 1024,
53+
use_multipart: bool | None = None,
54+
chunk_size: int = 5 * 1024 * 1024,
3755
max_concurrency: int = 12,
38-
) -> None:
39-
"""Call `put_file` asynchronously.
56+
) -> PutResult:
57+
"""Call `put` asynchronously.
4058
41-
Refer to the documentation for [put_file][object_store_rs.put_file].
59+
Refer to the documentation for [put][object_store_rs.put].
4260
"""

object-store-rs/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ fn _object_store_rs(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
4040
m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter_async))?;
4141
m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter))?;
4242
m.add_wrapped(wrap_pyfunction!(list::list))?;
43-
m.add_wrapped(wrap_pyfunction!(put::put_file_async))?;
44-
m.add_wrapped(wrap_pyfunction!(put::put_file))?;
43+
m.add_wrapped(wrap_pyfunction!(put::put_async))?;
44+
m.add_wrapped(wrap_pyfunction!(put::put))?;
4545
m.add_wrapped(wrap_pyfunction!(rename::rename_async))?;
4646
m.add_wrapped(wrap_pyfunction!(rename::rename))?;
4747
m.add_wrapped(wrap_pyfunction!(signer::sign_url_async))?;

object-store-rs/src/list.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl PyObjectMeta {
2121

2222
impl IntoPy<PyObject> for PyObjectMeta {
2323
fn into_py(self, py: Python<'_>) -> PyObject {
24-
let mut dict = HashMap::new();
24+
let mut dict = HashMap::with_capacity(5);
2525
dict.insert("location", self.0.location.as_ref().into_py(py));
2626
dict.insert("last_modified", self.0.last_modified.into_py(py));
2727
dict.insert("size", self.0.size.into_py(py));
@@ -35,7 +35,7 @@ pub(crate) struct PyListResult(ListResult);
3535

3636
impl IntoPy<PyObject> for PyListResult {
3737
fn into_py(self, py: Python<'_>) -> PyObject {
38-
let mut dict = HashMap::new();
38+
let mut dict = HashMap::with_capacity(2);
3939
dict.insert(
4040
"common_prefixes",
4141
self.0

object-store-rs/src/put.rs

+97-31
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
use std::collections::HashMap;
12
use std::fs::File;
2-
use std::io::{BufReader, Cursor, Read};
3+
use std::io::{BufReader, Cursor, Read, Seek, SeekFrom};
34
use std::path::PathBuf;
45
use std::sync::Arc;
56

67
use object_store::path::Path;
7-
use object_store::{ObjectStore, WriteMultipart};
8-
use pyo3::exceptions::PyIOError;
8+
use object_store::{ObjectStore, PutPayload, PutResult, WriteMultipart};
99
use pyo3::prelude::*;
1010
use pyo3::pybacked::PyBackedBytes;
1111
use pyo3_file::PyFileLikeObject;
@@ -22,6 +22,21 @@ pub(crate) enum MultipartPutInput {
2222
Buffer(Cursor<PyBackedBytes>),
2323
}
2424

25+
impl MultipartPutInput {
26+
/// Number of bytes in the file-like object
27+
fn nbytes(&mut self) -> PyObjectStoreResult<usize> {
28+
let origin_pos = self.stream_position()?;
29+
let size = self.seek(SeekFrom::End(0))?;
30+
self.seek(SeekFrom::Start(origin_pos))?;
31+
Ok(size.try_into().unwrap())
32+
}
33+
34+
/// Whether to use multipart uploads.
35+
fn use_multipart(&mut self, chunk_size: usize) -> PyObjectStoreResult<bool> {
36+
Ok(self.nbytes()? > chunk_size)
37+
}
38+
}
39+
2540
impl<'py> FromPyObject<'py> for MultipartPutInput {
2641
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
2742
let py = ob.py();
@@ -51,69 +66,120 @@ impl Read for MultipartPutInput {
5166
}
5267
}
5368

69+
impl Seek for MultipartPutInput {
70+
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
71+
match self {
72+
Self::File(f) => f.seek(pos),
73+
Self::FileLike(f) => f.seek(pos),
74+
Self::Buffer(f) => f.seek(pos),
75+
}
76+
}
77+
}
78+
79+
pub(crate) struct PyPutResult(PutResult);
80+
81+
impl IntoPy<PyObject> for PyPutResult {
82+
fn into_py(self, py: Python<'_>) -> PyObject {
83+
let mut dict = HashMap::with_capacity(2);
84+
dict.insert("e_tag", self.0.e_tag.into_py(py));
85+
dict.insert("version", self.0.version.into_py(py));
86+
dict.into_py(py)
87+
}
88+
}
89+
5490
#[pyfunction]
55-
#[pyo3(signature = (store, location, file, *, chunk_size = 5120, max_concurrency = 12))]
56-
pub(crate) fn put_file(
91+
#[pyo3(signature = (store, location, file, *, use_multipart = None, chunk_size = 5242880, max_concurrency = 12))]
92+
pub(crate) fn put(
5793
py: Python,
5894
store: PyObjectStore,
5995
location: String,
60-
file: MultipartPutInput,
96+
mut file: MultipartPutInput,
97+
use_multipart: Option<bool>,
6198
chunk_size: usize,
6299
max_concurrency: usize,
63-
) -> PyObjectStoreResult<()> {
100+
) -> PyObjectStoreResult<PyPutResult> {
101+
let use_multipart = if let Some(use_multipart) = use_multipart {
102+
use_multipart
103+
} else {
104+
file.use_multipart(chunk_size)?
105+
};
64106
let runtime = get_runtime(py)?;
65-
runtime.block_on(put_multipart_inner(
66-
store.into_inner(),
67-
&location.into(),
68-
file,
69-
chunk_size,
70-
max_concurrency,
71-
))
107+
if use_multipart {
108+
runtime.block_on(put_multipart_inner(
109+
store.into_inner(),
110+
&location.into(),
111+
file,
112+
chunk_size,
113+
max_concurrency,
114+
))
115+
} else {
116+
runtime.block_on(put_inner(store.into_inner(), &location.into(), file))
117+
}
72118
}
73119

74120
#[pyfunction]
75-
#[pyo3(signature = (store, location, file, *, chunk_size = 5120, max_concurrency = 12))]
76-
pub(crate) fn put_file_async(
121+
#[pyo3(signature = (store, location, file, *, use_multipart = None, chunk_size = 5242880, max_concurrency = 12))]
122+
pub(crate) fn put_async(
77123
py: Python,
78124
store: PyObjectStore,
79125
location: String,
80-
file: MultipartPutInput,
126+
mut file: MultipartPutInput,
127+
use_multipart: Option<bool>,
81128
chunk_size: usize,
82129
max_concurrency: usize,
83130
) -> PyResult<Bound<PyAny>> {
131+
let use_multipart = if let Some(use_multipart) = use_multipart {
132+
use_multipart
133+
} else {
134+
file.use_multipart(chunk_size)?
135+
};
84136
pyo3_async_runtimes::tokio::future_into_py(py, async move {
85-
Ok(put_multipart_inner(
86-
store.into_inner(),
87-
&location.into(),
88-
file,
89-
chunk_size,
90-
max_concurrency,
91-
)
92-
.await?)
137+
let result = if use_multipart {
138+
put_multipart_inner(
139+
store.into_inner(),
140+
&location.into(),
141+
file,
142+
chunk_size,
143+
max_concurrency,
144+
)
145+
.await?
146+
} else {
147+
put_inner(store.into_inner(), &location.into(), file).await?
148+
};
149+
Ok(result)
93150
})
94151
}
95152

153+
async fn put_inner(
154+
store: Arc<dyn ObjectStore>,
155+
location: &Path,
156+
mut reader: MultipartPutInput,
157+
) -> PyObjectStoreResult<PyPutResult> {
158+
let nbytes = reader.nbytes()?;
159+
let mut buffer = Vec::with_capacity(nbytes);
160+
reader.read_to_end(&mut buffer)?;
161+
let payload = PutPayload::from_bytes(buffer.into());
162+
Ok(PyPutResult(store.put(location, payload).await?))
163+
}
164+
96165
async fn put_multipart_inner<R: Read>(
97166
store: Arc<dyn ObjectStore>,
98167
location: &Path,
99168
mut reader: R,
100169
chunk_size: usize,
101170
max_concurrency: usize,
102-
) -> PyObjectStoreResult<()> {
171+
) -> PyObjectStoreResult<PyPutResult> {
103172
let upload = store.put_multipart(location).await?;
104173
let mut write = WriteMultipart::new(upload);
105174
let mut scratch_buffer = vec![0; chunk_size];
106175
loop {
107-
let read_size = reader
108-
.read(&mut scratch_buffer)
109-
.map_err(|err| PyIOError::new_err(err.to_string()))?;
176+
let read_size = reader.read(&mut scratch_buffer)?;
110177
if read_size == 0 {
111178
break;
112179
} else {
113180
write.wait_for_capacity(max_concurrency).await?;
114181
write.write(&scratch_buffer[0..read_size]);
115182
}
116183
}
117-
write.finish().await?;
118-
Ok(())
184+
Ok(PyPutResult(write.finish().await?))
119185
}

pyo3-object_store/src/error.rs

+17-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
//! Contains the [`PyObjectStoreError`], the Error returned by most fallible functions in this
22
//! crate.
33
4-
use pyo3::exceptions::{PyException, PyValueError};
4+
use pyo3::exceptions::{
5+
PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError,
6+
};
57
use pyo3::prelude::*;
68
use pyo3::DowncastError;
79
use thiserror::Error;
@@ -17,13 +19,26 @@ pub enum PyObjectStoreError {
1719
/// A wrapped [PyErr]
1820
#[error(transparent)]
1921
PyErr(#[from] PyErr),
22+
23+
/// A wrapped [std::io::Error]
24+
#[error(transparent)]
25+
IOError(#[from] std::io::Error),
2026
}
2127

2228
impl From<PyObjectStoreError> for PyErr {
2329
fn from(error: PyObjectStoreError) -> Self {
2430
match error {
2531
PyObjectStoreError::PyErr(err) => err,
26-
PyObjectStoreError::ObjectStoreError(err) => PyException::new_err(err.to_string()),
32+
PyObjectStoreError::ObjectStoreError(ref err) => match err {
33+
object_store::Error::NotFound { path: _, source: _ } => {
34+
PyFileNotFoundError::new_err(err.to_string())
35+
}
36+
object_store::Error::NotImplemented => {
37+
PyNotImplementedError::new_err(err.to_string())
38+
}
39+
_ => PyException::new_err(err.to_string()),
40+
},
41+
PyObjectStoreError::IOError(err) => PyIOError::new_err(err.to_string()),
2742
}
2843
}
2944
}

0 commit comments

Comments
 (0)