Skip to content

Commit 5778a6b

Browse files
bborehamowen-d
authored andcommitted
chunks: decode varints directly from byte buffer; stop panicing on some corrupt inputs (grafana#7264)
**What this PR does / why we need it**: This is much faster, since we don't pay for byte-at-a-time reads from the buffer. For best performance with gzip and lz4, we still want a bufio.Reader. We don't need the pool of buffered readers now. Other compression formats have their own internal buffers so don't need extra buffering. **Checklist** - [x] Reviewed the `CONTRIBUTING.md` guide - NA Documentation added - NA Tests updated - [x] `CHANGELOG.md` updated - NA Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` (using changes from grafana#7246, ignoring lines that didn't change much) ``` name old time/op new time/op delta Write/ordered-lz4-256k-4 24.1ms ± 7% 22.1ms ± 2% -8.52% (p=0.008 n=5+5) Write/unordered-lz4-4 54.6ms ±29% 49.1ms ± 1% -10.04% (p=0.016 n=5+4) Read/none_66_kB-4 4.16ms ± 6% 2.54ms ± 2% -38.87% (p=0.008 n=5+5) Read/gzip_66_kB-4 177ms ± 4% 161ms ± 4% -9.09% (p=0.008 n=5+5) Read/lz4-64k_66_kB-4 59.5ms ± 2% 49.5ms ± 3% -16.80% (p=0.008 n=5+5) Read/lz4-256k_66_kB-4 62.9ms ± 0% 51.2ms ± 1% -18.58% (p=0.016 n=4+5) Read/lz4-1M_66_kB-4 62.9ms ± 2% 51.5ms ± 1% -18.03% (p=0.008 n=5+5) Read/lz4_66_kB-4 63.7ms ± 4% 51.2ms ± 1% -19.68% (p=0.008 n=5+5) Read/snappy_66_kB-4 51.1ms ± 2% 41.9ms ± 1% -17.95% (p=0.016 n=5+4) Read/flate_66_kB-4 168ms ± 4% 150ms ± 2% -10.49% (p=0.008 n=5+5) Read/zstd_66_kB-4 334ms ±16% 272ms ± 6% -18.52% (p=0.008 n=5+5) Read/none_262_kB-4 3.93ms ± 1% 2.48ms ± 2% -36.91% (p=0.008 n=5+5) Read/gzip_262_kB-4 159ms ± 5% 143ms ± 2% -10.12% (p=0.008 n=5+5) Read/lz4-64k_262_kB-4 53.5ms ± 1% 43.2ms ± 3% -19.35% (p=0.008 n=5+5) Read/lz4-256k_262_kB-4 55.8ms ± 1% 45.3ms ± 2% -18.75% (p=0.008 n=5+5) Read/lz4-1M_262_kB-4 57.5ms ± 2% 45.5ms ± 2% -20.91% (p=0.008 n=5+5) Read/lz4_262_kB-4 56.8ms ± 2% 45.3ms ± 2% -20.26% (p=0.008 n=5+5) Read/snappy_262_kB-4 46.0ms ± 2% 37.5ms ± 1% -18.57% (p=0.008 n=5+5) Read/flate_262_kB-4 153ms ± 3% 134ms ± 2% -12.33% (p=0.008 n=5+5) Read/none_524_kB-4 3.85ms ± 2% 2.46ms ± 2% -36.04% (p=0.008 n=5+5) Read/gzip_524_kB-4 128ms ± 2% 119ms ± 2% -7.64% (p=0.008 n=5+5) Read/lz4-64k_524_kB-4 44.6ms ± 3% 34.7ms ± 1% -22.19% (p=0.008 n=5+5) Read/lz4-256k_524_kB-4 45.4ms ± 1% 36.7ms ± 5% -19.07% (p=0.008 n=5+5) Read/lz4-1M_524_kB-4 47.3ms ± 1% 38.6ms ± 2% -18.36% (p=0.008 n=5+5) Read/lz4_524_kB-4 47.4ms ± 1% 38.0ms ± 1% -19.70% (p=0.008 n=5+5) Read/snappy_524_kB-4 37.3ms ± 1% 31.1ms ± 3% -16.47% (p=0.008 n=5+5) Read/flate_524_kB-4 122ms ± 1% 111ms ± 3% -9.04% (p=0.008 n=5+5) Read/sample_none_66_kB-4 7.81ms ± 1% 6.62ms ± 6% -15.24% (p=0.008 n=5+5) Read/sample_gzip_66_kB-4 237ms ± 2% 219ms ± 1% -7.57% (p=0.008 n=5+5) Read/sample_lz4-64k_66_kB-4 105ms ± 2% 92ms ± 1% -12.03% (p=0.008 n=5+5) Read/sample_lz4-256k_66_kB-4 109ms ± 1% 98ms ± 2% -9.65% (p=0.008 n=5+5) Read/sample_lz4-1M_66_kB-4 113ms ± 8% 98ms ± 2% -13.66% (p=0.008 n=5+5) Read/sample_lz4_66_kB-4 109ms ± 1% 99ms ± 2% -9.29% (p=0.008 n=5+5) Read/sample_snappy_66_kB-4 86.6ms ± 1% 77.2ms ± 6% -10.92% (p=0.008 n=5+5) Read/sample_flate_66_kB-4 236ms ± 8% 214ms ± 3% -9.11% (p=0.008 n=5+5) Read/sample_none_262_kB-4 7.70ms ± 2% 6.34ms ± 2% -17.67% (p=0.008 n=5+5) Read/sample_lz4-64k_262_kB-4 94.7ms ± 1% 84.4ms ± 2% -10.90% (p=0.008 n=5+5) Read/sample_lz4-256k_262_kB-4 98.4ms ± 1% 87.7ms ± 2% -10.93% (p=0.008 n=5+5) Read/sample_lz4-1M_262_kB-4 101ms ± 2% 90ms ± 2% -11.19% (p=0.008 n=5+5) Read/sample_lz4_262_kB-4 100ms ± 2% 90ms ± 3% -10.78% (p=0.008 n=5+5) Read/sample_snappy_262_kB-4 77.9ms ± 2% 68.7ms ± 2% -11.84% (p=0.008 n=5+5) Read/sample_flate_262_kB-4 211ms ± 2% 190ms ± 2% -9.86% (p=0.008 n=5+5) Read/sample_none_524_kB-4 7.82ms ± 2% 6.43ms ± 2% -17.80% (p=0.008 n=5+5) Read/sample_gzip_524_kB-4 176ms ± 1% 164ms ± 2% -6.80% (p=0.008 n=5+5) Read/sample_lz4-64k_524_kB-4 77.7ms ± 1% 69.8ms ± 4% -10.10% (p=0.008 n=5+5) Read/sample_lz4-256k_524_kB-4 81.7ms ± 2% 72.1ms ± 3% -11.79% (p=0.008 n=5+5) Read/sample_lz4-1M_524_kB-4 83.8ms ± 1% 74.4ms ± 1% -11.21% (p=0.008 n=5+5) Read/sample_lz4_524_kB-4 83.2ms ± 0% 76.1ms ± 8% -8.53% (p=0.016 n=4+5) Read/sample_snappy_524_kB-4 63.6ms ± 2% 56.5ms ± 2% -11.23% (p=0.008 n=5+5) Read/sample_flate_524_kB-4 172ms ± 2% 157ms ± 1% -8.88% (p=0.008 n=5+5) name old speed new speed delta Write/ordered-lz4-256k-4 687MB/s ± 6% 749MB/s ± 2% +9.09% (p=0.008 n=5+5) Write/unordered-lz4-4 313MB/s ±24% 341MB/s ± 1% +9.23% (p=0.016 n=5+4) Read/none_66_kB-4 1.82GB/s ± 6% 2.97GB/s ± 2% +63.42% (p=0.008 n=5+5) Read/gzip_66_kB-4 669MB/s ± 4% 736MB/s ± 4% +10.00% (p=0.008 n=5+5) Read/lz4-64k_66_kB-4 1.42GB/s ± 2% 1.71GB/s ± 3% +20.21% (p=0.008 n=5+5) Read/lz4-256k_66_kB-4 1.43GB/s ± 0% 1.76GB/s ± 1% +22.83% (p=0.016 n=4+5) Read/lz4-1M_66_kB-4 1.43GB/s ± 2% 1.74GB/s ± 1% +21.99% (p=0.008 n=5+5) Read/lz4_66_kB-4 1.41GB/s ± 4% 1.76GB/s ± 1% +24.44% (p=0.008 n=5+5) Read/snappy_66_kB-4 1.29GB/s ± 2% 1.57GB/s ± 1% +21.86% (p=0.016 n=5+4) Read/flate_66_kB-4 710MB/s ± 4% 793MB/s ± 2% +11.70% (p=0.008 n=5+5) Read/zstd_66_kB-4 425MB/s ±14% 517MB/s ± 6% +21.74% (p=0.008 n=5+5) Read/none_262_kB-4 1.93GB/s ± 1% 3.06GB/s ± 2% +58.54% (p=0.008 n=5+5) Read/gzip_262_kB-4 701MB/s ± 5% 779MB/s ± 2% +11.20% (p=0.008 n=5+5) Read/lz4-64k_262_kB-4 1.47GB/s ± 1% 1.82GB/s ± 3% +24.03% (p=0.008 n=5+5) Read/lz4-256k_262_kB-4 1.48GB/s ± 1% 1.82GB/s ± 2% +23.09% (p=0.008 n=5+5) Read/lz4-1M_262_kB-4 1.46GB/s ± 2% 1.84GB/s ± 2% +26.44% (p=0.008 n=5+5) Read/lz4_262_kB-4 1.48GB/s ± 2% 1.85GB/s ± 2% +25.42% (p=0.008 n=5+5) Read/snappy_262_kB-4 1.31GB/s ± 2% 1.61GB/s ± 1% +22.80% (p=0.008 n=5+5) Read/flate_262_kB-4 729MB/s ± 3% 831MB/s ± 2% +14.05% (p=0.008 n=5+5) Read/none_524_kB-4 1.97GB/s ± 2% 3.08GB/s ± 2% +56.33% (p=0.008 n=5+5) Read/gzip_524_kB-4 714MB/s ± 2% 773MB/s ± 2% +8.27% (p=0.008 n=5+5) Read/lz4-64k_524_kB-4 1.47GB/s ± 3% 1.88GB/s ± 1% +28.48% (p=0.008 n=5+5) Read/lz4-256k_524_kB-4 1.50GB/s ± 1% 1.86GB/s ± 5% +23.63% (p=0.008 n=5+5) Read/lz4-1M_524_kB-4 1.49GB/s ± 1% 1.83GB/s ± 2% +22.51% (p=0.008 n=5+5) Read/lz4_524_kB-4 1.49GB/s ± 1% 1.86GB/s ± 1% +24.53% (p=0.008 n=5+5) Read/snappy_524_kB-4 1.34GB/s ± 1% 1.60GB/s ± 3% +19.75% (p=0.008 n=5+5) Read/flate_524_kB-4 750MB/s ± 1% 824MB/s ± 3% +9.97% (p=0.008 n=5+5) Read/sample_none_66_kB-4 967MB/s ± 1% 1142MB/s ± 6% +18.13% (p=0.008 n=5+5) Read/sample_gzip_66_kB-4 501MB/s ± 2% 542MB/s ± 1% +8.19% (p=0.008 n=5+5) Read/sample_lz4-64k_66_kB-4 811MB/s ± 2% 922MB/s ± 1% +13.66% (p=0.008 n=5+5) Read/sample_lz4-256k_66_kB-4 825MB/s ± 1% 913MB/s ± 2% +10.68% (p=0.008 n=5+5) Read/sample_lz4-1M_66_kB-4 795MB/s ± 7% 920MB/s ± 1% +15.67% (p=0.008 n=5+5) Read/sample_lz4_66_kB-4 824MB/s ± 1% 909MB/s ± 2% +10.25% (p=0.008 n=5+5) Read/sample_snappy_66_kB-4 759MB/s ± 1% 852MB/s ± 6% +12.37% (p=0.008 n=5+5) Read/sample_flate_66_kB-4 506MB/s ± 7% 556MB/s ± 4% +9.88% (p=0.008 n=5+5) Read/sample_none_262_kB-4 983MB/s ± 2% 1194MB/s ± 2% +21.47% (p=0.008 n=5+5) Read/sample_lz4-64k_262_kB-4 830MB/s ± 1% 932MB/s ± 2% +12.24% (p=0.008 n=5+5) Read/sample_lz4-256k_262_kB-4 839MB/s ± 1% 942MB/s ± 2% +12.28% (p=0.008 n=5+5) Read/sample_lz4-1M_262_kB-4 829MB/s ± 2% 934MB/s ± 2% +12.60% (p=0.008 n=5+5) Read/sample_lz4_262_kB-4 836MB/s ± 2% 937MB/s ± 3% +12.11% (p=0.008 n=5+5) Read/sample_snappy_262_kB-4 774MB/s ± 2% 878MB/s ± 2% +13.44% (p=0.008 n=5+5) Read/sample_flate_262_kB-4 529MB/s ± 2% 587MB/s ± 2% +10.94% (p=0.008 n=5+5) Read/sample_none_524_kB-4 970MB/s ± 2% 1180MB/s ± 2% +21.64% (p=0.008 n=5+5) Read/sample_gzip_524_kB-4 521MB/s ± 1% 559MB/s ± 2% +7.31% (p=0.008 n=5+5) Read/sample_lz4-64k_524_kB-4 842MB/s ± 1% 937MB/s ± 4% +11.30% (p=0.008 n=5+5) Read/sample_lz4-256k_524_kB-4 834MB/s ± 2% 945MB/s ± 3% +13.37% (p=0.008 n=5+5) Read/sample_lz4-1M_524_kB-4 842MB/s ± 1% 949MB/s ± 1% +12.63% (p=0.008 n=5+5) Read/sample_lz4_524_kB-4 849MB/s ± 0% 929MB/s ± 7% +9.53% (p=0.016 n=4+5) Read/sample_snappy_524_kB-4 783MB/s ± 2% 882MB/s ± 2% +12.65% (p=0.008 n=5+5) Read/sample_flate_524_kB-4 534MB/s ± 2% 585MB/s ± 1% +9.73% (p=0.008 n=5+5) name old alloc/op new alloc/op delta Write/unordered-lz4-4 18.8MB ± 4% 17.2MB ± 7% -8.28% (p=0.008 n=5+5) Read/none_66_kB-4 41.9kB ± 0% 43.7kB ± 0% +4.41% (p=0.008 n=5+5) Read/gzip_66_kB-4 713kB ± 0% 741kB ± 0% +3.85% (p=0.016 n=5+4) Read/lz4-64k_66_kB-4 730kB ± 1% 748kB ± 0% +2.53% (p=0.016 n=5+4) Read/lz4-256k_66_kB-4 770kB ± 0% 806kB ± 3% +4.70% (p=0.016 n=4+5) Read/lz4-1M_66_kB-4 770kB ± 0% 812kB ± 4% +5.43% (p=0.016 n=4+5) Read/lz4_66_kB-4 770kB ± 0% 792kB ± 0% +2.83% (p=0.029 n=4+4) Read/snappy_66_kB-4 361kB ± 0% 376kB ± 0% +4.02% (p=0.008 n=5+5) Read/flate_66_kB-4 715kB ± 0% 743kB ± 0% +3.88% (p=0.016 n=5+4) Read/none_262_kB-4 11.2kB ± 0% 11.7kB ± 0% +4.12% (p=0.016 n=5+4) Read/gzip_262_kB-4 198kB ± 0% 203kB ± 0% +2.93% (p=0.016 n=4+5) Read/lz4-64k_262_kB-4 169kB ± 0% 174kB ± 0% +2.75% (p=0.008 n=5+5) Read/lz4-256k_262_kB-4 177kB ± 0% 182kB ± 0% +2.79% (p=0.008 n=5+5) Read/lz4-1M_262_kB-4 181kB ± 0% 186kB ± 0% +2.77% (p=0.008 n=5+5) Read/lz4_262_kB-4 181kB ± 0% 186kB ± 0% +2.73% (p=0.016 n=5+4) Read/snappy_262_kB-4 87.8kB ± 0% 90.1kB ± 0% +2.58% (p=0.008 n=5+5) Read/flate_262_kB-4 197kB ± 0% 203kB ± 0% +2.63% (p=0.016 n=5+4) Read/zstd_262_kB-4 553MB ± 0% 552MB ± 0% -0.15% (p=0.032 n=5+5) Read/none_524_kB-4 5.94kB ± 0% 6.16kB ± 0% +3.76% (p=0.016 n=5+4) Read/gzip_524_kB-4 96.2kB ± 0% 98.3kB ± 0% +2.16% (p=0.016 n=4+5) Read/lz4-64k_524_kB-4 70.9kB ± 0% 72.8kB ± 0% +2.71% (p=0.016 n=4+5) Read/lz4-256k_524_kB-4 73.8kB ± 0% 75.8kB ± 0% +2.68% (p=0.008 n=5+5) Read/lz4-1M_524_kB-4 76.5kB ± 0% 78.7kB ± 0% +2.77% (p=0.016 n=4+5) Read/lz4_524_kB-4 76.5kB ± 0% 78.6kB ± 0% +2.74% (p=0.016 n=5+4) Read/snappy_524_kB-4 39.1kB ± 0% 39.5kB ± 0% +1.19% (p=0.008 n=5+5) Read/flate_524_kB-4 95.4kB ± 0% 97.3kB ± 0% +2.06% (p=0.008 n=5+5) Read/sample_none_66_kB-4 39.9kB ± 0% 41.7kB ± 0% +4.62% (p=0.008 n=5+5) Read/sample_gzip_66_kB-4 686kB ± 0% 715kB ± 0% +4.23% (p=0.016 n=4+5) Read/sample_lz4-64k_66_kB-4 707kB ± 0% 728kB ± 0% +2.88% (p=0.016 n=5+4) Read/sample_lz4-256k_66_kB-4 749kB ± 0% 783kB ± 4% +4.65% (p=0.008 n=5+5) Read/sample_lz4_66_kB-4 748kB ± 0% 770kB ± 0% +2.91% (p=0.016 n=5+4) Read/sample_snappy_66_kB-4 350kB ± 0% 364kB ± 0% +4.04% (p=0.029 n=4+4) Read/sample_flate_66_kB-4 688kB ± 0% 716kB ± 0% +4.08% (p=0.029 n=4+4) Read/sample_none_262_kB-4 10.6kB ± 0% 11.0kB ± 0% +4.23% (p=0.016 n=5+4) Read/sample_gzip_262_kB-4 194kB ± 0% 199kB ± 0% +2.48% (p=0.016 n=4+5) Read/sample_lz4-64k_262_kB-4 165kB ± 0% 169kB ± 0% +2.86% (p=0.016 n=4+5) Read/sample_lz4-256k_262_kB-4 172kB ± 0% 177kB ± 0% +2.92% (p=0.016 n=4+5) Read/sample_lz4-1M_262_kB-4 176kB ± 0% 181kB ± 0% +2.91% (p=0.029 n=4+4) Read/sample_lz4_262_kB-4 176kB ± 0% 181kB ± 0% +2.81% (p=0.016 n=5+4) Read/sample_snappy_262_kB-4 88.1kB ± 0% 90.8kB ± 0% +3.11% (p=0.029 n=4+4) Read/sample_flate_262_kB-4 194kB ± 0% 198kB ± 0% +2.13% (p=0.029 n=4+4) Read/sample_none_524_kB-4 5.56kB ± 0% 5.76kB ± 0% +3.65% (p=0.016 n=5+4) Read/sample_gzip_524_kB-4 95.4kB ± 0% 96.9kB ± 0% +1.56% (p=0.008 n=5+5) Read/sample_lz4-64k_524_kB-4 69.0kB ± 0% 71.0kB ± 0% +2.87% (p=0.008 n=5+5) Read/sample_lz4-256k_524_kB-4 71.8kB ± 0% 73.8kB ± 0% +2.79% (p=0.016 n=4+5) Read/sample_lz4-1M_524_kB-4 74.5kB ± 0% 76.6kB ± 0% +2.75% (p=0.008 n=5+5) Read/sample_lz4_524_kB-4 74.5kB ± 0% 188.4kB ±89% +152.98% (p=0.008 n=5+5) Read/sample_snappy_524_kB-4 40.7kB ± 1% 41.3kB ± 1% +1.58% (p=0.008 n=5+5) Read/sample_flate_524_kB-4 95.2kB ± 0% 96.2kB ± 0% +0.95% (p=0.008 n=5+5) name old allocs/op new allocs/op delta Read/gzip_66_kB-4 13.2k ± 0% 13.2k ± 0% -0.04% (p=0.000 n=5+4) Read/zstd_66_kB-4 18.0k ± 1% 17.8k ± 1% -1.06% (p=0.032 n=5+5) Read/gzip_262_kB-4 4.61k ± 0% 4.61k ± 0% -0.11% (p=0.029 n=4+4) Read/flate_262_kB-4 4.61k ± 0% 4.60k ± 0% -0.11% (p=0.008 n=5+5) Read/zstd_262_kB-4 7.10k ± 2% 6.88k ± 3% -3.01% (p=0.008 n=5+5) Read/gzip_524_kB-4 2.62k ± 0% 2.62k ± 0% -0.06% (p=0.000 n=4+5) Read/zstd_524_kB-4 6.13k ± 2% 5.94k ± 2% -3.03% (p=0.024 n=5+5) Read/sample_flate_66_kB-4 13.2k ± 0% 13.2k ± 0% -0.01% (p=0.029 n=4+4) Read/sample_flate_262_kB-4 4.62k ± 0% 4.61k ± 0% -0.22% (p=0.029 n=4+4) Read/sample_zstd_262_kB-4 7.12k ± 1% 6.97k ± 1% -2.11% (p=0.008 n=5+5) Read/sample_gzip_524_kB-4 2.63k ± 0% 2.63k ± 0% -0.11% (p=0.008 n=5+5) Read/sample_flate_524_kB-4 2.63k ± 0% 2.62k ± 0% -0.15% (p=0.008 n=5+5) Read/sample_zstd_524_kB-4 6.11k ± 2% 5.89k ± 1% -3.68% (p=0.016 n=5+4) ``` Co-authored-by: Owen Diehl <[email protected]>
1 parent 51d5d1a commit 5778a6b

File tree

6 files changed

+136
-91
lines changed

6 files changed

+136
-91
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully
2121
* [5997](https://github.com/grafana/loki/pull/5997) **simonswine**: Querier: parallize label queries to both stores.
2222
* [5406](https://github.com/grafana/loki/pull/5406) **ctovena**: Revise the configuration parameters that configure the usage report to grafana.com.
23+
* [7264](https://github.com/grafana/loki/pull/7264) **bboreham**: Chunks: decode varints directly from byte buffer, for speed.
2324
* [7263](https://github.com/grafana/loki/pull/7263) **bboreham**: Dependencies: klauspost/compress package to v1.15.11; improves performance.
2425

2526
##### Fixes

pkg/chunkenc/memchunk.go

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package chunkenc
22

33
import (
4-
"bufio"
54
"bytes"
65
"context"
76
"encoding/binary"
@@ -1114,12 +1113,14 @@ type bufferedIterator struct {
11141113
origBytes []byte
11151114
stats *stats.Context
11161115

1117-
bufReader *bufio.Reader
1118-
reader io.Reader
1119-
pool ReaderPool
1116+
reader io.Reader
1117+
pool ReaderPool
11201118

11211119
err error
11221120

1121+
readBuf [20]byte // Enough bytes to store two varints.
1122+
readBufValid int // How many bytes are left in readBuf from previous read.
1123+
11231124
buf []byte // The buffer for a single entry.
11241125
currLine []byte // the current line, this is the same as the buffer but sliced the the line size.
11251126
currTs int64
@@ -1134,7 +1135,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *buffer
11341135
stats: stats,
11351136
origBytes: b,
11361137
reader: nil, // will be initialized later
1137-
bufReader: nil, // will be initialized later
11381138
pool: pool,
11391139
}
11401140
}
@@ -1146,8 +1146,12 @@ func (si *bufferedIterator) Next() bool {
11461146

11471147
if !si.closed && si.reader == nil {
11481148
// initialize reader now, hopefully reusing one of the previous readers
1149-
si.reader = si.pool.GetReader(bytes.NewBuffer(si.origBytes))
1150-
si.bufReader = BufReaderPool.Get(si.reader)
1149+
var err error
1150+
si.reader, err = si.pool.GetReader(bytes.NewBuffer(si.origBytes))
1151+
if err != nil {
1152+
si.err = err
1153+
return false
1154+
}
11511155
}
11521156

11531157
ts, line, ok := si.moveNext()
@@ -1166,22 +1170,30 @@ func (si *bufferedIterator) Next() bool {
11661170

11671171
// moveNext moves the buffer to the next entry
11681172
func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
1169-
ts, err := binary.ReadVarint(si.bufReader)
1170-
if err != nil {
1171-
if err != io.EOF {
1172-
si.err = err
1173-
}
1174-
return 0, nil, false
1175-
}
1176-
1177-
l, err := binary.ReadUvarint(si.bufReader)
1178-
if err != nil {
1179-
if err != io.EOF {
1180-
si.err = err
1181-
return 0, nil, false
1173+
var ts int64
1174+
var tWidth, lWidth, lineSize, lastAttempt int
1175+
for lWidth == 0 { // Read until both varints have enough bytes.
1176+
n, err := si.reader.Read(si.readBuf[si.readBufValid:])
1177+
si.readBufValid += n
1178+
if err != nil {
1179+
if err != io.EOF {
1180+
si.err = err
1181+
return 0, nil, false
1182+
}
1183+
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
1184+
return 0, nil, false
1185+
}
1186+
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
1187+
si.err = fmt.Errorf("invalid data in chunk")
1188+
return 0, nil, false
1189+
}
11821190
}
1191+
var l uint64
1192+
ts, tWidth = binary.Varint(si.readBuf[:si.readBufValid])
1193+
l, lWidth = binary.Uvarint(si.readBuf[tWidth:si.readBufValid])
1194+
lineSize = int(l)
1195+
lastAttempt = si.readBufValid
11831196
}
1184-
lineSize := int(l)
11851197

11861198
if lineSize >= maxLineLength {
11871199
si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength)
@@ -1199,19 +1211,25 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
11991211
return 0, nil, false
12001212
}
12011213
}
1214+
si.buf = si.buf[:lineSize]
1215+
// Take however many bytes are left in the read buffer.
1216+
n := copy(si.buf, si.readBuf[tWidth+lWidth:si.readBufValid])
1217+
// Shift down what is still left in the fixed-size read buffer, if any.
1218+
si.readBufValid = copy(si.readBuf[:], si.readBuf[tWidth+lWidth+n:si.readBufValid])
1219+
12021220
// Then process reading the line.
1203-
n, err := si.bufReader.Read(si.buf[:lineSize])
1204-
if err != nil && err != io.EOF {
1205-
si.err = err
1206-
return 0, nil, false
1207-
}
12081221
for n < lineSize {
1209-
r, err := si.bufReader.Read(si.buf[n:lineSize])
1210-
if err != nil && err != io.EOF {
1222+
r, err := si.reader.Read(si.buf[n:lineSize])
1223+
n += r
1224+
if err != nil {
1225+
// We might get EOF after reading enough bytes to fill the buffer, which is OK.
1226+
// EOF and zero bytes read when the buffer isn't full is an error.
1227+
if err == io.EOF && r != 0 {
1228+
continue
1229+
}
12111230
si.err = err
12121231
return 0, nil, false
12131232
}
1214-
n += r
12151233
}
12161234
return ts, si.buf[:lineSize], true
12171235
}
@@ -1231,10 +1249,6 @@ func (si *bufferedIterator) close() {
12311249
si.pool.PutReader(si.reader)
12321250
si.reader = nil
12331251
}
1234-
if si.bufReader != nil {
1235-
BufReaderPool.Put(si.bufReader)
1236-
si.bufReader = nil
1237-
}
12381252

12391253
if si.buf != nil {
12401254
BytesBufferPool.Put(si.buf)

pkg/chunkenc/memchunk_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,38 @@ func TestBlock(t *testing.T) {
178178
}
179179
}
180180

181+
func TestCorruptChunk(t *testing.T) {
182+
for _, enc := range testEncoding {
183+
t.Run(enc.String(), func(t *testing.T) {
184+
t.Parallel()
185+
186+
chk := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
187+
cases := []struct {
188+
data []byte
189+
}{
190+
// Data that should not decode as lines from a chunk in any encoding.
191+
{data: []byte{0}},
192+
{data: []byte{1}},
193+
{data: []byte("asdfasdfasdfqwyteqwtyeq")},
194+
}
195+
196+
ctx, start, end := context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64)
197+
for i, c := range cases {
198+
chk.blocks = []block{{b: c.data}}
199+
it, err := chk.Iterator(ctx, start, end, logproto.FORWARD, noopStreamPipeline)
200+
require.NoError(t, err, "case %d", i)
201+
202+
idx := 0
203+
for it.Next() {
204+
idx++
205+
}
206+
require.Error(t, it.Error(), "case %d", i)
207+
require.NoError(t, it.Close())
208+
}
209+
})
210+
}
211+
}
212+
181213
func TestReadFormatV1(t *testing.T) {
182214
t.Parallel()
183215

pkg/chunkenc/pool.go

Lines changed: 50 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type WriterPool interface {
2626

2727
// ReaderPool similar to WriterPool but for reading chunks.
2828
type ReaderPool interface {
29-
GetReader(io.Reader) io.Reader
29+
GetReader(io.Reader) (io.Reader, error)
3030
PutReader(io.Reader)
3131
}
3232

@@ -44,13 +44,6 @@ var (
4444
// Noop is the no compression pool
4545
Noop NoopPool
4646

47-
// BufReaderPool is bufio.Reader pool
48-
BufReaderPool = &BufioReaderPool{
49-
pool: sync.Pool{
50-
New: func() interface{} { return bufio.NewReader(nil) },
51-
},
52-
}
53-
5447
// BytesBufferPool is a bytes buffer used for lines decompressed.
5548
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
5649
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })
@@ -117,21 +110,32 @@ type GzipPool struct {
117110
level int
118111
}
119112

113+
// Gzip needs buffering to read efficiently.
114+
// We need to be able to see the underlying gzip.Reader to Reset it.
115+
type gzipBufferedReader struct {
116+
*bufio.Reader
117+
gzipReader *gzip.Reader
118+
}
119+
120120
// GetReader gets or creates a new CompressionReader and reset it to read from src
121-
func (pool *GzipPool) GetReader(src io.Reader) io.Reader {
121+
func (pool *GzipPool) GetReader(src io.Reader) (io.Reader, error) {
122122
if r := pool.readers.Get(); r != nil {
123-
reader := r.(*gzip.Reader)
124-
err := reader.Reset(src)
123+
reader := r.(*gzipBufferedReader)
124+
err := reader.gzipReader.Reset(src)
125125
if err != nil {
126-
panic(err)
126+
return nil, err
127127
}
128-
return reader
128+
reader.Reader.Reset(reader.gzipReader)
129+
return reader, nil
129130
}
130-
reader, err := gzip.NewReader(src)
131+
gzipReader, err := gzip.NewReader(src)
131132
if err != nil {
132-
panic(err)
133+
return nil, err
133134
}
134-
return reader
135+
return &gzipBufferedReader{
136+
gzipReader: gzipReader,
137+
Reader: bufio.NewReaderSize(gzipReader, 4*1024),
138+
}, nil
135139
}
136140

137141
// PutReader places back in the pool a CompressionReader
@@ -171,16 +175,16 @@ type FlatePool struct {
171175
}
172176

173177
// GetReader gets or creates a new CompressionReader and reset it to read from src
174-
func (pool *FlatePool) GetReader(src io.Reader) io.Reader {
178+
func (pool *FlatePool) GetReader(src io.Reader) (io.Reader, error) {
175179
if r := pool.readers.Get(); r != nil {
176180
reader := r.(flate.Resetter)
177181
err := reader.Reset(src, nil)
178182
if err != nil {
179183
panic(err)
180184
}
181-
return reader.(io.Reader)
185+
return reader.(io.Reader), nil
182186
}
183-
return flate.NewReader(src)
187+
return flate.NewReader(src), nil
184188
}
185189

186190
// PutReader places back in the pool a CompressionReader
@@ -219,21 +223,21 @@ type ZstdPool struct {
219223
}
220224

221225
// GetReader gets or creates a new CompressionReader and reset it to read from src
222-
func (pool *ZstdPool) GetReader(src io.Reader) io.Reader {
226+
func (pool *ZstdPool) GetReader(src io.Reader) (io.Reader, error) {
223227
if r := pool.readers.Get(); r != nil {
224228
reader := r.(*zstd.Decoder)
225229
err := reader.Reset(src)
226230
if err != nil {
227-
panic(err)
231+
return nil, err
228232
}
229-
return reader
233+
return reader, nil
230234
}
231235
reader, err := zstd.NewReader(src)
232236
if err != nil {
233-
panic(err)
237+
return nil, err
234238
}
235239
runtime.SetFinalizer(reader, (*zstd.Decoder).Close)
236-
return reader
240+
return reader, nil
237241
}
238242

239243
// PutReader places back in the pool a CompressionReader
@@ -267,16 +271,27 @@ type LZ4Pool struct {
267271
bufferSize uint32 // available values: 1<<16 (64k), 1<<18 (256k), 1<<20 (1M), 1<<22 (4M). Defaults to 4MB, if not set.
268272
}
269273

274+
// We need to be able to see the underlying lz4.Reader to Reset it.
275+
type lz4BufferedReader struct {
276+
*bufio.Reader
277+
lz4Reader *lz4.Reader
278+
}
279+
270280
// GetReader gets or creates a new CompressionReader and reset it to read from src
271-
func (pool *LZ4Pool) GetReader(src io.Reader) io.Reader {
272-
var r *lz4.Reader
281+
func (pool *LZ4Pool) GetReader(src io.Reader) (io.Reader, error) {
282+
var r *lz4BufferedReader
273283
if pooled := pool.readers.Get(); pooled != nil {
274-
r = pooled.(*lz4.Reader)
275-
r.Reset(src)
284+
r = pooled.(*lz4BufferedReader)
285+
r.lz4Reader.Reset(src)
286+
r.Reader.Reset(r.lz4Reader)
276287
} else {
277-
r = lz4.NewReader(src)
288+
lz4Reader := lz4.NewReader(src)
289+
r = &lz4BufferedReader{
290+
lz4Reader: lz4Reader,
291+
Reader: bufio.NewReaderSize(lz4Reader, 4*1024),
292+
}
278293
}
279-
return r
294+
return r, nil
280295
}
281296

282297
// PutReader places back in the pool a CompressionReader
@@ -315,13 +330,13 @@ type SnappyPool struct {
315330
}
316331

317332
// GetReader gets or creates a new CompressionReader and reset it to read from src
318-
func (pool *SnappyPool) GetReader(src io.Reader) io.Reader {
333+
func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) {
319334
if r := pool.readers.Get(); r != nil {
320335
reader := r.(*snappy.Reader)
321336
reader.Reset(src)
322-
return reader
337+
return reader, nil
323338
}
324-
return snappy.NewReader(src)
339+
return snappy.NewReader(src), nil
325340
}
326341

327342
// PutReader places back in the pool a CompressionReader
@@ -347,8 +362,8 @@ func (pool *SnappyPool) PutWriter(writer io.WriteCloser) {
347362
type NoopPool struct{}
348363

349364
// GetReader gets or creates a new CompressionReader and reset it to read from src
350-
func (pool *NoopPool) GetReader(src io.Reader) io.Reader {
351-
return src
365+
func (pool *NoopPool) GetReader(src io.Reader) (io.Reader, error) {
366+
return src, nil
352367
}
353368

354369
// PutReader places back in the pool a CompressionReader
@@ -367,23 +382,3 @@ func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser {
367382

368383
// PutWriter places back in the pool a CompressionWriter
369384
func (pool *NoopPool) PutWriter(writer io.WriteCloser) {}
370-
371-
// BufioReaderPool is a bufio reader that uses sync.Pool.
372-
type BufioReaderPool struct {
373-
pool sync.Pool
374-
}
375-
376-
// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool.
377-
func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader {
378-
buf := bufPool.pool.Get().(*bufio.Reader)
379-
if buf == nil {
380-
return bufio.NewReaderSize(r, 4*1024)
381-
}
382-
buf.Reset(r)
383-
return buf
384-
}
385-
386-
// Put puts the bufio.Reader back into the pool.
387-
func (bufPool *BufioReaderPool) Put(b *bufio.Reader) {
388-
bufPool.pool.Put(b)
389-
}

pkg/chunkenc/pool_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ func TestPool(t *testing.T) {
3636
require.NoError(t, w.Close())
3737

3838
require.True(t, buf.Len() != 0, enc)
39-
r := rpool.GetReader(bytes.NewBuffer(buf.Bytes()))
39+
r, err := rpool.GetReader(bytes.NewBuffer(buf.Bytes()))
40+
require.NoError(t, err)
4041
defer rpool.PutReader(r)
4142
n, err := r.Read(res)
4243
if err != nil {

0 commit comments

Comments
 (0)