@@ -17,29 +17,20 @@ limitations under the License.
17
17
package kafka
18
18
19
19
import (
20
- "bytes"
21
20
"crypto/tls"
22
21
"crypto/x509"
23
- "encoding/binary"
24
- "encoding/json"
25
- "fmt"
26
- "math"
27
- "regexp"
28
- "strconv"
29
22
"strings"
30
23
"time"
31
24
32
25
"knative.dev/eventing/pkg/adapter"
26
+ "knative.dev/eventing/pkg/kncloudevents"
33
27
"knative.dev/pkg/source"
34
28
35
- "knative.dev/eventing-contrib/kafka/common/pkg/kafka"
36
- sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
37
-
38
29
"context"
39
30
31
+ "knative.dev/eventing-contrib/kafka/common/pkg/kafka"
32
+
40
33
"github.com/Shopify/sarama"
41
- cloudevents "github.com/cloudevents/sdk-go/legacy"
42
- "github.com/cloudevents/sdk-go/legacy/pkg/cloudevents/client"
43
34
"go.uber.org/zap"
44
35
"knative.dev/pkg/logging"
45
36
)
@@ -82,23 +73,23 @@ func NewEnvConfig() adapter.EnvConfigAccessor {
82
73
}
83
74
84
75
type Adapter struct {
85
- config * adapterConfig
86
- ceClient client. Client
87
- reporter source.StatsReporter
88
- logger * zap.Logger
89
- keyTypeMapper func ([]byte ) interface {}
76
+ config * adapterConfig
77
+ httpBindingsSender * kncloudevents. HttpBindingSender
78
+ reporter source.StatsReporter
79
+ logger * zap.Logger
80
+ keyTypeMapper func ([]byte ) interface {}
90
81
}
91
82
92
- func NewAdapter (ctx context.Context , processed adapter.EnvConfigAccessor , ceClient client. Client , reporter source.StatsReporter ) adapter.Adapter {
83
+ func NewAdapter (ctx context.Context , processed adapter.EnvConfigAccessor , httpBindingsSender * kncloudevents. HttpBindingSender , reporter source.StatsReporter ) adapter.Adapter {
93
84
logger := logging .FromContext (ctx ).Desugar ()
94
85
config := processed .(* adapterConfig )
95
86
96
87
return & Adapter {
97
- config : config ,
98
- ceClient : ceClient ,
99
- reporter : reporter ,
100
- logger : logger ,
101
- keyTypeMapper : getKeyTypeMapper (config .KeyType ),
88
+ config : config ,
89
+ httpBindingsSender : httpBindingsSender ,
90
+ reporter : reporter ,
91
+ logger : logger ,
92
+ keyTypeMapper : getKeyTypeMapper (config .KeyType ),
102
93
}
103
94
}
104
95
@@ -166,39 +157,17 @@ func (a *Adapter) Start(stopCh <-chan struct{}) error {
166
157
// --------------------------------------------------------------------
167
158
168
159
func (a * Adapter ) Handle (ctx context.Context , msg * sarama.ConsumerMessage ) (bool , error ) {
169
- var err error
170
- event := cloudevents .NewEvent (cloudevents .VersionV1 )
171
-
172
- if strings .Contains (getContentType (msg ), "application/cloudevents+json" ) {
173
- err = json .Unmarshal (msg .Value , & event )
174
- } else {
175
- // Check if payload is a valid json
176
- if ! json .Valid (msg .Value ) {
177
- return true , nil // Message is malformed, commit the offset so it won't be reprocessed
178
- }
179
-
180
- event .SetID (makeEventId (msg .Partition , msg .Offset ))
181
- event .SetTime (msg .Timestamp )
182
- event .SetType (sourcesv1alpha1 .KafkaEventType )
183
- event .SetSource (sourcesv1alpha1 .KafkaEventSource (a .config .Namespace , a .config .Name , msg .Topic ))
184
- event .SetSubject (makeEventSubject (msg .Partition , msg .Offset ))
185
- event .SetDataContentType (cloudevents .ApplicationJSON )
186
-
187
- dumpKafkaMetaToEvent (& event , a .keyTypeMapper , msg )
188
-
189
- err = event .SetData (msg .Value )
190
- }
191
-
160
+ req , err := a .httpBindingsSender .NewCloudEventRequest ()
192
161
if err != nil {
193
- return true , err // Message is malformed, commit the offset so it won't be reprocessed
162
+ return false , err
194
163
}
195
164
196
- // Check before writing log since event.String() allocates and uses a lot of time
197
- if ce := a . logger . Check ( zap . DebugLevel , "debugging" ); ce != nil {
198
- a . logger . Debug ( "Sending cloud event" , zap . String ( "event" , event . String ()))
165
+ err = a . ConsumerMessageToHttpRequest ( ctx , msg , req )
166
+ if err != nil {
167
+ return true , err
199
168
}
200
169
201
- rctx , _ , err := a .ceClient .Send (ctx , event )
170
+ res , err := a .httpBindingsSender .Send (ctx , req )
202
171
203
172
if err != nil {
204
173
a .logger .Debug ("Error while sending the message" , zap .Error (err ))
@@ -207,13 +176,11 @@ func (a *Adapter) Handle(ctx context.Context, msg *sarama.ConsumerMessage) (bool
207
176
208
177
reportArgs := & source.ReportArgs {
209
178
Namespace : a .config .Namespace ,
210
- EventSource : event .Source (),
211
- EventType : event .Type (),
212
179
Name : a .config .Name ,
213
180
ResourceGroup : resourceGroup ,
214
181
}
215
182
216
- _ = a .reporter .ReportEventCount (reportArgs , cloudevents . HTTPTransportContextFrom ( rctx ) .StatusCode )
183
+ _ = a .reporter .ReportEventCount (reportArgs , res .StatusCode )
217
184
return true , nil
218
185
}
219
186
@@ -252,89 +219,6 @@ func newTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
252
219
return config , nil
253
220
}
254
221
255
- func makeEventId (partition int32 , offset int64 ) string {
256
- return fmt .Sprintf ("partition:%s/offset:%s" , strconv .Itoa (int (partition )), strconv .FormatInt (offset , 10 ))
257
- }
258
-
259
- // KafkaEventSubject returns the Kafka CloudEvent subject of the message.
260
- func makeEventSubject (partition int32 , offset int64 ) string {
261
- return fmt .Sprintf ("partition:%d#%d" , partition , offset )
262
- }
263
-
264
- func getContentType (msg * sarama.ConsumerMessage ) string {
265
- for _ , h := range msg .Headers {
266
- if bytes .Equal (h .Key , []byte ("content-type" )) {
267
- return string (h .Value )
268
- }
269
- }
270
- return ""
271
- }
272
-
273
- var replaceBadCharacters = regexp .MustCompile (`[^a-zA-Z0-9]` ).ReplaceAllString
274
-
275
- func dumpKafkaMetaToEvent (event * cloudevents.Event , keyTypeMapper func ([]byte ) interface {}, msg * sarama.ConsumerMessage ) {
276
- if msg .Key != nil {
277
- event .SetExtension ("key" , keyTypeMapper (msg .Key ))
278
- }
279
- for _ , h := range msg .Headers {
280
- event .SetExtension ("kafkaheader" + replaceBadCharacters (string (h .Key ), "" ), string (h .Value ))
281
- }
282
- }
283
-
284
- func getKeyTypeMapper (keyType string ) func ([]byte ) interface {} {
285
- var keyTypeMapper func ([]byte ) interface {}
286
- switch keyType {
287
- case "int" :
288
- keyTypeMapper = func (by []byte ) interface {} {
289
- // Took from https://github.com/axbaretto/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
290
- if len (by ) == 4 {
291
- var res int32
292
- for _ , b := range by {
293
- res <<= 8
294
- res |= int32 (b & 0xFF )
295
- }
296
- return res
297
- } else if len (by ) == 8 {
298
- var res int64
299
- for _ , b := range by {
300
- res <<= 8
301
- res |= int64 (b & 0xFF )
302
- }
303
- return res
304
- } else {
305
- // Fallback to byte array
306
- return by
307
- }
308
- }
309
- case "float" :
310
- keyTypeMapper = func (by []byte ) interface {} {
311
- // BigEndian is specified in https://kafka.apache.org/protocol#protocol_types
312
- // Number is converted to string because
313
- if len (by ) == 4 {
314
- intermediate := binary .BigEndian .Uint32 (by )
315
- fl := math .Float32frombits (intermediate )
316
- return strconv .FormatFloat (float64 (fl ), 'f' , - 1 , 64 )
317
- } else if len (by ) == 8 {
318
- intermediate := binary .BigEndian .Uint64 (by )
319
- fl := math .Float64frombits (intermediate )
320
- return strconv .FormatFloat (fl , 'f' , - 1 , 64 )
321
- } else {
322
- // Fallback to byte array
323
- return by
324
- }
325
- }
326
- case "byte-array" :
327
- keyTypeMapper = func (bytes []byte ) interface {} {
328
- return bytes
329
- }
330
- default :
331
- keyTypeMapper = func (bytes []byte ) interface {} {
332
- return string (bytes )
333
- }
334
- }
335
- return keyTypeMapper
336
- }
337
-
338
222
// verifyCertSkipHostname verifies certificates in the same way that the
339
223
// default TLS handshake does, except it skips hostname verification. It must
340
224
// be used with InsecureSkipVerify.
0 commit comments