diff --git a/README.md b/README.md index 1f3d18f..d8d68a3 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Welcome! `AshSqlite` is the SQLite data layer for [Ash Framework](https://hexdoc ### Resources +- [Aggregates](documentation/topics/resources/aggregates.md) - [References](documentation/topics/resources/references.md) - [Polymorphic Resources](documentation/topics/resources/polymorphic-resources.md) diff --git a/documentation/topics/about-ash-sqlite/what-is-ash-sqlite.md b/documentation/topics/about-ash-sqlite/what-is-ash-sqlite.md index 08abe09..394b8c1 100644 --- a/documentation/topics/about-ash-sqlite/what-is-ash-sqlite.md +++ b/documentation/topics/about-ash-sqlite/what-is-ash-sqlite.md @@ -6,7 +6,7 @@ SPDX-License-Identifier: MIT # What is AshSqlite? -AshSqlite is the SQLite `Ash.DataLayer` for [Ash Framework](https://hexdocs.pm/ash). This doesn't have all of the features of [AshPostgres](https://hexdocs.pm/ash_postgres), but it does support most of the features of Ash data layers. The main feature missing is Aggregate support. +AshSqlite is the SQLite `Ash.DataLayer` for [Ash Framework](https://hexdocs.pm/ash). This doesn't have all of the features of [AshPostgres](https://hexdocs.pm/ash_postgres), but it does support most of the features of Ash data layers. AshSqlite supports related aggregates, filters, sorts, and expression calculations for common SQLite-backed applications. See the [AshSqlite aggregates guide](../resources/aggregates.md) for supported aggregate cases and SQLite-specific limitations. Use this to persist records in a SQLite table. For example, the resource below would be persisted in a table called `tweets`: diff --git a/documentation/topics/resources/aggregates.md b/documentation/topics/resources/aggregates.md new file mode 100644 index 0000000..94fb1e6 --- /dev/null +++ b/documentation/topics/resources/aggregates.md @@ -0,0 +1,225 @@ + + +# Aggregates + +AshSqlite supports resource aggregates that can be loaded, filtered, sorted, and used in expression calculations. For general Ash aggregate usage, see the [Ash aggregates guide](https://hexdocs.pm/ash/aggregates.html). + +## Supported Aggregates + +AshSqlite supports related `count`, `sum`, `avg`, `min`, `max`, `exists`, `first`, `list`, and `custom` aggregates over normal relationship paths. + +```elixir +aggregates do + count :total_tickets, :tickets + exists :has_open_tickets, :tickets do + filter expr(status == :open) + end + + first :first_ticket_subject, :tickets, :subject do + sort subject: :asc_nils_last + end + + list :ticket_subjects, :tickets, :subject do + sort subject: :asc_nils_last + end +end +``` + +Aggregates are translated to SQL and can be used in queries. + +```elixir +require Ash.Query + +Helpdesk.Support.Representative +|> Ash.Query.filter(total_tickets > 2) +|> Ash.Query.sort(total_tickets: :desc) +|> Ash.Query.load([:total_tickets, :first_ticket_subject]) +|> Ash.read!() +``` + +Aggregates can also be loaded on records that have already been read. + +```elixir +representatives = Helpdesk.Support.read!(Helpdesk.Support.Representative) + +Ash.load!(representatives, [:total_tickets, :ticket_subjects]) +``` + +## Calculations + +Expression calculations can reference aggregates and be pushed down to SQLite. + +```elixir +aggregates do + count :total_tickets, :tickets + + count :open_tickets, :tickets do + filter expr(status == :open) + end +end + +calculations do + calculate :percent_open, :float, expr(open_tickets / total_tickets) +end +``` + +Calculations that reference aggregates can be loaded, filtered, and sorted in the same way. + +```elixir +require Ash.Query + +Helpdesk.Support.Representative +|> Ash.Query.filter(percent_open > 0.25) +|> Ash.Query.sort(:percent_open) +|> Ash.Query.load(:percent_open) +|> Ash.read!() +``` + +## Relationship Paths + +Aggregates are supported over normal relationship paths, including multi-hop paths. + +```elixir +aggregates do + count :comment_count, [:posts, :comments] + sum :paid_total, [:orders, :payments], :amount +end +``` + +One-hop many-to-many relationship aggregates are supported. Scalar aggregates are also supported when a multi-hop path ends in a many-to-many relationship. + +```elixir +aggregates do + count :linked_post_count, :linked_posts + count :linked_post_count_through_posts, [:posts, :linked_posts] + + first :first_linked_post_title, :linked_posts, :title do + sort title: :asc_nils_last + end +end +``` + +Parent-independent unrelated aggregates are supported when the aggregate query does not need values from the parent row. + +```elixir +aggregates do + count :published_post_count, Post do + filter expr(published == true) + end +end +``` + +## Aggregate Filters + +Aggregate filters and aggregate `join_filter`s are supported for normal paths and one-hop many-to-many paths when they do not depend on parent row values. + +```elixir +aggregates do + count :open_ticket_count, :tickets do + filter expr(status == :open) + end + + count :matching_ticket_count, :tickets do + join_filter :tickets, expr(priority == :high) + end +end +``` + +For many-to-many aggregates, a `join_filter` on the many-to-many relationship applies to the destination resource side of the aggregate. Put through-resource filtering on the relationship's configured join relationship/filter. + +For filters that need to test a to-many relationship without multiplying the aggregate rows, prefer `exists/2`. + +```elixir +aggregates do + sum :liked_comment_total, :comments, :likes do + filter expr(exists(ratings, score > 5)) + end +end +``` + +Multi-hop aggregates use each relationship's configured read action. If an intermediate hop needs scoped rows, define the read action on that relationship rather than trying to override it per aggregate. + +## SQLite Requirements + +`first` and `list` aggregates require SQLite 3.30.0 or later with JSON functions enabled. Window functions were added in SQLite 3.25.0, but AshSqlite's generated SQL also uses aggregate `FILTER` clauses and explicit `NULLS FIRST`/`NULLS LAST` ordering, which require SQLite 3.30.0 or later. + +- window functions +- aggregate `FILTER` +- JSON aggregation +- explicit null ordering + +JSON functions are built into SQLite by default as of SQLite 3.38.0. Older SQLite builds need the JSON1 extension enabled. Check the SQLite library used by your application, which may not be the same binary as the `sqlite3` command: + +```elixir +MyApp.Repo.query!("select sqlite_version()") +MyApp.Repo.query!("select json_group_array(1)") +``` + +`list` aggregates return lists through SQLite JSON aggregation. `custom` aggregates require a SQLite-compatible aggregate expression or function. + +## Custom Aggregates + +Custom aggregates should use both `Ash.Resource.Aggregate.CustomAggregate` and `AshSqlite.CustomAggregate`. + +```elixir +defmodule MyApp.StringAgg do + use Ash.Resource.Aggregate.CustomAggregate + use AshSqlite.CustomAggregate + + require Ecto.Query + + def dynamic(opts, binding) do + Ecto.Query.dynamic( + [], + fragment("group_concat(?, ?)", field(as(^binding), ^opts[:field]), ^opts[:delimiter]) + ) + end +end +``` + +Then use that implementation from a resource aggregate. + +```elixir +aggregates do + custom :ticket_subjects_joined, :tickets, :string do + implementation {MyApp.StringAgg, field: :subject, delimiter: ", "} + end +end +``` + +`AshSqlite.CustomAggregate` only defines the `dynamic/2` contract. It does not install SQLite extensions or register user-defined functions. If your custom aggregate uses a function that is not built into SQLite, register it with the SQLite connection yourself and make sure it is available in every environment. + +## Performance + +AshSqlite builds aggregate queries as grouped subqueries or windowed subqueries and joins those results back to the parent query. Add indexes for the relationship keys used by those subqueries. + +Useful indexes usually include: + +- child foreign keys, like `tickets.representative_id` +- many-to-many join resource key pairs +- fields used by aggregate filters +- fields used by `first` and `list` aggregate sorts + +## Unsupported Cases + +Full aggregate parity with [AshPostgres](https://hexdocs.pm/ash_postgres) is not available. Unsupported cases include: + +- inline query-level `list` and `custom` aggregate expressions +- unrelated aggregates that reference the parent row +- manual relationships +- `no_attributes?` relationships +- multi-hop paths that include many-to-many relationships before the final hop +- non-scalar aggregates over multi-hop paths that include many-to-many relationships +- parent-dependent relationship filters +- parent-dependent aggregate filters +- parent-dependent `join_filter`s +- aggregate filters that reference other aggregates +- expression sorts on `first` and `list` aggregates +- `uniq` list aggregates sorted by fields other than the listed field +- fanout-prone `sum`, `avg`, `list`, `custom`, or field-based `count` aggregate filters over to-many relationship references + +A fanout-prone aggregate filter is one where filtering joins another to-many relationship and can duplicate the rows being aggregated. For example, a `sum` of comment likes filtered by `popular_ratings.id` could count the same comment once per matching rating. AshSqlite rejects these shapes instead of returning an over-counted result. Use `exists/2` when you only need to test that related rows exist. diff --git a/documentation/tutorials/getting-started-with-ash-sqlite.md b/documentation/tutorials/getting-started-with-ash-sqlite.md index 7007b93..c4c6e30 100644 --- a/documentation/tutorials/getting-started-with-ash-sqlite.md +++ b/documentation/tutorials/getting-started-with-ash-sqlite.md @@ -326,11 +326,71 @@ Helpdesk.Support.Ticket ### Aggregates -As stated in [what-is-ash-sqlite](https://hexdocs.pm/ash_sqlite/getting-started-with-ash-sqlite.html#steps), -**The main feature missing is Aggregate support.**. +Aggregates include grouped data about relationships. You can read more about them in the [Ash aggregates guide](https://hexdocs.pm/ash/aggregates.html) and the [AshSqlite aggregates guide](../topics/resources/aggregates.md). -In order to use these consider using [ash_postgres](https://github.com/ash-project/ash_postgres) or -provide a patch. +Lets add aggregates to the representative resource so we can query how many tickets are assigned to a representative, how many are open, and the first ticket subject. + +```elixir +# in lib/helpdesk/support/resources/representative.ex + + aggregates do + count :total_tickets, :tickets + + count :open_tickets, :tickets do + filter expr(status == :open) + end + + exists :has_closed_tickets, :tickets do + filter expr(status == :closed) + end + + first :first_ticket_subject, :tickets, :subject do + sort subject: :asc_nils_last + end + end +``` + +Aggregates are translated to SQL and can be used in filters and sorts. + +```elixir +require Ash.Query + +Helpdesk.Support.Representative +|> Ash.Query.filter(open_tickets > 0) +|> Ash.Query.sort(total_tickets: :desc) +|> Ash.Query.load([:total_tickets, :open_tickets, :first_ticket_subject]) +|> Ash.read!() +``` + +You can also load individual aggregates after records have already been read. + +```elixir +representatives = Helpdesk.Support.read!(Helpdesk.Support.Representative) + +Ash.load!(representatives, [:open_tickets, :has_closed_tickets]) +``` + +Calculations can refer to aggregates, and those calculations can also be filtered, sorted, and loaded. + +```elixir +# in lib/helpdesk/support/resources/representative.ex + + calculations do + calculate :percent_open, :float, expr(open_tickets / total_tickets) + end +``` + +```elixir +require Ash.Query + +Helpdesk.Support.Representative +|> Ash.Query.filter(percent_open > 0.25) +|> Ash.Query.sort(:percent_open) +|> Ash.Query.load(:percent_open) +|> Ash.read!() +``` + +AshSqlite supports related `count`, `sum`, `avg`, `min`, `max`, `exists`, `first`, `list`, and `custom` aggregates over normal relationship paths, one-hop many-to-many relationship aggregates, scalar aggregates over multi-hop paths that end in a many-to-many relationship, and parent-independent unrelated aggregates. `first` and `list` aggregates require SQLite 3.30.0 or later with JSON functions enabled. ### Rich Configuration Options diff --git a/lib/aggregate.ex b/lib/aggregate.ex new file mode 100644 index 0000000..12a7bb1 --- /dev/null +++ b/lib/aggregate.ex @@ -0,0 +1,1617 @@ +# SPDX-FileCopyrightText: 2023 ash_sqlite contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshSqlite.Aggregate do + @moduledoc false + + import Ecto.Query, only: [from: 2] + + @scalar_aggregate_kinds [:count, :sum, :avg, :max, :min, :exists] + @window_aggregate_kinds [:first, :list] + @supported_aggregate_kinds @scalar_aggregate_kinds ++ @window_aggregate_kinds ++ [:custom] + @window_value_field :__ash_sqlite_aggregate_value__ + @window_row_number_field :__ash_sqlite_aggregate_row_number__ + @window_count_field :__ash_sqlite_aggregate_count__ + @unrelated_join_field :__ash_sqlite_unrelated_aggregate_join__ + + def add_aggregates(query, aggregates, resource, opts \\ []) do + select? = Keyword.get(opts, :select?, true) + + do_add_aggregates(query, aggregates, resource, select?) + end + + def add_sort_aggregates(query, sort, _resource) when sort in [nil, []], do: {:ok, query} + + def add_sort_aggregates(query, sort, resource) do + with {:ok, aggregates} <- aggregates_from_sort(query, sort, resource) do + add_aggregates(query, aggregates, resource, select?: false) + end + end + + def relationship_filter_uses_parent?(%{filter: nil}), do: false + + def relationship_filter_uses_parent?(%{filter: filter}) do + filter_uses_parent?(filter) + end + + defp do_add_aggregates(query, [], _resource, _select?), do: {:ok, query} + + defp do_add_aggregates(query, aggregates, resource, select?) do + primary_key = Ash.Resource.Info.primary_key(resource) + + cond do + primary_key == [] -> + {:error, "AshSqlite cannot load aggregates on resources with no primary key"} + + Enum.any?(aggregates, &(not supported?(&1))) -> + {:error, + "AshSqlite only supports loading related count, sum, avg, min, max, exists, first, list and custom aggregates"} + + true -> + {already_added, remaining} = + aggregates + |> Enum.uniq_by(& &1.name) + |> Enum.split_with(&already_added?(&1, query.__ash_bindings__)) + + already_added_dynamics = + if select? do + Enum.map(already_added, &existing_aggregate_dynamic(&1, query.__ash_bindings__)) + else + [] + end + + remaining + |> Enum.group_by(&aggregate_group_key/1) + |> Enum.reduce_while({:ok, query, already_added_dynamics}, fn {relationship_path, + aggregates}, + {:ok, query, dynamics} -> + case add_aggregate_group( + query, + resource, + aggregate_relationship_path(relationship_path), + aggregates + ) do + {:ok, query, new_dynamics} -> + {:cont, {:ok, query, new_dynamics ++ dynamics}} + + {:error, error} -> + {:halt, {:error, error}} + end + end) + |> case do + {:ok, query, dynamics} -> + if select? do + {:ok, select_aggregates(query, dynamics)} + else + {:ok, query} + end + + {:error, error} -> + {:error, error} + end + end + end + + defp supported?(%{name: name}) when not is_atom(name), do: false + + defp supported?(%{kind: kind, related?: false}) when kind in @supported_aggregate_kinds do + true + end + + defp supported?(%{kind: kind, related?: related?, relationship_path: path}) + when kind in @supported_aggregate_kinds do + related? != false && match?([_ | _], path) + end + + defp supported?(_), do: false + + defp aggregate_group_key(aggregate) do + read_action = (aggregate.query.action && aggregate.query.action.name) || aggregate.read_action + + relationship_key = + case aggregate do + %{related?: false, query: %{resource: resource}} -> {:unrelated, resource} + %{relationship_path: relationship_path} -> {:related, relationship_path} + end + + {relationship_key, read_action, aggregate.join_filters || %{}, + aggregate_filter_group_key(aggregate), aggregate_kind_group_key(aggregate)} + end + + defp aggregate_relationship_path( + {{:related, relationship_path}, _read_action, _join_filters, _aggregate_filter_group, + _kind_group} + ) do + relationship_path + end + + defp aggregate_relationship_path( + {{:unrelated, _resource}, _read_action, _join_filters, _aggregate_filter_group, + _kind_group} + ) do + [] + end + + defp aggregate_kind_group_key(%{kind: kind, name: name}) when kind in @window_aggregate_kinds do + {kind, name} + end + + defp aggregate_kind_group_key(_aggregate), do: :shared + + defp aggregate_filter_group_key(aggregate) do + if aggregate_filter_uses_relationships?(aggregate) do + {:filter, aggregate.name} + else + :shared + end + end + + defp already_added?(aggregate, bindings) do + Enum.any?(bindings.bindings, fn + {_binding, %{type: :aggregate, aggregates: aggregates}} -> + aggregate.name in Enum.map(aggregates, & &1.name) + + _binding -> + false + end) + end + + defp existing_aggregate_dynamic(aggregate, bindings) do + {binding, _aggregate_binding} = + Enum.find(bindings.bindings, fn + {_binding, %{type: :aggregate, aggregates: aggregates}} -> + aggregate.name in Enum.map(aggregates, & &1.name) + + _binding -> + false + end) + + {aggregate.load, aggregate.name, loaded_aggregate_dynamic(aggregate, binding)} + end + + defp aggregates_from_sort(query, sort, resource) do + sort + |> List.wrap() + |> Enum.reduce_while({:ok, []}, fn sort, {:ok, aggregates} -> + case sort_aggregates(query, sort, resource) do + {:ok, new_aggregates} -> + {:cont, {:ok, new_aggregates ++ aggregates}} + + {:error, error} -> + {:halt, {:error, error}} + end + end) + |> case do + {:ok, aggregates} -> {:ok, Enum.uniq(aggregates)} + {:error, error} -> {:error, error} + end + end + + defp sort_aggregates(query, {sort, _order}, resource) do + sort_key_aggregates(query, sort, resource) + end + + defp sort_aggregates(query, sort, resource) do + sort_key_aggregates(query, sort, resource) + end + + defp sort_key_aggregates(_query, %Ash.Query.Aggregate{} = aggregate, _resource) do + {:ok, [aggregate]} + end + + defp sort_key_aggregates(query, %Ash.Query.Calculation{} = calculation, resource) do + calculation_aggregates(query, calculation, resource) + end + + defp sort_key_aggregates(query, sort, resource) when is_atom(sort) do + case Ash.Resource.Info.field(resource, sort) do + %Ash.Resource.Aggregate{} = aggregate -> + query_aggregate(resource, aggregate) + + %Ash.Resource.Calculation{} = calculation -> + calculation_aggregates(query, calculation, resource) + + _ -> + {:ok, []} + end + end + + defp sort_key_aggregates(_query, _sort, _resource), do: {:ok, []} + + defp calculation_aggregates(query, %Ash.Resource.Calculation{} = calculation, resource) do + {module, opts} = calculation.calculation + + with {:ok, calculation} <- + Ash.Query.Calculation.new( + calculation.name, + module, + opts, + calculation.type, + calculation.constraints + ) do + calculation = + Ash.Actions.Read.add_calc_context( + calculation, + query.__ash_bindings__.context[:private][:actor], + query.__ash_bindings__.context[:private][:authorize?], + query.__ash_bindings__.context[:private][:tenant], + query.__ash_bindings__.context[:private][:tracer], + query.__ash_bindings__.context[:private][:domain], + query.__ash_bindings__.context[:private][:resource], + parent_stack: query.__ash_bindings__[:parent_resources] || [] + ) + + calculation_aggregates(query, calculation, resource) + end + end + + defp calculation_aggregates(query, %Ash.Query.Calculation{} = calculation, resource) do + calculation.opts + |> calculation.module.expression(calculation.context) + |> Ash.Filter.hydrate_refs(%{ + resource: resource, + aggregates: %{}, + parent_stack: query.__ash_bindings__[:parent_resources] || [], + calculations: %{}, + public?: false + }) + |> case do + {:ok, expression} -> + {:ok, Ash.Filter.used_aggregates(expression)} + + {:error, error} -> + {:error, error} + end + end + + defp query_aggregate(resource, aggregate) do + related = Ash.Resource.Info.related(resource, aggregate.relationship_path) + + read_action = + aggregate.read_action || + Ash.Resource.Info.primary_action!(related, :read).name + + with %{valid?: true} = aggregate_query <- Ash.Query.for_read(related, read_action), + %{valid?: true} = aggregate_query <- + Ash.Query.build(aggregate_query, + filter: aggregate.filter, + sort: aggregate.sort + ), + {:ok, aggregate} <- + Ash.Query.Aggregate.new( + resource, + aggregate.name, + aggregate.kind, + path: aggregate.relationship_path, + query: aggregate_query, + field: aggregate.field, + default: aggregate.default, + filterable?: aggregate.filterable?, + type: aggregate.type, + sortable?: aggregate.sortable?, + include_nil?: aggregate.include_nil?, + constraints: aggregate.constraints, + implementation: aggregate.implementation, + uniq?: aggregate.uniq?, + read_action: read_action, + authorize?: aggregate.authorize?, + join_filters: aggregate.join_filters + ) do + {:ok, [aggregate]} + else + %{errors: errors} -> + {:error, errors} + + {:error, error} -> + {:error, error} + end + end + + defp add_aggregate_group(query, _resource, [], aggregates) do + if Enum.all?(aggregates, &(&1.related? == false)) do + do_add_unrelated_aggregate_group(query, aggregates) + else + {:error, "AshSqlite only supports loading unrelated aggregates with no relationship path"} + end + end + + defp add_aggregate_group(query, resource, relationship_path, aggregates) do + with {:ok, relationships} <- relationships(resource, relationship_path), + :ok <- validate_relationships(resource, relationship_path, relationships, aggregates) do + do_add_aggregate_group(query, relationships, aggregates) + end + end + + defp relationships(resource, relationship_path) do + relationship_path + |> Enum.reduce_while({:ok, resource, []}, fn relationship_name, {:ok, resource, acc} -> + case Ash.Resource.Info.relationship(resource, relationship_name) do + nil -> + {:halt, {:error, "No such relationship #{inspect(resource)}.#{relationship_name}"}} + + relationship -> + {:cont, {:ok, relationship.destination, [relationship | acc]}} + end + end) + |> case do + {:ok, _resource, relationships} -> {:ok, Enum.reverse(relationships)} + {:error, error} -> {:error, error} + end + end + + defp validate_relationships(resource, relationship_path, relationships, aggregates) do + cond do + Enum.any?(relationships, &match?(%{manual: {_, _}}, &1)) -> + {:error, "AshSqlite does not support loading aggregates over manual relationships"} + + Enum.any?(relationships, &Map.get(&1, :no_attributes?, false)) -> + {:error, + "AshSqlite does not support loading aggregates over no_attributes? relationships"} + + Enum.any?(relationships, &relationship_filter_uses_parent?/1) -> + {:error, + "AshSqlite does not support loading aggregates over relationships with parent-dependent filters"} + + Enum.any?(relationships, &join_relationship_filter_uses_parent?/1) -> + {:error, + "AshSqlite does not support loading aggregates over many_to_many relationships with parent-dependent join filters"} + + unsupported_multi_hop_many_to_many?(relationships, aggregates) -> + {:error, + "AshSqlite does not support loading aggregates over multi-hop paths that include many_to_many relationships"} + + Enum.empty?(relationships) -> + {:error, + "AshSqlite only supports loading aggregates over a relationship path from #{inspect(resource)}, got: #{inspect(relationship_path)}"} + + true -> + :ok + end + end + + defp unsupported_multi_hop_many_to_many?(relationships, aggregates) do + length(relationships) > 1 && + Enum.any?(relationships, &(&1.type == :many_to_many)) && + !supported_multi_hop_many_to_many?(relationships, aggregates) + end + + defp supported_multi_hop_many_to_many?(relationships, aggregates) do + List.last(relationships).type == :many_to_many && + Enum.count(relationships, &(&1.type == :many_to_many)) == 1 && + Enum.all?(aggregates, &(&1.kind in @scalar_aggregate_kinds)) + end + + defp do_add_unrelated_aggregate_group(query, aggregates) do + binding = query.__ash_bindings__.current + + with :ok <- validate_aggregate_filters(aggregates), + {:ok, aggregate_query} <- unrelated_aggregate_query(query, aggregates, binding) do + aggregate_query = Ecto.Query.subquery(aggregate_query) + + query = + from(_row in query, + left_join: aggregate in ^aggregate_query, + as: ^binding, + on: true + ) + + query = + AshSql.Bindings.add_binding(query, %{ + type: :aggregate, + path: [], + aggregates: aggregates + }) + + dynamics = + Enum.map(aggregates, fn aggregate -> + {aggregate.load, aggregate.name, loaded_aggregate_dynamic(aggregate, binding)} + end) + + {:ok, query, dynamics} + end + end + + defp do_add_aggregate_group(query, [first_relationship | _] = relationships, aggregates) do + binding = query.__ash_bindings__.current + + with :ok <- validate_aggregate_filters(aggregates), + {:ok, aggregate_query} <- + aggregate_query(query, relationships, aggregates, binding) do + aggregate_query = Ecto.Query.subquery(aggregate_query) + root_binding = query.__ash_bindings__.root_binding + + query = + from(_row in query, + left_join: aggregate in ^aggregate_query, + as: ^binding, + on: + field(as(^root_binding), ^first_relationship.source_attribute) == + field(aggregate, ^aggregate_join_attribute(first_relationship)) + ) + + query = + AshSql.Bindings.add_binding(query, %{ + type: :aggregate, + path: [], + aggregates: aggregates + }) + + dynamics = + Enum.map(aggregates, fn aggregate -> + {aggregate.load, aggregate.name, loaded_aggregate_dynamic(aggregate, binding)} + end) + + {:ok, query, dynamics} + end + end + + defp aggregate_query(parent_query, [relationship], [%{kind: kind} = aggregate], binding) + when kind in @window_aggregate_kinds do + case relationship do + %{type: :many_to_many} -> + many_to_many_window_aggregate_query(parent_query, relationship, aggregate, binding) + + relationship -> + related_window_aggregate_query(parent_query, relationship, aggregate, binding) + end + end + + defp aggregate_query( + parent_query, + [_ | _] = relationships, + [%{kind: kind} = aggregate], + binding + ) + when kind in @window_aggregate_kinds do + multi_hop_window_aggregate_query(parent_query, relationships, aggregate, binding) + end + + defp aggregate_query(parent_query, [relationship], aggregates, binding) do + case relationship do + %{type: :many_to_many} -> + many_to_many_aggregate_query(parent_query, relationship, aggregates, binding) + + relationship -> + related_aggregate_query(parent_query, relationship, aggregates, binding) + end + end + + defp aggregate_query(parent_query, relationships, aggregates, binding) do + case List.last(relationships) do + %{type: :many_to_many} -> + multi_hop_many_to_many_aggregate_query(parent_query, relationships, aggregates, binding) + + _relationship -> + multi_hop_aggregate_query(parent_query, relationships, aggregates, binding) + end + end + + defp unrelated_aggregate_query(parent_query, [%{kind: kind} = aggregate], binding) + when kind in @window_aggregate_kinds do + unrelated_window_aggregate_query(parent_query, aggregate, binding) + end + + defp unrelated_aggregate_query(parent_query, aggregates, binding) do + with {:ok, query} <- unrelated_query(parent_query, hd(aggregates), binding, filter?: false) do + root_binding = query.__ash_bindings__.root_binding + relationship = %{destination: hd(aggregates).query.resource} + + query = from(row in query, select: %{}) + + Enum.reduce_while(aggregates, {:ok, query}, fn aggregate, {:ok, query} -> + case aggregate_dynamic(query, relationship, aggregate, root_binding) do + {:ok, query, dynamic} -> + {:cont, {:ok, Ecto.Query.select_merge(query, ^%{aggregate.name => dynamic})}} + + {:error, error} -> + {:halt, {:error, error}} + end + end) + end + end + + defp unrelated_window_aggregate_query(parent_query, aggregate, binding) do + with {:ok, query} <- unrelated_query(parent_query, aggregate, binding, filter?: true) do + root_binding = query.__ash_bindings__.root_binding + + window_aggregate_query( + query, + aggregate, + @unrelated_join_field, + nil, + root_binding, + %{sort: []} + ) + end + end + + defp related_window_aggregate_query(parent_query, relationship, aggregate, binding) do + with {:ok, query} <- + related_window_query(parent_query, relationship, aggregate, binding, [ + relationship.name + ]) do + root_binding = query.__ash_bindings__.root_binding + + window_aggregate_query( + query, + aggregate, + relationship.destination_attribute, + root_binding, + root_binding, + relationship + ) + end + end + + defp many_to_many_window_aggregate_query(parent_query, relationship, aggregate, binding) do + with {:ok, query} <- + related_window_query(parent_query, relationship, aggregate, binding, [ + relationship.name + ]) do + through_binding = query.__ash_bindings__.current + + with {:ok, through_query} <- through_query(parent_query, relationship, through_binding) do + root_binding = query.__ash_bindings__.root_binding + through_query = Ecto.Query.subquery(through_query) + + query = + from(row in query, + join: through in ^through_query, + as: ^through_binding, + on: + field(through, ^relationship.destination_attribute_on_join_resource) == + field(as(^root_binding), ^relationship.destination_attribute) + ) + |> AshSql.Bindings.add_binding(%{ + type: :through, + relationship: relationship + }) + + window_aggregate_query( + query, + aggregate, + relationship.source_attribute_on_join_resource, + through_binding, + root_binding, + relationship + ) + end + end + end + + defp multi_hop_window_aggregate_query(parent_query, relationships, aggregate, binding) do + final_relationship = List.last(relationships) + relationship_path = Enum.map(relationships, & &1.name) + + with {:ok, query} <- + related_window_query( + parent_query, + final_relationship, + aggregate, + binding, + relationship_path + ), + {:ok, query, first_related_binding} <- + join_intermediate_relationships(parent_query, query, relationships, aggregate) do + first_relationship = hd(relationships) + root_binding = query.__ash_bindings__.root_binding + + window_aggregate_query( + query, + aggregate, + first_relationship.destination_attribute, + first_related_binding, + root_binding, + final_relationship + ) + end + end + + defp related_aggregate_query(parent_query, relationship, aggregates, binding) do + with {:ok, query} <- + related_query(parent_query, relationship, hd(aggregates), binding, [relationship.name]) do + root_binding = query.__ash_bindings__.root_binding + + query = + from(row in query, + group_by: field(as(^root_binding), ^relationship.destination_attribute), + select: %{ + ^relationship.destination_attribute => + field(as(^root_binding), ^relationship.destination_attribute) + } + ) + + Enum.reduce_while(aggregates, {:ok, query}, fn aggregate, {:ok, query} -> + case aggregate_dynamic(query, relationship, aggregate, root_binding) do + {:ok, query, dynamic} -> + {:cont, {:ok, Ecto.Query.select_merge(query, ^%{aggregate.name => dynamic})}} + + {:error, error} -> + {:halt, {:error, error}} + end + end) + end + end + + defp many_to_many_aggregate_query(parent_query, relationship, aggregates, binding) do + with {:ok, query} <- + related_query(parent_query, relationship, hd(aggregates), binding, [relationship.name]) do + through_binding = query.__ash_bindings__.current + + with {:ok, through_query} <- through_query(parent_query, relationship, through_binding) do + root_binding = query.__ash_bindings__.root_binding + ash_bindings = query.__ash_bindings__ + through_query = Ecto.Query.subquery(through_query) + + query = + from(row in query, + join: through in ^through_query, + as: ^through_binding, + on: + field(through, ^relationship.destination_attribute_on_join_resource) == + field(as(^root_binding), ^relationship.destination_attribute), + group_by: field(through, ^relationship.source_attribute_on_join_resource), + select: %{ + ^relationship.source_attribute_on_join_resource => + field(through, ^relationship.source_attribute_on_join_resource) + } + ) + |> Map.put(:__ash_bindings__, ash_bindings) + |> AshSql.Bindings.add_binding(%{ + type: :through, + relationship: relationship + }) + + Enum.reduce_while(aggregates, {:ok, query}, fn aggregate, {:ok, query} -> + case aggregate_dynamic(query, relationship, aggregate, root_binding) do + {:ok, query, dynamic} -> + {:cont, {:ok, Ecto.Query.select_merge(query, ^%{aggregate.name => dynamic})}} + + {:error, error} -> + {:halt, {:error, error}} + end + end) + end + end + end + + defp multi_hop_many_to_many_aggregate_query(parent_query, relationships, aggregates, binding) do + final_relationship = List.last(relationships) + relationship_path = Enum.map(relationships, & &1.name) + + with {:ok, query} <- + related_query( + parent_query, + final_relationship, + hd(aggregates), + binding, + relationship_path + ) do + through_binding = query.__ash_bindings__.current + + with {:ok, through_query} <- + through_query(parent_query, final_relationship, through_binding) do + root_binding = query.__ash_bindings__.root_binding + through_query = Ecto.Query.subquery(through_query) + + query = + from(row in query, + join: through in ^through_query, + as: ^through_binding, + on: + field(through, ^final_relationship.destination_attribute_on_join_resource) == + field(as(^root_binding), ^final_relationship.destination_attribute) + ) + |> AshSql.Bindings.add_binding(%{ + type: :through, + relationship: final_relationship + }) + + with {:ok, query, first_related_binding} <- + join_intermediate_relationships(parent_query, query, relationships, hd(aggregates), + current_binding: through_binding + ) do + first_relationship = hd(relationships) + + query = + from(row in query, + group_by: + field(as(^first_related_binding), ^first_relationship.destination_attribute), + select: %{ + ^first_relationship.destination_attribute => + field(as(^first_related_binding), ^first_relationship.destination_attribute) + } + ) + + root_binding = query.__ash_bindings__.root_binding + + Enum.reduce_while(aggregates, {:ok, query}, fn aggregate, {:ok, query} -> + case aggregate_dynamic(query, final_relationship, aggregate, root_binding) do + {:ok, query, dynamic} -> + {:cont, {:ok, Ecto.Query.select_merge(query, ^%{aggregate.name => dynamic})}} + + {:error, error} -> + {:halt, {:error, error}} + end + end) + end + end + end + end + + defp multi_hop_aggregate_query(parent_query, relationships, aggregates, binding) do + final_relationship = List.last(relationships) + relationship_path = Enum.map(relationships, & &1.name) + + with {:ok, query} <- + related_query( + parent_query, + final_relationship, + hd(aggregates), + binding, + relationship_path + ), + {:ok, query, first_related_binding} <- + join_intermediate_relationships(parent_query, query, relationships, hd(aggregates)) do + first_relationship = hd(relationships) + + query = + from(row in query, + group_by: field(as(^first_related_binding), ^first_relationship.destination_attribute), + select: %{ + ^first_relationship.destination_attribute => + field(as(^first_related_binding), ^first_relationship.destination_attribute) + } + ) + + root_binding = query.__ash_bindings__.root_binding + + Enum.reduce_while(aggregates, {:ok, query}, fn aggregate, {:ok, query} -> + case aggregate_dynamic(query, final_relationship, aggregate, root_binding) do + {:ok, query, dynamic} -> + {:cont, {:ok, Ecto.Query.select_merge(query, ^%{aggregate.name => dynamic})}} + + {:error, error} -> + {:halt, {:error, error}} + end + end) + end + end + + defp join_intermediate_relationships(parent_query, query, relationships, aggregate, opts \\ []) do + current_binding = Keyword.get(opts, :current_binding, query.__ash_bindings__.root_binding) + + relationships + |> Enum.zip(tl(relationships)) + |> Enum.with_index() + |> Enum.reverse() + |> Enum.reduce_while( + {:ok, query, current_binding, query.__ash_bindings__.current, nil}, + fn {{relationship, next_relationship}, index}, + {:ok, query, current_binding, next_binding, _first_related_binding} -> + path = + relationships + |> Enum.take(index + 1) + |> Enum.map(& &1.name) + + case intermediate_query(parent_query, relationship, next_binding, aggregate, path) do + {:ok, related_query} -> + related_query = Ecto.Query.subquery(related_query) + + on = intermediate_join_on(next_relationship, next_binding, current_binding) + + query = + from(row in query, + join: related in ^related_query, + as: ^next_binding, + on: ^on + ) + + {:cont, {:ok, query, next_binding, next_binding + 1, next_binding}} + + {:error, error} -> + {:halt, {:error, error}} + end + end + ) + |> case do + {:ok, query, _current_binding, _next_binding, first_related_binding} + when not is_nil(first_related_binding) -> + {:ok, query, first_related_binding} + + {:ok, _query, _current_binding, _next_binding, nil} -> + {:error, "AshSqlite could not build multi-hop aggregate joins"} + + {:error, error} -> + {:error, error} + end + end + + defp intermediate_join_on( + %{type: :many_to_many} = next_relationship, + related_binding, + current_binding + ) do + Ecto.Query.dynamic( + field(as(^related_binding), ^next_relationship.source_attribute) == + field(as(^current_binding), ^next_relationship.source_attribute_on_join_resource) + ) + end + + defp intermediate_join_on(next_relationship, related_binding, current_binding) do + Ecto.Query.dynamic( + field(as(^related_binding), ^next_relationship.source_attribute) == + field(as(^current_binding), ^next_relationship.destination_attribute) + ) + end + + defp related_query(parent_query, relationship, aggregate, binding, relationship_path) do + aggregate.query + |> Ash.Query.unset([:filter, :sort, :distinct, :select, :limit, :offset]) + |> Ash.Query.set_context(relationship.context) + |> Ash.Query.do_filter(relationship.filter, parent_stack: [relationship.source]) + |> Ash.Query.do_filter(join_filter(aggregate, relationship_path)) + |> Ash.Query.set_context(%{ + data_layer: %{ + start_bindings_at: binding, + parent_bindings: parent_query.__ash_bindings__ + } + }) + |> Ash.Query.data_layer_query(run_return_query?: false) + |> case do + {:ok, query} -> + {:ok, + query + |> Ecto.Query.exclude(:select) + |> Ecto.Query.exclude(:order_by)} + + {:error, error} -> + {:error, error} + end + end + + defp related_window_query(parent_query, relationship, aggregate, binding, relationship_path) do + aggregate.query + |> Ash.Query.unset([:sort, :distinct, :select, :limit, :offset]) + |> Ash.Query.set_context(relationship.context) + |> Ash.Query.do_filter(relationship.filter, parent_stack: [relationship.source]) + |> Ash.Query.do_filter(join_filter(aggregate, relationship_path)) + |> Ash.Query.set_context(%{ + data_layer: %{ + start_bindings_at: binding, + parent_bindings: parent_query.__ash_bindings__ + } + }) + |> Ash.Query.data_layer_query(run_return_query?: false) + |> case do + {:ok, query} -> + {:ok, + query + |> Ecto.Query.exclude(:select) + |> Ecto.Query.exclude(:order_by)} + + {:error, error} -> + {:error, error} + end + end + + defp unrelated_query(parent_query, aggregate, binding, opts) do + unset = + if Keyword.fetch!(opts, :filter?) do + [:sort, :distinct, :select, :limit, :offset] + else + [:filter, :sort, :distinct, :select, :limit, :offset] + end + + aggregate.query + |> Ash.Query.unset(unset) + |> Ash.Query.set_context(%{ + data_layer: %{ + start_bindings_at: binding, + parent_bindings: parent_query.__ash_bindings__ + } + }) + |> Ash.Query.data_layer_query(run_return_query?: false) + |> case do + {:ok, query} -> + {:ok, + query + |> Ecto.Query.exclude(:select) + |> Ecto.Query.exclude(:order_by)} + + {:error, error} -> + {:error, error} + end + end + + defp intermediate_query(parent_query, relationship, binding, aggregate, relationship_path) do + read_action = + relationship.read_action || + Ash.Resource.Info.primary_action!(relationship.destination, :read).name + + relationship.destination + |> Ash.Query.for_read(read_action) + |> Ash.Query.unset([:sort, :distinct, :select, :limit, :offset]) + |> Ash.Query.set_context(relationship.context) + |> Ash.Query.do_filter(relationship.filter, parent_stack: [relationship.source]) + |> Ash.Query.do_filter(join_filter(aggregate, relationship_path)) + |> Ash.Query.set_context(%{ + data_layer: %{ + start_bindings_at: binding, + parent_bindings: parent_query.__ash_bindings__ + } + }) + |> Ash.Query.data_layer_query(run_return_query?: false) + |> case do + {:ok, query} -> + {:ok, + query + |> Ecto.Query.exclude(:select) + |> Ecto.Query.exclude(:order_by)} + + {:error, error} -> + {:error, error} + end + end + + defp through_query(parent_query, relationship, binding) do + join_relationship = + Ash.Resource.Info.relationship(relationship.source, relationship.join_relationship) + + relationship.through + |> Ash.Query.new() + |> Ash.Query.set_context(%{ + data_layer: %{ + start_bindings_at: binding, + parent_bindings: parent_query.__ash_bindings__ + } + }) + |> Ash.Query.set_context(join_relationship.context) + |> Ash.Query.do_filter(join_relationship.filter) + |> Ash.Query.data_layer_query(run_return_query?: false) + |> case do + {:ok, query} -> + {:ok, + query + |> Ecto.Query.exclude(:select) + |> Ecto.Query.exclude(:order_by)} + + {:error, error} -> + {:error, error} + end + end + + defp aggregate_join_attribute(%{type: :many_to_many} = relationship) do + relationship.source_attribute_on_join_resource + end + + defp aggregate_join_attribute(relationship), do: relationship.destination_attribute + + defp join_relationship_filter_uses_parent?(%{type: :many_to_many} = relationship) do + relationship.source + |> Ash.Resource.Info.relationship(relationship.join_relationship) + |> relationship_filter_uses_parent?() + end + + defp join_relationship_filter_uses_parent?(_relationship), do: false + + defp join_filter(%{join_filters: join_filters}, relationship_path) + when is_map(join_filters) do + Map.get(join_filters, relationship_path) + end + + defp join_filter(_aggregate, _relationship_path), do: nil + + defp validate_aggregate_filters(aggregates) do + cond do + Enum.any?(aggregates, &aggregate_filter_uses_parent?/1) -> + {:error, + "AshSqlite does not support loading aggregates with parent-dependent aggregate filters"} + + Enum.any?(aggregates, &aggregate_filter_uses_parent_dependent_relationship?/1) -> + {:error, + "AshSqlite does not support loading aggregates with filters that reference relationships with parent-dependent filters"} + + Enum.any?(aggregates, &aggregate_filter_uses_aggregates?/1) -> + {:error, + "AshSqlite does not support loading aggregates with aggregate filters that reference other aggregates"} + + Enum.any?(aggregates, &unsupported_to_many_aggregate_filter?/1) -> + {:error, + "AshSqlite does not support loading sum, avg, list, custom, or field-based count aggregates with filters that reference to-many relationships"} + + Enum.any?(aggregates, &join_filters_use_parent?/1) -> + {:error, + "AshSqlite does not support loading aggregates with parent-dependent join filters"} + + true -> + :ok + end + end + + defp aggregate_filter_uses_parent?(%{query: %{filter: filter}}) do + filter_uses_parent?(filter) + end + + defp aggregate_filter_uses_parent_dependent_relationship?(%{ + query: %{filter: filter, resource: resource} + }) do + filter + |> aggregate_filter_relationship_paths() + |> Enum.any?(&parent_dependent_relationship_path?(resource, &1)) + end + + defp aggregate_filter_uses_parent_dependent_relationship?(_aggregate), do: false + + defp aggregate_filter_uses_relationships?(%{query: %{filter: filter}}) do + filter + |> aggregate_filter_relationship_paths() + |> Enum.any?() + end + + defp aggregate_filter_uses_relationships?(_aggregate), do: false + + defp aggregate_filter_uses_aggregates?(%{query: %{filter: filter}}) when not is_nil(filter) do + filter + |> Ash.Filter.used_aggregates([]) + |> Enum.any?() + end + + defp aggregate_filter_uses_aggregates?(_aggregate), do: false + + defp unsupported_to_many_aggregate_filter?(%{kind: :count, field: field} = aggregate) + when not is_nil(field) do + aggregate_filter_references_to_many_relationship?(aggregate) && !aggregate.uniq? + end + + defp unsupported_to_many_aggregate_filter?(%{kind: kind} = aggregate) + when kind in [:sum, :avg, :list, :custom] do + aggregate_filter_references_to_many_relationship?(aggregate) + end + + defp unsupported_to_many_aggregate_filter?(_aggregate), do: false + + defp aggregate_filter_references_to_many_relationship?(%{ + query: %{filter: filter, resource: resource} + }) do + filter + |> aggregate_filter_relationship_paths() + |> Enum.any?(&to_many_relationship_path?(resource, &1)) + end + + defp aggregate_filter_references_to_many_relationship?(_aggregate), do: false + + defp aggregate_filter_relationship_paths(nil), do: [] + + defp aggregate_filter_relationship_paths(%{expression: nil}), do: [] + + defp aggregate_filter_relationship_paths(filter) do + Ash.Filter.relationship_paths(filter) + end + + defp parent_dependent_relationship_path?(_resource, []), do: false + + defp parent_dependent_relationship_path?(resource, [relationship_name | rest]) do + case Ash.Resource.Info.relationship(resource, relationship_name) do + nil -> + false + + relationship -> + relationship_filter_uses_parent?(relationship) || + parent_dependent_relationship_path?(relationship.destination, rest) + end + end + + defp to_many_relationship_path?(_resource, []), do: false + + defp to_many_relationship_path?(resource, [relationship_name | rest]) do + case Ash.Resource.Info.relationship(resource, relationship_name) do + %{cardinality: :many} -> + true + + nil -> + false + + relationship -> + to_many_relationship_path?(relationship.destination, rest) + end + end + + defp join_filters_use_parent?(%{join_filters: join_filters}) when is_map(join_filters) do + Enum.any?(join_filters, fn {_path, filter} -> filter_uses_parent?(filter) end) + end + + defp join_filters_use_parent?(_aggregate), do: false + + defp filter_uses_parent?(nil), do: false + + defp filter_uses_parent?(%{expression: nil}), do: false + + defp filter_uses_parent?(filter) do + Ash.Filter.find( + filter, + fn + %Ash.Query.Parent{} -> true + %Ash.Query.Call{name: :parent} -> true + _ -> false + end, + true, + true, + true + ) + |> case do + nil -> false + _ -> true + end + end + + defp window_aggregate_query( + query, + aggregate, + join_attribute, + partition_binding, + value_binding, + relationship + ) do + with :ok <- validate_window_aggregate(aggregate), + {:ok, sort} <- window_aggregate_sort(aggregate, relationship), + :ok <- validate_window_aggregate_sort(aggregate, sort) do + query = + query + |> maybe_filter_window_nil_values(aggregate, value_binding) + |> window_source_query(aggregate, join_attribute, partition_binding, value_binding, sort) + |> Ecto.Query.subquery() + |> window_result_query(aggregate, join_attribute, sort) + + {:ok, query} + end + end + + defp validate_window_aggregate(%{field: field, kind: kind}) + when kind in @window_aggregate_kinds and is_atom(field) and not is_nil(field) do + :ok + end + + defp validate_window_aggregate(%{name: name, field: field}) do + {:error, + "AshSqlite cannot load first or list aggregate #{inspect(name)} with field #{inspect(field)}"} + end + + defp validate_window_aggregate_sort(%{kind: :list, uniq?: true, field: field}, sort) do + if Enum.all?(sort, fn {sort_field, _order} -> sort_field == field end) do + :ok + else + {:error, + "AshSqlite only supports uniq list aggregates when sorting by the list aggregate field"} + end + end + + defp validate_window_aggregate_sort(_aggregate, _sort), do: :ok + + defp maybe_filter_window_nil_values(query, %{include_nil?: true}, _binding), do: query + + defp maybe_filter_window_nil_values(query, aggregate, binding) do + from(row in query, where: not is_nil(field(as(^binding), ^aggregate.field))) + end + + defp window_source_query( + query, + aggregate, + join_attribute, + partition_binding, + value_binding, + sort + ) do + sort_selects = + sort + |> Enum.with_index() + |> Map.new(fn {{field, _order}, index} -> + {window_sort_field(index), Ecto.Query.dynamic(field(as(^value_binding), ^field))} + end) + + select = + Map.merge( + %{ + join_attribute => window_join_field(partition_binding, join_attribute), + @window_value_field => Ecto.Query.dynamic(field(as(^value_binding), ^aggregate.field)) + }, + sort_selects + ) + + query = + if aggregate.kind == :list && aggregate.uniq? do + # This relies on validate_window_aggregate_sort/2 requiring uniq lists to + # sort by the listed field, so distinct applies to {parent, value}. + from(row in query, distinct: true) + else + query + end + + from(row in query, select: ^select) + end + + defp window_result_query(source_query, aggregate, join_attribute, sort) do + order_by = + sort + |> Enum.with_index() + |> Enum.map(fn {{_field, order}, index} -> + {ecto_sort_order(order), Ecto.Query.dynamic([row], field(row, ^window_sort_field(index)))} + end) + + partition_by = Ecto.Query.dynamic([row], field(row, ^join_attribute)) + aggregate_value = window_aggregate_value(aggregate) + + query = + from(row in source_query, + windows: [ + ash_sqlite_aggregate_window: [ + partition_by: ^partition_by, + order_by: ^order_by + ], + ash_sqlite_aggregate_partition_window: [ + partition_by: ^partition_by + ] + ], + select: %{ + ^join_attribute => field(row, ^join_attribute), + @window_row_number_field => over(row_number(), :ash_sqlite_aggregate_window), + @window_count_field => over(count(), :ash_sqlite_aggregate_partition_window) + } + ) + |> Ecto.Query.select_merge(^%{aggregate.name => aggregate_value}) + + row_filter = window_row_filter(aggregate) + + from(row in Ecto.Query.subquery(query), + where: ^row_filter, + select: %{ + ^join_attribute => field(row, ^join_attribute), + ^aggregate.name => field(row, ^aggregate.name) + } + ) + end + + defp window_row_filter(%{kind: :list}) do + row_number_field = @window_row_number_field + count_field = @window_count_field + + Ecto.Query.dynamic( + [row], + field(row, ^row_number_field) == field(row, ^count_field) + ) + end + + defp window_row_filter(_aggregate) do + row_number_field = @window_row_number_field + + Ecto.Query.dynamic([row], field(row, ^row_number_field) == 1) + end + + defp window_aggregate_value(%{kind: :first, type: type}) do + value_field = @window_value_field + + value = + Ecto.Query.dynamic( + [row], + over(first_value(field(row, ^value_field)), :ash_sqlite_aggregate_window) + ) + + maybe_type_dynamic(value, type) + end + + defp window_aggregate_value(%{kind: :list, include_nil?: true, type: type}) do + value_field = @window_value_field + + value = + Ecto.Query.dynamic( + [row], + over( + fragment("json_group_array(?)", field(row, ^value_field)), + :ash_sqlite_aggregate_window + ) + ) + + maybe_type_dynamic(value, type) + end + + defp window_aggregate_value(%{kind: :list, type: type}) do + value_field = @window_value_field + + value = + Ecto.Query.dynamic( + [row], + over( + fragment( + "json_group_array(?) FILTER (WHERE ? IS NOT NULL)", + field(row, ^value_field), + field(row, ^value_field) + ), + :ash_sqlite_aggregate_window + ) + ) + + maybe_type_dynamic(value, type) + end + + defp maybe_type_dynamic(dynamic, nil), do: dynamic + + defp maybe_type_dynamic(dynamic, type) do + case sqlite_aggregate_type(type) do + nil -> dynamic + type -> AshSqlite.SqlImplementation.type_expr(dynamic, type) + end + end + + defp sqlite_aggregate_type(type) do + AshSqlite.SqlImplementation.parameterized_type(type, []) + end + + defp window_aggregate_sort(%{query: %{sort: sort}} = aggregate, relationship) do + sort = + cond do + sort not in [nil, []] -> + List.wrap(sort) + + relationship.sort not in [nil, []] -> + List.wrap(relationship.sort) + + true -> + [{aggregate.field, :asc}] + end + + sort + |> Enum.reduce_while({:ok, []}, fn + {field, order}, {:ok, acc} when is_atom(field) and is_atom(order) -> + {:cont, {:ok, [{field, order} | acc]}} + + field, {:ok, acc} when is_atom(field) -> + {:cont, {:ok, [{field, :asc} | acc]}} + + sort, _acc -> + {:halt, + {:error, + "AshSqlite only supports first and list aggregate sorting by related fields, got: #{inspect(sort)}"}} + end) + |> case do + {:ok, sort} -> {:ok, Enum.reverse(sort)} + {:error, error} -> {:error, error} + end + end + + defp window_sort_field(index) do + :"__ash_sqlite_aggregate_sort_#{index}__" + end + + defp window_join_field(nil, _join_attribute) do + Ecto.Query.dynamic(fragment("1")) + end + + defp window_join_field(partition_binding, join_attribute) do + Ecto.Query.dynamic(field(as(^partition_binding), ^join_attribute)) + end + + defp ecto_sort_order(:asc), do: :asc + defp ecto_sort_order(:desc), do: :desc + defp ecto_sort_order(:asc_nils_first), do: :asc_nulls_first + defp ecto_sort_order(:asc_nils_last), do: :asc_nulls_last + defp ecto_sort_order(:desc_nils_first), do: :desc_nulls_first + defp ecto_sort_order(:desc_nils_last), do: :desc_nulls_last + defp ecto_sort_order(other), do: other + + defp aggregate_dynamic(query, relationship, %{kind: :exists} = aggregate, binding) do + count_dynamic = count_dynamic(relationship, aggregate, binding) + + with {:ok, query, count_dynamic} <- + maybe_filter_aggregate(query, aggregate, count_dynamic) do + {:ok, query, Ecto.Query.dynamic(^count_dynamic > 0)} + end + end + + defp aggregate_dynamic(query, relationship, %{kind: :count} = aggregate, binding) do + dynamic = count_dynamic(relationship, aggregate, binding) + + with {:ok, query, dynamic} <- maybe_filter_aggregate(query, aggregate, dynamic) do + {:ok, query, maybe_default_aggregate(dynamic, aggregate)} + end + end + + defp aggregate_dynamic(query, _relationship, aggregate, binding) + when aggregate.kind in [:sum, :avg, :max, :min] and is_atom(aggregate.field) do + field = Ecto.Query.dynamic(field(as(^binding), ^aggregate.field)) + + dynamic = + case aggregate.kind do + :sum -> Ecto.Query.dynamic(sum(^field)) + :avg -> Ecto.Query.dynamic(avg(^field)) + :max -> Ecto.Query.dynamic(max(^field)) + :min -> Ecto.Query.dynamic(min(^field)) + end + + with {:ok, query, dynamic} <- maybe_filter_aggregate(query, aggregate, dynamic) do + {:ok, query, maybe_default_aggregate(dynamic, aggregate)} + end + end + + defp aggregate_dynamic(query, _relationship, %{kind: :custom} = aggregate, binding) do + {module, opts} = aggregate.implementation + dynamic = module.dynamic(opts, binding) + + with {:ok, query, dynamic} <- maybe_filter_aggregate(query, aggregate, dynamic) do + {:ok, query, maybe_default_aggregate(dynamic, aggregate)} + end + end + + defp aggregate_dynamic(_query, _relationship, aggregate, _binding) do + {:error, + "AshSqlite cannot load aggregate #{inspect(aggregate.name)} with field #{inspect(aggregate.field)}"} + end + + defp count_dynamic(relationship, %{field: nil} = aggregate, binding) do + if count_distinct?(aggregate) do + count_field = count_field(relationship, aggregate) + + Ecto.Query.dynamic(count(field(as(^binding), ^count_field), :distinct)) + else + Ecto.Query.dynamic(count()) + end + end + + defp count_dynamic(relationship, aggregate, binding) do + count_field = count_field(relationship, aggregate) + + if count_distinct?(aggregate) do + Ecto.Query.dynamic(count(field(as(^binding), ^count_field), :distinct)) + else + Ecto.Query.dynamic(count(field(as(^binding), ^count_field))) + end + end + + defp count_field(_relationship, %{field: field}) when is_atom(field) and not is_nil(field) do + field + end + + defp count_field(relationship, _aggregate) do + relationship.destination + |> Ash.Resource.Info.primary_key() + |> List.first() + |> case do + nil -> relationship.destination_attribute + field -> field + end + end + + defp count_distinct?(%{uniq?: true}), do: true + + defp count_distinct?(%{field: nil} = aggregate) do + aggregate_filter_references_to_many_relationship?(aggregate) + end + + defp count_distinct?(_aggregate), do: false + + defp maybe_filter_aggregate(query, aggregate, dynamic) do + case aggregate.query.filter do + nil -> + {:ok, query, dynamic} + + %{expression: nil} -> + {:ok, query, dynamic} + + filter -> + with {:ok, query} <- + AshSql.Join.join_all_relationships( + query, + filter, + [], + nil, + [], + nil, + true, + nil, + nil, + true + ) do + {filter_dynamic, acc} = + AshSql.Expr.dynamic_expr( + query, + filter, + Map.put(query.__ash_bindings__, :location, :aggregate), + false + ) + + {:ok, AshSql.Bindings.merge_expr_accumulator(query, acc), + Ecto.Query.dynamic(filter(^dynamic, ^filter_dynamic))} + end + end + end + + defp maybe_default_aggregate(dynamic, %{kind: :list, default_value: nil, type: type}) + when not is_nil(type) do + case sqlite_aggregate_type(type) do + nil -> + dynamic + + type -> + default = + Ecto.Query.dynamic(^"[]") + |> AshSqlite.SqlImplementation.type_expr(type) + + Ecto.Query.dynamic(coalesce(^dynamic, ^default)) + |> AshSqlite.SqlImplementation.type_expr(type) + end + end + + defp maybe_default_aggregate(dynamic, %{default_value: nil}), do: dynamic + + defp maybe_default_aggregate(dynamic, aggregate) do + Ecto.Query.dynamic(coalesce(^dynamic, ^aggregate.default_value)) + end + + defp loaded_aggregate_dynamic(%{kind: :exists, default_value: nil} = aggregate, binding) do + aggregate + |> loaded_aggregate_field(binding) + |> then(&Ecto.Query.dynamic(coalesce(^&1, false))) + end + + defp loaded_aggregate_dynamic(%{kind: :list} = aggregate, binding) do + type = sqlite_aggregate_type(aggregate.type) + default_value = aggregate.default_value || [] + + default_value = + if is_list(default_value), do: Jason.encode!(default_value), else: default_value + + aggregate + |> loaded_aggregate_field(binding) + |> then(fn field -> + if type do + default = + Ecto.Query.dynamic(^default_value) + |> AshSqlite.SqlImplementation.type_expr(type) + + Ecto.Query.dynamic(coalesce(^field, ^default)) + |> AshSqlite.SqlImplementation.type_expr(type) + else + Ecto.Query.dynamic(coalesce(^field, ^default_value)) + end + end) + end + + defp loaded_aggregate_dynamic(aggregate, binding) do + aggregate + |> loaded_aggregate_field(binding) + |> maybe_default_aggregate(aggregate) + end + + defp loaded_aggregate_field(aggregate, binding) do + Ecto.Query.dynamic(field(as(^binding), ^aggregate.name)) + end + + defp select_aggregates(query, dynamics) do + {in_aggregates, in_body} = + Enum.split_with(dynamics, fn {load, _name, _dynamic} -> is_nil(load) end) + + aggregates = + in_body + |> Map.new(fn {load, _name, dynamic} -> {load, dynamic} end) + + aggregates = + if Enum.empty?(in_aggregates) do + aggregates + else + Map.put( + aggregates, + :aggregates, + Map.new(in_aggregates, fn {_load, name, dynamic} -> {name, dynamic} end) + ) + end + + query = + if query.select do + query + else + from(row in query, select: %{}) + end + + Ecto.Query.select_merge(query, ^aggregates) + end +end diff --git a/lib/custom_aggregate.ex b/lib/custom_aggregate.ex new file mode 100644 index 0000000..7a92114 --- /dev/null +++ b/lib/custom_aggregate.ex @@ -0,0 +1,30 @@ +# SPDX-FileCopyrightText: 2023 ash_sqlite contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshSqlite.CustomAggregate do + @moduledoc """ + A custom aggregate implementation for Ecto queries against SQLite. + """ + + @doc """ + The dynamic expression to create the aggregate. + + The binding refers to the resource being aggregated. Use `as(^binding)` to + reference it. + + For example: + + Ecto.Query.dynamic( + [], + fragment("group_concat(?, ?)", field(as(^binding), ^opts[:field]), ^opts[:delimiter]) + ) + """ + @callback dynamic(opts :: Keyword.t(), binding :: integer) :: Ecto.Query.dynamic_expr() + + defmacro __using__(_) do + quote do + @behaviour AshSqlite.CustomAggregate + end + end +end diff --git a/lib/data_layer.ex b/lib/data_layer.ex index e3c2b6f..98e6fbb 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -467,12 +467,17 @@ defmodule AshSqlite.DataLayer do false end + def can?(_, {:aggregate, :unrelated}), do: true + def can?(_, {:exists, :unrelated}), do: true + def can?(_, :boolean_filter), do: true - def can?(_, {:aggregate, _type}), do: false + def can?(_, {:aggregate, type}) + when type in [:count, :sum, :avg, :max, :min, :exists, :first, :list, :custom], + do: true - def can?(_, :aggregate_filter), do: false - def can?(_, :aggregate_sort), do: false + def can?(_, :aggregate_filter), do: true + def can?(_, :aggregate_sort), do: true def can?(_, :expression_calculation), do: true def can?(_, :expression_calculation_sort), do: true def can?(_, :create), do: true @@ -496,7 +501,30 @@ defmodule AshSqlite.DataLayer do def can?(_, {:filter_relationship, _}), do: true - def can?(_, {:aggregate_relationship, _}), do: false + def can?(_, {:aggregate_relationship, %{manual: {_, _}}}), do: false + + def can?(_, {:aggregate_relationship, %{type: :many_to_many} = relationship}) do + join_relationship = + Ash.Resource.Info.relationship(relationship.source, relationship.join_relationship) + + not is_nil(join_relationship) && + not AshSqlite.Aggregate.relationship_filter_uses_parent?(relationship) && + not AshSqlite.Aggregate.relationship_filter_uses_parent?(join_relationship) && + can?(relationship.source, {:join, relationship.through}) && + can?(relationship.through, {:join, relationship.destination}) + end + + def can?(_, {:aggregate_relationship, %{no_attributes?: true}}), do: false + + def can?(_, {:aggregate_relationship, relationship}) + when not is_nil(relationship.filter) do + not AshSqlite.Aggregate.relationship_filter_uses_parent?(relationship) && + can?(relationship.source, {:join, relationship.destination}) + end + + def can?(resource, {:aggregate_relationship, relationship}) do + can?(resource, {:join, relationship.destination}) + end def can?(_, :timeout), do: true def can?(_, {:filter_expr, %Ash.Query.Function.StringJoin{}}), do: false @@ -560,6 +588,31 @@ defmodule AshSqlite.DataLayer do {:ok, from(row in query, offset: ^offset)} end + @impl true + def return_query(query, resource) do + # AshSql.Query.return_query/2 also normalizes bindings. Do it here first so + # aggregate prebinding can inspect sort/load metadata before return_query + # consumes it. + query = + query + |> AshSql.Bindings.default_bindings(resource, AshSqlite.SqlImplementation) + + load_aggregates = query.__ash_bindings__[:load_aggregates] || [] + + query_without_aggregates = + Map.update!(query, :__ash_bindings__, &Map.put(&1, :load_aggregates, [])) + + with {:ok, query_without_aggregates} <- + AshSqlite.Aggregate.add_sort_aggregates( + query_without_aggregates, + query_without_aggregates.__ash_bindings__[:sort], + resource + ), + {:ok, query} <- AshSql.Query.return_query(query_without_aggregates, resource) do + AshSqlite.Aggregate.add_aggregates(query, load_aggregates, resource) + end + end + @impl true def run_aggregate_query(query, aggregates, resource) do AshSql.AggregateQuery.run_aggregate_query( @@ -576,7 +629,14 @@ defmodule AshSqlite.DataLayer do if query.__ash_bindings__[:sort_applied?] do {:ok, query} else - AshSql.Sort.apply_sort(query, query.__ash_bindings__[:sort], resource) + with {:ok, query} <- + AshSqlite.Aggregate.add_sort_aggregates( + query, + query.__ash_bindings__[:sort], + resource + ) do + AshSql.Sort.apply_sort(query, query.__ash_bindings__[:sort], resource) + end end case with_sort_applied do @@ -609,7 +669,8 @@ defmodule AshSqlite.DataLayer do repo.all( query, opts - )} + ) + |> AshSql.Query.remap_mapped_fields(query)} end end rescue @@ -1981,20 +2042,56 @@ defmodule AshSqlite.DataLayer do @impl true def filter(query, filter, _resource, opts \\ []) do + used_aggregates = Ash.Filter.used_aggregates(filter, []) + query |> AshSql.Join.join_all_relationships(filter, opts) |> case do {:ok, query} -> - {:ok, AshSql.Filter.add_filter_expression(query, filter)} + query + |> AshSqlite.Aggregate.add_aggregates( + used_aggregates, + query.__ash_bindings__.resource, + select?: false + ) + |> case do + {:ok, query} -> + {:ok, AshSql.Filter.add_filter_expression(query, filter)} + + {:error, error} -> + {:error, error} + end {:error, error} -> {:error, error} end end + @impl true + def add_aggregates(query, aggregates, _resource) do + {:ok, + Map.update!(query, :__ash_bindings__, fn bindings -> + Map.put(bindings, :load_aggregates, aggregates) + end)} + end + @impl true def add_calculations(query, calculations, resource) do - AshSql.Calculation.add_calculations(query, calculations, resource, 0, true) + aggregates = + calculations + |> Enum.flat_map(fn {calculation, expression} -> + expression + |> Ash.Filter.used_aggregates([]) + |> Enum.map(&Map.put(&1, :context, calculation.context)) + end) + # Preserve context before deduping: identical calculation contexts share + # one aggregate binding, different contexts stay isolated. + |> Enum.uniq() + + with {:ok, query} <- + AshSqlite.Aggregate.add_aggregates(query, aggregates, resource, select?: false) do + AshSql.Calculation.add_calculations(query, calculations, resource, 0, true) + end end @doc false diff --git a/mix.exs b/mix.exs index 5ba5752..569b916 100644 --- a/mix.exs +++ b/mix.exs @@ -88,6 +88,7 @@ defmodule AshSqlite.MixProject do "documentation/tutorials/getting-started-with-ash-sqlite.md", "documentation/topics/about-ash-sqlite/what-is-ash-sqlite.md", "documentation/topics/about-ash-sqlite/transactions.md", + "documentation/topics/resources/aggregates.md", "documentation/topics/resources/references.md", "documentation/topics/resources/polymorphic-resources.md", "documentation/topics/development/migrations-and-tasks.md", @@ -131,6 +132,9 @@ defmodule AshSqlite.MixProject do Types: [ AshSqlite.Type ], + "Custom Aggregates": [ + AshSqlite.CustomAggregate + ], Expressions: [ AshSqlite.Functions.Fragment, AshSqlite.Functions.Like diff --git a/test/aggregate_test.exs b/test/aggregate_test.exs index 6593479..8739a09 100644 --- a/test/aggregate_test.exs +++ b/test/aggregate_test.exs @@ -6,7 +6,7 @@ defmodule AshSqlite.AggregatesTest do use AshSqlite.RepoCase, async: false require Ash.Query - alias AshSqlite.Test.Post + alias AshSqlite.Test.{Author, Comment, Post, PostLink, Profile, Rating} test "a count with a filter returns the appropriate value" do Ash.Seed.seed!(%Post{title: "foo"}) @@ -31,4 +31,1069 @@ defmodule AshSqlite.AggregatesTest do |> Ash.Query.for_read(:paginated) |> Ash.read!() end + + test "paginated reads can count and load scalar aggregates" do + create_post!("paged aggregate a") + page_post = create_post!("paged aggregate b") + create_post!("paged aggregate c") + + create_comment!(page_post, "first", 1) + create_comment!(page_post, "second", 1) + + assert %Ash.Page.Offset{ + count: 3, + limit: 1, + offset: 1, + results: [ + %Post{title: "paged aggregate b", count_of_comments: 2} + ] + } = + Post + |> Ash.Query.for_read(:paginated) + |> Ash.Query.load(:count_of_comments) + |> Ash.Query.sort(:title) + |> Ash.Query.page(offset: 1, limit: 1, count: true) + |> Ash.read!() + end + + test "related scalar aggregates can be loaded" do + post = create_post!("loaded") + empty_post = create_post!("empty") + + create_comment!(post, "match", 1) + create_comment!(post, "other", 4) + create_comment!(post, "other", 10) + + loaded_post = + post + |> Ash.load!([ + :count_of_comments, + :count_of_popular_comments, + :count_of_comments_called_match, + :sum_of_comment_likes, + :sum_of_comment_likes_called_match, + :avg_comment_likes, + :min_comment_likes, + :max_comment_likes, + :has_comment_called_match + ]) + + assert loaded_post.count_of_comments == 3 + assert loaded_post.count_of_popular_comments == 0 + assert loaded_post.count_of_comments_called_match == 1 + assert loaded_post.sum_of_comment_likes == 15 + assert loaded_post.sum_of_comment_likes_called_match == 1 + assert loaded_post.avg_comment_likes == 5.0 + assert loaded_post.min_comment_likes == 1 + assert loaded_post.max_comment_likes == 10 + assert loaded_post.has_comment_called_match == true + + empty_post = + empty_post + |> Ash.load!([ + :count_of_comments, + :sum_of_comment_likes, + :avg_comment_likes, + :has_comment_called_match + ]) + + assert empty_post.count_of_comments == 0 + assert empty_post.sum_of_comment_likes == nil + assert empty_post.avg_comment_likes == nil + assert empty_post.has_comment_called_match == false + + assert [ + %Post{title: "empty", count_of_comments: 0}, + %Post{title: "loaded", count_of_comments: 3} + ] = + Post + |> Ash.Query.load(:count_of_comments) + |> Ash.Query.sort(:title) + |> Ash.read!() + end + + test "fieldless count aggregates use SQL count star" do + {:ok, query} = + Post + |> Ash.Query.load(:count_of_comments) + |> Ash.Query.data_layer_query() + + {sql, _params} = Ecto.Adapters.SQL.to_sql(:all, AshSqlite.TestRepo, query) + + assert sql =~ "count(*)" + end + + test "relationship filters are applied to loaded aggregates" do + post = create_post!("relationship filter") + + create_comment!(post, "quiet", 1) + create_comment!(post, "popular", 11) + + assert %{count_of_popular_comments: 1} = + Ash.load!(post, :count_of_popular_comments) + end + + test "resource queries can sort by related aggregates" do + one_comment = create_post!("one comment") + two_comments = create_post!("two comments") + no_comments = create_post!("no comments") + + create_comment!(one_comment, "only", 1) + create_comment!(two_comments, "first", 1) + create_comment!(two_comments, "second", 1) + + assert [ + %Post{id: two_comments_id, count_of_comments: 2}, + %Post{id: one_comment_id, count_of_comments: 1}, + %Post{id: no_comments_id, count_of_comments: 0} + ] = + Post + |> Ash.Query.load(:count_of_comments) + |> Ash.Query.sort(count_of_comments: :desc) + |> Ash.read!() + + assert two_comments_id == two_comments.id + assert one_comment_id == one_comment.id + assert no_comments_id == no_comments.id + end + + test "aggregate sorting works with pagination and aggregate filters" do + one_comment = create_post!("one comment") + two_comments = create_post!("two comments") + three_comments = create_post!("three comments") + create_post!("no comments") + + create_comment!(one_comment, "only", 1) + create_comment!(two_comments, "first", 1) + create_comment!(two_comments, "second", 1) + create_comment!(three_comments, "first", 1) + create_comment!(three_comments, "second", 1) + create_comment!(three_comments, "third", 1) + + assert [%Post{id: two_comments_id, count_of_comments: 2}] = + Post + |> Ash.Query.load(:count_of_comments) + |> Ash.Query.filter(count_of_comments > 0) + |> Ash.Query.sort(count_of_comments: :desc) + |> Ash.Query.limit(1) + |> Ash.Query.offset(1) + |> Ash.read!() + + assert two_comments_id == two_comments.id + end + + test "resource queries can filter on related aggregates" do + post = create_post!("with comments") + create_comment!(post, "match", 1) + create_comment!(post, "other", 1) + + create_post!("without comments") + + assert [%Post{id: post_id, count_of_comments: 2}] = + Post + |> Ash.Query.load(:count_of_comments) + |> Ash.Query.filter(count_of_comments > 1) + |> Ash.read!() + + assert post_id == post.id + end + + test "resource queries can filter and sort on related aggregates without loading them" do + one_comment = create_post!("one unloaded comment") + two_comments = create_post!("two unloaded comments") + create_post!("no unloaded comments") + + create_comment!(one_comment, "only", 1) + create_comment!(two_comments, "first", 1) + create_comment!(two_comments, "second", 1) + + assert [%Post{id: two_comments_id}, %Post{id: one_comment_id}] = + Post + |> Ash.Query.filter(count_of_comments > 0) + |> Ash.Query.sort(count_of_comments: :desc) + |> Ash.read!() + + assert two_comments_id == two_comments.id + assert one_comment_id == one_comment.id + end + + test "list loads related aggregates" do + post = create_post!("list load") + empty_post = create_post!("list load empty") + + create_comment!(post, "first", 1) + create_comment!(post, "second", 1) + + assert [ + %Post{id: post_id, count_of_comments: 2}, + %Post{id: empty_post_id, count_of_comments: 0} + ] = Ash.load!([post, empty_post], :count_of_comments) + + assert post_id == post.id + assert empty_post_id == empty_post.id + end + + test "aggregate join filters are applied on one-hop relationships" do + post = create_post!("join filter") + + create_comment!(post, "match", 1) + create_comment!(post, "other", 1) + + assert %{count_of_comments_with_join_filter: 1} = + Ash.load!(post, :count_of_comments_with_join_filter) + end + + test "same-path aggregates can use different read action filters" do + post = create_post!("read action aggregate") + + create_comment!(post, "low", 1) + create_comment!(post, "high", 10) + + assert %{count_of_comments: 2, count_of_liked_comments: 1} = + Ash.load!(post, [:count_of_comments, :count_of_liked_comments]) + end + + test "aggregate filters can reference relationships" do + post = create_post!("related aggregate filter") + + create_comment!(post, "first", 1) + create_comment!(post, "second", 1) + + assert %{count_of_comments_with_related_filter: 2} = + Ash.load!(post, :count_of_comments_with_related_filter) + end + + test "aggregate filters can reference related exists expressions" do + post = create_post!("related aggregate exists filter") + + create_comment!(post, "first", 1) + create_comment!(post, "second", 1) + + assert %{count_of_comments_with_related_exists_filter: 2} = + Ash.load!(post, :count_of_comments_with_related_exists_filter) + end + + test "aggregate filters over filtered to-many relationship refs do not corrupt siblings" do + post = create_post!("filtered related aggregate filter") + popular_comment = create_comment!(post, "popular", 1) + unpopular_comment = create_comment!(post, "unpopular", 1) + + create_comment_rating!(popular_comment, 10) + create_comment_rating!(popular_comment, 11) + create_comment_rating!(unpopular_comment, 1) + + assert %{ + count_of_comments: 2, + sum_of_comment_likes: 2, + count_of_comments_with_popular_ratings: 1 + } = + Ash.load!(post, [ + :count_of_comments, + :sum_of_comment_likes, + :count_of_comments_with_popular_ratings + ]) + end + + test "fieldless count filters over to-many refs count distinct aggregate rows" do + post = create_post!("distinct related aggregate filter") + popular_comment = create_comment!(post, "popular", 1) + unpopular_comment = create_comment!(post, "unpopular", 1) + + create_comment_rating!(popular_comment, 10) + create_comment_rating!(popular_comment, 11) + create_comment_rating!(unpopular_comment, 1) + + assert %{count_of_comments_with_popular_ratings: 1} = + Ash.load!(post, :count_of_comments_with_popular_ratings) + end + + test "exists filters avoid to-many fanout for sum aggregates" do + post = create_post!("exists fanout aggregate filter") + popular_comment = create_comment!(post, "popular", 4) + unpopular_comment = create_comment!(post, "unpopular", 6) + + create_comment_rating!(popular_comment, 10) + create_comment_rating!(popular_comment, 11) + create_comment_rating!(unpopular_comment, 1) + + assert %{sum_of_comment_likes_with_popular_ratings_exists: 4} = + Ash.load!(post, :sum_of_comment_likes_with_popular_ratings_exists) + end + + test "fanout-prone aggregate filters return stable unsupported errors" do + post = create_post!("fanout aggregate filter") + comment = create_comment!(post, "popular", 1) + create_comment_rating!(comment, 10) + create_comment_rating!(comment, 11) + + assert_raise Ash.Error.Unknown, ~r/sum, avg, list, custom, or field-based count/, fn -> + Ash.load!(post, :sum_of_comment_likes_with_popular_ratings) + end + + assert_raise Ash.Error.Unknown, ~r/sum, avg, list, custom, or field-based count/, fn -> + Ash.load!(post, :avg_comment_likes_with_popular_ratings) + end + + assert_raise Ash.Error.Unknown, ~r/list, custom, or field-based count aggregates/, fn -> + Ash.load!(post, :comment_titles_with_popular_ratings) + end + + assert_raise Ash.Error.Unknown, ~r/list, custom, or field-based count aggregates/, fn -> + Ash.load!(post, :comment_titles_joined_with_popular_ratings) + end + + assert_raise Ash.Error.Unknown, ~r/list, custom, or field-based count aggregates/, fn -> + Ash.load!(post, :count_comment_titles_with_popular_ratings) + end + end + + test "aggregate filters using parent expressions return a stable unsupported error" do + post = create_post!("same") + create_comment!(post, "same", 1) + + assert_raise Ash.Error.Unknown, ~r/parent-dependent aggregate filters/, fn -> + Ash.load!(post, :count_of_comments_matching_post_title) + end + end + + test "parent-dependent aggregate join filters return a stable unsupported error" do + post = create_post!("parent join") + create_comment!(post, "parent join", 1) + + assert_raise Ash.Error.Unknown, ~r/parent-dependent join filters/, fn -> + Ash.load!(post, :count_of_comments_with_parent_join_filter) + end + end + + test "aggregate filters that reference aggregates return a stable unsupported error" do + post = create_post!("aggregate filter") + create_comment!(post, "comment", 1) + + assert_raise Ash.Error.Unknown, ~r/filters that reference other aggregates/, fn -> + Ash.load!(post, :count_of_comments_with_aggregate_filter) + end + end + + test "multi-hop aggregate relationships can be loaded through normal paths" do + post = create_post!("post multi-hop") + comment = create_comment!(post, "comment", 1) + create_comment_rating!(comment, 7) + + assert %{count_of_comment_ratings: 1} = + Ash.load!(post, :count_of_comment_ratings) + end + + test "first aggregates can be loaded" do + post = create_post!("first aggregate") + empty_post = create_post!("first aggregate empty") + + create_comment!(post, nil, 1) + create_comment!(post, "bbb", 1) + create_comment!(post, "aaa", 1) + create_comment!(post, "stuff", 1) + + loaded_post = + Ash.load!(post, [ + :first_comment, + :first_comment_nils_first, + :first_comment_nils_first_called_stuff, + :first_comment_nils_first_include_nil + ]) + + assert loaded_post.first_comment == "aaa" + assert loaded_post.first_comment_nils_first == "aaa" + assert loaded_post.first_comment_nils_first_called_stuff == "stuff" + assert loaded_post.first_comment_nils_first_include_nil == nil + + assert %{first_comment: nil} = Ash.load!(empty_post, :first_comment) + end + + test "first aggregates can be sorted and used over belongs_to and multi-hop paths" do + author = create_author!("Belongs", "To") + author_post = create_post_for_author!(author, "belongs to first") + + low = create_post!("low first") + high = create_post!("high first") + + create_comment!(low, "aaa", 1) + create_comment!(high, "zzz", 1) + + assert [ + %Post{id: low_id, first_comment: "aaa"}, + %Post{id: high_id, first_comment: "zzz"} + ] = + Post + |> Ash.Query.load(:first_comment) + |> Ash.Query.filter(count_of_comments > 0) + |> Ash.Query.sort(first_comment: :asc) + |> Ash.read!() + + assert low_id == low.id + assert high_id == high.id + + assert %{author_first_name: "Belongs"} = Ash.load!(author_post, :author_first_name) + + comment = create_comment!(high, "rated", 1) + create_comment_rating!(comment, 3) + create_comment_rating!(comment, 10) + + assert %{highest_rating: 10} = Ash.load!(high, :highest_rating) + end + + test "list aggregates can be loaded" do + post = create_post!("list aggregate") + empty_post = create_post!("list aggregate empty") + + first = create_comment!(post, "bbb", 1) + create_comment!(post, nil, 1) + create_comment!(post, "aaa", 7) + create_comment!(post, "aaa", 9) + + loaded_post = + Ash.load!(post, [ + :comment_titles, + :comment_titles_with_nils, + :uniq_comment_titles, + :comment_titles_with_5_likes, + :comment_ids + ]) + + assert loaded_post.comment_titles == ["aaa", "aaa", "bbb"] + assert loaded_post.comment_titles_with_nils == ["aaa", "aaa", "bbb", nil] + assert loaded_post.uniq_comment_titles == ["aaa", "bbb"] + assert loaded_post.comment_titles_with_5_likes == ["aaa", "aaa"] + assert first.id in loaded_post.comment_ids + + assert %{comment_titles: []} = Ash.load!(empty_post, :comment_titles) + end + + test "custom aggregates can use sqlite-specific implementations" do + post = create_post!("custom aggregate") + + create_comment!(post, "aaa", 2) + create_comment!(post, "bbb", 3) + + assert %{comment_titles_joined: joined, total_comment_likes_custom: total} = + Ash.load!(post, [:comment_titles_joined, :total_comment_likes_custom]) + + assert joined |> String.split(",") |> Enum.sort() == ["aaa", "bbb"] + assert total == 5.0 + assert is_float(total) + end + + test "unrelated aggregates without parent filters can be loaded" do + first_author = create_author!("first", "author") + second_author = create_author!("second", "author") + + create_profile!("bbb") + create_profile!("aaa") + create_profile!(nil) + + create_post!("scored one", %{score: 2}) + create_post!("scored two", %{score: 3}) + + loaded_authors = + [first_author, second_author] + |> Ash.load!([ + :total_profiles, + :total_profiles_plus_one, + :total_post_score, + :avg_post_score, + :min_post_score, + :max_post_score, + :has_any_profile, + :first_profile_description, + :profile_descriptions, + :post_titles_joined + ]) + + assert [ + %Author{ + id: first_author_id, + total_profiles: 3, + total_profiles_plus_one: 4, + total_post_score: 5, + avg_post_score: 2.5, + min_post_score: 2, + max_post_score: 3, + has_any_profile: true, + first_profile_description: "aaa", + profile_descriptions: ["aaa", "bbb"] + } = loaded_first_author, + %Author{ + id: second_author_id, + total_profiles: 3, + total_profiles_plus_one: 4, + total_post_score: 5, + avg_post_score: 2.5, + min_post_score: 2, + max_post_score: 3, + has_any_profile: true, + first_profile_description: "aaa", + profile_descriptions: ["aaa", "bbb"] + } = loaded_second_author + ] = loaded_authors + + assert first_author_id == first_author.id + assert second_author_id == second_author.id + + assert loaded_first_author.post_titles_joined |> String.split(",") |> Enum.sort() == [ + "scored one", + "scored two" + ] + + assert loaded_second_author.post_titles_joined |> String.split(",") |> Enum.sort() == [ + "scored one", + "scored two" + ] + end + + test "unsupported aggregate relationship shapes return stable errors" do + manual_relationship = Ash.Resource.Info.relationship(Post, :comments_containing_title) + no_attributes_relationship = Ash.Resource.Info.relationship(Post, :posts_with_matching_title) + + parent_filter_relationship = + Ash.Resource.Info.relationship(Post, :comments_matching_post_title) + + refute AshSqlite.DataLayer.can?(Post, {:aggregate_relationship, manual_relationship}) + refute AshSqlite.DataLayer.can?(Post, {:aggregate_relationship, no_attributes_relationship}) + refute AshSqlite.DataLayer.can?(Post, {:aggregate_relationship, parent_filter_relationship}) + end + + test "parent-dependent unrelated aggregate filters return a stable unsupported error" do + author = create_author!("parent", "unrelated") + create_profile!("parent") + + assert_raise Ash.Error.Unknown, ~r/parent-dependent aggregate filters/, fn -> + Ash.load!(author, :profiles_matching_first_name) + end + end + + test "calculations can reference related aggregates" do + post = create_post!("with aggregate calculation", %{score: 3}) + empty_post = create_post!("without aggregate calculation", %{score: 7}) + + create_comment!(post, "first", 4) + create_comment!(post, "second", 6) + + assert [ + %Post{ + id: post_id, + has_comments: true, + comment_likes_with_score: 13 + }, + %Post{ + id: empty_post_id, + has_comments: false, + comment_likes_with_score: 7 + } + ] = + Post + |> Ash.Query.load([:has_comments, :comment_likes_with_score]) + |> Ash.Query.sort(comment_likes_with_score: :desc) + |> Ash.read!() + + assert post_id == post.id + assert empty_post_id == empty_post.id + end + + test "many_to_many scalar aggregates can be loaded" do + source = create_post!("source", %{score: 5}) + match = create_post!("match", %{score: 2}) + other = create_post!("other", %{score: 6}) + archived = create_post!("archived", %{score: 20}) + empty = create_post!("empty", %{score: 1}) + + link_posts!(source, [match, other]) + create_post_link!(source, archived, :archived) + + loaded_source = + Ash.load!(source, [ + :count_of_linked_posts, + :sum_of_linked_post_scores, + :avg_linked_post_score, + :min_linked_post_score, + :max_linked_post_score, + :has_linked_post_called_match + ]) + + assert loaded_source.count_of_linked_posts == 2 + assert loaded_source.sum_of_linked_post_scores == 8 + assert loaded_source.avg_linked_post_score == 4.0 + assert loaded_source.min_linked_post_score == 2 + assert loaded_source.max_linked_post_score == 6 + assert loaded_source.has_linked_post_called_match == true + + loaded_empty = + Ash.load!(empty, [ + :count_of_linked_posts, + :sum_of_linked_post_scores, + :avg_linked_post_score, + :has_linked_post_called_match + ]) + + assert loaded_empty.count_of_linked_posts == 0 + assert loaded_empty.sum_of_linked_post_scores == nil + assert loaded_empty.avg_linked_post_score == nil + assert loaded_empty.has_linked_post_called_match == false + end + + test "many_to_many aggregates with filters that require joins can be loaded" do + source = create_post!("source") + author = create_author!("John", "Doe") + linked = create_post_for_author!(author, "linked") + + link_posts!(source, [linked]) + + assert %{count_of_linked_posts_with_author: 1} = + Ash.load!(source, :count_of_linked_posts_with_author) + end + + test "many_to_many aggregate filters that require joins work in parent queries" do + first_source = create_post!("first source") + second_source = create_post!("second source") + create_post!("no links") + + author = create_author!("Jane", "Doe") + linked_with_author = create_post_for_author!(author, "linked with author") + linked_without_author = create_post!("linked without author") + + link_posts!(first_source, [linked_with_author, linked_without_author]) + link_posts!(second_source, [linked_with_author]) + + assert [ + %Post{id: first_source_id, count_of_linked_posts_with_author: 1}, + %Post{id: second_source_id, count_of_linked_posts_with_author: 1} + ] = + Post + |> Ash.Query.load(:count_of_linked_posts_with_author) + |> Ash.Query.filter(count_of_linked_posts_with_author > 0) + |> Ash.Query.sort(title: :asc) + |> Ash.read!() + + assert first_source_id == first_source.id + assert second_source_id == second_source.id + end + + test "many_to_many first and list aggregates can be loaded" do + source = create_post!("m2m window source") + empty = create_post!("m2m window empty") + first = create_post!("bbb") + second = create_post!("ccc") + archived = create_post!("aaa") + + link_posts!(source, [second, first]) + create_post_link!(source, archived, :archived) + + assert %{ + first_linked_post_title: "bbb", + linked_post_titles: ["bbb", "ccc"] + } = + Ash.load!(source, [ + :first_linked_post_title, + :linked_post_titles + ]) + + assert %{ + first_linked_post_title: nil, + linked_post_titles: [] + } = + Ash.load!(empty, [ + :first_linked_post_title, + :linked_post_titles + ]) + end + + test "many_to_many first and list aggregates with joined filters can be loaded" do + source = create_post!("m2m joined window source") + author = create_author!("Window", "Author") + without_author = create_post!("aaa") + with_author = create_post_for_author!(author, "bbb") + with_author_later = create_post_for_author!(author, "ccc") + + link_posts!(source, [without_author, with_author_later, with_author]) + + assert %{ + first_linked_post_title_with_author: "bbb", + linked_post_titles_with_author: ["bbb", "ccc"], + first_linked_post_title_with_author_join_filter: "bbb", + linked_post_titles_with_author_join_filter: ["bbb", "ccc"] + } = + Ash.load!(source, [ + :first_linked_post_title_with_author, + :linked_post_titles_with_author, + :first_linked_post_title_with_author_join_filter, + :linked_post_titles_with_author_join_filter + ]) + end + + test "many_to_many custom aggregates can be loaded" do + source = create_post!("m2m custom source") + empty = create_post!("m2m custom empty") + first = create_post!("aaa") + second = create_post!("bbb") + archived = create_post!("ccc") + + link_posts!(source, [second, first]) + create_post_link!(source, archived, :archived) + + assert %{linked_post_titles_joined: joined} = + Ash.load!(source, :linked_post_titles_joined) + + assert joined |> String.split(",") |> Enum.sort() == ["aaa", "bbb"] + + assert %{linked_post_titles_joined: nil} = + Ash.load!(empty, :linked_post_titles_joined) + end + + test "many_to_many aggregates can be filtered, sorted and used in calculations" do + one_link = create_post!("one link", %{score: 1}) + two_links = create_post!("two links", %{score: 2}) + no_links = create_post!("no links", %{score: 3}) + + linked_a = create_post!("linked a", %{score: 4}) + linked_b = create_post!("linked b", %{score: 5}) + + link_posts!(one_link, [linked_a]) + link_posts!(two_links, [linked_a, linked_b]) + + assert [ + %Post{ + id: two_links_id, + count_of_linked_posts: 2, + linked_post_score_with_score: 11 + }, + %Post{ + id: one_link_id, + count_of_linked_posts: 1, + linked_post_score_with_score: 5 + } + ] = + Post + |> Ash.Query.load([ + :count_of_linked_posts, + :linked_post_score_with_score + ]) + |> Ash.Query.filter(count_of_linked_posts > 0) + |> Ash.Query.sort(count_of_linked_posts: :desc) + |> Ash.read!() + + assert two_links_id == two_links.id + assert one_link_id == one_link.id + + assert %{linked_post_score_with_score: 3} = + Ash.load!(no_links, :linked_post_score_with_score) + end + + test "aggregate join filters are applied on many_to_many relationships" do + source = create_post!("m2m join filter source") + match = create_post!("match") + other = create_post!("other") + + link_posts!(source, [match, other]) + + assert %{count_of_linked_posts_with_join_filter: 1} = + Ash.load!(source, :count_of_linked_posts_with_join_filter) + end + + test "multi-hop scalar aggregates can be loaded" do + author = create_author!("multi", "hop") + empty_author = create_author!("empty", "author") + + first_post = create_post_for_author!(author, "first post") + second_post = create_post_for_author!(author, "second post") + + create_comment!(first_post, "match", 1) + create_comment!(first_post, "other", 4) + create_comment!(second_post, "other", 10) + + loaded_author = + Ash.load!(author, [ + :count_of_comments_through_posts, + :sum_of_comment_likes_through_posts, + :avg_comment_likes_through_posts, + :min_comment_likes_through_posts, + :max_comment_likes_through_posts, + :has_comment_called_match_through_posts + ]) + + assert loaded_author.count_of_comments_through_posts == 3 + assert loaded_author.sum_of_comment_likes_through_posts == 15 + assert loaded_author.avg_comment_likes_through_posts == 5.0 + assert loaded_author.min_comment_likes_through_posts == 1 + assert loaded_author.max_comment_likes_through_posts == 10 + assert loaded_author.has_comment_called_match_through_posts == true + + loaded_empty = + Ash.load!(empty_author, [ + :count_of_comments_through_posts, + :sum_of_comment_likes_through_posts, + :avg_comment_likes_through_posts, + :has_comment_called_match_through_posts + ]) + + assert loaded_empty.count_of_comments_through_posts == 0 + assert loaded_empty.sum_of_comment_likes_through_posts == nil + assert loaded_empty.avg_comment_likes_through_posts == nil + assert loaded_empty.has_comment_called_match_through_posts == false + end + + test "multi-hop list and custom aggregates can be loaded" do + author = create_author!("multi", "list") + empty_author = create_author!("multi", "list empty") + + first_post = create_post_for_author!(author, "first post") + second_post = create_post_for_author!(author, "second post") + + create_comment!(first_post, "bbb", 1) + create_comment!(second_post, "aaa", 1) + + assert %{ + comment_titles_through_posts: ["aaa", "bbb"], + comment_titles_joined_through_posts: joined + } = + Ash.load!(author, [ + :comment_titles_through_posts, + :comment_titles_joined_through_posts + ]) + + assert joined |> String.split(",") |> Enum.sort() == ["aaa", "bbb"] + + assert %{ + comment_titles_through_posts: [], + comment_titles_joined_through_posts: nil + } = + Ash.load!(empty_author, [ + :comment_titles_through_posts, + :comment_titles_joined_through_posts + ]) + end + + test "multi-hop aggregates can be filtered, sorted and used in calculations" do + one_comment = create_author!("one", "comment") + two_comments = create_author!("two", "comments") + no_comments = create_author!("no", "comments") + + one_post = create_post_for_author!(one_comment, "one post") + two_post = create_post_for_author!(two_comments, "two post") + + create_comment!(one_post, "only", 4) + create_comment!(two_post, "first", 5) + create_comment!(two_post, "second", 6) + + assert [ + %Author{ + id: two_comments_id, + count_of_comments_through_posts: 2, + comment_likes_through_posts_plus_one: 12 + }, + %Author{ + id: one_comment_id, + count_of_comments_through_posts: 1, + comment_likes_through_posts_plus_one: 5 + } + ] = + Author + |> Ash.Query.load([ + :count_of_comments_through_posts, + :comment_likes_through_posts_plus_one + ]) + |> Ash.Query.filter(count_of_comments_through_posts > 0) + |> Ash.Query.sort(count_of_comments_through_posts: :desc) + |> Ash.read!() + + assert two_comments_id == two_comments.id + assert one_comment_id == one_comment.id + + assert %{comment_likes_through_posts_plus_one: 1} = + Ash.load!(no_comments, :comment_likes_through_posts_plus_one) + end + + test "aggregate join filters are applied on multi-hop relationships" do + author = create_author!("multi", "join filter") + public_post = create_post_for_author!(author, "public post", %{public: true}) + private_post = create_post_for_author!(author, "private post", %{public: false}) + + create_comment!(public_post, "match", 1) + create_comment!(public_post, "other", 1) + create_comment!(private_post, "match", 1) + + loaded_author = + Ash.load!(author, [ + :count_of_comments_on_public_posts, + :count_of_comments_called_match_with_join_filter + ]) + + assert loaded_author.count_of_comments_on_public_posts == 2 + assert loaded_author.count_of_comments_called_match_with_join_filter == 2 + end + + test "intermediate read action filters are applied on multi-hop aggregates" do + author = create_author!("multi", "read action") + public_post = create_post_for_author!(author, "public action post", %{public: true}) + private_post = create_post_for_author!(author, "private action post", %{public: false}) + + create_comment!(public_post, "public", 1) + create_comment!(private_post, "private", 1) + + assert %{count_of_comments_through_public_posts: 1} = + Ash.load!(author, :count_of_comments_through_public_posts) + end + + test "multi-hop scalar aggregates ending in many_to_many relationships can be loaded" do + author = create_author!("multi", "m2m") + empty_author = create_author!("empty", "m2m") + + public_post = create_post_for_author!(author, "public post", %{public: true}) + private_post = create_post_for_author!(author, "private post", %{public: false}) + + match = create_post!("match", %{score: 2}) + other = create_post!("other", %{score: 6}) + private = create_post!("private", %{score: 10}) + archived = create_post!("archived", %{score: 20}) + + link_posts!(public_post, [match, other]) + link_posts!(private_post, [private]) + create_post_link!(private_post, archived, :archived) + + loaded_author = + Ash.load!(author, [ + :count_of_linked_posts_through_posts, + :sum_of_linked_post_scores_through_posts, + :avg_linked_post_score_through_posts, + :min_linked_post_score_through_posts, + :max_linked_post_score_through_posts, + :has_linked_post_called_match_through_posts + ]) + + assert loaded_author.count_of_linked_posts_through_posts == 3 + assert loaded_author.sum_of_linked_post_scores_through_posts == 18 + assert loaded_author.avg_linked_post_score_through_posts == 6.0 + assert loaded_author.min_linked_post_score_through_posts == 2 + assert loaded_author.max_linked_post_score_through_posts == 10 + assert loaded_author.has_linked_post_called_match_through_posts == true + + loaded_empty = + Ash.load!(empty_author, [ + :count_of_linked_posts_through_posts, + :sum_of_linked_post_scores_through_posts, + :avg_linked_post_score_through_posts, + :has_linked_post_called_match_through_posts + ]) + + assert loaded_empty.count_of_linked_posts_through_posts == 0 + assert loaded_empty.sum_of_linked_post_scores_through_posts == nil + assert loaded_empty.avg_linked_post_score_through_posts == nil + assert loaded_empty.has_linked_post_called_match_through_posts == false + end + + test "multi-hop many_to_many scalar aggregates work in parent queries" do + one_link = create_author!("one", "m2m") + two_links = create_author!("two", "m2m") + create_author!("none", "m2m") + + one_post = create_post_for_author!(one_link, "one post") + two_post = create_post_for_author!(two_links, "two post") + + linked_a = create_post!("linked a", %{score: 4}) + linked_b = create_post!("linked b", %{score: 5}) + + link_posts!(one_post, [linked_a]) + link_posts!(two_post, [linked_a, linked_b]) + + assert [ + %Author{ + id: two_links_id, + count_of_linked_posts_through_posts: 2, + linked_post_score_through_posts_plus_one: 10 + }, + %Author{ + id: one_link_id, + count_of_linked_posts_through_posts: 1, + linked_post_score_through_posts_plus_one: 5 + } + ] = + Author + |> Ash.Query.load([ + :count_of_linked_posts_through_posts, + :linked_post_score_through_posts_plus_one + ]) + |> Ash.Query.filter(count_of_linked_posts_through_posts > 0) + |> Ash.Query.sort(count_of_linked_posts_through_posts: :desc) + |> Ash.read!() + + assert two_links_id == two_links.id + assert one_link_id == one_link.id + end + + test "unsupported multi-hop many_to_many aggregate shapes return stable errors" do + author = create_author!("multi", "m2m unsupported") + post = create_post_for_author!(author, "post") + linked_post = create_post!("linked") + + link_posts!(post, [linked_post]) + + assert_raise Ash.Error.Unknown, ~r/multi-hop paths that include many_to_many/, fn -> + Ash.load!(post, :count_of_comments_through_linked_posts) + end + + assert_raise Ash.Error.Unknown, ~r/multi-hop paths that include many_to_many/, fn -> + Ash.load!(author, :linked_post_titles_through_posts) + end + end + + defp create_post!(title, attrs \\ %{}) do + Post + |> Ash.Changeset.for_create(:create, Map.put(attrs, :title, title)) + |> Ash.create!() + end + + defp create_author!(first_name, last_name) do + Author + |> Ash.Changeset.for_create(:create, %{first_name: first_name, last_name: last_name}) + |> Ash.create!() + end + + defp create_post_for_author!(author, title, attrs \\ %{}) do + Post + |> Ash.Changeset.for_create(:create, Map.put(attrs, :title, title)) + |> Ash.Changeset.manage_relationship(:author, author, type: :append_and_remove) + |> Ash.create!() + end + + defp create_profile!(description) do + Profile + |> Ash.Changeset.for_create(:create, %{description: description}) + |> Ash.create!() + end + + defp create_comment!(post, title, likes, attrs \\ %{}) do + Comment + |> Ash.Changeset.for_create(:create, Map.merge(attrs, %{title: title, likes: likes})) + |> Ash.Changeset.manage_relationship(:post, post, type: :append_and_remove) + |> Ash.create!() + end + + defp create_comment_rating!(comment, score) do + Rating + |> Ash.Changeset.for_create(:create, %{score: score, resource_id: comment.id}) + |> Ash.Changeset.set_context(%{data_layer: %{table: "comment_ratings"}}) + |> Ash.create!() + end + + defp link_posts!(source, destinations) do + source + |> Ash.Changeset.new() + |> Ash.Changeset.manage_relationship(:linked_posts, destinations, type: :append_and_remove) + |> Ash.update!() + end + + defp create_post_link!(source, destination, state) do + PostLink + |> Ash.Changeset.new() + |> Ash.Changeset.change_attribute(:state, state) + |> Ash.Changeset.manage_relationship(:source_post, source, type: :append) + |> Ash.Changeset.manage_relationship(:destination_post, destination, type: :append) + |> Ash.create!() + end end diff --git a/test/calculation_test.exs b/test/calculation_test.exs index 48d6a06..c08dfc8 100644 --- a/test/calculation_test.exs +++ b/test/calculation_test.exs @@ -357,7 +357,7 @@ defmodule AshSqlite.CalculationTest do |> Ash.create!() end) - assert_raise Ash.Error.Invalid, ~r/does not support using aggregates/, fn -> + assert_raise Ash.Error.Unknown, ~r/only supports loading related/, fn -> Ash.load!(author, :post_titles) end end diff --git a/test/support/resources/author.ex b/test/support/resources/author.ex index fc83280..ad6c30c 100644 --- a/test/support/resources/author.ex +++ b/test/support/resources/author.ex @@ -29,6 +29,74 @@ defmodule AshSqlite.Test.Author do relationships do has_one(:profile, AshSqlite.Test.Profile, public?: true) has_many(:posts, AshSqlite.Test.Post, public?: true) + has_many(:public_posts, AshSqlite.Test.Post, public?: true, read_action: :public) + end + + aggregates do + count(:count_of_comments_through_posts, [:posts, :comments]) + count(:count_of_comments_through_public_posts, [:public_posts, :comments]) + count(:count_of_linked_posts_through_posts, [:posts, :linked_posts]) + count(:total_profiles, AshSqlite.Test.Profile) + sum(:total_post_score, AshSqlite.Test.Post, :score) + avg(:avg_post_score, AshSqlite.Test.Post, :score) + min(:min_post_score, AshSqlite.Test.Post, :score) + max(:max_post_score, AshSqlite.Test.Post, :score) + sum(:sum_of_comment_likes_through_posts, [:posts, :comments], :likes) + avg(:avg_comment_likes_through_posts, [:posts, :comments], :likes) + min(:min_comment_likes_through_posts, [:posts, :comments], :likes) + max(:max_comment_likes_through_posts, [:posts, :comments], :likes) + sum(:sum_of_linked_post_scores_through_posts, [:posts, :linked_posts], :score) + avg(:avg_linked_post_score_through_posts, [:posts, :linked_posts], :score) + min(:min_linked_post_score_through_posts, [:posts, :linked_posts], :score) + max(:max_linked_post_score_through_posts, [:posts, :linked_posts], :score) + + count :count_of_comments_on_public_posts, [:posts, :comments] do + join_filter(:posts, expr(public == true)) + end + + count :count_of_comments_called_match_with_join_filter, [:posts, :comments] do + join_filter([:posts, :comments], expr(title == "match")) + end + + exists :has_comment_called_match_through_posts, [:posts, :comments] do + filter(expr(title == "match")) + end + + exists :has_linked_post_called_match_through_posts, [:posts, :linked_posts] do + filter(expr(title == "match")) + end + + exists :has_any_profile, AshSqlite.Test.Profile do + filter(expr(not is_nil(description))) + end + + count :profiles_matching_first_name, AshSqlite.Test.Profile do + filter(expr(description == parent(first_name))) + end + + first :first_profile_description, AshSqlite.Test.Profile, :description do + sort(description: :asc_nils_last) + end + + list :profile_descriptions, AshSqlite.Test.Profile, :description do + sort(description: :asc_nils_last) + end + + list :comment_titles_through_posts, [:posts, :comments], :title do + sort(title: :asc_nils_last) + end + + list :linked_post_titles_through_posts, [:posts, :linked_posts], :title do + sort(title: :asc_nils_last) + end + + custom(:post_titles_joined, AshSqlite.Test.Post, :string) do + implementation({AshSqlite.Test.StringAgg, field: :title, delimiter: ","}) + end + + custom(:comment_titles_joined_through_posts, [:posts, :comments], :string) do + implementation({AshSqlite.Test.StringAgg, field: :title, delimiter: ","}) + end end calculations do @@ -76,5 +144,19 @@ defmodule AshSqlite.Test.Author do end calculate(:post_titles, {:array, :string}, expr(list(posts, field: :title))) + + calculate( + :comment_likes_through_posts_plus_one, + :integer, + expr((sum_of_comment_likes_through_posts || 0) + 1) + ) + + calculate( + :linked_post_score_through_posts_plus_one, + :integer, + expr((sum_of_linked_post_scores_through_posts || 0) + 1) + ) + + calculate(:total_profiles_plus_one, :integer, expr(total_profiles + 1)) end end diff --git a/test/support/resources/comment.ex b/test/support/resources/comment.ex index ca2dc6b..25374ae 100644 --- a/test/support/resources/comment.ex +++ b/test/support/resources/comment.ex @@ -31,6 +31,12 @@ defmodule AshSqlite.Test.Comment do default_accept(:*) defaults([:read, :update, :destroy]) + read :liked do + filter(expr(likes > 5)) + end + + read(:public) + create :create do primary?(true) argument(:rating, :map) @@ -64,4 +70,8 @@ defmodule AshSqlite.Test.Comment do filter: expr(score > 5) ) end + + aggregates do + count(:count_of_ratings, :ratings) + end end diff --git a/test/support/resources/post.ex b/test/support/resources/post.ex index 6b283cb..a70c4bb 100644 --- a/test/support/resources/post.ex +++ b/test/support/resources/post.ex @@ -43,6 +43,10 @@ defmodule AshSqlite.Test.Post do pagination(offset?: true, required?: true) end + read :public do + filter(expr(public == true)) + end + create :create do primary?(true) argument(:rating, :map) @@ -172,9 +176,211 @@ defmodule AshSqlite.Test.Post do validate(attribute_does_not_equal(:title, "not allowed")) end + aggregates do + count(:count_of_comments, :comments) + count(:count_of_popular_comments, :popular_comments) + count(:count_of_linked_posts, :linked_posts) + count(:count_of_comments_through_linked_posts, [:linked_posts, :comments]) + count(:count_of_liked_comments, :comments, read_action: :liked) + count(:count_of_comment_ratings, [:comments, :ratings]) + sum(:sum_of_comment_likes, :comments, :likes) + sum(:sum_of_comment_likes_called_match, :comments, :likes, filter: expr(title == "match")) + + sum(:sum_of_comment_likes_with_popular_ratings, :comments, :likes) do + filter(expr(not is_nil(popular_ratings.id))) + end + + sum(:sum_of_comment_likes_with_popular_ratings_exists, :comments, :likes) do + filter(expr(exists(popular_ratings, score > 5))) + end + + sum(:sum_of_linked_post_scores, :linked_posts, :score) + avg(:avg_comment_likes, :comments, :likes) + + avg(:avg_comment_likes_with_popular_ratings, :comments, :likes) do + filter(expr(not is_nil(popular_ratings.id))) + end + + avg(:avg_linked_post_score, :linked_posts, :score) + min(:min_comment_likes, :comments, :likes) + min(:min_linked_post_score, :linked_posts, :score) + max(:max_comment_likes, :comments, :likes) + max(:max_linked_post_score, :linked_posts, :score) + + first :first_comment, :comments, :title do + sort(title: :asc_nils_last) + end + + first :first_comment_nils_first, :comments, :title do + sort(title: :asc_nils_first) + end + + first :first_comment_nils_first_called_stuff, :comments, :title do + sort(title: :asc_nils_first) + filter(expr(title == "stuff")) + end + + first :first_comment_nils_first_include_nil, :comments, :title do + include_nil?(true) + sort(title: :asc_nils_first) + end + + first :last_comment, :comments, :title do + sort(title: :desc) + end + + first :latest_comment_created_at, :comments, :created_at do + sort(created_at: :desc) + end + + first :highest_rating, [:comments, :ratings], :score do + sort(score: :desc) + end + + first(:author_first_name, :author, :first_name) + + first :first_linked_post_title, :linked_posts, :title do + sort(title: :asc_nils_last) + end + + first :first_linked_post_title_with_author, :linked_posts, :title do + sort(title: :asc_nils_last) + filter(expr(not is_nil(author.id))) + end + + first :first_linked_post_title_with_author_join_filter, :linked_posts, :title do + sort(title: :asc_nils_last) + join_filter(:linked_posts, expr(not is_nil(author.id))) + end + + list :comment_titles, :comments, :title do + sort(title: :asc_nils_last) + end + + list :comment_titles_with_nils, :comments, :title do + sort(title: :asc_nils_last) + include_nil?(true) + end + + list :uniq_comment_titles, :comments, :title do + uniq?(true) + sort(title: :asc_nils_last) + end + + list :comment_titles_with_5_likes, :comments, :title do + sort(title: :asc_nils_last) + filter(expr(likes >= 5)) + end + + list :comment_titles_with_popular_ratings, :comments, :title do + sort(title: :asc_nils_last) + filter(expr(not is_nil(popular_ratings.id))) + end + + list(:comment_ids, :comments, :id) + + list :linked_post_titles, :linked_posts, :title do + sort(title: :asc_nils_last) + end + + list :linked_post_titles_with_author, :linked_posts, :title do + sort(title: :asc_nils_last) + filter(expr(not is_nil(author.id))) + end + + list :linked_post_titles_with_author_join_filter, :linked_posts, :title do + sort(title: :asc_nils_last) + join_filter(:linked_posts, expr(not is_nil(author.id))) + end + + custom(:comment_titles_joined, :comments, :string) do + implementation({AshSqlite.Test.StringAgg, field: :title, delimiter: ","}) + end + + custom(:total_comment_likes_custom, :comments, :float) do + implementation({AshSqlite.Test.TotalAgg, field: :likes}) + end + + custom(:comment_titles_joined_with_popular_ratings, :comments, :string) do + filter(expr(not is_nil(popular_ratings.id))) + implementation({AshSqlite.Test.StringAgg, field: :title, delimiter: ","}) + end + + custom(:linked_post_titles_joined, :linked_posts, :string) do + implementation({AshSqlite.Test.StringAgg, field: :title, delimiter: ","}) + end + + count :count_of_comments_called_match, :comments do + filter(expr(title == "match")) + end + + count :count_of_comments_with_join_filter, :comments do + join_filter(:comments, expr(title == "match")) + end + + count :count_of_comments_with_related_filter, :comments do + filter(expr(not is_nil(post.id))) + end + + count :count_of_comments_with_related_exists_filter, :comments do + filter(expr(exists(post, not is_nil(id)))) + end + + count :count_of_comments_with_popular_ratings, :comments do + filter(expr(not is_nil(popular_ratings.id))) + end + + count :count_comment_titles_with_popular_ratings, :comments do + field(:title) + filter(expr(not is_nil(popular_ratings.id))) + end + + count :count_of_comments_with_aggregate_filter, :comments do + filter(expr(count_of_ratings > 0)) + end + + count :count_of_comments_matching_post_title, :comments do + filter(expr(title == parent(title))) + end + + count :count_of_comments_with_parent_join_filter, :comments do + join_filter(:comments, expr(title == parent(title))) + end + + exists :has_comment_called_match, :comments do + filter(expr(title == "match")) + end + + exists :has_linked_post_called_match, :linked_posts do + filter(expr(title == "match")) + end + + count :count_of_linked_posts_with_join_filter, :linked_posts do + join_filter(:linked_posts, expr(title == "match")) + end + + count :count_of_linked_posts_with_author, :linked_posts do + filter(expr(not is_nil(author.id))) + end + end + calculations do calculate(:score_after_winning, :integer, expr((score || 0) + 1)) calculate(:negative_score, :integer, expr(-score)) + calculate(:has_comments, :boolean, expr(count_of_comments > 0)) + + calculate( + :comment_likes_with_score, + :integer, + expr((sum_of_comment_likes || 0) + (score || 0)) + ) + + calculate( + :linked_post_score_with_score, + :integer, + expr((sum_of_linked_post_scores || 0) + (score || 0)) + ) + calculate(:category_label, :string, expr("(" <> category <> ")")) calculate(:score_with_score, :string, expr(score <> score)) calculate(:foo_bar_from_stuff, :string, expr(stuff[:foo][:bar])) diff --git a/test/support/string_agg.ex b/test/support/string_agg.ex new file mode 100644 index 0000000..9cc6519 --- /dev/null +++ b/test/support/string_agg.ex @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: 2023 ash_sqlite contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshSqlite.Test.StringAgg do + @moduledoc false + + use Ash.Resource.Aggregate.CustomAggregate + use AshSqlite.CustomAggregate + + import Ecto.Query + + def dynamic(opts, binding) do + field = Keyword.fetch!(opts, :field) + delimiter = Keyword.get(opts, :delimiter, ",") + + dynamic(fragment("group_concat(?, ?)", field(as(^binding), ^field), ^delimiter)) + end +end diff --git a/test/support/total_agg.ex b/test/support/total_agg.ex new file mode 100644 index 0000000..af3133d --- /dev/null +++ b/test/support/total_agg.ex @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: 2023 ash_sqlite contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshSqlite.Test.TotalAgg do + @moduledoc false + + use Ash.Resource.Aggregate.CustomAggregate + use AshSqlite.CustomAggregate + + import Ecto.Query + + def dynamic(opts, binding) do + field = Keyword.fetch!(opts, :field) + + dynamic(fragment("total(?)", field(as(^binding), ^field))) + end +end