From 3b6989f5963802861dfc3a8e2821ad2fd469ab92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Wed, 28 Feb 2024 13:58:56 +0100 Subject: [PATCH 01/17] Add Membrane.AWS.S3.Source --- ...sues-in-membranefranework-membrane_core.md | 12 -- .github/workflows/fetch_changes.yml | 54 -------- .github/workflows/on_issue_opened.yaml | 20 --- .github/workflows/on_pr_opened.yaml | 19 --- README.md | 22 ++-- lib/membrane_template.ex | 2 - lib/s3/s3_source.ex | 65 +++++++++ mix.exs | 39 ++++-- mix.lock | 22 ++++ test/s3_source_test.exs | 124 ++++++++++++++++++ test/support/bypass_helpers.ex | 17 +++ 11 files changed, 271 insertions(+), 125 deletions(-) delete mode 100644 .github/ISSUE_TEMPLATE/please--open-new-issues-in-membranefranework-membrane_core.md delete mode 100644 .github/workflows/fetch_changes.yml delete mode 100644 .github/workflows/on_issue_opened.yaml delete mode 100644 .github/workflows/on_pr_opened.yaml delete mode 100644 lib/membrane_template.ex create mode 100644 lib/s3/s3_source.ex create mode 100644 test/s3_source_test.exs create mode 100644 test/support/bypass_helpers.ex diff --git a/.github/ISSUE_TEMPLATE/please--open-new-issues-in-membranefranework-membrane_core.md b/.github/ISSUE_TEMPLATE/please--open-new-issues-in-membranefranework-membrane_core.md deleted file mode 100644 index 7dc05eb..0000000 --- a/.github/ISSUE_TEMPLATE/please--open-new-issues-in-membranefranework-membrane_core.md +++ /dev/null @@ -1,12 +0,0 @@ ---- -name: Please, open new issues in membranefranework/membrane_core -about: New issues related to this repo should be opened there -title: "[DO NOT OPEN]" -labels: '' -assignees: '' - ---- - -Please, do not open this issue here. Open it in the [membrane_core](https://github.com/membraneframework/membrane_core) repository instead. - -Thanks for helping us grow :) diff --git a/.github/workflows/fetch_changes.yml b/.github/workflows/fetch_changes.yml deleted file mode 100644 index 7c9c0e9..0000000 --- a/.github/workflows/fetch_changes.yml +++ /dev/null @@ -1,54 +0,0 @@ -# This is a basic workflow to help you get started with Actions - -name: Fetch changes - -# Controls when the workflow will run -on: - # Trigger thrice a day - schedule: - - cron: '0 4,8,12 * * *' - - # Allows you to run this workflow manually from the Actions tab - workflow_dispatch: - -jobs: - # This workflow contains a single job called "build" - build: - # The type of runner that the job will run on - runs-on: ubuntu-latest - - # Steps represent a sequence of tasks that will be executed as part of the job - steps: - # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - - uses: actions/checkout@v3 - with: - fetch-depth: '0' - - - name: webfactory/ssh-agent - uses: webfactory/ssh-agent@v0.5.4 - with: - ssh-private-key: ${{ secrets.SSH_PRIVATE_KEY }} - - # Runs a set of commands using the runners shell - - name: Add remote - run: | - git remote add source git@github.com:membraneframework/membrane_template_plugin.git - git remote update - - echo "CURRENT_BRANCH=$(git branch --show-current)" >> $GITHUB_ENV - - - name: Check changes - run: | - echo ${{env.CURRENT_BRANCH}} - echo "LOG_SIZE=$(git log origin/${{ env.CURRENT_BRANCH }}..source/${{ env.CURRENT_BRANCH }} | wc -l)" - - echo "LOG_SIZE=$(git log origin/${{ env.CURRENT_BRANCH }}..source/${{ env.CURRENT_BRANCH }} | wc -l)" >> $GITHUB_ENV - - - if: ${{ env.LOG_SIZE != '0'}} - name: Merge changes - run: | - git config --global user.email "admin@membraneframework.com" - git config --global user.name "MembraneFramework" - - git merge source/master - git push origin master diff --git a/.github/workflows/on_issue_opened.yaml b/.github/workflows/on_issue_opened.yaml deleted file mode 100644 index 23208d1..0000000 --- a/.github/workflows/on_issue_opened.yaml +++ /dev/null @@ -1,20 +0,0 @@ -name: 'Close issue when opened' -on: - issues: - types: - - opened -jobs: - close: - runs-on: ubuntu-latest - steps: - - name: Checkout membrane_core - uses: actions/checkout@v3 - with: - repository: membraneframework/membrane_core - - name: Close issue - uses: ./.github/actions/close_issue - with: - GITHUB_TOKEN: ${{ secrets.MEMBRANEFRAMEWORKADMIN_TOKEN }} - ISSUE_URL: ${{ github.event.issue.html_url }} - ISSUE_NUMBER: ${{ github.event.issue.number }} - REPOSITORY: ${{ github.repository }} diff --git a/.github/workflows/on_pr_opened.yaml b/.github/workflows/on_pr_opened.yaml deleted file mode 100644 index 248caba..0000000 --- a/.github/workflows/on_pr_opened.yaml +++ /dev/null @@ -1,19 +0,0 @@ -name: Add PR to Smackore project board, if the author is from outside Membrane Team -on: - pull_request_target: - types: - - opened -jobs: - maybe_add_to_project_board: - runs-on: ubuntu-latest - steps: - - name: Checkout membrane_core - uses: actions/checkout@v3 - with: - repository: membraneframework/membrane_core - - name: Puts PR in "New PRs by community" column in the Smackore project, if the author is from outside Membrane Team - uses: ./.github/actions/add_pr_to_smackore_board - with: - GITHUB_TOKEN: ${{ secrets.MEMBRANEFRAMEWORKADMIN_TOKEN }} - AUTHOR_LOGIN: ${{ github.event.pull_request.user.login }} - PR_URL: ${{ github.event.pull_request.html_url }} diff --git a/README.md b/README.md index e057e4c..81a6109 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,25 @@ -# Membrane Template Plugin +# Membrane AWS Plugin -[![Hex.pm](https://img.shields.io/hexpm/v/membrane_template_plugin.svg)](https://hex.pm/packages/membrane_template_plugin) -[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_template_plugin) -[![CircleCI](https://circleci.com/gh/membraneframework/membrane_template_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_template_plugin) +[![Hex.pm](https://img.shields.io/hexpm/v/membrane_aws_plugin.svg)](https://hex.pm/packages/membrane_aws_plugin) +[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_aws_plugin) +[![CircleCI](https://circleci.com/gh/jellyfish-dev/membrane_aws_plugin.svg?style=svg)](https://circleci.com/gh/jellyfish-dev/membrane_aws_plugin) +[![codecov](https://codecov.io/gh/jellyfish-dev/membrane_aws_plugin/branch/main/graph/badge.svg?token=ANWFKV2EDP)](https://codecov.io/gh/jellyfish-dev/membrane_aws_plugin) -This repository contains a template for new plugins. +This repository contains Membrane element that interacts with AWS. +Currently implemented are: +- `Membrane.AWS.S3.Source` -Check out different branches for other flavors of this template. It's a part of the [Membrane Framework](https://membrane.stream). ## Installation -The package can be installed by adding `membrane_template_plugin` to your list of dependencies in `mix.exs`: +The package can be installed by adding `membrane_aws_plugin` to your list of dependencies in `mix.exs`: ```elixir def deps do [ - {:membrane_template_plugin, "~> 0.1.0"} + {:membrane_aws_plugin, "~> 0.1.0"} ] end ``` @@ -28,8 +30,8 @@ TODO ## Copyright and License -Copyright 2020, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin) +Copyright 2024, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_aws_plugin) -[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin) +[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_aws_plugin) Licensed under the [Apache License, Version 2.0](LICENSE) diff --git a/lib/membrane_template.ex b/lib/membrane_template.ex deleted file mode 100644 index c6882fb..0000000 --- a/lib/membrane_template.ex +++ /dev/null @@ -1,2 +0,0 @@ -defmodule Membrane.Template do -end diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex new file mode 100644 index 0000000..0a47930 --- /dev/null +++ b/lib/s3/s3_source.ex @@ -0,0 +1,65 @@ +defmodule Membrane.AWS.S3.Source do + @moduledoc """ + Element that reads file from a S3 bucket and sends it through the output pad. + """ + use Membrane.Source + + alias Membrane.{Buffer, RemoteStream} + + def_options aws_credentials: [ + spec: Keyword.t(), + description: "Credentials to AWS", + default: [] + ], + bucket: [ + spec: binary(), + description: "Name of bucket" + ], + path: [ + spec: binary(), + description: "Path to file in bucket" + ], + opts: [ + spec: [ + max_concurrency: pos_integer(), + chunk_size: pos_integer(), + timeout: pos_integer() + ], + description: "File download opts", + default: [] + ] + + def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :manual + + @impl true + def handle_init(_context, opts) do + {[], Map.from_struct(opts)} + end + + @impl true + def handle_playing(_ctx, state) do + ExAws + + file_stream = + state.bucket + |> ExAws.S3.download_file(state.path, :memory, state.opts) + |> ExAws.stream!(state.aws_credentials) + + # Uncomment if you need to gunzip (and add dependency :stream_gzip) + # |> StreamGzip.gunzip() + + {[stream_format: {:output, %RemoteStream{type: :bytestream}}], file_stream} + end + + @impl true + def handle_demand(_pad, _size, _unit, _ctx, file_stream) do + case Enum.take(file_stream, 1) do + [] -> + {[end_of_stream: :output], file_stream} + + [payload] -> + file_stream = Stream.drop(file_stream, 1) + {[buffer: {:output, %Buffer{payload: payload}}] ++ [redemand: :output], file_stream} + end + end +end diff --git a/mix.exs b/mix.exs index 3d9466b..03db127 100644 --- a/mix.exs +++ b/mix.exs @@ -1,12 +1,12 @@ -defmodule Membrane.Template.Mixfile do +defmodule Membrane.AWS.Mixfile do use Mix.Project @version "0.1.0" - @github_url "https://github.com/membraneframework/membrane_template_plugin" + @github_url "https://github.com/jellyfish-dev/membrane_aws_plugin" def project do [ - app: :membrane_template_plugin, + app: :membrane_aws_plugin, version: @version, elixir: "~> 1.13", elixirc_paths: elixirc_paths(Mix.env()), @@ -15,13 +15,25 @@ defmodule Membrane.Template.Mixfile do dialyzer: dialyzer(), # hex - description: "Template Plugin for Membrane Framework", + description: "AWS Plugin for Membrane Framework", package: package(), # docs - name: "Membrane Template plugin", + name: "Membrane AWS plugin", source_url: @github_url, - docs: docs() + docs: docs(), + + # test coverage + test_coverage: [tool: ExCoveralls], + preferred_cli_env: [ + coveralls: :test, + "coveralls.detail": :test, + "coveralls.post": :test, + "coveralls.html": :test, + "coveralls.json": :test, + "test.cluster": :test, + "test.cluster.ci": :test + ] ] end @@ -37,9 +49,20 @@ defmodule Membrane.Template.Mixfile do defp deps do [ {:membrane_core, "~> 1.0"}, + + # aws deps + {:ex_aws, "~> 2.1"}, + {:ex_aws_s3, "~> 2.0"}, + {:sweet_xml, "~> 0.6"}, + + # Dialyzer and credo {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, - {:credo, ">= 0.0.0", only: :dev, runtime: false} + {:credo, ">= 0.0.0", only: :dev, runtime: false}, + + # Test dependency + {:bypass, "~> 2.1", only: :test}, + {:hackney, ">= 0.0.0", only: :test} ] end @@ -73,7 +96,7 @@ defmodule Membrane.Template.Mixfile do extras: ["README.md", "LICENSE"], formatters: ["html"], source_ref: "v#{@version}", - nest_modules_by_prefix: [Membrane.Template] + nest_modules_by_prefix: [Membrane.AWS] ] end end diff --git a/mix.lock b/mix.lock index 8d15140..edc08e7 100644 --- a/mix.lock +++ b/mix.lock @@ -1,21 +1,43 @@ %{ "bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, + "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, + "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, + "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, + "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, + "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"}, "dialyxir": {:hex, :dialyxir, "1.4.1", "a22ed1e7bd3a3e3f197b68d806ef66acb61ee8f57b3ac85fc5d57354c5482a93", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "84b795d6d7796297cca5a3118444b80c7d94f7ce247d49886e7c291e1ae49801"}, "earmark_parser": {:hex, :earmark_parser, "1.4.35", "437773ca9384edf69830e26e9e7b2e0d22d2596c4a6b17094a3b29f01ea65bb8", [:mix], [], "hexpm", "8652ba3cb85608d0d7aa2d21b45c6fad4ddc9a1f9a1f1b30ca3a246f0acc33f6"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, + "ex_aws": {:hex, :ex_aws, "2.5.1", "7418917974ea42e9e84b25e88b9f3d21a861d5f953ad453e212f48e593d8d39f", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1b95431f70c446fa1871f0eb9b183043c5a625f75f9948a42d25f43ae2eff12b"}, + "ex_aws_s3": {:hex, :ex_aws_s3, "2.5.3", "422468e5c3e1a4da5298e66c3468b465cfd354b842e512cb1f6fbbe4e2f5bdaf", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "4f09dd372cc386550e484808c5ac5027766c8d0cd8271ccc578b82ee6ef4f3b8"}, "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, + "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, + "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, + "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, + "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, + "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, + "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, + "plug_cowboy": {:hex, :plug_cowboy, "2.7.0", "3ae9369c60641084363b08fe90267cbdd316df57e3557ea522114b30b63256ea", [:mix], [{:cowboy, "~> 2.7.0 or ~> 2.8.0 or ~> 2.9.0 or ~> 2.10.0", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d85444fb8aa1f2fc62eabe83bbe387d81510d773886774ebdcb429b3da3c1a4a"}, + "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, + "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, + "sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } diff --git a/test/s3_source_test.exs b/test/s3_source_test.exs new file mode 100644 index 0000000..57770c3 --- /dev/null +++ b/test/s3_source_test.exs @@ -0,0 +1,124 @@ +defmodule Membrane.AWS.S3.SourceTest do + use ExUnit.Case, async: true + + import Membrane.Testing.Assertions + import Membrane.ChildrenSpec + import Support.BypassHelpers + + alias Membrane.AWS.S3.Source + alias Membrane.Testing.{Pipeline, Sink} + + describe "file pass through pipeline" do + setup [:start_bypass] + + test "whole file in one chunk", %{bypass: bypass} do + data = for i <- 0..10_000, do: <>, into: <<>> + + setup_multipart_download_backend( + bypass, + "bucket", + "test.txt", + data + ) + + assert pipeline = + Pipeline.start_link_supervised!( + spec: + child(:s3_source, %Source{ + bucket: "bucket", + path: "test.txt", + aws_credentials: exaws_config_for_bypass(bypass) + }) + |> child(:sink, Sink), + test_process: self() + ) + + assert_sink_buffer( + pipeline, + :sink, + %Membrane.Buffer{ + payload: ^data + } + ) + + assert_end_of_stream(pipeline, :sink) + end + + test "file splitted to 16-bytes chunks", %{bypass: bypass} do + chunk_size = 128 + + data = for i <- 0..10_000, do: <>, into: <<>> + + splitted_binary = + for <>, do: <> + + setup_multipart_download_backend( + bypass, + "bucket", + "test2.txt", + data + ) + + assert pipeline = + Pipeline.start_link_supervised!( + spec: + child(:s3_source, %Source{ + bucket: "bucket", + path: "test2.txt", + aws_credentials: exaws_config_for_bypass(bypass), + opts: [chunk_size: chunk_size] + }) + |> child(:sink, Sink), + test_process: self() + ) + + Enum.each(splitted_binary, fn expected_payload -> + assert_sink_buffer( + pipeline, + :sink, + %Membrane.Buffer{ + payload: ^expected_payload + } + ) + end) + + assert_end_of_stream(pipeline, :sink) + end + end + + defp setup_multipart_download_backend( + bypass, + bucket_name, + path, + file_body + ) do + request_path = "/#{bucket_name}/#{path}" + + Bypass.expect(bypass, fn conn -> + case conn do + %{method: "HEAD", request_path: ^request_path} -> + conn + |> Plug.Conn.put_resp_header("Content-Length", file_body |> byte_size |> to_string) + |> Plug.Conn.send_resp(200, "") + + %{method: "GET", request_path: ^request_path, req_headers: headers} -> + {_key, "bytes=" <> range} = Enum.find(headers, fn {key, _val} -> key == "range" end) + + [first, second | _] = String.split(range, "-") + + first = String.to_integer(first) + second = String.to_integer(second) + + # IO.inspect({first, second, second - first + 1}, label: :difference) + + <<_head::binary-size(first), payload::binary-size(second - first + 1), _rest::binary>> = + file_body + + # IO.inspect(byte_size(payload), label: :WTF_PAYLOAD) + + conn + |> Plug.Conn.send_resp(200, payload) + end + end) + end +end diff --git a/test/support/bypass_helpers.ex b/test/support/bypass_helpers.ex new file mode 100644 index 0000000..884d4aa --- /dev/null +++ b/test/support/bypass_helpers.ex @@ -0,0 +1,17 @@ +defmodule Support.BypassHelpers do + def start_bypass(_) do + bypass = Bypass.open() + [bypass: bypass] + end + + def exaws_config_for_bypass(bypass) do + ExAws.Config.new(:s3, + access_key_id: "AKIAIOSFODNN7EXAMPLE", + secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + host: "localhost", + port: bypass.port, + scheme: "http://", + region: "us-east-1" + ) + end +end From 812a386be2e16a390622f4c14560e6412e5420c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Wed, 28 Feb 2024 14:19:16 +0100 Subject: [PATCH 02/17] Modify tests --- mix.lock | 2 -- test/s3_source_test.exs | 8 +++++--- test/support/bypass_helpers.ex | 6 +++++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/mix.lock b/mix.lock index edc08e7..5a481dc 100644 --- a/mix.lock +++ b/mix.lock @@ -16,7 +16,6 @@ "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, - "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, @@ -26,7 +25,6 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, - "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, diff --git a/test/s3_source_test.exs b/test/s3_source_test.exs index 57770c3..7b4ada6 100644 --- a/test/s3_source_test.exs +++ b/test/s3_source_test.exs @@ -45,9 +45,9 @@ defmodule Membrane.AWS.S3.SourceTest do end test "file splitted to 16-bytes chunks", %{bypass: bypass} do - chunk_size = 128 + chunk_size = 16 - data = for i <- 0..10_000, do: <>, into: <<>> + data = for i <- 0..(16 ** 2), do: <>, into: <<>> splitted_binary = for <>, do: <> @@ -102,7 +102,9 @@ defmodule Membrane.AWS.S3.SourceTest do |> Plug.Conn.send_resp(200, "") %{method: "GET", request_path: ^request_path, req_headers: headers} -> - {_key, "bytes=" <> range} = Enum.find(headers, fn {key, _val} -> key == "range" end) + headers = Map.new(headers) + + "bytes=" <> range = Map.fetch!(headers, "range") [first, second | _] = String.split(range, "-") diff --git a/test/support/bypass_helpers.ex b/test/support/bypass_helpers.ex index 884d4aa..9e17218 100644 --- a/test/support/bypass_helpers.ex +++ b/test/support/bypass_helpers.ex @@ -1,9 +1,13 @@ defmodule Support.BypassHelpers do - def start_bypass(_) do + @moduledoc false + + @spec start_bypass(Keyword.t()) :: [bypass: Bypass.t()] + def start_bypass(_opts) do bypass = Bypass.open() [bypass: bypass] end + @spec exaws_config_for_bypass(Bypass.t()) :: ExAws.Config.t() def exaws_config_for_bypass(bypass) do ExAws.Config.new(:s3, access_key_id: "AKIAIOSFODNN7EXAMPLE", From f3cf859b423f686d9fa1edf4bcd75c90b782f286 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 29 Feb 2024 08:46:38 +0100 Subject: [PATCH 03/17] Working tests --- lib/s3/s3_source.ex | 3 -- mix.exs | 5 +- mix.lock | 2 + test/s3_source_test.exs | 85 ++++++++++++++++------------------ test/support/bypass_helpers.ex | 21 --------- test/test_helper.exs | 3 ++ 6 files changed, 47 insertions(+), 72 deletions(-) delete mode 100644 test/support/bypass_helpers.ex diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index 0a47930..fc60af6 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -45,9 +45,6 @@ defmodule Membrane.AWS.S3.Source do |> ExAws.S3.download_file(state.path, :memory, state.opts) |> ExAws.stream!(state.aws_credentials) - # Uncomment if you need to gunzip (and add dependency :stream_gzip) - # |> StreamGzip.gunzip() - {[stream_format: {:output, %RemoteStream{type: :bytestream}}], file_stream} end diff --git a/mix.exs b/mix.exs index 03db127..2e208e7 100644 --- a/mix.exs +++ b/mix.exs @@ -39,7 +39,7 @@ defmodule Membrane.AWS.Mixfile do def application do [ - extra_applications: [] + extra_applications: [:logger] ] end @@ -62,7 +62,8 @@ defmodule Membrane.AWS.Mixfile do # Test dependency {:bypass, "~> 2.1", only: :test}, - {:hackney, ">= 0.0.0", only: :test} + {:httpoison, "~> 2.0", only: :test}, + {:mox, "~> 1.0", only: :test} ] end diff --git a/mix.lock b/mix.lock index 5a481dc..edc08e7 100644 --- a/mix.lock +++ b/mix.lock @@ -16,6 +16,7 @@ "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, + "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, @@ -25,6 +26,7 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, diff --git a/test/s3_source_test.exs b/test/s3_source_test.exs index 7b4ada6..b14f8c6 100644 --- a/test/s3_source_test.exs +++ b/test/s3_source_test.exs @@ -1,33 +1,35 @@ defmodule Membrane.AWS.S3.SourceTest do - use ExUnit.Case, async: true + use ExUnit.Case import Membrane.Testing.Assertions import Membrane.ChildrenSpec - import Support.BypassHelpers + import Mox alias Membrane.AWS.S3.Source alias Membrane.Testing.{Pipeline, Sink} + @bucket_name "bucket" + describe "file pass through pipeline" do - setup [:start_bypass] + setup :verify_on_exit! + setup :set_mox_from_context - test "whole file in one chunk", %{bypass: bypass} do + test "whole file in one chunk" do data = for i <- 0..10_000, do: <>, into: <<>> + file_name = "test.txt" - setup_multipart_download_backend( - bypass, - "bucket", - "test.txt", - data - ) + setup_multipart_download_backend(@bucket_name, file_name, data) assert pipeline = Pipeline.start_link_supervised!( spec: child(:s3_source, %Source{ - bucket: "bucket", - path: "test.txt", - aws_credentials: exaws_config_for_bypass(bypass) + bucket: @bucket_name, + path: file_name, + aws_credentials: [ + access_key_id: "dummy", + secret_access_key: "dummy" + ] }) |> child(:sink, Sink), test_process: self() @@ -44,29 +46,28 @@ defmodule Membrane.AWS.S3.SourceTest do assert_end_of_stream(pipeline, :sink) end - test "file splitted to 16-bytes chunks", %{bypass: bypass} do + test "file splitted to 16-bytes chunks" do chunk_size = 16 + file_name = "test2.txt" data = for i <- 0..(16 ** 2), do: <>, into: <<>> splitted_binary = for <>, do: <> - setup_multipart_download_backend( - bypass, - "bucket", - "test2.txt", - data - ) + setup_multipart_download_backend(@bucket_name, file_name, data) assert pipeline = Pipeline.start_link_supervised!( spec: child(:s3_source, %Source{ - bucket: "bucket", - path: "test2.txt", - aws_credentials: exaws_config_for_bypass(bypass), - opts: [chunk_size: chunk_size] + bucket: @bucket_name, + path: file_name, + opts: [chunk_size: chunk_size], + aws_credentials: [ + access_key_id: "dummy", + secret_access_key: "dummy" + ] }) |> child(:sink, Sink), test_process: self() @@ -87,40 +88,32 @@ defmodule Membrane.AWS.S3.SourceTest do end defp setup_multipart_download_backend( - bypass, bucket_name, path, file_body ) do - request_path = "/#{bucket_name}/#{path}" - - Bypass.expect(bypass, fn conn -> - case conn do - %{method: "HEAD", request_path: ^request_path} -> - conn - |> Plug.Conn.put_resp_header("Content-Length", file_body |> byte_size |> to_string) - |> Plug.Conn.send_resp(200, "") + request_path = "https://s3.amazonaws.com/#{bucket_name}/#{path}" - %{method: "GET", request_path: ^request_path, req_headers: headers} -> - headers = Map.new(headers) + stub(ExAws.Request.HttpMock, :request, fn + :head, ^request_path, _req_body, _headers, _http_opts -> + content_length = file_body |> byte_size |> to_string - "bytes=" <> range = Map.fetch!(headers, "range") + {:ok, %{status_code: 200, headers: %{"Content-Length" => content_length}}} - [first, second | _] = String.split(range, "-") + :get, ^request_path, _req_body, headers, _http_opts -> + headers = Map.new(headers) - first = String.to_integer(first) - second = String.to_integer(second) + "bytes=" <> range = Map.fetch!(headers, "range") - # IO.inspect({first, second, second - first + 1}, label: :difference) + [first, second | _] = String.split(range, "-") - <<_head::binary-size(first), payload::binary-size(second - first + 1), _rest::binary>> = - file_body + first = String.to_integer(first) + second = String.to_integer(second) - # IO.inspect(byte_size(payload), label: :WTF_PAYLOAD) + <<_head::binary-size(first), payload::binary-size(second - first + 1), _rest::binary>> = + file_body - conn - |> Plug.Conn.send_resp(200, payload) - end + {:ok, %{status_code: 200, body: payload}} end) end end diff --git a/test/support/bypass_helpers.ex b/test/support/bypass_helpers.ex deleted file mode 100644 index 9e17218..0000000 --- a/test/support/bypass_helpers.ex +++ /dev/null @@ -1,21 +0,0 @@ -defmodule Support.BypassHelpers do - @moduledoc false - - @spec start_bypass(Keyword.t()) :: [bypass: Bypass.t()] - def start_bypass(_opts) do - bypass = Bypass.open() - [bypass: bypass] - end - - @spec exaws_config_for_bypass(Bypass.t()) :: ExAws.Config.t() - def exaws_config_for_bypass(bypass) do - ExAws.Config.new(:s3, - access_key_id: "AKIAIOSFODNN7EXAMPLE", - secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", - host: "localhost", - port: bypass.port, - scheme: "http://", - region: "us-east-1" - ) - end -end diff --git a/test/test_helper.exs b/test/test_helper.exs index 6a0af57..e0d3c60 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1,4 @@ +Mox.defmock(ExAws.Request.HttpMock, for: ExAws.Request.HttpClient) +Application.put_env(:ex_aws, :http_client, ExAws.Request.HttpMock) + ExUnit.start(capture_log: true) From db7a7c8afa57b79eb895d145259a1a11227ca445 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 29 Feb 2024 13:08:29 +0100 Subject: [PATCH 04/17] Rename S3.Source option to aws_config --- lib/s3/s3_source.ex | 4 ++-- test/s3_source_test.exs | 10 ++++++---- test/test_helper.exs | 1 - 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index fc60af6..31a2c2a 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -6,7 +6,7 @@ defmodule Membrane.AWS.S3.Source do alias Membrane.{Buffer, RemoteStream} - def_options aws_credentials: [ + def_options aws_config: [ spec: Keyword.t(), description: "Credentials to AWS", default: [] @@ -43,7 +43,7 @@ defmodule Membrane.AWS.S3.Source do file_stream = state.bucket |> ExAws.S3.download_file(state.path, :memory, state.opts) - |> ExAws.stream!(state.aws_credentials) + |> ExAws.stream!(state.aws_config) {[stream_format: {:output, %RemoteStream{type: :bytestream}}], file_stream} end diff --git a/test/s3_source_test.exs b/test/s3_source_test.exs index b14f8c6..a6742a8 100644 --- a/test/s3_source_test.exs +++ b/test/s3_source_test.exs @@ -26,9 +26,10 @@ defmodule Membrane.AWS.S3.SourceTest do child(:s3_source, %Source{ bucket: @bucket_name, path: file_name, - aws_credentials: [ + aws_config: [ access_key_id: "dummy", - secret_access_key: "dummy" + secret_access_key: "dummy", + http_client: ExAws.Request.HttpMock ] }) |> child(:sink, Sink), @@ -64,9 +65,10 @@ defmodule Membrane.AWS.S3.SourceTest do bucket: @bucket_name, path: file_name, opts: [chunk_size: chunk_size], - aws_credentials: [ + aws_config: [ access_key_id: "dummy", - secret_access_key: "dummy" + secret_access_key: "dummy", + http_client: ExAws.Request.HttpMock ] }) |> child(:sink, Sink), diff --git a/test/test_helper.exs b/test/test_helper.exs index e0d3c60..dadc2c8 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,4 +1,3 @@ Mox.defmock(ExAws.Request.HttpMock, for: ExAws.Request.HttpClient) -Application.put_env(:ex_aws, :http_client, ExAws.Request.HttpMock) ExUnit.start(capture_log: true) From 8ac1a2acce24e97c2cb3040246919b3e1828944a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 29 Feb 2024 14:23:32 +0100 Subject: [PATCH 05/17] Add example of usage --- .gitignore | 2 ++ example/source_example.exs | 48 ++++++++++++++++++++++++++++++++++++++ mix.exs | 2 +- mix.lock | 9 ------- 4 files changed, 51 insertions(+), 10 deletions(-) create mode 100644 example/source_example.exs diff --git a/.gitignore b/.gitignore index 324f6fa..ae03bf3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ + +.env compile_commands.json .gdb_history bundlex.sh diff --git a/example/source_example.exs b/example/source_example.exs new file mode 100644 index 0000000..b4d018e --- /dev/null +++ b/example/source_example.exs @@ -0,0 +1,48 @@ +Mix.install([ + {:membrane_aws_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()}, + :membrane_core, + :membrane_file_plugin, + :hackney +]) + +# In this example, the pipeline will download file from S3 and save it to a local file + +defmodule Example do + use Membrane.Pipeline + + @impl true + def handle_init(_ctx, options) do + structure = [ + child(:s3_source, %Membrane.AWS.S3.Source{ + bucket: System.fetch_env!("BUCKET"), + path: System.fetch_env!("FILE_PATH"), + aws_config: [ + access_key_id: System.fetch_env!("ACCESS_KEY_ID"), + secret_access_key: System.fetch_env!("SECRET_ACCESS_KEY"), + region: System.fetch_env!("REGION") + ] + }) + |> child(:file_sink, %Membrane.File.Sink{location: System.fetch_env!("OUTPUT_FILE")}) + ] + + {[spec: structure], %{}} + end + + # Next two functions are only a logic for terminating a pipeline when it's done, you don't need to worry + @impl true + def handle_element_end_of_stream(:file_sink, _pad, _ctx, state) do + {[terminate: :normal], state} + end + + def handle_element_end_of_stream(_element, _pad, _ctx, state) do + {[], state} + end +end + +{:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(Example) +monitor_ref = Process.monitor(pipeline) + +receive do + {:DOWN, ^monitor_ref, :process, _pid, _reason} -> + :ok +end diff --git a/mix.exs b/mix.exs index 2e208e7..a7ad6f6 100644 --- a/mix.exs +++ b/mix.exs @@ -62,7 +62,7 @@ defmodule Membrane.AWS.Mixfile do # Test dependency {:bypass, "~> 2.1", only: :test}, - {:httpoison, "~> 2.0", only: :test}, + # {:httpoison, "~> 2.0", only: :test}, {:mox, "~> 1.0", only: :test} ] end diff --git a/mix.lock b/mix.lock index edc08e7..d45d680 100644 --- a/mix.lock +++ b/mix.lock @@ -2,7 +2,6 @@ "bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, - "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, @@ -15,29 +14,21 @@ "ex_aws_s3": {:hex, :ex_aws_s3, "2.5.3", "422468e5c3e1a4da5298e66c3468b465cfd354b842e512cb1f6fbbe4e2f5bdaf", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "4f09dd372cc386550e484808c5ac5027766c8d0cd8271ccc578b82ee6ef4f3b8"}, "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, - "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, - "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, - "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"}, - "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, - "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, - "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, "plug_cowboy": {:hex, :plug_cowboy, "2.7.0", "3ae9369c60641084363b08fe90267cbdd316df57e3557ea522114b30b63256ea", [:mix], [{:cowboy, "~> 2.7.0 or ~> 2.8.0 or ~> 2.9.0 or ~> 2.10.0", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d85444fb8aa1f2fc62eabe83bbe387d81510d773886774ebdcb429b3da3c1a4a"}, "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, - "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } From 737f513e0e08721392f972c8ce4be2baa486f63d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= <56085570+Rados13@users.noreply.github.com> Date: Thu, 29 Feb 2024 14:23:50 +0100 Subject: [PATCH 06/17] Update lib/s3/s3_source.ex Co-authored-by: Karol Konkol <56369082+Karolk99@users.noreply.github.com> --- lib/s3/s3_source.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index 31a2c2a..e977bbd 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -38,7 +38,6 @@ defmodule Membrane.AWS.S3.Source do @impl true def handle_playing(_ctx, state) do - ExAws file_stream = state.bucket From 16e59feb81cf8ec7dd70ceb7ffa2bd87c2221d84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= <56085570+Rados13@users.noreply.github.com> Date: Thu, 29 Feb 2024 14:23:56 +0100 Subject: [PATCH 07/17] Update mix.exs Co-authored-by: Karol Konkol <56369082+Karolk99@users.noreply.github.com> --- mix.exs | 1 - 1 file changed, 1 deletion(-) diff --git a/mix.exs b/mix.exs index a7ad6f6..43db98d 100644 --- a/mix.exs +++ b/mix.exs @@ -61,7 +61,6 @@ defmodule Membrane.AWS.Mixfile do {:credo, ">= 0.0.0", only: :dev, runtime: false}, # Test dependency - {:bypass, "~> 2.1", only: :test}, # {:httpoison, "~> 2.0", only: :test}, {:mox, "~> 1.0", only: :test} ] From f72718c759306e8527b5139727458ae0098f2203 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 29 Feb 2024 14:26:14 +0100 Subject: [PATCH 08/17] Remove unused deps --- lib/s3/s3_source.ex | 1 - mix.lock | 8 -------- 2 files changed, 9 deletions(-) diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index e977bbd..f1857bc 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -38,7 +38,6 @@ defmodule Membrane.AWS.S3.Source do @impl true def handle_playing(_ctx, state) do - file_stream = state.bucket |> ExAws.S3.download_file(state.path, :memory, state.opts) diff --git a/mix.lock b/mix.lock index d45d680..e93a53c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,11 +1,7 @@ %{ "bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, - "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, - "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, - "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, - "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"}, "dialyxir": {:hex, :dialyxir, "1.4.1", "a22ed1e7bd3a3e3f197b68d806ef66acb61ee8f57b3ac85fc5d57354c5482a93", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "84b795d6d7796297cca5a3118444b80c7d94f7ce247d49886e7c291e1ae49801"}, "earmark_parser": {:hex, :earmark_parser, "1.4.35", "437773ca9384edf69830e26e9e7b2e0d22d2596c4a6b17094a3b29f01ea65bb8", [:mix], [], "hexpm", "8652ba3cb85608d0d7aa2d21b45c6fad4ddc9a1f9a1f1b30ca3a246f0acc33f6"}, @@ -23,11 +19,7 @@ "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, - "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, - "plug_cowboy": {:hex, :plug_cowboy, "2.7.0", "3ae9369c60641084363b08fe90267cbdd316df57e3557ea522114b30b63256ea", [:mix], [{:cowboy, "~> 2.7.0 or ~> 2.8.0 or ~> 2.9.0 or ~> 2.10.0", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d85444fb8aa1f2fc62eabe83bbe387d81510d773886774ebdcb429b3da3c1a4a"}, - "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, - "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, "sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, From e766a5094df39bb76f060ddbc82ecc9bf0570a13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 29 Feb 2024 14:31:06 +0100 Subject: [PATCH 09/17] Add reference to ExAWS.Config in description of aws_config option --- lib/s3/s3_source.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index f1857bc..e00511e 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -8,7 +8,9 @@ defmodule Membrane.AWS.S3.Source do def_options aws_config: [ spec: Keyword.t(), - description: "Credentials to AWS", + description: """ + Config to ExAWS. For more information refer to [`ExAws.Config`](https://github.com/ex-aws/ex_aws/blob/main/lib/ex_aws/config.ex). + """, default: [] ], bucket: [ From e86aba2302774c805f21f4f7aa306db127501080 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Fri, 1 Mar 2024 08:19:49 +0100 Subject: [PATCH 10/17] Add usage example in README --- README.md | 9 +++++++-- example/source_example.exs | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 81a6109..9a8ff78 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,14 @@ def deps do end ``` -## Usage +## Usage example -TODO +The `example/` folder contains an example usage of `Membrane.AWS.S3.Source`. + +This demo downloads a file from S3 and saves it locally. It requires that you set these environment variables: `BUCKET`, `FILE_PATH`, `ACCESS_KEY_ID`, `SECRET_ACCESS_KEY`, `REGION`. +```bash +$ elixir examples/receive.exs +``` ## Copyright and License diff --git a/example/source_example.exs b/example/source_example.exs index b4d018e..705987e 100644 --- a/example/source_example.exs +++ b/example/source_example.exs @@ -22,7 +22,7 @@ defmodule Example do region: System.fetch_env!("REGION") ] }) - |> child(:file_sink, %Membrane.File.Sink{location: System.fetch_env!("OUTPUT_FILE")}) + |> child(:file_sink, %Membrane.File.Sink{location: "source.example"}) ] {[spec: structure], %{}} From 9683d3133ff7e9f388e41c2aef6b95311bab813b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Fri, 1 Mar 2024 12:20:36 +0100 Subject: [PATCH 11/17] Modify how chunks are downloaded --- README.md | 2 +- lib/s3/s3_source.ex | 75 +++++++++++++++++++++++++++++++++-------- test/s3_source_test.exs | 13 +++---- 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 9a8ff78..4ac51d0 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ The `example/` folder contains an example usage of `Membrane.AWS.S3.Source`. This demo downloads a file from S3 and saves it locally. It requires that you set these environment variables: `BUCKET`, `FILE_PATH`, `ACCESS_KEY_ID`, `SECRET_ACCESS_KEY`, `REGION`. ```bash -$ elixir examples/receive.exs +$ elixir examples/source_example.exs ``` ## Copyright and License diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index e00511e..3e85414 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -29,34 +29,81 @@ defmodule Membrane.AWS.S3.Source do ], description: "File download opts", default: [] + ], + cached_chunks_number: [ + spec: non_neg_integer(), + description: "Number of chunks cached before demand", + default: 10 ] def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :manual @impl true def handle_init(_context, opts) do - {[], Map.from_struct(opts)} + state = %{ + aws_config: ExAws.Config.new(:s3, opts.aws_config), + bucket: opts.bucket, + path: opts.path, + opts: opts.opts, + chunks_stream: nil, + cached_chunks_number: opts.cached_chunks_number, + chunks: :queue.new() + } + + chunks_stream = ExAws.S3.Download.build_chunk_stream(state, state.aws_config) + + {[], %{state | chunks_stream: chunks_stream}} end @impl true def handle_playing(_ctx, state) do - file_stream = - state.bucket - |> ExAws.S3.download_file(state.path, :memory, state.opts) - |> ExAws.stream!(state.aws_config) + {take_chunks, chunks_stream} = state.chunks_stream |> Enum.split(state.cached_chunks_number) - {[stream_format: {:output, %RemoteStream{type: :bytestream}}], file_stream} + downloaded_chunks = + take_chunks + |> Task.async_stream( + &download_chunk(state, &1), + max_concurrency: Keyword.get(state.opts, :max_concurrency, 8), + timeout: Keyword.get(state.opts, :timeout, 60_000) + ) + |> Enum.map(fn {:ok, chunk} -> chunk end) + |> Enum.reduce(state.chunks, fn chunk, acc -> :queue.in(chunk, acc) end) + + {[stream_format: {:output, %RemoteStream{type: :bytestream}}], + %{state | chunks_stream: chunks_stream, chunks: downloaded_chunks}} end @impl true - def handle_demand(_pad, _size, _unit, _ctx, file_stream) do - case Enum.take(file_stream, 1) do - [] -> - {[end_of_stream: :output], file_stream} - - [payload] -> - file_stream = Stream.drop(file_stream, 1) - {[buffer: {:output, %Buffer{payload: payload}}] ++ [redemand: :output], file_stream} + def handle_demand(_pad, _size, _unit, _ctx, state) do + {chunks, chunks_stream} = update_chunks(state) + + case :queue.out(chunks) do + {:empty, _chunks} -> + {[end_of_stream: :output], state} + + {{:value, payload}, chunks} -> + {[buffer: {:output, %Buffer{payload: payload}}] ++ [redemand: :output], + %{state | chunks_stream: chunks_stream, chunks: chunks}} end end + + defp update_chunks(state) do + {boundaries, chunks_stream} = Enum.split(state.chunks_stream, 1) + + chunks = + if boundaries != [] do + [boundaries] = boundaries + chunk = download_chunk(state, boundaries) + :queue.in(chunk, state.chunks) + else + state.chunks + end + + {chunks, chunks_stream} + end + + defp download_chunk(state, boundaries) do + {_start_byte, chunk} = ExAws.S3.Download.get_chunk(state, boundaries, state.aws_config) + chunk + end end diff --git a/test/s3_source_test.exs b/test/s3_source_test.exs index a6742a8..4bfd3cb 100644 --- a/test/s3_source_test.exs +++ b/test/s3_source_test.exs @@ -18,7 +18,7 @@ defmodule Membrane.AWS.S3.SourceTest do data = for i <- 0..10_000, do: <>, into: <<>> file_name = "test.txt" - setup_multipart_download_backend(@bucket_name, file_name, data) + setup_multipart_download_backend(2, @bucket_name, file_name, data) assert pipeline = Pipeline.start_link_supervised!( @@ -30,7 +30,8 @@ defmodule Membrane.AWS.S3.SourceTest do access_key_id: "dummy", secret_access_key: "dummy", http_client: ExAws.Request.HttpMock - ] + ], + opts: [max_concurrency: 1] }) |> child(:sink, Sink), test_process: self() @@ -56,7 +57,7 @@ defmodule Membrane.AWS.S3.SourceTest do splitted_binary = for <>, do: <> - setup_multipart_download_backend(@bucket_name, file_name, data) + setup_multipart_download_backend(18, @bucket_name, file_name, data) assert pipeline = Pipeline.start_link_supervised!( @@ -64,7 +65,7 @@ defmodule Membrane.AWS.S3.SourceTest do child(:s3_source, %Source{ bucket: @bucket_name, path: file_name, - opts: [chunk_size: chunk_size], + opts: [chunk_size: chunk_size, max_concurrency: 1], aws_config: [ access_key_id: "dummy", secret_access_key: "dummy", @@ -90,13 +91,14 @@ defmodule Membrane.AWS.S3.SourceTest do end defp setup_multipart_download_backend( + amount_of_request, bucket_name, path, file_body ) do request_path = "https://s3.amazonaws.com/#{bucket_name}/#{path}" - stub(ExAws.Request.HttpMock, :request, fn + expect(ExAws.Request.HttpMock, :request, amount_of_request, fn :head, ^request_path, _req_body, _headers, _http_opts -> content_length = file_body |> byte_size |> to_string @@ -104,7 +106,6 @@ defmodule Membrane.AWS.S3.SourceTest do :get, ^request_path, _req_body, headers, _http_opts -> headers = Map.new(headers) - "bytes=" <> range = Map.fetch!(headers, "range") [first, second | _] = String.split(range, "-") From 4074abb1161715a0f050f68dfa8b3a521b403e67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Fri, 1 Mar 2024 12:26:18 +0100 Subject: [PATCH 12/17] Fix dialyzer issue --- lib/s3/s3_source.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index 3e85414..ab992b8 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -45,19 +45,19 @@ defmodule Membrane.AWS.S3.Source do bucket: opts.bucket, path: opts.path, opts: opts.opts, - chunks_stream: nil, cached_chunks_number: opts.cached_chunks_number, + chunks_stream: nil, chunks: :queue.new() } - chunks_stream = ExAws.S3.Download.build_chunk_stream(state, state.aws_config) - - {[], %{state | chunks_stream: chunks_stream}} + {[], state} end @impl true def handle_playing(_ctx, state) do - {take_chunks, chunks_stream} = state.chunks_stream |> Enum.split(state.cached_chunks_number) + chunks_stream = ExAws.S3.Download.build_chunk_stream(state, state.aws_config) + + {take_chunks, chunks_stream} = chunks_stream |> Enum.split(state.cached_chunks_number) downloaded_chunks = take_chunks From 2db55df3dacbba16b4cc424cdfa076c8c0e2c99a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Fri, 1 Mar 2024 12:33:14 +0100 Subject: [PATCH 13/17] Remove unnecessary options --- test/s3_source_test.exs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/s3_source_test.exs b/test/s3_source_test.exs index 4bfd3cb..fcef412 100644 --- a/test/s3_source_test.exs +++ b/test/s3_source_test.exs @@ -30,8 +30,7 @@ defmodule Membrane.AWS.S3.SourceTest do access_key_id: "dummy", secret_access_key: "dummy", http_client: ExAws.Request.HttpMock - ], - opts: [max_concurrency: 1] + ] }) |> child(:sink, Sink), test_process: self() @@ -65,7 +64,7 @@ defmodule Membrane.AWS.S3.SourceTest do child(:s3_source, %Source{ bucket: @bucket_name, path: file_name, - opts: [chunk_size: chunk_size, max_concurrency: 1], + opts: [chunk_size: chunk_size], aws_config: [ access_key_id: "dummy", secret_access_key: "dummy", From ab3b755a362403f11e556c6e8826e6765c2465f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= <56085570+Rados13@users.noreply.github.com> Date: Mon, 4 Mar 2024 10:05:24 +0100 Subject: [PATCH 14/17] Update mix.exs Co-authored-by: Karol Konkol <56369082+Karolk99@users.noreply.github.com> --- mix.exs | 1 - 1 file changed, 1 deletion(-) diff --git a/mix.exs b/mix.exs index 43db98d..9616ac5 100644 --- a/mix.exs +++ b/mix.exs @@ -61,7 +61,6 @@ defmodule Membrane.AWS.Mixfile do {:credo, ">= 0.0.0", only: :dev, runtime: false}, # Test dependency - # {:httpoison, "~> 2.0", only: :test}, {:mox, "~> 1.0", only: :test} ] end From 1906e42d4ad2ca96a055fda4deeb68b075e7f1f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= <56085570+Rados13@users.noreply.github.com> Date: Mon, 4 Mar 2024 10:05:38 +0100 Subject: [PATCH 15/17] Update lib/s3/s3_source.ex Co-authored-by: Karol Konkol <56369082+Karolk99@users.noreply.github.com> --- lib/s3/s3_source.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index ab992b8..cbdcc1d 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -57,7 +57,7 @@ defmodule Membrane.AWS.S3.Source do def handle_playing(_ctx, state) do chunks_stream = ExAws.S3.Download.build_chunk_stream(state, state.aws_config) - {take_chunks, chunks_stream} = chunks_stream |> Enum.split(state.cached_chunks_number) + {take_chunks, chunks_stream} = Enum.split(chunks_stream, state.cached_chunks_number) downloaded_chunks = take_chunks From 4cdbae048850d68671cec9c32b34fcf87778d82d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= <56085570+Rados13@users.noreply.github.com> Date: Mon, 4 Mar 2024 10:06:43 +0100 Subject: [PATCH 16/17] Update lib/s3/s3_source.ex Co-authored-by: Karol Konkol <56369082+Karolk99@users.noreply.github.com> --- lib/s3/s3_source.ex | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index cbdcc1d..3d10713 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -91,12 +91,12 @@ defmodule Membrane.AWS.S3.Source do {boundaries, chunks_stream} = Enum.split(state.chunks_stream, 1) chunks = - if boundaries != [] do - [boundaries] = boundaries - chunk = download_chunk(state, boundaries) - :queue.in(chunk, state.chunks) - else - state.chunks + case boundaries do + [boundaries] -> + chunk = download_chunk(state, boundaries) + :queue.in(chunk, state.chunks) + [] -> + state.chunks end {chunks, chunks_stream} From 8ef09dc6db129870539001096d91a448acf1b1ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Mon, 4 Mar 2024 10:12:09 +0100 Subject: [PATCH 17/17] Fixes after review --- lib/s3/s3_source.ex | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/s3/s3_source.ex b/lib/s3/s3_source.ex index 3d10713..c064586 100644 --- a/lib/s3/s3_source.ex +++ b/lib/s3/s3_source.ex @@ -6,6 +6,9 @@ defmodule Membrane.AWS.S3.Source do alias Membrane.{Buffer, RemoteStream} + @default_max_concurrency 8 + @task_timeout_milliseconds 60_000 + def_options aws_config: [ spec: Keyword.t(), description: """ @@ -40,15 +43,12 @@ defmodule Membrane.AWS.S3.Source do @impl true def handle_init(_context, opts) do - state = %{ - aws_config: ExAws.Config.new(:s3, opts.aws_config), - bucket: opts.bucket, - path: opts.path, - opts: opts.opts, - cached_chunks_number: opts.cached_chunks_number, - chunks_stream: nil, - chunks: :queue.new() - } + state = + Map.merge(opts, %{ + aws_config: ExAws.Config.new(:s3, opts.aws_config), + chunks_stream: nil, + chunks: :queue.new() + }) {[], state} end @@ -63,8 +63,8 @@ defmodule Membrane.AWS.S3.Source do take_chunks |> Task.async_stream( &download_chunk(state, &1), - max_concurrency: Keyword.get(state.opts, :max_concurrency, 8), - timeout: Keyword.get(state.opts, :timeout, 60_000) + max_concurrency: state.opts[:max_concurrency] || @default_max_concurrency, + timeout: state.opts[:timeout] || @task_timeout_milliseconds ) |> Enum.map(fn {:ok, chunk} -> chunk end) |> Enum.reduce(state.chunks, fn chunk, acc -> :queue.in(chunk, acc) end) @@ -93,10 +93,11 @@ defmodule Membrane.AWS.S3.Source do chunks = case boundaries do [boundaries] -> - chunk = download_chunk(state, boundaries) - :queue.in(chunk, state.chunks) - [] -> - state.chunks + chunk = download_chunk(state, boundaries) + :queue.in(chunk, state.chunks) + + [] -> + state.chunks end {chunks, chunks_stream}