Skip to content

Commit 0f4f75f

Browse files
authored
Move preprocessing out of adapters (#2312)
Simplify execute (and stream) by removing processing argument
1 parent 48607c7 commit 0f4f75f

File tree

7 files changed

+78
-92
lines changed

7 files changed

+78
-92
lines changed

lib/ecto/adapter.ex

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ defmodule Ecto.Adapter do
1919
@type returning :: [atom]
2020
@type prepared :: term
2121
@type cached :: term
22-
@type process :: (term -> term)
2322
@type on_conflict :: {:raise, list(), []} |
2423
{:nothing, list(), [atom]} |
2524
{Ecto.Query.t, list(), [atom]}
@@ -128,13 +127,8 @@ defmodule Ecto.Adapter do
128127
129128
The `meta` field is a map containing some of the fields found
130129
in the `Ecto.Query` struct.
131-
132-
It receives a process function that should be invoked for each
133-
selected field in the query result in order to convert them to the
134-
expected Ecto type. The `process` function will be nil if no
135-
result set is expected from the query.
136130
"""
137-
@callback execute(repo, query_meta, query, params :: list(), process | nil, options) :: result when
131+
@callback execute(repo, query_meta, query, params :: list(), options) :: result when
138132
result: {integer, [[term]] | nil} | no_return,
139133
query: {:nocache, prepared} |
140134
{:cached, (prepared -> :ok), cached} |

lib/ecto/adapters/sql.ex

Lines changed: 45 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ defmodule Ecto.Adapters.SQL do
6464
do: {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.delete_all(query))}}
6565

6666
@doc false
67-
def execute(repo, meta, query, params, process, opts) do
68-
Ecto.Adapters.SQL.execute(repo, meta, query, params, process, opts)
67+
def execute(repo, meta, query, params, opts) do
68+
Ecto.Adapters.SQL.execute(repo, meta, query, params, opts)
6969
end
7070

7171
@doc false
72-
def stream(repo, meta, query, params, process, opts) do
73-
Ecto.Adapters.SQL.stream(repo, meta, query, params, process, opts)
72+
def stream(repo, meta, query, params, opts) do
73+
Ecto.Adapters.SQL.stream(repo, meta, query, params, opts)
7474
end
7575

7676
@doc false
@@ -141,7 +141,7 @@ defmodule Ecto.Adapters.SQL do
141141
Ecto.Adapters.SQL.lock_for_migrations(repo, query, opts, fun)
142142
end
143143

144-
defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7,
144+
defoverridable [prepare: 2, execute: 5, insert: 6, update: 6, delete: 4, insert_all: 7,
145145
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2,
146146
lock_for_migrations: 4]
147147
end
@@ -197,11 +197,7 @@ defmodule Ecto.Adapters.SQL do
197197
optional(atom) => any}
198198
| no_return
199199
def query!(repo, sql, params \\ [], opts \\ []) do
200-
query!(repo, sql, params, fn x -> x end, opts)
201-
end
202-
203-
defp query!(repo, sql, params, mapper, opts) do
204-
case query(repo, sql, params, mapper, opts) do
200+
case query(repo, sql, params, opts) do
205201
{:ok, result} -> result
206202
{:error, err} -> raise err
207203
end
@@ -246,17 +242,13 @@ defmodule Ecto.Adapters.SQL do
246242
optional(atom) => any}}
247243
| {:error, Exception.t}
248244
def query(repo, sql, params \\ [], opts \\ []) do
249-
query(repo, sql, params, fn x -> x end, opts)
250-
end
251-
252-
defp query(repo, sql, params, mapper, opts) do
253-
sql_call(repo, :execute, [sql], params, mapper, opts)
245+
sql_call(repo, :execute, [sql], params, opts)
254246
end
255247

256-
defp sql_call(repo, callback, args, params, mapper, opts) do
248+
defp sql_call(repo, callback, args, params, opts) do
257249
{repo_mod, pool, default_opts} = lookup_pool(repo)
258250
conn = get_conn(pool) || pool
259-
opts = [decode_mapper: mapper] ++ with_log(repo_mod, params, opts ++ default_opts)
251+
opts = with_log(repo_mod, params, opts ++ default_opts)
260252
args = args ++ [params, opts]
261253
try do
262254
apply(repo_mod.__sql__, callback, [conn | args])
@@ -406,39 +398,41 @@ defmodule Ecto.Adapters.SQL do
406398
end
407399

408400
@doc false
409-
def execute(repo, meta, prepared, params, mapper, opts) do
410-
do_execute(repo, meta, prepared, params, mapper, put_source(opts, meta))
401+
def execute(repo, meta, prepared, params, opts) do
402+
%{num_rows: num, rows: rows} = do_execute(repo, meta, prepared, params, put_source(opts, meta))
403+
{num, rows}
411404
end
412405

413-
defp do_execute(repo, _meta, {:cache, update, {id, prepared}}, params, mapper, opts) do
414-
execute_and_cache(repo, id, update, prepared, params, mapper, opts)
406+
defp do_execute(repo, _meta, {:cache, update, {id, prepared}}, params, opts) do
407+
execute_and_cache(repo, id, update, prepared, params, opts)
415408
end
416409

417-
defp do_execute(repo, _meta, {:cached, reset, {id, cached}}, params, mapper, opts) do
418-
execute_or_reset(repo, id, reset, cached, params, mapper, opts)
410+
defp do_execute(repo, _meta, {:cached, reset, {id, cached}}, params, opts) do
411+
execute_or_reset(repo, id, reset, cached, params, opts)
419412
end
420413

421-
defp do_execute(repo, _meta, {:nocache, {_id, prepared}}, params, mapper, opts) do
422-
%{rows: rows, num_rows: num} =
423-
sql_call!(repo, :execute, [prepared], params, mapper, opts)
424-
{num, rows}
414+
defp do_execute(repo, _meta, {:nocache, {_id, prepared}}, params, opts) do
415+
case sql_call(repo, :execute, [prepared], params, opts) do
416+
{:ok, res} -> res
417+
{:error, err} -> raise err
418+
end
425419
end
426420

427-
defp execute_and_cache(repo, id, update, prepared, params, mapper, opts) do
421+
defp execute_and_cache(repo, id, update, prepared, params, opts) do
428422
name = "ecto_" <> Integer.to_string(id)
429-
case sql_call(repo, :prepare_execute, [name, prepared], params, mapper, opts) do
430-
{:ok, query, %{num_rows: num, rows: rows}} ->
423+
case sql_call(repo, :prepare_execute, [name, prepared], params, opts) do
424+
{:ok, query, result} ->
431425
update.({id, query})
432-
{num, rows}
426+
result
433427
{:error, err} ->
434428
raise err
435429
end
436430
end
437431

438-
defp execute_or_reset(repo, id, reset, cached, params, mapper, opts) do
439-
case sql_call(repo, :execute, [cached], params, mapper, opts) do
440-
{:ok, %{num_rows: num, rows: rows}} ->
441-
{num, rows}
432+
defp execute_or_reset(repo, id, reset, cached, params, opts) do
433+
case sql_call(repo, :execute, [cached], params, opts) do
434+
{:ok, result} ->
435+
result
442436
{:error, err} ->
443437
raise err
444438
{:reset, err} ->
@@ -447,13 +441,6 @@ defmodule Ecto.Adapters.SQL do
447441
end
448442
end
449443

450-
defp sql_call!(repo, callback, args, params, mapper, opts) do
451-
case sql_call(repo, callback, args, params, mapper, opts) do
452-
{:ok, res} -> res
453-
{:error, err} -> raise err
454-
end
455-
end
456-
457444
@doc """
458445
Returns a stream that runs a custom SQL query on given repo when reduced.
459446
@@ -488,36 +475,36 @@ defmodule Ecto.Adapters.SQL do
488475
"""
489476
@spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t
490477
def stream(repo, sql, params \\ [], opts \\ []) do
491-
Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, opts)
478+
Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, opts)
492479
end
493480

494481
@doc false
495-
def stream(repo, meta, prepared, params, mapper, opts) do
496-
do_stream(repo, meta, prepared, params, mapper, put_source(opts, meta))
482+
def stream(repo, meta, prepared, params, opts) do
483+
do_stream(repo, meta, prepared, params, put_source(opts, meta))
497484
end
498485

499-
def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, mapper, opts) do
500-
prepare_stream(repo, prepared, params, mapper, opts)
486+
def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, opts) do
487+
prepare_stream(repo, prepared, params, opts)
501488
end
502489

503-
def do_stream(repo, _, {:cached, _, {_, cached}}, params, mapper, opts) do
504-
prepare_stream(repo, String.Chars.to_string(cached), params, mapper, opts)
490+
def do_stream(repo, _, {:cached, _, {_, cached}}, params, opts) do
491+
prepare_stream(repo, String.Chars.to_string(cached), params, opts)
505492
end
506493

507-
def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, mapper, opts) do
508-
prepare_stream(repo, prepared, params, mapper, opts)
494+
def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, opts) do
495+
prepare_stream(repo, prepared, params, opts)
509496
end
510497

511-
defp prepare_stream(repo, prepared, params, mapper, opts) do
498+
defp prepare_stream(repo, prepared, params, opts) do
512499
repo
513-
|> Ecto.Adapters.SQL.Stream.__build__(prepared, params, mapper, opts)
500+
|> Ecto.Adapters.SQL.Stream.__build__(prepared, params, opts)
514501
|> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end)
515502
end
516503

517504
@doc false
518-
def reduce(repo, statement, params, mapper, opts, acc, fun) do
505+
def reduce(repo, statement, params, opts, acc, fun) do
519506
{repo_mod, pool, default_opts} = lookup_pool(repo)
520-
opts = [decode_mapper: mapper] ++ with_log(repo, params, opts ++ default_opts)
507+
opts = with_log(repo, params, opts ++ default_opts)
521508
case get_conn(pool) do
522509
nil ->
523510
raise "cannot reduce stream outside of transaction"
@@ -528,9 +515,9 @@ defmodule Ecto.Adapters.SQL do
528515
end
529516

530517
@doc false
531-
def into(repo, statement, params, mapper, opts) do
518+
def into(repo, statement, params, opts) do
532519
{repo_mod, pool, default_opts} = lookup_pool(repo)
533-
opts = [decode_mapper: mapper] ++ with_log(repo_mod, params, opts ++ default_opts)
520+
opts = with_log(repo_mod, params, opts ++ default_opts)
534521
case get_conn(pool) do
535522
nil ->
536523
raise "cannot collect into stream outside of transaction"
@@ -542,7 +529,7 @@ defmodule Ecto.Adapters.SQL do
542529

543530
@doc false
544531
def struct(repo, conn, sql, {operation, source, params}, values, on_conflict, returning, opts) do
545-
case query(repo, sql, values, fn x -> x end, opts) do
532+
case query(repo, sql, values, opts) do
546533
{:ok, %{rows: nil, num_rows: 1}} ->
547534
{:ok, []}
548535
{:ok, %{rows: [values], num_rows: 1}} ->

lib/ecto/adapters/sql/stream.ex

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
defmodule Ecto.Adapters.SQL.Stream do
22
@moduledoc false
33

4-
defstruct [:repo, :statement, :params, :mapper, :opts]
4+
defstruct [:repo, :statement, :params, :opts]
55

6-
def __build__(repo, statement, params, mapper, opts) do
7-
%__MODULE__{repo: repo, statement: statement, params: params, mapper: mapper,
8-
opts: opts}
6+
def __build__(repo, statement, params, opts) do
7+
%__MODULE__{repo: repo, statement: statement, params: params, opts: opts}
98
end
109
end
1110

@@ -18,16 +17,16 @@ defimpl Enumerable, for: Ecto.Adapters.SQL.Stream do
1817

1918
def reduce(stream, acc, fun) do
2019
%Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params,
21-
mapper: mapper, opts: opts} = stream
22-
Ecto.Adapters.SQL.reduce(repo, statement, params, mapper, opts, acc, fun)
20+
opts: opts} = stream
21+
Ecto.Adapters.SQL.reduce(repo, statement, params, opts, acc, fun)
2322
end
2423
end
2524

2625
defimpl Collectable, for: Ecto.Adapters.SQL.Stream do
2726
def into(stream) do
2827
%Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params,
29-
mapper: mapper, opts: opts} = stream
30-
{state, fun} = Ecto.Adapters.SQL.into(repo, statement, params, mapper, opts)
28+
opts: opts} = stream
29+
{state, fun} = Ecto.Adapters.SQL.into(repo, statement, params, opts)
3130
{state, make_into(fun, stream)}
3231
end
3332

lib/ecto/repo/assoc.ex

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,20 @@ defmodule Ecto.Repo.Assoc do
77
Transforms a result set based on query assocs, loading
88
the associations onto their parent schema.
99
"""
10-
@spec query([Ecto.Schema.t], list, tuple) :: [Ecto.Schema.t]
11-
def query(rows, assocs, sources)
10+
@spec query([list], list, tuple, (list -> list)) :: [Ecto.Schema.t]
11+
def query(rows, assocs, sources, fun)
1212

13-
def query([], _assocs, _sources), do: []
14-
def query(rows, [], _sources), do: rows
13+
def query([], _assocs, _sources, _fun), do: []
14+
def query(rows, [], _sources, fun), do: Enum.map(rows, fun)
1515

16-
def query(rows, assocs, sources) do
16+
def query(rows, assocs, sources, fun) do
1717
# Create rose tree of accumulator dicts in the same
1818
# structure as the fields tree
1919
accs = create_accs(0, assocs, sources, [])
2020

2121
# Populate tree of dicts of associated entities from the result set
2222
{_keys, _cache, rows, sub_dicts} = Enum.reduce(rows, accs, fn row, acc ->
23-
merge(row, acc, 0) |> elem(0)
23+
merge(fun.(row), acc, 0) |> elem(0)
2424
end)
2525

2626
# Create the reflections that will be loaded into memory.

lib/ecto/repo/queryable.ex

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,17 @@ defmodule Ecto.Repo.Queryable do
125125

126126
case meta do
127127
%{select: nil} ->
128-
adapter.execute(repo, meta, prepared, params, nil, opts)
128+
adapter.execute(repo, meta, prepared, params, opts)
129129
%{select: select, prefix: prefix, sources: sources, preloads: preloads} ->
130130
%{preprocess: preprocess, postprocess: postprocess, take: take, assocs: assocs} = select
131131
all_nil? = tuple_size(sources) != 1
132132
preprocessor = &preprocess(&1, preprocess, all_nil?, prefix, adapter)
133-
{count, rows} = adapter.execute(repo, meta, prepared, params, preprocessor, opts)
133+
{count, rows} = adapter.execute(repo, meta, prepared, params, opts)
134134
postprocessor = postprocessor(postprocess, take, prefix, adapter)
135135

136136
{count,
137137
rows
138-
|> Ecto.Repo.Assoc.query(assocs, sources)
138+
|> Ecto.Repo.Assoc.query(assocs, sources, preprocessor)
139139
|> Ecto.Repo.Preloader.query(repo, preloads, take, postprocessor, opts)}
140140
end
141141
end
@@ -146,18 +146,18 @@ defmodule Ecto.Repo.Queryable do
146146
case meta do
147147
%{select: nil} ->
148148
repo
149-
|> adapter.stream(meta, prepared, params, nil, opts)
149+
|> adapter.stream(meta, prepared, params, opts)
150150
|> Stream.flat_map(fn {_, nil} -> [] end)
151151
%{select: select, prefix: prefix, sources: sources, preloads: preloads} ->
152152
%{preprocess: preprocess, postprocess: postprocess, take: take, assocs: assocs} = select
153153
all_nil? = tuple_size(sources) != 1
154154
preprocessor = &preprocess(&1, preprocess, all_nil?, prefix, adapter)
155-
stream = adapter.stream(repo, meta, prepared, params, preprocessor, opts)
155+
stream = adapter.stream(repo, meta, prepared, params, opts)
156156
postprocessor = postprocessor(postprocess, take, prefix, adapter)
157157

158158
Stream.flat_map(stream, fn {_, rows} ->
159159
rows
160-
|> Ecto.Repo.Assoc.query(assocs, sources)
160+
|> Ecto.Repo.Assoc.query(assocs, sources, preprocessor)
161161
|> Ecto.Repo.Preloader.query(repo, preloads, take, postprocessor, opts)
162162
end)
163163
end

test/ecto/repo_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ defmodule Ecto.RepoTest do
122122
test "stream emits row values lazily" do
123123
stream = TestRepo.stream(MySchema)
124124
refute_received :stream_execute
125-
assert Enum.to_list(stream) == [1]
125+
assert [%MySchema{id: 1}] = Enum.to_list(stream)
126126
assert_received :stream_execute
127127
assert Enum.take(stream, 0) == []
128128
refute_received :stream_execute

test/support/test_repo.exs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,24 +35,30 @@ defmodule Ecto.TestAdapter do
3535

3636
def prepare(operation, query), do: {:nocache, {operation, query}}
3737

38-
def execute(_repo, _, {:nocache, {:all, _}}, _, _, _) do
39-
Process.get(:test_repo_all_results, {1, [[1]]})
38+
def execute(_repo, _, {:nocache, {:all, %{select: %{fields: [_|_] = fields}}}}, _, _) do
39+
# Pad nil values after first
40+
values = List.duplicate(nil, length(fields) - 1)
41+
Process.get(:test_repo_all_results, {1, [[1 | values]]})
4042
end
4143

42-
def execute(_repo, _meta, {:nocache, {:delete_all, %{from: {_, SchemaMigration}}}}, [version], _, _) do
44+
def execute(_repo, _, {:nocache, {:all, %{select: %{fields: []}}}}, _, _) do
45+
Process.get(:test_repo_all_results, {1, [[]]})
46+
end
47+
48+
def execute(_repo, _meta, {:nocache, {:delete_all, %{from: {_, SchemaMigration}}}}, [version], _) do
4349
Process.put(:migrated_versions, List.delete(migrated_versions(), version))
4450
{1, nil}
4551
end
4652

47-
def execute(_repo, meta, {:nocache, {op, %{from: {source, _}}}}, _params, _preprocess, _opts) do
53+
def execute(_repo, meta, {:nocache, {op, %{from: {source, _}}}}, _params, _opts) do
4854
send test_process(), {op, {meta.prefix, source}}
4955
{1, nil}
5056
end
5157

52-
def stream(repo, meta, prepared, params, preprocess, opts) do
58+
def stream(repo, meta, prepared, params, opts) do
5359
Stream.map([:execute], fn(:execute) ->
5460
send test_process(), :stream_execute
55-
execute(repo, meta, prepared, params, preprocess, opts)
61+
execute(repo, meta, prepared, params, opts)
5662
end)
5763
end
5864

0 commit comments

Comments
 (0)