Skip to content

Commit

Permalink
Implement File Storage (#2212)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleDsz authored Sep 25, 2023
1 parent 393915d commit 96bf5dd
Show file tree
Hide file tree
Showing 29 changed files with 837 additions and 342 deletions.
73 changes: 57 additions & 16 deletions lib/livebook/file_system/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ defmodule Livebook.FileSystem.S3 do
@type t :: %__MODULE__{
id: String.t(),
bucket_url: String.t(),
external_id: String.t(),
external_id: String.t() | nil,
region: String.t(),
access_key_id: String.t(),
secret_access_key: String.t()
secret_access_key: String.t(),
hub_id: String.t()
}

embedded_schema do
Expand All @@ -19,6 +20,7 @@ defmodule Livebook.FileSystem.S3 do
field :region, :string
field :access_key_id, :string
field :secret_access_key, :string
field :hub_id, :string
end

@doc """
Expand All @@ -32,37 +34,42 @@ defmodule Livebook.FileSystem.S3 do
* `:external_id` - the external id from Teams.
* `:prefix` - the id prefix.
* `:hub_id` - the Hub id.
* `:id` - the file system id.
"""
@spec new(String.t(), String.t(), String.t(), keyword()) :: t()
def new(bucket_url, access_key_id, secret_access_key, opts \\ []) do
opts = Keyword.validate!(opts, [:region, :external_id, :prefix])
opts = Keyword.validate!(opts, [:region, :external_id, :hub_id, :id])

bucket_url = String.trim_trailing(bucket_url, "/")
region = opts[:region] || region_from_uri(bucket_url)

hash = :crypto.hash(:sha256, bucket_url) |> Base.url_encode64(padding: false)

id =
if prefix = opts[:prefix],
do: "#{prefix}-s3-#{hash}",
else: "s3-#{hash}"
hub_id = Keyword.get(opts, :hub_id, Livebook.Hubs.Personal.id())
id = opts[:id] || id(hub_id, bucket_url)

%__MODULE__{
id: id,
bucket_url: bucket_url,
external_id: opts[:external_id],
region: region,
access_key_id: access_key_id,
secret_access_key: secret_access_key
secret_access_key: secret_access_key,
hub_id: hub_id
}
end

defp region_from_uri(uri) do
# For many services the API host is of the form *.[region].[rootdomain].com
%{host: host} = URI.parse(uri)
host |> String.split(".") |> Enum.reverse() |> Enum.at(2, "auto")
splitted_host = host |> String.split(".") |> Enum.reverse()

case Enum.at(splitted_host, 2, "auto") do
"s3" -> "us-east-1"
"r2" -> "auto"
region -> region
end
end

@doc """
Expand Down Expand Up @@ -110,10 +117,42 @@ defmodule Livebook.FileSystem.S3 do
:external_id,
:region,
:access_key_id,
:secret_access_key
:secret_access_key,
:hub_id
])
|> put_region_from_uri()
|> validate_required([:bucket_url, :access_key_id, :secret_access_key])
|> Livebook.Utils.validate_url(:bucket_url)
|> put_id()
end

defp put_region_from_uri(changeset) do
case get_field(changeset, :bucket_url) do
nil -> changeset
bucket_url -> put_change(changeset, :region, region_from_uri(bucket_url))
end
end

defp put_id(changeset) do
hub_id = get_field(changeset, :hub_id)
bucket_url = get_field(changeset, :bucket_url)

if get_field(changeset, :id) do
changeset
else
put_change(changeset, :id, id(hub_id, bucket_url))
end
end

def id(_, nil), do: nil
def id(nil, bucket_url), do: hashed_id(bucket_url)
def id(hub_id, bucket_url), do: "#{hub_id}-#{hashed_id(bucket_url)}"

defp hashed_id(bucket_url) do
hash = :crypto.hash(:sha256, bucket_url)
encrypted_hash = Base.url_encode64(hash, padding: false)

"s3-#{encrypted_hash}"
end
end

Expand Down Expand Up @@ -377,22 +416,24 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
region: fields["region"],
access_key_id: fields["access_key_id"],
secret_access_key: fields["secret_access_key"],
prefix: fields["prefix"]
id: fields["id"],
hub_id: fields["hub_id"]
})
end

def load(_file_system, fields) do
S3.new(fields.bucket_url, fields.access_key_id, fields.secret_access_key,
region: fields[:region],
external_id: fields[:external_id],
prefix: fields[:prefix]
id: fields[:id],
hub_id: fields[:hub_id]
)
end

def dump(file_system) do
file_system
|> Map.from_struct()
|> Map.take([:bucket_url, :region, :access_key_id, :secret_access_key])
|> Map.take([:id, :bucket_url, :region, :access_key_id, :secret_access_key, :hub_id])
end

def external_metadata(file_system) do
Expand Down
27 changes: 27 additions & 0 deletions lib/livebook/file_systems.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,33 @@ defmodule Livebook.FileSystems do
@spec type(FileSystem.t()) :: String.t()
def type(%FileSystem.S3{}), do: "s3"

@doc """
Updates file system with the given changes.
"""
@spec update_file_system(FileSystem.t(), map()) ::
{:ok, FileSystem.t()} | {:error, Ecto.Changeset.t()}
def update_file_system(file_system, attrs) do
file_system
|> change_file_system(attrs)
|> Ecto.Changeset.apply_action(:update)
end

@doc """
Returns an `%Ecto.Changeset{}` for tracking file system changes.
"""
@spec change_file_system(FileSystem.t()) :: Ecto.Changeset.t()
def change_file_system(file_system) do
change_file_system(file_system, %{})
end

@doc """
Returns an `%Ecto.Changeset{}` for tracking file system changes.
"""
@spec change_file_system(FileSystem.t(), map()) :: Ecto.Changeset.t()
def change_file_system(%FileSystem.S3{} = file_system, attrs) do
FileSystem.S3.change_file_system(file_system, attrs)
end

@doc """
Loads the file system from given type and dumped data.
"""
Expand Down
27 changes: 23 additions & 4 deletions lib/livebook/hubs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ defmodule Livebook.Hubs do
* `{:secret_updated, %Secret{}}`
* `{:secret_deleted, %Secret{}}`
Topic `hubs:file_systems`:
* `{:file_system_created, FileSystem.t()}`
* `{:file_system_updated, FileSystem.t()}`
* `{:file_system_deleted, FileSystem.t()}`
"""
@spec subscribe(atom() | list(atom())) :: :ok | {:error, term()}
def subscribe(topics) when is_list(topics) do
Expand Down Expand Up @@ -294,15 +300,28 @@ defmodule Livebook.Hubs do
Provider.verify_notebook_stamp(hub, notebook_source, stamp)
end

@doc """
Gets a list of file systems from all hubs.
"""
@spec get_file_systems() :: list(FileSystem.t())
def get_file_systems() do
file_systems = Enum.flat_map(get_hubs(), &Provider.get_file_systems/1)
local_file_system = Livebook.Config.local_file_system()

[local_file_system | Enum.sort_by(file_systems, & &1.id)]
end

@doc """
Gets a list of file systems for given hub.
"""
@spec get_file_systems(Provider.t()) :: list(FileSystem.t())
def get_file_systems(hub) do
@spec get_file_systems(Provider.t(), keyword()) :: list(FileSystem.t())
def get_file_systems(hub, opts \\ []) do
hub_file_systems = Provider.get_file_systems(hub)
local_file_system = Livebook.Config.local_file_system()
sorted_hub_file_systems = Enum.sort_by(hub_file_systems, & &1.id)

[local_file_system | Enum.sort_by(hub_file_systems, & &1.id)]
if opts[:hub_only],
do: sorted_hub_file_systems,
else: [Livebook.Config.local_file_system() | sorted_hub_file_systems]
end

@doc """
Expand Down
7 changes: 3 additions & 4 deletions lib/livebook/hubs/team_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,9 @@ defmodule Livebook.Hubs.TeamClient do
{:ok, decrypted_value} = Teams.decrypt(file_system.value, secret_key, sign_secret)

dumped_data =
Map.merge(Jason.decode!(decrypted_value), %{
"external_id" => file_system.id,
"prefix" => state.hub.id
})
decrypted_value
|> Jason.decode!()
|> Map.put("external_id", file_system.id)

FileSystems.load(file_system.type, dumped_data)
end
Expand Down
6 changes: 3 additions & 3 deletions lib/livebook/users/user.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ defmodule Livebook.Users.User do
@doc """
Generates a new user.
"""
@spec new() :: t()
def new() do
@spec new(String.t()) :: t()
def new(id \\ Utils.random_id()) do
%__MODULE__{
id: Utils.random_id(),
id: id,
name: nil,
email: nil,
hex_color: Livebook.EctoTypes.HexColor.random()
Expand Down
18 changes: 14 additions & 4 deletions lib/livebook_web/live/file_select_component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ defmodule LivebookWeb.FileSelectComponent do
renaming_file: nil,
renamed_name: nil,
error_message: nil,
file_systems: Livebook.Settings.file_systems()
configure_path: nil,
file_systems: []
)
|> allow_upload(:folder,
accept: :any,
Expand All @@ -70,7 +71,14 @@ defmodule LivebookWeb.FileSelectComponent do
|> assign(assigns)
|> update_file_infos(force_reload? or running_files_changed?)

{:ok, socket}
{file_systems, configure_hub_id} =
if hub = socket.assigns[:hub],
do: {Livebook.Hubs.get_file_systems(hub), hub.id},
else: {Livebook.Hubs.get_file_systems(), Livebook.Hubs.Personal.id()}

configure_path = ~p"/hub/#{configure_hub_id}/file-systems/new"

{:ok, assign(socket, file_systems: file_systems, configure_path: configure_path)}
end

@impl true
Expand All @@ -83,6 +91,7 @@ defmodule LivebookWeb.FileSelectComponent do
<.file_system_menu_button
file={@file}
file_systems={@file_systems}
configure_path={@configure_path}
file_system_select_disabled={@file_system_select_disabled}
myself={@myself}
/>
Expand Down Expand Up @@ -281,14 +290,15 @@ defmodule LivebookWeb.FileSelectComponent do
<%= for file_system <- @file_systems do %>
<%= if file_system == @file.file_system do %>
<.menu_item variant={:selected}>
<button role="menuitem">
<button id={"file-system-#{file_system.id}"} role="menuitem">
<.file_system_icon file_system={file_system} />
<span><%= file_system_label(file_system) %></span>
</button>
</.menu_item>
<% else %>
<.menu_item>
<button
id={"file-system-#{file_system.id}"}
role="menuitem"
phx-target={@myself}
phx-click="set_file_system"
Expand All @@ -301,7 +311,7 @@ defmodule LivebookWeb.FileSelectComponent do
<% end %>
<% end %>
<.menu_item>
<.link navigate={~p"/settings"} class="border-t border-gray-200" role="menuitem">
<.link navigate={@configure_path} class="border-t border-gray-200" role="menuitem">
<.remix_icon icon="settings-3-line" />
<span>Configure</span>
</.link>
Expand Down
4 changes: 2 additions & 2 deletions lib/livebook_web/live/hooks/user_hook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ defmodule LivebookWeb.UserHook do
# attributes if the socket is connected. Otherwise uses
# `user_data` from session.
defp build_current_user(session, socket) do
user = User.new()
identity_data = Map.new(session["identity_data"], fn {k, v} -> {Atom.to_string(k), v} end)

connect_params = get_connect_params(socket) || %{}
attrs = connect_params["user_data"] || session["user_data"] || %{}

Expand All @@ -45,6 +43,8 @@ defmodule LivebookWeb.UserHook do
attrs -> attrs
end

user = User.new(attrs["id"])

case Livebook.Users.update_user(user, attrs) do
{:ok, user} -> user
{:error, _changeset} -> user
Expand Down
Loading

0 comments on commit 96bf5dd

Please sign in to comment.