Skip to content

Commit 1314d89

Browse files
feat(stream): add basic stream route
Co-authored-by: divyam234 <[email protected]>
1 parent a8aab9a commit 1314d89

File tree

10 files changed

+433
-1
lines changed

10 files changed

+433
-1
lines changed

bot/client.go

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/celestix/gotgproto/sessionMaker"
1111
)
1212

13+
var Bot *gotgproto.Client
14+
1315
func StartClient(log *zap.Logger) (*gotgproto.Client, error) {
1416
client, err := gotgproto.NewClient(
1517
int(config.ValueOf.ApiID),
@@ -27,5 +29,6 @@ func StartClient(log *zap.Logger) (*gotgproto.Client, error) {
2729
}
2830
commands.Load(log, client.Dispatcher)
2931
log.Info("Client started", zap.String("username", client.Self.Username))
32+
Bot = client
3033
return client, nil
3134
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ require (
2323
github.com/klauspost/compress v1.17.3 // indirect
2424
github.com/mattn/go-sqlite3 v1.14.18 // indirect
2525
github.com/pkg/errors v0.9.1 // indirect
26+
github.com/quantumsheep/range-parser v1.1.0 // indirect
2627
github.com/segmentio/asm v1.2.0 // indirect
2728
go.opentelemetry.io/otel v1.21.0 // indirect
2829
go.opentelemetry.io/otel/trace v1.21.0 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
8383
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
8484
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
8585
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
86+
github.com/quantumsheep/range-parser v1.1.0 h1:k4f1F58f8FF54FBYc9dYBRM+8JkAxFo11gC3IeMH4rU=
87+
github.com/quantumsheep/range-parser v1.1.0/go.mod h1:acv4Vt2PvpGvRsvGju7Gk2ahKluZJsIUNR69W53J22I=
8688
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
8789
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
8890
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"go.uber.org/zap"
1515
)
1616

17-
const versionString = "v0.0.1"
17+
const versionString = "v0.0.0"
1818

1919
var startTime time.Time = time.Now()
2020

routes/stream.go

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package routes
2+
3+
import (
4+
"EverythingSuckz/fsb/bot"
5+
"EverythingSuckz/fsb/utils"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"strconv"
10+
11+
range_parser "github.com/quantumsheep/range-parser"
12+
13+
"github.com/gin-gonic/gin"
14+
)
15+
16+
func (e *allRoutes) LoadHome(r *Route) {
17+
defer e.log.Info("Loaded stream route")
18+
r.Engine.GET("/stream/:messageID", getStreamRoute)
19+
}
20+
21+
func getStreamRoute(ctx *gin.Context) {
22+
w := ctx.Writer
23+
r := ctx.Request
24+
25+
messageIDParm := ctx.Param("messageID")
26+
messageID, err := strconv.Atoi(messageIDParm)
27+
if err != nil {
28+
http.Error(w, err.Error(), http.StatusBadRequest)
29+
return
30+
}
31+
32+
ctx.Header("Accept-Ranges", "bytes")
33+
var start, end int64
34+
rangeHeader := r.Header.Get("Range")
35+
36+
messageMedia, err := utils.GetMessageMedia(ctx, bot.Bot.Client, messageID)
37+
if err != nil {
38+
http.Error(w, err.Error(), http.StatusBadRequest)
39+
return
40+
}
41+
file, err := utils.FileFromMedia(messageMedia)
42+
if err != nil {
43+
http.Error(w, err.Error(), http.StatusBadRequest)
44+
return
45+
}
46+
47+
if rangeHeader == "" {
48+
start = 0
49+
end = file.FileSize - 1
50+
w.WriteHeader(http.StatusOK)
51+
} else {
52+
ranges, err := range_parser.Parse(file.FileSize, r.Header.Get("Range"))
53+
if err != nil {
54+
http.Error(w, err.Error(), http.StatusBadRequest)
55+
return
56+
}
57+
start = ranges[0].Start
58+
end = ranges[0].End
59+
ctx.Header("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, file.FileSize))
60+
w.WriteHeader(http.StatusPartialContent)
61+
}
62+
63+
contentLength := end - start + 1
64+
mimeType := file.MimeType
65+
66+
if mimeType == "" {
67+
mimeType = "application/octet-stream"
68+
}
69+
70+
ctx.Header("Content-Type", mimeType)
71+
ctx.Header("Content-Length", strconv.FormatInt(contentLength, 10))
72+
73+
disposition := "inline"
74+
75+
if ctx.Query("d") == "true" {
76+
disposition = "attachment"
77+
}
78+
79+
ctx.Header("Content-Disposition", fmt.Sprintf("%s; filename=\"%s\"", disposition, file.FileName))
80+
81+
if r.Method != "HEAD" {
82+
parts, err := utils.GetParts(ctx, bot.Bot.Client, file)
83+
if err != nil {
84+
http.Error(w, err.Error(), http.StatusInternalServerError)
85+
return
86+
}
87+
parts = utils.RangedParts(parts, start, end)
88+
lr, _ := utils.NewLinearReader(ctx, bot.Bot.Client, parts, contentLength)
89+
io.CopyN(w, lr, contentLength)
90+
}
91+
}

types/file.go

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package types
2+
3+
import "github.com/gotd/td/tg"
4+
5+
type File struct {
6+
Location *tg.InputDocumentFileLocation
7+
FileSize int64
8+
FileName string
9+
MimeType string
10+
ID int64
11+
}

types/part.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package types
2+
3+
import "github.com/gotd/td/tg"
4+
5+
type Part struct {
6+
Location *tg.InputDocumentFileLocation
7+
Start int64
8+
End int64
9+
}

utils/bot_helpers.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package utils
2+
3+
import (
4+
"EverythingSuckz/fsb/config"
5+
"EverythingSuckz/fsb/types"
6+
"context"
7+
"errors"
8+
"fmt"
9+
10+
"github.com/gotd/td/telegram"
11+
"github.com/gotd/td/tg"
12+
)
13+
14+
func GetTGMessage(ctx context.Context, client *telegram.Client, messageID int) (*tg.Message, error) {
15+
inputMessageID := tg.InputMessageClass(&tg.InputMessageID{ID: messageID})
16+
channel, err := GetChannelById(ctx, client)
17+
if err != nil {
18+
return nil, err
19+
}
20+
messageRequest := tg.ChannelsGetMessagesRequest{Channel: channel, ID: []tg.InputMessageClass{inputMessageID}}
21+
res, err := client.API().ChannelsGetMessages(ctx, &messageRequest)
22+
if err != nil {
23+
return nil, err
24+
}
25+
messages := res.(*tg.MessagesChannelMessages)
26+
message := messages.Messages[0]
27+
if _, ok := message.(*tg.Message); ok {
28+
return message.(*tg.Message), nil
29+
} else {
30+
return nil, fmt.Errorf("this file was deleted")
31+
}
32+
}
33+
34+
func GetMessageMedia(ctx context.Context, client *telegram.Client, messageID int) (tg.MessageMediaClass, error) {
35+
message, err := GetTGMessage(ctx, client, messageID)
36+
if err != nil {
37+
return nil, err
38+
}
39+
return message.Media, nil
40+
}
41+
42+
func FileFromMedia(media tg.MessageMediaClass) (*types.File, error) {
43+
switch media := media.(type) {
44+
case *tg.MessageMediaDocument:
45+
document, ok := media.Document.AsNotEmpty()
46+
if !ok {
47+
return nil, fmt.Errorf("unexpected type %T", media)
48+
}
49+
var fileName string
50+
for _, attribute := range document.Attributes {
51+
if name, ok := attribute.(*tg.DocumentAttributeFilename); ok {
52+
fileName = name.FileName
53+
break
54+
}
55+
}
56+
return &types.File{
57+
Location: document.AsInputDocumentFileLocation(),
58+
FileSize: document.Size,
59+
FileName: fileName,
60+
MimeType: document.MimeType,
61+
ID: document.ID,
62+
}, nil
63+
// TODO: add photo support
64+
}
65+
return nil, fmt.Errorf("unexpected type %T", media)
66+
}
67+
68+
func GetChannelById(ctx context.Context, client *telegram.Client) (*tg.InputChannel, error) {
69+
channel := &tg.InputChannel{}
70+
inputChannel := &tg.InputChannel{
71+
ChannelID: config.ValueOf.LogChannelID,
72+
}
73+
channels, err := client.API().ChannelsGetChannels(ctx, []tg.InputChannelClass{inputChannel})
74+
if err != nil {
75+
return nil, err
76+
}
77+
if len(channels.GetChats()) == 0 {
78+
return nil, errors.New("no channels found")
79+
}
80+
channel = channels.GetChats()[0].(*tg.Channel).AsInput()
81+
return channel, nil
82+
}

utils/linear_reader.go

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package utils
2+
3+
import (
4+
"EverythingSuckz/fsb/types"
5+
"context"
6+
"fmt"
7+
"io"
8+
"log"
9+
10+
"github.com/gotd/td/telegram"
11+
"github.com/gotd/td/tg"
12+
)
13+
14+
type linearReader struct {
15+
ctx context.Context
16+
parts []types.Part
17+
pos int
18+
client *telegram.Client
19+
next func() ([]byte, error)
20+
buffer []byte
21+
bytesread int64
22+
chunkSize int64
23+
i int64
24+
contentLength int64
25+
}
26+
27+
func (*linearReader) Close() error {
28+
return nil
29+
}
30+
31+
func NewLinearReader(ctx context.Context, client *telegram.Client, parts []types.Part, contentLength int64) (io.ReadCloser, error) {
32+
33+
r := &linearReader{
34+
ctx: ctx,
35+
parts: parts,
36+
client: client,
37+
chunkSize: int64(1024 * 1024),
38+
contentLength: contentLength,
39+
}
40+
41+
r.next = r.partStream()
42+
43+
return r, nil
44+
}
45+
46+
func (r *linearReader) Read(p []byte) (n int, err error) {
47+
48+
if r.bytesread == r.contentLength {
49+
log.Println("Linear Reader: EOF (bytesread == contentLength)")
50+
return 0, io.EOF
51+
}
52+
53+
if r.i >= int64(len(r.buffer)) {
54+
r.buffer, err = r.next()
55+
if err != nil {
56+
return 0, err
57+
}
58+
if len(r.buffer) == 0 {
59+
r.pos++
60+
if r.pos == len(r.parts) {
61+
log.Println("Linear Reader: EOF (pos==n(parts))")
62+
return 0, io.EOF
63+
} else {
64+
r.next = r.partStream()
65+
r.buffer, err = r.next()
66+
if err != nil {
67+
return 0, err
68+
}
69+
}
70+
71+
}
72+
r.i = 0
73+
}
74+
75+
n = copy(p, r.buffer[r.i:])
76+
77+
r.i += int64(n)
78+
79+
r.bytesread += int64(n)
80+
81+
return n, nil
82+
}
83+
84+
func (r *linearReader) chunk(offset int64, limit int64) ([]byte, error) {
85+
86+
req := &tg.UploadGetFileRequest{
87+
Offset: offset,
88+
Limit: int(limit),
89+
Location: r.parts[r.pos].Location,
90+
}
91+
92+
res, err := r.client.API().UploadGetFile(r.ctx, req)
93+
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
switch result := res.(type) {
99+
case *tg.UploadFile:
100+
return result.Bytes, nil
101+
default:
102+
return nil, fmt.Errorf("unexpected type %T", r)
103+
}
104+
}
105+
106+
func (r *linearReader) partStream() func() ([]byte, error) {
107+
108+
start := r.parts[r.pos].Start
109+
end := r.parts[r.pos].End
110+
offset := start - (start % r.chunkSize)
111+
112+
firstPartCut := start - offset
113+
lastPartCut := (end % r.chunkSize) + 1
114+
partCount := int((end - offset + r.chunkSize) / r.chunkSize)
115+
currentPart := 1
116+
117+
readData := func() ([]byte, error) {
118+
if currentPart > partCount {
119+
return make([]byte, 0), nil
120+
}
121+
res, err := r.chunk(offset, r.chunkSize)
122+
if err != nil {
123+
return nil, err
124+
}
125+
if len(res) == 0 {
126+
return res, nil
127+
} else if partCount == 1 {
128+
res = res[firstPartCut:lastPartCut]
129+
} else if currentPart == 1 {
130+
res = res[firstPartCut:]
131+
} else if currentPart == partCount {
132+
res = res[:lastPartCut]
133+
}
134+
135+
currentPart++
136+
offset += r.chunkSize
137+
return res, nil
138+
}
139+
return readData
140+
}

0 commit comments

Comments
 (0)