Skip to content

Commit da8c5e0

Browse files
authored
Fix coder corruption in prism while handling flatten (#34582)
* Replace coder substitution with pcoll substitution in handling flatten. * Re-enable some tests for flatten. * Revert "Re-enable some tests for flatten." This reverts commit 581bb0e.
1 parent 7c4eb70 commit da8c5e0

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

sdks/go/pkg/beam/runners/prism/internal/handlerunner.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
3333
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
3434
"google.golang.org/protobuf/encoding/prototext"
35+
"google.golang.org/protobuf/proto"
3536
)
3637

3738
// This file retains the logic for the pardo handler
@@ -107,12 +108,12 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C
107108
// they're written out to the runner in the same fashion.
108109
// This may stop being necessary once Flatten Unzipping happens in the optimizer.
109110
outPCol := comps.GetPcollections()[outColID]
110-
outCoder := comps.GetCoders()[outPCol.GetCoderId()]
111-
coderSubs := map[string]*pipepb.Coder{}
111+
pcollSubs := map[string]*pipepb.PCollection{}
112112
for _, p := range t.GetInputs() {
113113
inPCol := comps.GetPcollections()[p]
114114
if inPCol.CoderId != outPCol.CoderId {
115-
coderSubs[inPCol.CoderId] = outCoder
115+
pcollSubs[p] = proto.Clone(inPCol).(*pipepb.PCollection)
116+
pcollSubs[p].CoderId = outPCol.CoderId
116117
}
117118
}
118119

@@ -123,7 +124,7 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C
123124
Transforms: map[string]*pipepb.PTransform{
124125
tid: t,
125126
},
126-
Coders: coderSubs,
127+
Pcollections: pcollSubs,
127128
},
128129
RemovedLeaves: nil,
129130
ForcedRoots: forcedRoots,

0 commit comments

Comments
 (0)