Skip to content

Commit 9bc209f

Browse files
authored
Add from_json function to convert json into flowtide objects (#733)
* Add to_json function to convert an object to a json string * add check that to_json must have 1 arg * Add from_json function to convert json into flowtide objects * add docs * fix comment * Add so binaryvalue can also be used in from_json
1 parent 70fd548 commit 9bc209f

File tree

9 files changed

+278
-6
lines changed

9 files changed

+278
-6
lines changed

docs/docs/expressions/scalarfunctions/string.md

+13-1
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,18 @@ Converts an object into json stored as a string.
249249

250250
### SQL Usage
251251

252-
```
252+
```sql
253253
SELECT to_json(column1) ...
254+
```
255+
256+
## From Json
257+
258+
This function does not have a substrait definition.
259+
260+
Converts a JSON string to flowtide data objects. It is also possible to use a binary value in utf8 encoding as the input.
261+
262+
### SQL Usage
263+
264+
```sql
265+
SELECT from_json(myjsoncolumn) ...
254266
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License")
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
using FlowtideDotNet.Core.ColumnStore.DataValues;
14+
using System.Text.Json;
15+
16+
namespace FlowtideDotNet.Core.ColumnStore.Json
17+
{
18+
internal static class DataValueJsonReader
19+
{
20+
public static IDataValue Read(ref Utf8JsonReader reader)
21+
{
22+
while (reader.TokenType == JsonTokenType.Comment || reader.TokenType == JsonTokenType.None)
23+
{
24+
if (!reader.Read())
25+
{
26+
throw new JsonException("Unexpected json");
27+
}
28+
continue;
29+
}
30+
switch (reader.TokenType)
31+
{
32+
case JsonTokenType.String:
33+
var strVal = reader.GetString();
34+
if (strVal == null)
35+
{
36+
return NullValue.Instance;
37+
}
38+
return new StringValue(strVal);
39+
case JsonTokenType.False:
40+
return new BoolValue(false);
41+
case JsonTokenType.True:
42+
return new BoolValue(true);
43+
case JsonTokenType.Number:
44+
if (reader.TryGetInt64(out var intVal))
45+
{
46+
return new Int64Value(intVal);
47+
}
48+
if (reader.TryGetDouble(out var doubleVal))
49+
{
50+
return new DoubleValue(doubleVal);
51+
}
52+
throw new JsonException();
53+
case JsonTokenType.Null:
54+
return NullValue.Instance;
55+
case JsonTokenType.StartObject:
56+
return ParseObject(ref reader);
57+
case JsonTokenType.StartArray:
58+
return ParseArray(ref reader);
59+
}
60+
throw new JsonException($"Unknown json token type {reader.TokenType}");
61+
}
62+
63+
private static IDataValue ParseArray(ref Utf8JsonReader reader)
64+
{
65+
if (!reader.Read())
66+
{
67+
throw new JsonException("Unexpected json");
68+
}
69+
70+
List<IDataValue> values = new List<IDataValue>();
71+
while (true)
72+
{
73+
if (reader.TokenType == JsonTokenType.EndArray)
74+
{
75+
break;
76+
}
77+
78+
values.Add(Read(ref reader));
79+
80+
if (!reader.Read())
81+
{
82+
throw new JsonException("Unexpected json");
83+
}
84+
}
85+
86+
return new ListValue(values);
87+
}
88+
89+
private static IDataValue ParseObject(ref Utf8JsonReader reader)
90+
{
91+
if (!reader.Read())
92+
{
93+
throw new JsonException("Unexpected json");
94+
}
95+
96+
List<KeyValuePair<IDataValue, IDataValue>> properties = new List<KeyValuePair<IDataValue, IDataValue>>();
97+
while (true)
98+
{
99+
if (reader.TokenType == JsonTokenType.EndObject)
100+
{
101+
break;
102+
}
103+
if (reader.TokenType != JsonTokenType.PropertyName)
104+
{
105+
throw new JsonException();
106+
}
107+
108+
var propertyName = reader.GetString();
109+
110+
if (propertyName == null)
111+
{
112+
throw new JsonException("Unexpected json");
113+
}
114+
115+
if (!reader.Read())
116+
{
117+
throw new JsonException("Unexpected json");
118+
}
119+
120+
var value = Read(ref reader);
121+
122+
if (!reader.Read())
123+
{
124+
throw new JsonException("Unexpected json");
125+
}
126+
127+
properties.Add(new KeyValuePair<IDataValue, IDataValue>(new StringValue(propertyName), value));
128+
}
129+
130+
return new MapValue(properties);
131+
}
132+
}
133+
}

src/FlowtideDotNet.Core/Compute/Columnar/Functions/BuiltInStringFunctions.cs

+63
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using System.Linq.Expressions;
2424
using System.Reflection;
2525
using System.Text;
26+
using System.Text.Json;
2627
using System.Text.RegularExpressions;
2728
using static SqlParser.Ast.MatchRecognizeSymbol;
2829
using static SqlParser.Ast.Partition;
@@ -202,6 +203,8 @@ public static void RegisterFunctions(IFunctionsRegister functionsRegister)
202203
var resultContainer = Expression.Constant(new DataValueContainer());
203204
return System.Linq.Expressions.Expression.Call(genericMethod, expr, resultContainer, jsonWriterConstant);
204205
});
206+
207+
functionsRegister.RegisterScalarMethod(FunctionsString.Uri, FunctionsString.FromJson, typeof(BuiltInStringFunctions), nameof(FromJsonImplementation));
205208
}
206209

207210
private static bool SubstringTryGetParameters<T1, T2, T3>(
@@ -804,5 +807,65 @@ private static DataValueContainer ToJsonImplementation<T1>(in T1 val, DataValueC
804807
result._stringValue = new StringValue(writer.WrittenMemory);
805808
return result;
806809
}
810+
811+
private static IDataValue FromJsonImplementation<T1>(in T1 val)
812+
where T1 : IDataValue
813+
{
814+
if (val.Type != ArrowTypeId.String && val.Type != ArrowTypeId.Binary)
815+
{
816+
return NullValue.Instance;
817+
}
818+
819+
if (val.Type == ArrowTypeId.String)
820+
{
821+
Utf8JsonReader reader = new Utf8JsonReader(val.AsString.Span);
822+
try
823+
{
824+
return DataValueJsonReader.Read(ref reader);
825+
}
826+
catch (JsonException)
827+
{
828+
return NullValue.Instance;
829+
}
830+
}
831+
else
832+
{
833+
Utf8JsonReader reader = new Utf8JsonReader(val.AsBinary);
834+
try
835+
{
836+
return DataValueJsonReader.Read(ref reader);
837+
}
838+
catch (JsonException)
839+
{
840+
return NullValue.Instance;
841+
}
842+
}
843+
844+
845+
}
846+
847+
private static IDataValue FromJsonImplementation_error_handling__ERROR<T1>(in T1 val)
848+
where T1 : IDataValue
849+
{
850+
if (val.Type == ArrowTypeId.Null)
851+
{
852+
return NullValue.Instance;
853+
}
854+
if (val.Type != ArrowTypeId.String && val.Type != ArrowTypeId.Binary)
855+
{
856+
throw new ArgumentException("FromJson function must have a string or binary argument.");
857+
}
858+
859+
if (val.Type == ArrowTypeId.String)
860+
{
861+
Utf8JsonReader reader = new Utf8JsonReader(val.AsString.Span);
862+
return DataValueJsonReader.Read(ref reader);
863+
}
864+
else
865+
{
866+
Utf8JsonReader reader = new Utf8JsonReader(val.AsBinary);
867+
return DataValueJsonReader.Read(ref reader);
868+
}
869+
}
807870
}
808871
}

src/FlowtideDotNet.Substrait/FunctionExtensions/FunctionsString.cs

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public static class FunctionsString
3333
public const string StringSplit = "string_split";
3434
public const string RegexStringSplit = "regexp_string_split";
3535
public const string ToJson = "to_json";
36+
public const string FromJson = "from_json";
3637

3738
public const string StringAgg = "string_agg";
3839
}

src/FlowtideDotNet.Substrait/Sql/Internal/BuiltInSqlFunctions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ public static void AddBuiltInFunctions(SqlFunctionRegister sqlFunctionRegister)
755755
RegisterTwoVariableScalarFunction(sqlFunctionRegister, "regexp_string_split", FunctionsString.Uri, FunctionsString.RegexStringSplit);
756756

757757
RegisterOneVariableScalarFunction(sqlFunctionRegister, "to_json", FunctionsString.Uri, FunctionsString.ToJson);
758+
RegisterOneVariableScalarFunction(sqlFunctionRegister, "from_json", FunctionsString.Uri, FunctionsString.FromJson);
758759

759760
// Table functions
760761
UnnestSqlFunction.AddUnnest(sqlFunctionRegister);

tests/FlowtideDotNet.AcceptanceTests/StringFunctionTests.cs

+11
Original file line numberDiff line numberDiff line change
@@ -229,5 +229,16 @@ INSERT INTO output
229229
await WaitForUpdate();
230230
AssertCurrentDataEqual(Users.Select(x => new { json = JsonSerializer.Serialize(new {firstName = x.FirstName, lastName = x.LastName}) }));
231231
}
232+
233+
[Fact]
234+
public async Task FromJsonWithMap()
235+
{
236+
GenerateData();
237+
await StartStream(@"
238+
INSERT INTO output
239+
SELECT from_json(to_json(map('firstName', firstName, 'lastName', lastName))) as json FROM users");
240+
await WaitForUpdate();
241+
AssertCurrentDataEqual(Users.Select(x => new { json = new { firstName = x.FirstName, lastName = x.LastName } }));
242+
}
232243
}
233244
}

tests/FlowtideDotNet.ComputeTests/Internal/Framework/SubstraitTestRunner.cs

+21-4
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,32 @@ private ValueTask<TimeSpan> InvokeScalarTest(SubstraitTestRunnerContext ctxt, ob
127127
}
128128
Column resultColumn = Column.Create(GlobalMemoryManager.Instance);
129129
// Run once to try and reduce IL compile time for output
130-
compiledMethod(new EventBatchData(columns), 0, resultColumn);
130+
if (parsedTest.Expected.ExpectError)
131+
{
132+
Assert.ThrowsAny<Exception>(() => compiledMethod(new EventBatchData(columns), 0, resultColumn));
133+
}
134+
else
135+
{
136+
compiledMethod(new EventBatchData(columns), 0, resultColumn);
137+
}
131138
resultColumn.Clear();
132139
Stopwatch sw = new();
133140
sw.Start();
134-
compiledMethod(new EventBatchData(columns), 0, resultColumn);
141+
if (parsedTest.Expected.ExpectError)
142+
{
143+
Assert.ThrowsAny<Exception>(() => compiledMethod(new EventBatchData(columns), 0, resultColumn));
144+
}
145+
else
146+
{
147+
compiledMethod(new EventBatchData(columns), 0, resultColumn);
148+
}
135149
sw.Stop();
136150

137-
var actual = resultColumn.GetValueAt(0, default);
138-
Assert.Equal(parsedTest.Expected.ExpectedValue, actual, (x, y) => DataValueComparer.CompareTo(x!, y!) == 0);
151+
if (!parsedTest.Expected.ExpectError)
152+
{
153+
var actual = resultColumn.GetValueAt(0, default);
154+
Assert.Equal(parsedTest.Expected.ExpectedValue, actual, (x, y) => DataValueComparer.CompareTo(x!, y!) == 0);
155+
}
139156
return new(sw.Elapsed);
140157
}
141158

tests/FlowtideDotNet.ComputeTests/Internal/Parser/Tests/ArgumentVisitor.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
using FlowtideDotNet.Core.ColumnStore;
1616
using FlowtideDotNet.Core.ColumnStore.DataValues;
1717
using System.Globalization;
18+
using System.Text;
1819

1920
namespace FlowtideDotNet.ComputeTests.Internal.Tests
2021
{
@@ -49,7 +50,11 @@ public override IDataValue VisitIntegerLiteral([NotNull] FuncTestCaseParser.Inte
4950

5051
public override IDataValue VisitFixedBinaryArg([NotNull] FuncTestCaseParser.FixedBinaryArgContext context)
5152
{
52-
throw new NotImplementedException();
53+
var text = context.GetText();
54+
55+
text = text.Trim('\'');
56+
57+
return new BinaryValue(Encoding.UTF8.GetBytes(text));
5358
}
5459

5560
public override IDataValue VisitStringArg([NotNull] FuncTestCaseParser.StringArgContext context)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
### SUBSTRAIT_SCALAR_TEST: v1.0
2+
### SUBSTRAIT_INCLUDE: '/extensions/functions_string.yaml'
3+
4+
# basic: Basic examples without any special cases
5+
from_json('"abcdef"'::str) = 'abcdef'::str
6+
from_json('["a", "b"]'::str) = ['a', 'b']::LIST<str>
7+
from_json('123'::str) = 123::i64
8+
from_json('123.45'::str) = 123.45::fp64
9+
from_json('true'::str) = true::bool
10+
from_json('false'::str) = false::bool
11+
from_json('null'::str) = null::str
12+
13+
# Binary input: example with binary input
14+
from_json('"abcdef"'::FIXEDBINARY<7>) = 'abcdef'::str
15+
from_json('["a", "b"]'::FIXEDBINARY<10>) = ['a', 'b']::LIST<str>
16+
17+
# Errors: error handling for invalid json
18+
from_json('{"a": 1'::str) = null::str
19+
from_json(123::i64) = null::str
20+
from_json('{"a": 1'::FIXEDBINARY<7>) = null::str
21+
from_json('{"a": 1'::str) [error_handling:ERROR] = <!ERROR>
22+
from_json(123::i64) [error_handling:ERROR] = <!ERROR>
23+
from_json('{"a": 1'::FIXEDBINARY<7>) [error_handling:ERROR] = <!ERROR>
24+
25+
# Errors: Error handling enabled with correct json
26+
from_json('"abcdef"'::str) [error_handling:ERROR] = 'abcdef'::str
27+
from_json('["a", "b"]'::str) [error_handling:ERROR] = ['a', 'b']::LIST<str>
28+
from_json('"abcdef"'::FIXEDBINARY<7>) [error_handling:ERROR] = 'abcdef'::str
29+
from_json('["a", "b"]'::FIXEDBINARY<10>) [error_handling:ERROR] = ['a', 'b']::LIST<str>

0 commit comments

Comments
 (0)