Skip to content

Commit bb7c050

Browse files
committed
minor enhancements
1 parent 93d259d commit bb7c050

File tree

15 files changed

+229
-52
lines changed

15 files changed

+229
-52
lines changed

Makefile

+11
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
11
test:
22
go test ./... -v --tags=integration
33

4+
build:
5+
docker compose up -d
6+
7+
clean:
8+
docker compose down --rmi all --volumes
9+
10+
code-gen:
11+
go generate ./...
12+
13+
schema-gen:
14+
go run script/avsc2json/main.go schema/avro/expense.avsc > docker/schema/expense.json

cmd/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func main() {
4646
cc := events.NewKafkaClient(cfg)
4747
api := app.Group("/api")
4848
router.ExpenseRouter(api, ctx, cc, cfg)
49+
router.PaymentRouter(api, ctx, cc, cfg)
4950
// Listen from a different goroutine
5051
go func() {
5152
if err := app.Listen(":8083"); err != nil {
@@ -55,7 +56,7 @@ func main() {
5556
c := make(chan os.Signal, 1) // Create channel to signify a signal being sent
5657
signal.Notify(c, os.Interrupt, syscall.SIGTERM) // When an interrupt or termination signal is sent, notify the channel
5758

58-
_ = <-c // This blocks the main thread until an interrupt is received
59+
<-c // This blocks the main thread until an interrupt is received
5960
log.Println("Gracefully shutting down...")
6061
if err := app.Shutdown(); err != nil {
6162
log.Printf("Error during shutdown: %v", err)

docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ services:
1111
- init-topic
1212
environment:
1313
BROKERS: kafka:9092
14-
TOPICS: expense-topic
14+
TOPICS: expense-topic,payment-topic,transaction-topic
1515
SCHEMAREGISTRY: kafka:8081
1616

1717
kafka:

docker/schema/expense.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
"schema": "{ \"type\": \"record\", \"namespace\": \"com.expense\", \"name\": \"Expense\", \"fields\": [ { \"name\": \"expense_id\", \"type\": \"string\" }, { \"name\": \"user_id\", \"type\": \"string\" }, { \"name\": \"category\", \"type\": \"string\" }, { \"name\": \"amount\", \"type\": \"double\" }, { \"name\": \"currency\", \"type\": \"string\" }, { \"name\": \"timestamp\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\" }, { \"name\": \"description\", \"type\": [\"null\", \"string\"], \"default\": null }, { \"name\": \"receipt\", \"type\": [\"null\", \"string\"], \"default\": null } ]}"
3-
}
2+
"schema": "{\"name\":\"com.expense.Expense\",\"type\":\"record\",\"fields\":[{\"name\":\"expense_id\",\"type\":\"string\"},{\"name\":\"user_id\",\"type\":\"string\"},{\"name\":\"category\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"description\",\"type\":[\"null\",\"string\"]},{\"name\":\"receipt\",\"type\":[\"null\",\"string\"]}]}"
3+
}

go.mod

+6-6
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ require (
1212
)
1313

1414
require (
15-
github.com/andybalholm/brotli v1.0.5 // indirect
16-
github.com/google/uuid v1.5.0 // indirect
15+
github.com/andybalholm/brotli v1.1.0 // indirect
16+
github.com/google/uuid v1.6.0 // indirect
1717
github.com/json-iterator/go v1.1.12 // indirect
1818
github.com/klauspost/compress v1.17.7 // indirect
1919
github.com/mattn/go-colorable v0.1.13 // indirect
@@ -22,12 +22,12 @@ require (
2222
github.com/mitchellh/mapstructure v1.5.0 // indirect
2323
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
2424
github.com/modern-go/reflect2 v1.0.2 // indirect
25-
github.com/pierrec/lz4/v4 v4.1.19 // indirect
26-
github.com/rivo/uniseg v0.2.0 // indirect
25+
github.com/pierrec/lz4/v4 v4.1.21 // indirect
26+
github.com/rivo/uniseg v0.4.7 // indirect
2727
github.com/stretchr/testify v1.7.5 // indirect
2828
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
2929
github.com/valyala/bytebufferpool v1.0.0 // indirect
30-
github.com/valyala/fasthttp v1.51.0 // indirect
30+
github.com/valyala/fasthttp v1.52.0 // indirect
3131
github.com/valyala/tcplisten v1.0.0 // indirect
32-
golang.org/x/sys v0.15.0 // indirect
32+
golang.org/x/sys v0.18.0 // indirect
3333
)

go.sum

+14-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
2-
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
1+
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
2+
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
33
github.com/caarlos0/env/v10 v10.0.0 h1:yIHUBZGsyqCnpTkbjk8asUlx6RFhhEs+h7TOBdgdzXA=
44
github.com/caarlos0/env/v10 v10.0.0/go.mod h1:ZfulV76NvVPw3tm591U4SwL3Xx9ldzBP9aGxzeN7G18=
55
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -10,8 +10,8 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
1010
github.com/gofiber/fiber/v2 v2.52.4 h1:P+T+4iK7VaqUsq2PALYEfBBo6bJZ4q3FP8cZ84EggTM=
1111
github.com/gofiber/fiber/v2 v2.52.4/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ=
1212
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
13-
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
14-
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
13+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
14+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
1515
github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E=
1616
github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig=
1717
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -32,12 +32,13 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
3232
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
3333
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
3434
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
35-
github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4=
36-
github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
35+
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
36+
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
3737
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3838
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
39-
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
4039
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
40+
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
41+
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
4142
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
4243
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
4344
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@@ -52,16 +53,16 @@ github.com/twmb/franz-go/pkg/sr v0.0.0-20240307025822-351e7fae879c h1:Qu83jF+b04
5253
github.com/twmb/franz-go/pkg/sr v0.0.0-20240307025822-351e7fae879c/go.mod h1:egX+kicq83hpztv3PRCXKLNO132Ol9JTAJOCRZcqUxI=
5354
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
5455
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
55-
github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA=
56-
github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g=
56+
github.com/valyala/fasthttp v1.52.0 h1:wqBQpxH71XW0e2g+Og4dzQM8pk34aFYlA1Ga8db7gU0=
57+
github.com/valyala/fasthttp v1.52.0/go.mod h1:hf5C4QnVMkNXMspnsUlfM3WitlgYflyhHYoKol/szxQ=
5758
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
5859
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
59-
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
60-
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
60+
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
61+
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
6162
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
6263
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
63-
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
64-
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
64+
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
65+
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
6566
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
6667
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
6768
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

pkg/config/kafka.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111

1212
// Config represents the application configuration.
1313
type Config struct {
14-
Brokers string `env:"BROKERS" envDefault:"localhost:9092"`
15-
Topics string `env:"TOPICS" envDefault:"expense-topic"`
16-
SchemaRegistry string `env:"SCHEMAREGISTRY" envDefault:"localhost:8081"`
14+
Brokers string `env:"BROKERS" envDefault:"localhost:9092"`
15+
Topics []string `env:"TOPICS" envDefault:"expense-topic,payment-topic,transaction-topic"`
16+
SchemaRegistry string `env:"SCHEMAREGISTRY" envDefault:"localhost:8081"`
1717
}
1818

1919
// KafkaTLS represents the configuration for Kafka TLS settings.
@@ -28,7 +28,7 @@ type KafkaTLS struct {
2828
// NewConfig creates a new Config instance by parsing environment variables.
2929
// It returns a pointer to the Config and an error if there was a problem parsing the environment variables.
3030
func NewConfig() (*Config, error) {
31-
var cfg Config
31+
cfg := Config{}
3232
if err := env.Parse(&cfg); err != nil {
3333
return nil, fmt.Errorf("error processing environment variables: %w", err)
3434
}

pkg/events/Produce.go

+15-12
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"context"
77
"fmt"
88

9-
"github.com/dipjyotimetia/event-stream/gen"
109
"github.com/dipjyotimetia/event-stream/pkg/config"
1110
"github.com/hamba/avro/v2"
1211
"github.com/twmb/franz-go/pkg/kgo"
@@ -29,7 +28,6 @@ func NewKafkaClient(cfg *config.Config) *KafkaClient {
2928
seeds := []string{cfg.Brokers}
3029
client, err := kgo.NewClient(
3130
kgo.SeedBrokers(seeds...),
32-
kgo.DefaultProduceTopic("expense-topic"),
3331
)
3432
if err != nil {
3533
fmt.Printf("error initializing Kafka producer client: %v\n", err)
@@ -53,30 +51,34 @@ func (c KafkaClient) Producer(ctx context.Context, record *kgo.Record) error {
5351
}
5452

5553
// getSchema retrieves the Avro schema for the specified subject from the schema registry.
56-
func getSchema(cfg config.Config, subject string) sr.SubjectSchema {
54+
// getSchema retrieves the Avro schema for the specified subject from the schema registry.
55+
func getSchema(cfg config.Config, subject string) (sr.SubjectSchema, error) {
5756
rcl, err := sr.NewClient(sr.URLs(cfg.SchemaRegistry))
5857
if err != nil {
59-
_ = fmt.Errorf("unable to create schema registry client")
58+
return sr.SubjectSchema{}, fmt.Errorf("unable to create schema registry client: %w", err)
6059
}
6160
schemaSubject, err := rcl.SchemaByVersion(context.Background(), subject, -1)
6261
if err != nil {
63-
_ = fmt.Errorf("unable to get schema registry client")
62+
return sr.SubjectSchema{}, fmt.Errorf("unable to get schema registry client: %w", err)
6463
}
65-
return schemaSubject
64+
return schemaSubject, nil
6665
}
6766

68-
// SetExpenseRecord encodes the provided data using Avro and creates a Kafka record with the encoded value.
69-
func (c KafkaClient) SetExpenseRecord(cfg *config.Config, ts interface{}) *kgo.Record {
70-
schemaSubject := getSchema(*cfg, "expense-topic-value")
67+
// SetRecord encodes the provided data using Avro and creates a Kafka record with the encoded value.
68+
func (c KafkaClient) SetRecord(cfg *config.Config, ts interface{}, topic string, schemaType interface{}) (*kgo.Record, error) {
69+
schemaSubject, err := getSchema(*cfg, topic+"-value")
70+
if err != nil {
71+
return nil, err
72+
}
7173
avroSchema, err := avro.Parse(schemaSubject.Schema.Schema)
7274
if err != nil {
73-
_ = fmt.Errorf("unable to parse avro schema")
75+
return nil, fmt.Errorf("unable to parse avro schema: %w", err)
7476
}
7577

7678
var serde sr.Serde
7779
serde.Register(
7880
schemaSubject.ID,
79-
gen.Expense{},
81+
schemaType,
8082
sr.EncodeFn(func(v interface{}) ([]byte, error) {
8183
return avro.Marshal(avroSchema, v)
8284
}),
@@ -87,8 +89,9 @@ func (c KafkaClient) SetExpenseRecord(cfg *config.Config, ts interface{}) *kgo.R
8789
tt := serde.MustEncode(ts)
8890
record := kgo.Record{
8991
Value: tt,
92+
Topic: topic,
9093
}
91-
return &record
94+
return &record, nil
9295
}
9396

9497
// Ptr returns a pointer to the provided value.

pkg/handler/expenseHandler.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,7 @@ import (
1515

1616
// ExpenseHandler returns an HTTP handler function for creating expense records.
1717
// It takes a KafkaClient instance and a Config instance as input.
18-
// The handler function parses the JSON request body into an Expense struct,
19-
// sets the Timestamp field to the current time if it's not already set,
20-
// creates a Kafka record with the expense data, and sends it to the Kafka topic.
21-
// Finally, it returns a success response.
2218
func ExpenseHandler(ctx context.Context, client *events.KafkaClient, cfg *config.Config) fiber.Handler {
23-
// Parse the JSON request body into an Expense struct
2419
return func(c *fiber.Ctx) error {
2520
var expense gen.Expense
2621

@@ -34,15 +29,16 @@ func ExpenseHandler(ctx context.Context, client *events.KafkaClient, cfg *config
3429
expense.Timestamp = time.Now().UnixNano() / int64(time.Millisecond)
3530
}
3631

37-
// Create a Kafka record with the expense data
38-
record := client.SetExpenseRecord(cfg, expense)
32+
record, err := client.SetRecord(cfg, expense, "expense-topic", gen.Expense{})
33+
if err != nil {
34+
c.Status(http.StatusInternalServerError)
35+
}
3936

40-
// Send the Kafka record to the Kafka topic
41-
err := client.Producer(ctx, record)
37+
err = client.Producer(ctx, record)
4238
if err != nil {
4339
c.Status(http.StatusInternalServerError)
4440
}
45-
// Return a success response
41+
4642
c.SendStatus(http.StatusOK) //nolint:errcheck
4743
return c.Send([]byte("expense created successfully"))
4844
}

pkg/handler/paymentHandler.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"time"
7+
8+
"github.com/dipjyotimetia/event-stream/gen"
9+
"github.com/dipjyotimetia/event-stream/pkg/config"
10+
"github.com/dipjyotimetia/event-stream/pkg/events"
11+
"github.com/gofiber/fiber/v2"
12+
)
13+
14+
func PaymentHandler(ctx context.Context, client *events.KafkaClient, cfg *config.Config) fiber.Handler {
15+
return func(c *fiber.Ctx) error {
16+
var payment gen.Payment
17+
18+
if err := c.BodyParser(&payment); err != nil {
19+
c.Status(http.StatusBadRequest)
20+
return err
21+
}
22+
23+
// Set the Timestamp field to current time if it's not already set
24+
if payment.Timestamp == 0 {
25+
payment.Timestamp = time.Now().UnixNano() / int64(time.Millisecond)
26+
}
27+
28+
record, err := client.SetRecord(cfg, payment, "payment-topic", gen.Payment{})
29+
if err != nil {
30+
c.Status(http.StatusInternalServerError)
31+
}
32+
33+
err = client.Producer(ctx, record)
34+
if err != nil {
35+
c.Status(http.StatusInternalServerError)
36+
}
37+
c.SendStatus(http.StatusOK) //nolint:errcheck
38+
return c.Send([]byte("expense created successfully"))
39+
}
40+
}

pkg/router/expenseRouter.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
)
1111

1212
// ExpenseRouter is the Router for GoFiber App
13-
func ExpenseRouter(app fiber.Router,ctx context.Context, client *events.KafkaClient, cfg *config.Config) {
14-
app.Post("/expense", handler.ExpenseHandler(ctx,client, cfg))
13+
func ExpenseRouter(app fiber.Router, ctx context.Context, client *events.KafkaClient, cfg *config.Config) {
14+
app.Post("/expense", handler.ExpenseHandler(ctx, client, cfg))
15+
}
16+
17+
func PaymentRouter(app fiber.Router, ctx context.Context, client *events.KafkaClient, cfg *config.Config) {
18+
app.Post("/payment", handler.PaymentHandler(ctx, client, cfg))
1519
}

script/avsc2json/main.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"os"
7+
8+
"github.com/hamba/avro/v2"
9+
)
10+
11+
func main() {
12+
if len(os.Args) != 2 {
13+
fmt.Println("Usage: avsc_to_json <input.avsc>")
14+
return
15+
}
16+
17+
avscFilename := os.Args[1]
18+
19+
avscBytes, err := os.ReadFile(avscFilename)
20+
if err != nil {
21+
fmt.Println("Error reading AVSC file:", err)
22+
return
23+
}
24+
25+
// Load the schema from a file or some other source
26+
schema, err := avro.Parse(string(avscBytes))
27+
if err != nil {
28+
fmt.Println("Error parsing schema:", err)
29+
return
30+
}
31+
output := map[string]interface{}{
32+
"schema": schema.String(),
33+
}
34+
35+
jsonBytes, err := json.MarshalIndent(output, "", " ")
36+
if err != nil {
37+
fmt.Println("Error marshalling to JSON:", err)
38+
return
39+
}
40+
fmt.Println(string(jsonBytes))
41+
42+
// jsonFilename := strings.TrimSuffix(avscFilename, ".avsc") + ".json"
43+
// err = os.WriteFile(jsonFilename, jsonBytes, 0644) //nolint:gosec
44+
// if err != nil {
45+
// fmt.Println("Error writing JSON file:", err)
46+
// return
47+
// }
48+
}

script/avsc2json/readme.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# AVSC to JSON Converter
2+
This script is a simple utility written in Go that converts Avro schema files (.avsc) to JSON format.
3+
4+
### Usage
5+
To use this script, you need to pass the .avsc file as a command-line argument:
6+
7+
Replace <input.avsc> with the path to your Avro schema file.
8+
9+
### How It Works
10+
The script reads the Avro schema file specified as a command-line argument. It then converts the schema to JSON format and writes the output to a new file with the same name as the input file but with a .json extension.
11+
12+
For example, if you run go run main.go example.avsc, the script will create a new file named example.json with the JSON representation of the Avro schema.
13+
14+
### Error Handling
15+
If the script encounters an error while reading the Avro schema file or converting it to JSON, it will print an error message and exit.
16+
17+
### Contributing
18+
Contributions are welcome. Please submit a pull request if you have any improvements or bug fixes.

tests/expense_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//go:build integration
22
// +build integration
33

4-
package main
4+
package tests
55

66
import (
77
"bytes"

0 commit comments

Comments
 (0)