Skip to content

Commit 4613359

Browse files
authored
Merge branch 'master' into sheinbergon/source-tiktok-marketing-new-streams
2 parents 4e3a609 + 238aaf2 commit 4613359

File tree

38 files changed

+3058
-425
lines changed

38 files changed

+3058
-425
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+57-59
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import io.micronaut.context.annotation.Value
4646
import jakarta.inject.Singleton
4747
import java.math.BigInteger
4848
import java.time.OffsetDateTime
49+
import java.util.SequencedMap
4950
import java.util.UUID
5051

5152
/**
@@ -148,8 +149,8 @@ data class Meta(
148149
TimestampWithTimezoneValue(
149150
OffsetDateTime.parse(
150151
value,
151-
AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER
152-
)
152+
AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER,
153+
),
153154
)
154155
}
155156
}
@@ -158,7 +159,7 @@ data class Meta(
158159
COLUMN_NAME_DATA -> toObjectValue(value.deserializeToNode())
159160
else ->
160161
throw NotImplementedError(
161-
"Column name $metaColumnName is not yet supported. This is probably a bug."
162+
"Column name $metaColumnName is not yet supported. This is probably a bug.",
162163
)
163164
}
164165
}
@@ -196,9 +197,9 @@ data class DestinationRecord(
196197
message.record.emittedAt,
197198
Meta(
198199
message.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) }
199-
?: emptyList()
200+
?: emptyList(),
200201
),
201-
serialized.length.toLong()
202+
serialized.length.toLong(),
202203
)
203204
}
204205
fun asDestinationRecordRaw(): DestinationRecordRaw {
@@ -223,8 +224,8 @@ data class DestinationRecordAirbyteValue(
223224

224225
data class EnrichedDestinationRecordAirbyteValue(
225226
val stream: DestinationStream,
226-
val declaredFields: Map<String, EnrichedAirbyteValue>,
227-
val undeclaredFields: Map<String, JsonNode>,
227+
val declaredFields: LinkedHashMap<String, EnrichedAirbyteValue>,
228+
val undeclaredFields: LinkedHashMap<String, JsonNode>,
228229
val emittedAtMs: Long,
229230
/**
230231
* The airbyte_meta field as received by the destination connector. Note that this field is NOT
@@ -305,9 +306,9 @@ data class DestinationRecordRaw(
305306
rawData.record.emittedAt,
306307
Meta(
307308
rawData.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) }
308-
?: emptyList()
309+
?: emptyList(),
309310
),
310-
serialized.length.toLong()
311+
serialized.length.toLong(),
311312
)
312313
}
313314

@@ -321,45 +322,42 @@ data class DestinationRecordRaw(
321322
fun asEnrichedDestinationRecordAirbyteValue(): EnrichedDestinationRecordAirbyteValue {
322323
val rawJson = asRawJson()
323324

324-
// Get the set of field names defined in the schema
325-
val schemaFields =
325+
// Get the fields from the schema
326+
val schemaFields: SequencedMap<String, FieldType> =
326327
when (schema) {
327-
is ObjectType -> schema.properties.keys
328-
else -> emptySet()
328+
is ObjectType -> schema.properties
329+
else -> linkedMapOf()
329330
}
330331

331-
val declaredFields = mutableMapOf<String, EnrichedAirbyteValue>()
332-
val undeclaredFields = mutableMapOf<String, JsonNode>()
332+
val declaredFields = LinkedHashMap<String, EnrichedAirbyteValue>()
333+
val undeclaredFields = LinkedHashMap<String, JsonNode>()
333334

334-
// Process fields from the raw JSON
335-
rawJson.fields().forEach { (fieldName, fieldValue) ->
336-
when {
337-
schemaFields.contains(fieldName) -> {
338-
// Declared field (exists in schema)
339-
val fieldType =
340-
(schema as ObjectType).properties[fieldName]?.type
341-
?: throw IllegalStateException(
342-
"Field '$fieldName' exists in schema keys but not in properties"
343-
)
344-
345-
val enrichedValue =
346-
EnrichedAirbyteValue(
347-
abValue = NullValue,
348-
type = fieldType,
349-
name = fieldName,
350-
airbyteMetaField = null,
351-
)
352-
AirbyteValueCoercer.coerce(fieldValue.toAirbyteValue(), fieldType)?.let {
353-
enrichedValue.abValue = it
354-
}
355-
?: enrichedValue.nullify(Reason.DESTINATION_SERIALIZATION_ERROR)
335+
// Process fields from the raw JSON.
336+
// First, get the declared fields, in the order defined by the catalog
337+
schemaFields.forEach { (fieldName, fieldType) ->
338+
if (!rawJson.has(fieldName)) {
339+
return@forEach
340+
}
356341

357-
declaredFields[fieldName] = enrichedValue
358-
}
359-
else -> {
360-
// Undeclared field (not in schema)
361-
undeclaredFields[fieldName] = fieldValue
362-
}
342+
val fieldValue = rawJson[fieldName]
343+
val enrichedValue =
344+
EnrichedAirbyteValue(
345+
abValue = NullValue,
346+
type = fieldType.type,
347+
name = fieldName,
348+
airbyteMetaField = null,
349+
)
350+
AirbyteValueCoercer.coerce(fieldValue.toAirbyteValue(), fieldType.type)?.let {
351+
enrichedValue.abValue = it
352+
}
353+
?: enrichedValue.nullify(Reason.DESTINATION_SERIALIZATION_ERROR)
354+
355+
declaredFields[fieldName] = enrichedValue
356+
}
357+
// Then, get the undeclared fields
358+
rawJson.fields().forEach { (fieldName, fieldValue) ->
359+
if (!schemaFields.contains(fieldName)) {
360+
undeclaredFields[fieldName] = fieldValue
363361
}
364362
}
365363

@@ -407,7 +405,7 @@ data class DestinationFile(
407405
bytes = null,
408406
fileRelativePath = null,
409407
modified = null,
410-
sourceFileUrl = null
408+
sourceFileUrl = null,
411409
)
412410

413411
@get:JsonProperty("file_url")
@@ -453,7 +451,7 @@ data class DestinationFile(
453451
.withStream(stream.descriptor.name)
454452
.withNamespace(stream.descriptor.namespace)
455453
.withEmittedAt(emittedAtMs)
456-
.withAdditionalProperty("file", file)
454+
.withAdditionalProperty("file", file),
457455
)
458456
}
459457
}
@@ -472,8 +470,8 @@ private fun statusToProtocolMessage(
472470
.withStreamStatus(
473471
AirbyteStreamStatusTraceMessage()
474472
.withStreamDescriptor(stream.asProtocolObject())
475-
.withStatus(status)
476-
)
473+
.withStatus(status),
474+
),
477475
)
478476

479477
data class DestinationRecordStreamComplete(
@@ -558,7 +556,7 @@ data class StreamCheckpoint(
558556
) : this(
559557
Checkpoint(
560558
DestinationStream.Descriptor(streamNamespace, streamName),
561-
state = blob.deserializeToNode()
559+
state = blob.deserializeToNode(),
562560
),
563561
Stats(sourceRecordCount),
564562
destinationRecordCount?.let { Stats(it) },
@@ -604,7 +602,7 @@ data class GlobalCheckpoint(
604602
.withGlobal(
605603
AirbyteGlobalState()
606604
.withSharedState(state)
607-
.withStreamStates(checkpoints.map { it.asProtocolObject() })
605+
.withStreamStates(checkpoints.map { it.asProtocolObject() }),
608606
)
609607
decorateStateMessage(stateMessage)
610608
return AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(stateMessage)
@@ -617,7 +615,7 @@ data object Undefined : DestinationMessage {
617615
// Arguably we could accept the raw message in the constructor?
618616
// But that seems weird - when would we ever want to reemit that message?
619617
throw NotImplementedError(
620-
"Unrecognized messages cannot be safely converted back to a protocol object."
618+
"Unrecognized messages cannot be safely converted back to a protocol object.",
621619
)
622620
}
623621
}
@@ -641,7 +639,7 @@ class DestinationMessageFactory(
641639
is Long -> it
642640
else ->
643641
throw IllegalArgumentException(
644-
"Unexpected value for $name: $it (${it::class.qualifiedName})"
642+
"Unexpected value for $name: $it (${it::class.qualifiedName})",
645643
)
646644
}
647645
}
@@ -671,12 +669,12 @@ class DestinationMessageFactory(
671669
fileRelativePath = fileMessage["file_relative_path"] as String?,
672670
modified =
673671
toLong(fileMessage["modified"], "message.record.modified"),
674-
sourceFileUrl = fileMessage["source_file_url"] as String?
675-
)
672+
sourceFileUrl = fileMessage["source_file_url"] as String?,
673+
),
676674
)
677675
} catch (e: Exception) {
678676
throw IllegalArgumentException(
679-
"Failed to construct file message: ${e.message}"
677+
"Failed to construct file message: ${e.message}",
680678
)
681679
}
682680
} else {
@@ -699,24 +697,24 @@ class DestinationMessageFactory(
699697
if (fileTransferEnabled) {
700698
DestinationFileStreamComplete(
701699
stream,
702-
message.trace.emittedAt?.toLong() ?: 0L
700+
message.trace.emittedAt?.toLong() ?: 0L,
703701
)
704702
} else {
705703
DestinationRecordStreamComplete(
706704
stream,
707-
message.trace.emittedAt?.toLong() ?: 0L
705+
message.trace.emittedAt?.toLong() ?: 0L,
708706
)
709707
}
710708
AirbyteStreamStatus.INCOMPLETE ->
711709
if (fileTransferEnabled) {
712710
DestinationFileStreamIncomplete(
713711
stream,
714-
message.trace.emittedAt?.toLong() ?: 0L
712+
message.trace.emittedAt?.toLong() ?: 0L,
715713
)
716714
} else {
717715
DestinationRecordStreamIncomplete(
718716
stream,
719-
message.trace.emittedAt?.toLong() ?: 0L
717+
message.trace.emittedAt?.toLong() ?: 0L,
720718
)
721719
}
722720
else -> Undefined
@@ -763,7 +761,7 @@ class DestinationMessageFactory(
763761
val descriptor = streamState.streamDescriptor
764762
return Checkpoint(
765763
stream = DestinationStream.Descriptor(descriptor.namespace, descriptor.name),
766-
state = runCatching { streamState.streamState }.getOrNull()
764+
state = runCatching { streamState.streamState }.getOrNull(),
767765
)
768766
}
769767
}

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/EnrichedDestinationRecordAirbyteValueTest.kt

+10-10
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ class EnrichedDestinationRecordAirbyteValueTest {
3232
val record =
3333
EnrichedDestinationRecordAirbyteValue(
3434
stream = destinationStream,
35-
declaredFields = emptyMap(),
36-
undeclaredFields = emptyMap(),
35+
declaredFields = linkedMapOf(),
36+
undeclaredFields = linkedMapOf(),
3737
emittedAtMs = emittedAtMs,
3838
meta = null
3939
)
@@ -79,7 +79,7 @@ class EnrichedDestinationRecordAirbyteValueTest {
7979
@Test
8080
fun `test allTypedFields property`() {
8181
val declaredFields =
82-
mapOf(
82+
linkedMapOf(
8383
"field1" to
8484
EnrichedAirbyteValue(
8585
StringValue("value1"),
@@ -100,7 +100,7 @@ class EnrichedDestinationRecordAirbyteValueTest {
100100
EnrichedDestinationRecordAirbyteValue(
101101
stream = destinationStream,
102102
declaredFields = declaredFields,
103-
undeclaredFields = emptyMap(),
103+
undeclaredFields = linkedMapOf(),
104104
emittedAtMs = emittedAtMs,
105105
meta = null
106106
)
@@ -144,7 +144,7 @@ class EnrichedDestinationRecordAirbyteValueTest {
144144
)
145145
field2.nullify(Reason.DESTINATION_FIELD_SIZE_LIMITATION)
146146

147-
val declaredFields = mapOf("field1" to field1, "field2" to field2)
147+
val declaredFields = linkedMapOf("field1" to field1, "field2" to field2)
148148

149149
// Create meta with its own changes
150150
val meta =
@@ -163,7 +163,7 @@ class EnrichedDestinationRecordAirbyteValueTest {
163163
EnrichedDestinationRecordAirbyteValue(
164164
stream = destinationStream,
165165
declaredFields = declaredFields,
166-
undeclaredFields = emptyMap(),
166+
undeclaredFields = linkedMapOf(),
167167
emittedAtMs = emittedAtMs,
168168
meta = meta
169169
)
@@ -215,17 +215,17 @@ class EnrichedDestinationRecordAirbyteValueTest {
215215
val record1 =
216216
EnrichedDestinationRecordAirbyteValue(
217217
stream = destinationStream,
218-
declaredFields = emptyMap(),
219-
undeclaredFields = emptyMap(),
218+
declaredFields = linkedMapOf(),
219+
undeclaredFields = linkedMapOf(),
220220
emittedAtMs = emittedAtMs,
221221
meta = null
222222
)
223223

224224
val record2 =
225225
EnrichedDestinationRecordAirbyteValue(
226226
stream = destinationStream,
227-
declaredFields = emptyMap(),
228-
undeclaredFields = emptyMap(),
227+
declaredFields = linkedMapOf(),
228+
undeclaredFields = linkedMapOf(),
229229
emittedAtMs = emittedAtMs,
230230
meta = null
231231
)

0 commit comments

Comments
 (0)