@@ -17,29 +17,22 @@ limitations under the License.
17
17
package kafka
18
18
19
19
import (
20
- "bytes"
21
20
"crypto/tls"
22
21
"crypto/x509"
23
- "encoding/binary"
24
- "encoding/json"
25
22
"fmt"
26
- "math"
27
- "regexp"
28
- "strconv"
23
+ "net/http"
29
24
"strings"
30
25
"time"
31
26
32
27
"knative.dev/eventing/pkg/adapter"
28
+ "knative.dev/eventing/pkg/kncloudevents"
33
29
"knative.dev/pkg/source"
34
30
35
- "knative.dev/eventing-contrib/kafka/common/pkg/kafka"
36
- sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
37
-
38
31
"context"
39
32
33
+ "knative.dev/eventing-contrib/kafka/common/pkg/kafka"
34
+
40
35
"github.com/Shopify/sarama"
41
- cloudevents "github.com/cloudevents/sdk-go/legacy"
42
- "github.com/cloudevents/sdk-go/legacy/pkg/cloudevents/client"
43
36
"go.uber.org/zap"
44
37
"knative.dev/pkg/logging"
45
38
)
@@ -82,23 +75,26 @@ func NewEnvConfig() adapter.EnvConfigAccessor {
82
75
}
83
76
84
77
type Adapter struct {
85
- config * adapterConfig
86
- ceClient client. Client
87
- reporter source.StatsReporter
88
- logger * zap.Logger
89
- keyTypeMapper func ([]byte ) interface {}
78
+ config * adapterConfig
79
+ httpMessageSender * kncloudevents. HttpMessageSender
80
+ reporter source.StatsReporter
81
+ logger * zap.Logger
82
+ keyTypeMapper func ([]byte ) interface {}
90
83
}
91
84
92
- func NewAdapter (ctx context.Context , processed adapter.EnvConfigAccessor , ceClient client.Client , reporter source.StatsReporter ) adapter.Adapter {
85
+ var _ adapter.MessageAdapter = (* Adapter )(nil )
86
+ var _ adapter.MessageAdapterConstructor = NewAdapter
87
+
88
+ func NewAdapter (ctx context.Context , processed adapter.EnvConfigAccessor , httpMessageSender * kncloudevents.HttpMessageSender , reporter source.StatsReporter ) adapter.MessageAdapter {
93
89
logger := logging .FromContext (ctx ).Desugar ()
94
90
config := processed .(* adapterConfig )
95
91
96
92
return & Adapter {
97
- config : config ,
98
- ceClient : ceClient ,
99
- reporter : reporter ,
100
- logger : logger ,
101
- keyTypeMapper : getKeyTypeMapper (config .KeyType ),
93
+ config : config ,
94
+ httpMessageSender : httpMessageSender ,
95
+ reporter : reporter ,
96
+ logger : logger ,
97
+ keyTypeMapper : getKeyTypeMapper (config .KeyType ),
102
98
}
103
99
}
104
100
@@ -166,54 +162,35 @@ func (a *Adapter) Start(stopCh <-chan struct{}) error {
166
162
// --------------------------------------------------------------------
167
163
168
164
func (a * Adapter ) Handle (ctx context.Context , msg * sarama.ConsumerMessage ) (bool , error ) {
169
- var err error
170
- event := cloudevents .NewEvent (cloudevents .VersionV1 )
171
-
172
- if strings .Contains (getContentType (msg ), "application/cloudevents+json" ) {
173
- err = json .Unmarshal (msg .Value , & event )
174
- } else {
175
- // Check if payload is a valid json
176
- if ! json .Valid (msg .Value ) {
177
- return true , nil // Message is malformed, commit the offset so it won't be reprocessed
178
- }
179
-
180
- event .SetID (makeEventId (msg .Partition , msg .Offset ))
181
- event .SetTime (msg .Timestamp )
182
- event .SetType (sourcesv1alpha1 .KafkaEventType )
183
- event .SetSource (sourcesv1alpha1 .KafkaEventSource (a .config .Namespace , a .config .Name , msg .Topic ))
184
- event .SetSubject (makeEventSubject (msg .Partition , msg .Offset ))
185
- event .SetDataContentType (cloudevents .ApplicationJSON )
186
-
187
- dumpKafkaMetaToEvent (& event , a .keyTypeMapper , msg )
188
-
189
- err = event .SetData (msg .Value )
190
- }
191
-
165
+ req , err := a .httpMessageSender .NewCloudEventRequest (ctx )
192
166
if err != nil {
193
- return true , err // Message is malformed, commit the offset so it won't be reprocessed
167
+ return false , err
194
168
}
195
169
196
- // Check before writing log since event.String() allocates and uses a lot of time
197
- if ce := a . logger . Check ( zap . DebugLevel , "debugging" ); ce != nil {
198
- a . logger . Debug ( "Sending cloud event" , zap . String ( "event" , event . String ()))
170
+ err = a . ConsumerMessageToHttpRequest ( ctx , msg , req , a . logger )
171
+ if err != nil {
172
+ return true , err
199
173
}
200
174
201
- rctx , _ , err := a .ceClient .Send (ctx , event )
175
+ res , err := a .httpMessageSender .Send (req )
202
176
203
177
if err != nil {
204
178
a .logger .Debug ("Error while sending the message" , zap .Error (err ))
205
179
return false , err // Error while sending, don't commit offset
206
180
}
207
181
182
+ if res .StatusCode / 100 != 2 {
183
+ a .logger .Debug ("Unexpected status code" , zap .Int ("status code" , res .StatusCode ))
184
+ return false , fmt .Errorf ("%d %s" , res .StatusCode , http .StatusText (res .StatusCode ))
185
+ }
186
+
208
187
reportArgs := & source.ReportArgs {
209
188
Namespace : a .config .Namespace ,
210
- EventSource : event .Source (),
211
- EventType : event .Type (),
212
189
Name : a .config .Name ,
213
190
ResourceGroup : resourceGroup ,
214
191
}
215
192
216
- _ = a .reporter .ReportEventCount (reportArgs , cloudevents . HTTPTransportContextFrom ( rctx ) .StatusCode )
193
+ _ = a .reporter .ReportEventCount (reportArgs , res .StatusCode )
217
194
return true , nil
218
195
}
219
196
@@ -252,89 +229,6 @@ func newTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
252
229
return config , nil
253
230
}
254
231
255
- func makeEventId (partition int32 , offset int64 ) string {
256
- return fmt .Sprintf ("partition:%s/offset:%s" , strconv .Itoa (int (partition )), strconv .FormatInt (offset , 10 ))
257
- }
258
-
259
- // KafkaEventSubject returns the Kafka CloudEvent subject of the message.
260
- func makeEventSubject (partition int32 , offset int64 ) string {
261
- return fmt .Sprintf ("partition:%d#%d" , partition , offset )
262
- }
263
-
264
- func getContentType (msg * sarama.ConsumerMessage ) string {
265
- for _ , h := range msg .Headers {
266
- if bytes .Equal (h .Key , []byte ("content-type" )) {
267
- return string (h .Value )
268
- }
269
- }
270
- return ""
271
- }
272
-
273
- var replaceBadCharacters = regexp .MustCompile (`[^a-zA-Z0-9]` ).ReplaceAllString
274
-
275
- func dumpKafkaMetaToEvent (event * cloudevents.Event , keyTypeMapper func ([]byte ) interface {}, msg * sarama.ConsumerMessage ) {
276
- if msg .Key != nil {
277
- event .SetExtension ("key" , keyTypeMapper (msg .Key ))
278
- }
279
- for _ , h := range msg .Headers {
280
- event .SetExtension ("kafkaheader" + replaceBadCharacters (string (h .Key ), "" ), string (h .Value ))
281
- }
282
- }
283
-
284
- func getKeyTypeMapper (keyType string ) func ([]byte ) interface {} {
285
- var keyTypeMapper func ([]byte ) interface {}
286
- switch keyType {
287
- case "int" :
288
- keyTypeMapper = func (by []byte ) interface {} {
289
- // Took from https://github.com/axbaretto/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
290
- if len (by ) == 4 {
291
- var res int32
292
- for _ , b := range by {
293
- res <<= 8
294
- res |= int32 (b & 0xFF )
295
- }
296
- return res
297
- } else if len (by ) == 8 {
298
- var res int64
299
- for _ , b := range by {
300
- res <<= 8
301
- res |= int64 (b & 0xFF )
302
- }
303
- return res
304
- } else {
305
- // Fallback to byte array
306
- return by
307
- }
308
- }
309
- case "float" :
310
- keyTypeMapper = func (by []byte ) interface {} {
311
- // BigEndian is specified in https://kafka.apache.org/protocol#protocol_types
312
- // Number is converted to string because
313
- if len (by ) == 4 {
314
- intermediate := binary .BigEndian .Uint32 (by )
315
- fl := math .Float32frombits (intermediate )
316
- return strconv .FormatFloat (float64 (fl ), 'f' , - 1 , 64 )
317
- } else if len (by ) == 8 {
318
- intermediate := binary .BigEndian .Uint64 (by )
319
- fl := math .Float64frombits (intermediate )
320
- return strconv .FormatFloat (fl , 'f' , - 1 , 64 )
321
- } else {
322
- // Fallback to byte array
323
- return by
324
- }
325
- }
326
- case "byte-array" :
327
- keyTypeMapper = func (bytes []byte ) interface {} {
328
- return bytes
329
- }
330
- default :
331
- keyTypeMapper = func (bytes []byte ) interface {} {
332
- return string (bytes )
333
- }
334
- }
335
- return keyTypeMapper
336
- }
337
-
338
232
// verifyCertSkipHostname verifies certificates in the same way that the
339
233
// default TLS handshake does, except it skips hostname verification. It must
340
234
// be used with InsecureSkipVerify.
0 commit comments