Skip to content

Commit 63ec11c

Browse files
committed
Update docs.
1 parent 22da84c commit 63ec11c

File tree

4 files changed

+210
-8
lines changed

4 files changed

+210
-8
lines changed

s2/README.md

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,8 @@ The 10 byte 'stream identifier' of the second stream can optionally be stripped,
685685

686686
Blocks can be concatenated using the `ConcatBlocks` function.
687687

688-
Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
688+
Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
689+
Streams with indexes (see below) will currently not work on concatenated streams.
689690

690691
# Stream Seek Index
691692

@@ -701,9 +702,27 @@ so the output remains compatible with other decoders.
701702
To automatically add an index to a stream, add `WriterAddIndex()` option to your writer.
702703
Then the index will be added to the stream when `Close()` is called.
703704

705+
```
706+
// Add Index to stream...
707+
enc := s2.NewWriter(w, s2.WriterAddIndex())
708+
io.Copy(enc, r)
709+
enc.Close()
710+
```
711+
704712
If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`.
705713
This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`.
706714

715+
```
716+
// Get index for separate storage...
717+
enc := s2.NewWriter(w)
718+
io.Copy(enc, r)
719+
index, err := enc.CloseIndex()
720+
```
721+
722+
The `index` can then be used needing to read from the stream.
723+
This means the index can be used without needing to seek to the end of the stream
724+
or for manually forwarding streams. See below.
725+
707726
## Using Indexes
708727

709728
To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available.
@@ -713,15 +732,83 @@ Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeek
713732
If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported.
714733
Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.
715734

735+
```
736+
dec := s2.NewReader(r)
737+
rs, err := dec.ReadSeeker(false, nil)
738+
rs.Seek(wantOffset, io.SeekStart)
739+
```
740+
741+
Get a seeker to seek forward. Since no index is provided, the index is read from the stream.
742+
This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.
743+
716744
A custom index can be specified which will be used if supplied.
717745
When using a custom index, it will not be read from the input stream.
718746

747+
```
748+
dec := s2.NewReader(r)
749+
rs, err := dec.ReadSeeker(false, index)
750+
rs.Seek(wantOffset, io.SeekStart)
751+
```
752+
753+
This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker
754+
755+
```
756+
dec := s2.NewReader(r)
757+
rs, err := dec.ReadSeeker(true, index)
758+
rs.Seek(wantOffset, io.SeekStart)
759+
```
760+
761+
Finally, since we specify that we want to do random seeking `r` must be an io.Seeker.
762+
719763
The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader,
720764
meaning changes performed to one is reflected in the other.
721765

766+
## Manually Forwarding Streams
767+
722768
Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type.
723769
This can be used for parsing indexes, either separate or in streams.
724770

771+
In some cases it may not be possible to serve a seekable stream.
772+
This can for instance be an HTTP stream, where the Range request
773+
is sent at the start of the stream.
774+
775+
With a little bit of extra code it is still possible to forward
776+
777+
It is possible to load the index manually like this:
778+
```
779+
var index s2.Index
780+
_, err = index.Load(idxBytes)
781+
```
782+
783+
This can be used to figure out how much to offset the compressed stream:
784+
785+
```
786+
compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
787+
```
788+
789+
The `compressedOffset` is the number of bytes that should be skipped
790+
from the beginning of the compressed file.
791+
792+
The `uncompressedOffset` will then be offset of the uncompressed bytes returned
793+
when decoding from that position. This will always be <= wantOffset.
794+
795+
When creating a decoder it must be specified that it should *not* expect a frame header
796+
at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset`
797+
we create the decoder like this:
798+
799+
```
800+
dec := s2.NewReader(r, s2.ReaderIgnoreFrameHeader())
801+
```
802+
803+
We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want.
804+
This is done using the regular "Skip" function:
805+
806+
```
807+
err = dec.Skip(wantOffset - uncompressedOffset)
808+
```
809+
810+
This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset.
811+
725812
## Index Format:
726813

727814
Each block is structured as a snappy skippable block, with the chunk ID 0x99.

s2/decode.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
100100
} else {
101101
nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
102102
}
103+
nr.readHeader = nr.ignoreFrameHeader
103104
nr.paramsOK = true
104105
return &nr
105106
}
@@ -143,6 +144,16 @@ func ReaderAllocBlock(blockSize int) ReaderOption {
143144
}
144145
}
145146

147+
// ReaderIgnoreFrameHeader will make the reader skip the expected
148+
// frame header at the beginning of the stream.
149+
// This can be used when serving a stream that has been forwarded to a specific point.
150+
func ReaderIgnoreFrameHeader() ReaderOption {
151+
return func(r *Reader) error {
152+
r.ignoreFrameHeader = true
153+
return nil
154+
}
155+
}
156+
146157
// ReaderSkippableCB will register a callback for chuncks with the specified ID.
147158
// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
148159
// For each chunk with the ID, the callback is called with the content.
@@ -166,6 +177,7 @@ type Reader struct {
166177
buf []byte
167178
skippableCB [0x80]func(r io.Reader) error
168179
blockStart int64 // Uncompressed offset at start of current.
180+
index *Index
169181

170182
// decoded[i:j] contains decoded bytes that have not yet been passed on.
171183
i, j int
@@ -174,11 +186,11 @@ type Reader struct {
174186
// maximum expected buffer size.
175187
maxBufSize int
176188
// alloc a buffer this size if > 0.
177-
lazyBuf int
178-
readHeader bool
179-
paramsOK bool
180-
snappyFrame bool
181-
index *Index
189+
lazyBuf int
190+
readHeader bool
191+
paramsOK bool
192+
snappyFrame bool
193+
ignoreFrameHeader bool
182194
}
183195

184196
// ensureBufferSize will ensure that the buffer can take at least n bytes.
@@ -208,7 +220,7 @@ func (r *Reader) Reset(reader io.Reader) {
208220
r.err = nil
209221
r.i = 0
210222
r.j = 0
211-
r.readHeader = false
223+
r.readHeader = r.ignoreFrameHeader
212224
}
213225

214226
func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {

s2/encode.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,7 @@ func (w *Writer) Close() error {
10951095
// CloseIndex calls Close and returns an index on first call.
10961096
// This is not required if you are only adding index to a stream.
10971097
func (w *Writer) CloseIndex() ([]byte, error) {
1098-
return w.closeIndex(false)
1098+
return w.closeIndex(true)
10991099
}
11001100

11011101
func (w *Writer) closeIndex(idx bool) ([]byte, error) {

s2/index_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package s2_test
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"io/ioutil"
8+
"math/rand"
9+
"sync"
10+
11+
"github.com/klauspost/compress/s2"
12+
)
13+
14+
func ExampleIndex_Load() {
15+
fatalErr := func(err error) {
16+
if err != nil {
17+
panic(err)
18+
}
19+
}
20+
21+
// Create a test corpus
22+
tmp := make([]byte, 5<<20)
23+
rng := rand.New(rand.NewSource(0xbeefcafe))
24+
rng.Read(tmp)
25+
// Make it compressible...
26+
for i, v := range tmp {
27+
tmp[i] = '0' + v&3
28+
}
29+
// Compress it...
30+
var buf bytes.Buffer
31+
// We use smaller blocks just for the example...
32+
enc := s2.NewWriter(&buf, s2.WriterBlockSize(100<<10), s2.WriterAddIndex())
33+
err := enc.EncodeBuffer(tmp)
34+
fatalErr(err)
35+
36+
// Close and get index...
37+
idxBytes, err := enc.CloseIndex()
38+
fatalErr(err)
39+
40+
// This is our compressed stream...
41+
compressed := buf.Bytes()
42+
43+
var once sync.Once
44+
for wantOffset := int64(0); wantOffset < int64(len(tmp)); wantOffset += 555555 {
45+
// Let's assume we want to read from uncompressed offset 'i'
46+
// and we cannot seek in input, but we have the index.
47+
want := tmp[wantOffset:]
48+
49+
// Load the index.
50+
var index s2.Index
51+
_, err = index.Load(idxBytes)
52+
fatalErr(err)
53+
54+
// Find offset in file:
55+
compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
56+
fatalErr(err)
57+
58+
// Offset the input to the compressed offset.
59+
// Notice how we do not provide any bytes before the offset.
60+
input := io.Reader(bytes.NewBuffer(compressed[compressedOffset:]))
61+
if _, ok := input.(io.Seeker); !ok {
62+
// Notice how the input cannot be seeked...
63+
once.Do(func() {
64+
fmt.Println("Input does not support seeking...")
65+
})
66+
} else {
67+
panic("did you implement seeking on bytes.Buffer?")
68+
}
69+
70+
// When creating the decoder we must specify that it should not
71+
// expect a frame header at the beginning og the frame.
72+
dec := s2.NewReader(input, s2.ReaderIgnoreFrameHeader())
73+
74+
rs, err := dec.ReadSeeker(true, nil)
75+
rs.Seek(wantOffset, io.SeekStart)
76+
// We now have a reader, but it will start outputting at uncompressedOffset,
77+
// and not the actual offset we want, so skip forward to that.
78+
toSkip := wantOffset - uncompressedOffset
79+
err = dec.Skip(toSkip)
80+
fatalErr(err)
81+
82+
// Read the rest of the stream...
83+
got, err := ioutil.ReadAll(dec)
84+
fatalErr(err)
85+
if bytes.Equal(got, want) {
86+
fmt.Println("Successfully skipped forward to", wantOffset)
87+
} else {
88+
fmt.Println("Failed to skip forward to", wantOffset)
89+
}
90+
}
91+
// OUTPUT:
92+
//Input does not support seeking...
93+
//Successfully skipped forward to 0
94+
//Successfully skipped forward to 555555
95+
//Successfully skipped forward to 1111110
96+
//Successfully skipped forward to 1666665
97+
//Successfully skipped forward to 2222220
98+
//Successfully skipped forward to 2777775
99+
//Successfully skipped forward to 3333330
100+
//Successfully skipped forward to 3888885
101+
//Successfully skipped forward to 4444440
102+
//Successfully skipped forward to 4999995
103+
}

0 commit comments

Comments
 (0)