Skip to content

Commit

Permalink
per source searching
Browse files Browse the repository at this point in the history
  • Loading branch information
yujonglee committed Oct 29, 2024
1 parent 1ba619d commit 6125b19
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 66 deletions.
138 changes: 89 additions & 49 deletions core/lib/canary/index/trieve/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,17 @@ defmodule Canary.Index.Trieve.Actual do
def search(client, query, opts \\ []) do
rag? = Keyword.get(opts, :rag, false)
tags = Keyword.get(opts, :tags, nil)
source_ids = Keyword.get(opts, :source_ids, nil)

search_type = if(rag? or question?(query), do: :hybrid, else: :fulltext)
receive_timeout = if(rag? or question?(query), do: 3_000, else: 1_500)
remove_stop_words = not (rag? or question?(query))
page_size = if(rag?, do: 8, else: 32)
group_size = if(rag?, do: 5, else: 3)
page_size = 8

score_threshold =
case search_type do
:fulltext -> 1
_ -> 0.5
_ -> 0.3
end

highlight_options =
Expand All @@ -239,59 +239,99 @@ defmodule Canary.Index.Trieve.Actual do
}
end

filters = %{
must:
[
if(not is_nil(tags) and tags != [],
do: %{
field: "tag_set",
match_any: [
format_for_tagset(:empty_tags)
| Enum.map(tags, &format_for_tagset(:tag, &1))
]
filters =
if is_nil(source_ids) do
%{must: [filter_for_tags(tags)] |> Enum.reject(&is_nil/1)}
else
source_ids
|> Enum.map(fn id ->
%{must: [filter_for_tags(tags), filter_for_source_id(id)] |> Enum.reject(&is_nil/1)}
end)
end

result =
filters
|> Enum.map(fn f ->
Task.async(fn ->
client
|> run_search(%{
filters: f,
query: query,
page: 1,
page_size: page_size,
group_size: group_size,
search_type: search_type,
score_threshold: score_threshold,
remove_stop_words: remove_stop_words,
typo_options: %{
correct_typos: true,
one_typo_word_range: %{min: 3, max: 3 * 3},
two_typo_word_range: %{min: 4, max: 4 * 3}
},
else: nil
)
]
|> Enum.reject(&is_nil/1)
highlight_options:
Map.merge(
%{
highlight_results: true,
highlight_max_num: 1,
pre_tag: "<mark>",
post_tag: "</mark>"
},
highlight_options
)
})
end)
end)
|> Task.await_many(2_500)

if Enum.all?(result, &match?({:error, _}, &1)) do
{:error, result}
else
merged =
result
|> Enum.flat_map(fn
{:ok, v} -> v
_ -> []
end)
|> Enum.sort_by(
fn %{"chunks" => chunks} ->
chunks
|> Enum.max_by(& &1["score"])
|> Map.get("score")
end,
:desc
)

{:ok, merged}
end
end

defp filter_for_source_id(id) do
%{
field: "tag_set",
match_any: [format_for_tagset(:source_id, id)]
}
end

defp filter_for_tags(nil), do: nil
defp filter_for_tags([]), do: nil

defp filter_for_tags(tags) do
%{
field: "tag_set",
match_any: [
format_for_tagset(:empty_tags)
| Enum.map(tags, &format_for_tagset(:tag, &1))
]
}
end

defp run_search(client, data) do
# https://docs.trieve.ai/api-reference/chunk-group/search-over-groups
case client
|> Req.post(
receive_timeout: receive_timeout,
receive_timeout: 2_000,
url: "/chunk_group/group_oriented_search",
json: %{
query: query,
filters: filters,
page: 1,
page_size: page_size,
group_size: group_size,
search_type: search_type,
score_threshold: score_threshold,
remove_stop_words: remove_stop_words,
typo_options: %{
correct_typos: true,
one_typo_word_range: %{
min: 3,
max: 3 * 3
},
two_typo_word_range: %{
min: 4,
max: 4 * 3
}
},
highlight_options:
Map.merge(
%{
highlight_results: true,
highlight_max_num: 1,
pre_tag: "<mark>",
post_tag: "</mark>"
},
highlight_options
)
}
json: data
) do
{:ok, %{status: 200, body: %{"results" => results}}} ->
{:ok, results}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/canary/interface/ask.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Canary.Interface.Ask.Default do

results =
groups
|> Enum.take(5)
|> Enum.take(6)
|> Enum.map(fn %{"chunks" => chunks, "group" => %{"tracking_id" => group_id}} ->
Task.async(fn ->
chunk_indices =
Expand Down
39 changes: 25 additions & 14 deletions core/lib/canary/interface/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,26 @@ defmodule CanaryWeb.Interface.Controller do
end

defp wrap_project_query(query, action) do
source_query =
Canary.Sources.Source
|> Ash.Query.select([:id])

billing_query =
Canary.Accounts.Billing
|> Ash.Query.select([:id])
|> Ash.Query.load(:membership)

account_query =
Canary.Accounts.Account
|> Ash.Query.select([:id])
|> Ash.Query.load(billing: billing_query)

if action == :search do
query
|> Ash.Query.load(sources: source_query)
else
billing_query =
Canary.Accounts.Billing
|> Ash.Query.select([:id])
|> Ash.Query.load(:membership)

account_query =
Canary.Accounts.Account
|> Ash.Query.select([:id])
|> Ash.Query.load(billing: billing_query)

query
|> Ash.Query.load(account: account_query)
|> Ash.Query.load(sources: source_query, account: account_query)
end
end

Expand All @@ -103,8 +108,13 @@ defmodule CanaryWeb.Interface.Controller do
def search(conn, %{"query" => %{"text" => query, "tags" => tags}} = params) do
try do
matches =
conn.assigns.project
|> Canary.Interface.Search.run!(query, tags: tags, cache: cache?())
Canary.Interface.Search.run!(
conn.assigns.project,
query,
tags: tags,
cache: cache?(),
source_ids: Enum.map(conn.assigns.project.sources, & &1.id)
)

data = %{
matches: matches,
Expand Down Expand Up @@ -169,7 +179,8 @@ defmodule CanaryWeb.Interface.Controller do
query,
&send(here, {:delta, &1}),
tags: tags,
cache: cache?()
cache: cache?(),
source_ids: Enum.map(conn.assigns.project.sources, & &1.id)
)

send(here, {:done, completion})
Expand Down
3 changes: 2 additions & 1 deletion core/lib/canary/interface/search.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Canary.Interface.Search do
end

defp set_cache(project, query, opts, result) do
Cachex.put(:cache, key(project, query, opts), result, ttl: :timer.minutes(3))
Cachex.put(:cache, key(project, query, opts), result, ttl: :timer.minutes(1))
end

defp get_cache(project, query, opts) do
Expand All @@ -38,6 +38,7 @@ defmodule Canary.Interface.Search do
project.id
|> Kernel.<>(":" <> query)
|> Kernel.<>(":" <> Jason.encode!(opts[:tags]))
|> Kernel.<>(":" <> Jason.encode!(opts[:source_ids]))
end

defp impl(),
Expand Down
1 change: 1 addition & 0 deletions core/lib/canary/sources/document_create.ex
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ defmodule Canary.Sources.Document.Create do
url: chunk.url,
meta: chunk.meta,
source_id: source_id,
title: chunk[:title],
weight: chunk[:weight],
tags: changeset.context.remote_tags
}
Expand Down
5 changes: 4 additions & 1 deletion core/lib/canary/sources/github_fetcher.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule Canary.Sources.GithubFetcher do
require Logger

defp client() do
Canary.graphql_client(
url: "https://api.github.com/graphql",
Expand Down Expand Up @@ -35,11 +37,12 @@ defmodule Canary.Sources.GithubFetcher do
end

{:try_after_s, seconds} ->
Logger.warning("failed to fetch from github, retrying in #{seconds} seconds")
Process.sleep(seconds * 1000)
{[], cursor}

{:error, errors} ->
IO.inspect(errors)
Logger.error("failed to fetch from github: #{inspect(errors)}")
{[], :stop}
end
end)
Expand Down

0 comments on commit 6125b19

Please sign in to comment.