Skip to content

Commit 5873845

Browse files
authored
Bug fix: Fix case where a post join condition could cause an early exit of the loop (#711)
* Bug fix: Fix case where a post join condition could cause an early exit of the loop * change test to use modulo to future proof that it wont be able to use an index and only post join filter
1 parent 2fd22b5 commit 5873845

File tree

5 files changed

+95
-4
lines changed

5 files changed

+95
-4
lines changed

src/FlowtideDotNet.Core/Operators/Join/MergeJoin/ColumnStoreMergeJoin.cs

+3-4
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ private async IAsyncEnumerable<StreamEventBatch> OnRecieveLeft(StreamEventBatch
230230
{
231231
if (!_postCondition(columnReference.referenceBatch, columnReference.RowIndex, pageKeyStorage._data, k))
232232
{
233-
_searchRightComparer.end = int.MinValue;
234-
break;
233+
continue;
235234
}
236235
}
237236
pageUpdated |= RecieveLeftHandleElement(
@@ -436,8 +435,7 @@ private async IAsyncEnumerable<StreamEventBatch> OnRecieveRight(StreamEventBatch
436435
{
437436
if (!_postCondition(pageKeyStorage._data, k, columnReference.referenceBatch, columnReference.RowIndex))
438437
{
439-
_searchLeftComparer.end = int.MinValue;
440-
break;
438+
continue;
441439
}
442440
}
443441
pageUpdated |= RecieveRightHandleElement(
@@ -503,6 +501,7 @@ private async IAsyncEnumerable<StreamEventBatch> OnRecieveRight(StreamEventBatch
503501
{
504502
await page.SavePage(false);
505503
}
504+
// Checks if we end before the last element, if we do, we dont have to fetch the next page
506505
if (_searchLeftComparer.end < (page.Keys.Count - 1))
507506
{
508507
break;

tests/FlowtideDotNet.AcceptanceTests/FlowtideAcceptanceBase.cs

+10
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ public void AddOrUpdateOrder(Order order)
120120
flowtideTestStream.AddOrUpdateOrder(order);
121121
}
122122

123+
public void AddOrUpdateProject(Project project)
124+
{
125+
flowtideTestStream.AddOrUpdateProject(project);
126+
}
127+
128+
public void AddOrUpdateProjectMember(ProjectMember projectMember)
129+
{
130+
flowtideTestStream.AddOrUpdateProjectMember(projectMember);
131+
}
132+
123133
public void DeleteUser(User user)
124134
{
125135
flowtideTestStream.DeleteUser(user);

tests/FlowtideDotNet.AcceptanceTests/Internal/DatasetGenerator.cs

+32
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,38 @@ public void AddOrUpdateOrder(Order order)
8585
mockTable.AddOrUpdate(new List<Order>() { order });
8686
}
8787

88+
public void AddOrUpdateProject(Project project)
89+
{
90+
var index = Projects.FindIndex(x => x.ProjectKey == project.ProjectKey);
91+
92+
if (index >= 0)
93+
{
94+
Projects[index] = project;
95+
}
96+
else
97+
{
98+
Projects.Add(project);
99+
}
100+
var mockTable = mockDatabase.GetOrCreateTable<Project>("projects");
101+
mockTable.AddOrUpdate(new List<Project>() { project });
102+
}
103+
104+
public void AddOrUpdateProjectMember(ProjectMember projectMember)
105+
{
106+
var index = ProjectMembers.FindIndex(x => x.ProjectMemberKey == projectMember.ProjectMemberKey);
107+
108+
if (index >= 0)
109+
{
110+
ProjectMembers[index] = projectMember;
111+
}
112+
else
113+
{
114+
ProjectMembers.Add(projectMember);
115+
}
116+
var mockTable = mockDatabase.GetOrCreateTable<ProjectMember>("projectmembers");
117+
mockTable.AddOrUpdate(new List<ProjectMember>() { projectMember });
118+
}
119+
88120
public void AddOrUpdateCompany(Company company)
89121
{
90122
var index = Companies.FindIndex(x => x.CompanyId == company.CompanyId);

tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs

+10
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,16 @@ public void AddOrUpdateOrder(Order order)
155155
generator.AddOrUpdateOrder(order);
156156
}
157157

158+
public void AddOrUpdateProject(Project project)
159+
{
160+
generator.AddOrUpdateProject(project);
161+
}
162+
163+
public void AddOrUpdateProjectMember(ProjectMember projectMember)
164+
{
165+
generator.AddOrUpdateProjectMember(projectMember);
166+
}
167+
158168

159169
[MemberNotNull(nameof(_connectorManager))]
160170
public void SetupConnectorManager()

tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs

+40
Original file line numberDiff line numberDiff line change
@@ -1402,5 +1402,45 @@ from subuser in gj.DefaultIfEmpty()
14021402

14031403
AssertCurrentDataEqual(fullOuterJoin);
14041404
}
1405+
1406+
[Fact]
1407+
public async Task PostJoinConditionShouldCheckAllRows()
1408+
{
1409+
// Add members first to make sure all the data is inside the tree
1410+
AddOrUpdateProjectMember(new ProjectMember()
1411+
{
1412+
CompanyId = "1",
1413+
ProjectNumber = "123",
1414+
UserKey = 0,
1415+
ProjectMemberKey = 1
1416+
});
1417+
AddOrUpdateProjectMember(new ProjectMember()
1418+
{
1419+
CompanyId = "1",
1420+
ProjectNumber = "123",
1421+
UserKey = 1,
1422+
ProjectMemberKey = 2
1423+
});
1424+
await StartStream(@"
1425+
INSERT INTO output
1426+
SELECT
1427+
pm.userkey
1428+
FROM projects p
1429+
LEFT JOIN projectmembers pm
1430+
ON p.projectnumber = pm.projectnumber AND p.companyid = pm.companyid AND pm.userkey % ProjectKey = 1", ignoreSameDataCheck: false);
1431+
await WaitForUpdate();
1432+
// add the project to do the join
1433+
AddOrUpdateProject(new Project()
1434+
{
1435+
ProjectKey = 2,
1436+
CompanyId = "1",
1437+
ProjectNumber = "123",
1438+
Name = "Project 1"
1439+
});
1440+
await WaitForUpdate();
1441+
1442+
var actualData = GetActualRows();
1443+
AssertCurrentDataEqual(new[] { new { val = 1 } });
1444+
}
14051445
}
14061446
}

0 commit comments

Comments
 (0)