Skip to content

Move preprocessing out of adapters #2312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions lib/ecto/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Ecto.Adapter do
@type returning :: [atom]
@type prepared :: term
@type cached :: term
@type process :: (term -> term)
@type on_conflict :: {:raise, list(), []} |
{:nothing, list(), [atom]} |
{Ecto.Query.t, list(), [atom]}
Expand Down Expand Up @@ -128,13 +127,8 @@ defmodule Ecto.Adapter do

The `meta` field is a map containing some of the fields found
in the `Ecto.Query` struct.

It receives a process function that should be invoked for each
selected field in the query result in order to convert them to the
expected Ecto type. The `process` function will be nil if no
result set is expected from the query.
"""
@callback execute(repo, query_meta, query, params :: list(), process | nil, options) :: result when
@callback execute(repo, query_meta, query, params :: list(), options) :: result when
result: {integer, [[term]] | nil} | no_return,
query: {:nocache, prepared} |
{:cached, (prepared -> :ok), cached} |
Expand Down
103 changes: 45 additions & 58 deletions lib/ecto/adapters/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ defmodule Ecto.Adapters.SQL do
do: {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.delete_all(query))}}

@doc false
def execute(repo, meta, query, params, process, opts) do
Ecto.Adapters.SQL.execute(repo, meta, query, params, process, opts)
def execute(repo, meta, query, params, opts) do
Ecto.Adapters.SQL.execute(repo, meta, query, params, opts)
end

@doc false
def stream(repo, meta, query, params, process, opts) do
Ecto.Adapters.SQL.stream(repo, meta, query, params, process, opts)
def stream(repo, meta, query, params, opts) do
Ecto.Adapters.SQL.stream(repo, meta, query, params, opts)
end

@doc false
Expand Down Expand Up @@ -141,7 +141,7 @@ defmodule Ecto.Adapters.SQL do
Ecto.Adapters.SQL.lock_for_migrations(repo, query, opts, fun)
end

defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7,
defoverridable [prepare: 2, execute: 5, insert: 6, update: 6, delete: 4, insert_all: 7,
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2,
lock_for_migrations: 4]
end
Expand Down Expand Up @@ -197,11 +197,7 @@ defmodule Ecto.Adapters.SQL do
optional(atom) => any}
| no_return
def query!(repo, sql, params \\ [], opts \\ []) do
query!(repo, sql, params, fn x -> x end, opts)
end

defp query!(repo, sql, params, mapper, opts) do
case query(repo, sql, params, mapper, opts) do
case query(repo, sql, params, opts) do
{:ok, result} -> result
{:error, err} -> raise err
end
Expand Down Expand Up @@ -246,17 +242,13 @@ defmodule Ecto.Adapters.SQL do
optional(atom) => any}}
| {:error, Exception.t}
def query(repo, sql, params \\ [], opts \\ []) do
query(repo, sql, params, fn x -> x end, opts)
end

defp query(repo, sql, params, mapper, opts) do
sql_call(repo, :execute, [sql], params, mapper, opts)
sql_call(repo, :execute, [sql], params, opts)
end

defp sql_call(repo, callback, args, params, mapper, opts) do
defp sql_call(repo, callback, args, params, opts) do
{repo_mod, pool, default_opts} = lookup_pool(repo)
conn = get_conn(pool) || pool
opts = [decode_mapper: mapper] ++ with_log(repo_mod, params, opts ++ default_opts)
opts = with_log(repo_mod, params, opts ++ default_opts)
args = args ++ [params, opts]
try do
apply(repo_mod.__sql__, callback, [conn | args])
Expand Down Expand Up @@ -406,39 +398,41 @@ defmodule Ecto.Adapters.SQL do
end

@doc false
def execute(repo, meta, prepared, params, mapper, opts) do
do_execute(repo, meta, prepared, params, mapper, put_source(opts, meta))
def execute(repo, meta, prepared, params, opts) do
%{num_rows: num, rows: rows} = do_execute(repo, meta, prepared, params, put_source(opts, meta))
{num, rows}
end

defp do_execute(repo, _meta, {:cache, update, {id, prepared}}, params, mapper, opts) do
execute_and_cache(repo, id, update, prepared, params, mapper, opts)
defp do_execute(repo, _meta, {:cache, update, {id, prepared}}, params, opts) do
execute_and_cache(repo, id, update, prepared, params, opts)
end

defp do_execute(repo, _meta, {:cached, reset, {id, cached}}, params, mapper, opts) do
execute_or_reset(repo, id, reset, cached, params, mapper, opts)
defp do_execute(repo, _meta, {:cached, reset, {id, cached}}, params, opts) do
execute_or_reset(repo, id, reset, cached, params, opts)
end

defp do_execute(repo, _meta, {:nocache, {_id, prepared}}, params, mapper, opts) do
%{rows: rows, num_rows: num} =
sql_call!(repo, :execute, [prepared], params, mapper, opts)
{num, rows}
defp do_execute(repo, _meta, {:nocache, {_id, prepared}}, params, opts) do
case sql_call(repo, :execute, [prepared], params, opts) do
{:ok, res} -> res
{:error, err} -> raise err
end
end

defp execute_and_cache(repo, id, update, prepared, params, mapper, opts) do
defp execute_and_cache(repo, id, update, prepared, params, opts) do
name = "ecto_" <> Integer.to_string(id)
case sql_call(repo, :prepare_execute, [name, prepared], params, mapper, opts) do
{:ok, query, %{num_rows: num, rows: rows}} ->
case sql_call(repo, :prepare_execute, [name, prepared], params, opts) do
{:ok, query, result} ->
update.({id, query})
{num, rows}
result
{:error, err} ->
raise err
end
end

defp execute_or_reset(repo, id, reset, cached, params, mapper, opts) do
case sql_call(repo, :execute, [cached], params, mapper, opts) do
{:ok, %{num_rows: num, rows: rows}} ->
{num, rows}
defp execute_or_reset(repo, id, reset, cached, params, opts) do
case sql_call(repo, :execute, [cached], params, opts) do
{:ok, result} ->
result
{:error, err} ->
raise err
{:reset, err} ->
Expand All @@ -447,13 +441,6 @@ defmodule Ecto.Adapters.SQL do
end
end

defp sql_call!(repo, callback, args, params, mapper, opts) do
case sql_call(repo, callback, args, params, mapper, opts) do
{:ok, res} -> res
{:error, err} -> raise err
end
end

@doc """
Returns a stream that runs a custom SQL query on given repo when reduced.

Expand Down Expand Up @@ -488,36 +475,36 @@ defmodule Ecto.Adapters.SQL do
"""
@spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t
def stream(repo, sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, opts)
Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, opts)
end

@doc false
def stream(repo, meta, prepared, params, mapper, opts) do
do_stream(repo, meta, prepared, params, mapper, put_source(opts, meta))
def stream(repo, meta, prepared, params, opts) do
do_stream(repo, meta, prepared, params, put_source(opts, meta))
end

def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, mapper, opts) do
prepare_stream(repo, prepared, params, mapper, opts)
def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, opts) do
prepare_stream(repo, prepared, params, opts)
end

def do_stream(repo, _, {:cached, _, {_, cached}}, params, mapper, opts) do
prepare_stream(repo, String.Chars.to_string(cached), params, mapper, opts)
def do_stream(repo, _, {:cached, _, {_, cached}}, params, opts) do
prepare_stream(repo, String.Chars.to_string(cached), params, opts)
end

def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, mapper, opts) do
prepare_stream(repo, prepared, params, mapper, opts)
def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, opts) do
prepare_stream(repo, prepared, params, opts)
end

defp prepare_stream(repo, prepared, params, mapper, opts) do
defp prepare_stream(repo, prepared, params, opts) do
repo
|> Ecto.Adapters.SQL.Stream.__build__(prepared, params, mapper, opts)
|> Ecto.Adapters.SQL.Stream.__build__(prepared, params, opts)
|> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end)
end

@doc false
def reduce(repo, statement, params, mapper, opts, acc, fun) do
def reduce(repo, statement, params, opts, acc, fun) do
{repo_mod, pool, default_opts} = lookup_pool(repo)
opts = [decode_mapper: mapper] ++ with_log(repo, params, opts ++ default_opts)
opts = with_log(repo, params, opts ++ default_opts)
case get_conn(pool) do
nil ->
raise "cannot reduce stream outside of transaction"
Expand All @@ -528,9 +515,9 @@ defmodule Ecto.Adapters.SQL do
end

@doc false
def into(repo, statement, params, mapper, opts) do
def into(repo, statement, params, opts) do
{repo_mod, pool, default_opts} = lookup_pool(repo)
opts = [decode_mapper: mapper] ++ with_log(repo_mod, params, opts ++ default_opts)
opts = with_log(repo_mod, params, opts ++ default_opts)
case get_conn(pool) do
nil ->
raise "cannot collect into stream outside of transaction"
Expand All @@ -542,7 +529,7 @@ defmodule Ecto.Adapters.SQL do

@doc false
def struct(repo, conn, sql, {operation, source, params}, values, on_conflict, returning, opts) do
case query(repo, sql, values, fn x -> x end, opts) do
case query(repo, sql, values, opts) do
{:ok, %{rows: nil, num_rows: 1}} ->
{:ok, []}
{:ok, %{rows: [values], num_rows: 1}} ->
Expand Down
15 changes: 7 additions & 8 deletions lib/ecto/adapters/sql/stream.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
defmodule Ecto.Adapters.SQL.Stream do
@moduledoc false

defstruct [:repo, :statement, :params, :mapper, :opts]
defstruct [:repo, :statement, :params, :opts]

def __build__(repo, statement, params, mapper, opts) do
%__MODULE__{repo: repo, statement: statement, params: params, mapper: mapper,
opts: opts}
def __build__(repo, statement, params, opts) do
%__MODULE__{repo: repo, statement: statement, params: params, opts: opts}
end
end

Expand All @@ -18,16 +17,16 @@ defimpl Enumerable, for: Ecto.Adapters.SQL.Stream do

def reduce(stream, acc, fun) do
%Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params,
mapper: mapper, opts: opts} = stream
Ecto.Adapters.SQL.reduce(repo, statement, params, mapper, opts, acc, fun)
opts: opts} = stream
Ecto.Adapters.SQL.reduce(repo, statement, params, opts, acc, fun)
end
end

defimpl Collectable, for: Ecto.Adapters.SQL.Stream do
def into(stream) do
%Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params,
mapper: mapper, opts: opts} = stream
{state, fun} = Ecto.Adapters.SQL.into(repo, statement, params, mapper, opts)
opts: opts} = stream
{state, fun} = Ecto.Adapters.SQL.into(repo, statement, params, opts)
{state, make_into(fun, stream)}
end

Expand Down
12 changes: 6 additions & 6 deletions lib/ecto/repo/assoc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ defmodule Ecto.Repo.Assoc do
Transforms a result set based on query assocs, loading
the associations onto their parent schema.
"""
@spec query([Ecto.Schema.t], list, tuple) :: [Ecto.Schema.t]
def query(rows, assocs, sources)
@spec query([list], list, tuple, (list -> list)) :: [Ecto.Schema.t]
def query(rows, assocs, sources, fun)

def query([], _assocs, _sources), do: []
def query(rows, [], _sources), do: rows
def query([], _assocs, _sources, _fun), do: []
def query(rows, [], _sources, fun), do: Enum.map(rows, fun)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would be able to delay this map until next step (preloading) to minimize traversals. However if Ecto controls all processing perhaps we can do better.


def query(rows, assocs, sources) do
def query(rows, assocs, sources, fun) do
# Create rose tree of accumulator dicts in the same
# structure as the fields tree
accs = create_accs(0, assocs, sources, [])

# Populate tree of dicts of associated entities from the result set
{_keys, _cache, rows, sub_dicts} = Enum.reduce(rows, accs, fn row, acc ->
merge(row, acc, 0) |> elem(0)
merge(fun.(row), acc, 0) |> elem(0)
end)

# Create the reflections that will be loaded into memory.
Expand Down
12 changes: 6 additions & 6 deletions lib/ecto/repo/queryable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,17 @@ defmodule Ecto.Repo.Queryable do

case meta do
%{select: nil} ->
adapter.execute(repo, meta, prepared, params, nil, opts)
adapter.execute(repo, meta, prepared, params, opts)
%{select: select, prefix: prefix, sources: sources, preloads: preloads} ->
%{preprocess: preprocess, postprocess: postprocess, take: take, assocs: assocs} = select
all_nil? = tuple_size(sources) != 1
preprocessor = &preprocess(&1, preprocess, all_nil?, prefix, adapter)
{count, rows} = adapter.execute(repo, meta, prepared, params, preprocessor, opts)
{count, rows} = adapter.execute(repo, meta, prepared, params, opts)
postprocessor = postprocessor(postprocess, take, prefix, adapter)

{count,
rows
|> Ecto.Repo.Assoc.query(assocs, sources)
|> Ecto.Repo.Assoc.query(assocs, sources, preprocessor)
|> Ecto.Repo.Preloader.query(repo, preloads, take, postprocessor, opts)}
end
end
Expand All @@ -146,18 +146,18 @@ defmodule Ecto.Repo.Queryable do
case meta do
%{select: nil} ->
repo
|> adapter.stream(meta, prepared, params, nil, opts)
|> adapter.stream(meta, prepared, params, opts)
|> Stream.flat_map(fn {_, nil} -> [] end)
%{select: select, prefix: prefix, sources: sources, preloads: preloads} ->
%{preprocess: preprocess, postprocess: postprocess, take: take, assocs: assocs} = select
all_nil? = tuple_size(sources) != 1
preprocessor = &preprocess(&1, preprocess, all_nil?, prefix, adapter)
stream = adapter.stream(repo, meta, prepared, params, preprocessor, opts)
stream = adapter.stream(repo, meta, prepared, params, opts)
postprocessor = postprocessor(postprocess, take, prefix, adapter)

Stream.flat_map(stream, fn {_, rows} ->
rows
|> Ecto.Repo.Assoc.query(assocs, sources)
|> Ecto.Repo.Assoc.query(assocs, sources, preprocessor)
|> Ecto.Repo.Preloader.query(repo, preloads, take, postprocessor, opts)
end)
end
Expand Down
2 changes: 1 addition & 1 deletion test/ecto/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ defmodule Ecto.RepoTest do
test "stream emits row values lazily" do
stream = TestRepo.stream(MySchema)
refute_received :stream_execute
assert Enum.to_list(stream) == [1]
assert [%MySchema{id: 1}] = Enum.to_list(stream)
assert_received :stream_execute
assert Enum.take(stream, 0) == []
refute_received :stream_execute
Expand Down
18 changes: 12 additions & 6 deletions test/support/test_repo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,30 @@ defmodule Ecto.TestAdapter do

def prepare(operation, query), do: {:nocache, {operation, query}}

def execute(_repo, _, {:nocache, {:all, _}}, _, _, _) do
Process.get(:test_repo_all_results, {1, [[1]]})
def execute(_repo, _, {:nocache, {:all, %{select: %{fields: [_|_] = fields}}}}, _, _) do
# Pad nil values after first
values = List.duplicate(nil, length(fields) - 1)
Process.get(:test_repo_all_results, {1, [[1 | values]]})
end

def execute(_repo, _meta, {:nocache, {:delete_all, %{from: {_, SchemaMigration}}}}, [version], _, _) do
def execute(_repo, _, {:nocache, {:all, %{select: %{fields: []}}}}, _, _) do
Process.get(:test_repo_all_results, {1, [[]]})
end

def execute(_repo, _meta, {:nocache, {:delete_all, %{from: {_, SchemaMigration}}}}, [version], _) do
Process.put(:migrated_versions, List.delete(migrated_versions(), version))
{1, nil}
end

def execute(_repo, meta, {:nocache, {op, %{from: {source, _}}}}, _params, _preprocess, _opts) do
def execute(_repo, meta, {:nocache, {op, %{from: {source, _}}}}, _params, _opts) do
send test_process(), {op, {meta.prefix, source}}
{1, nil}
end

def stream(repo, meta, prepared, params, preprocess, opts) do
def stream(repo, meta, prepared, params, opts) do
Stream.map([:execute], fn(:execute) ->
send test_process(), :stream_execute
execute(repo, meta, prepared, params, preprocess, opts)
execute(repo, meta, prepared, params, opts)
end)
end

Expand Down