Notes on streaming downloads with progress in Elixir

RMAG news

Context

We want the server to download data, ie trigger an HTTP get request to some endpoint. The response should be a stream, and we want to append the chunks into a file. We also want to display the download progress.

We will use the Req library, a superset of Finch, which is itself a superset of Mint.

A few reasons for Req:

it enable body response streaming,
it seems be able to reconnect when the connection fails or timeouts (not tested here).
it takes care of redirections.

[Front-end note]: an HTTP request made from the browser uses the fetch API which can consume a fetch as a stream. To get a download progress, you still need to use XMLHttpRequest. This is the reason why Liveview used it in the uploader snippet.

Code

The code below is “livebookable” 😁 or can be run in an iex session.

========================================================

HTTP streaming with progress using Finch and Req

Mix.install([
{:finch, “~> 0.18.0”},
{:req, “~> 0.4.14”}
])

Finch.start_link(name: ExStream.Finch)

Test endpoints

We use two endpoints for testing. The last one includes a redirection.

vid = “https://sample-videos.com/video321/mp4/720/big_buck_bunny_720p_1mb.mp4”
img = “https://source.unsplash.com/QT-l619id6w”

Stream & progress with Req

We start with the Req library.

We start with a download with streams into a file. The code is pretty compact: we pass a callback in the :into option. This callback returns a File.stream to collect the chunks.

defmodule ReqWriteStream do
def download(url, file_path) do
Req.get!(url, raw: true, into: File.stream!(file_path, [:write]))
end
end

We test it:

ReqWriteStream.download(img, “image2.jpg”)

We now write a module that displays the progress. We grab the “content-length” with a HEAD request. The body is again streamed via a callback declared with the :into option. This callback writes the chunks into a file, and stores the progress state into the :private key of the struct %Req.Response{}.

defmodule ReqProgressStream do
def download(url, file_path) do
[size] = Map.get(Req.head!(url: url).headers, “content-length”, [“0”])
size = String.to_integer(size)
file_pid = File.open!(file_path, [:write, :binary])

func = fn {:data, data}, {req, res} ->
IO.binwrite(file_pid, data)
chunk_size = byte_size(data)
res = Req.Response.update_private(res, :progress, chunk_size, &(&1 + chunk_size))

if size>0, do:
{Req.Response.get_private(res, :progress) * 100 / size, chunk_size, size} |> dbg()

{:cont, {req, res}}
end

Req.get!(url: url, raw: true, into: func)

File.close(file_pid)
end
end

We test concurrent HTTP calls with Task.async_stream because we use the same function with different arguments.

Task.async_stream(
[[vid, “video.mp4”], [img, “image.jpg”]],
&apply(ReqProgressStream, :download, &1),
timeout: :infinity
)
|> Stream.run()

Stream & progress with Finch

We start with the Finch library. The code below is adapted from the example coming with stream_while.

We expose a function that take an URL and a path. As such, it may be fragile to connection errors.

defmodule FinchStream do
def download(url, file_path) do
IO.puts(“Starting to process #{inspect(file_path)}………..”)

# Open a file to which binary chunks will be appended to.
# this process is reset in case of redirection
file_pid = File.open!(file_path, [:write, :binary])

unless is_pid(file_pid), do: raise(“File creation problem on disk”)

# the HTTP stream request
Finch.build(:get, url)
|> Finch.stream_while(ExStream.Finch, nil, fn
# we put the status in the “acc” to handle redirections
{:status, status}, _acc ->
{:cont, status}

# – when we receive 302, we put the “location” header in the “acc”
# – when we receive a 200, we put the “content-length” and the file name in the “acc”,
{:headers, headers}, acc ->
handle_headers(headers, acc)

# when we receive the “location” tuple, we recurse
# otherwise, we write the chunk into the file and print out the current progress.
{:data, data}, acc ->
handle_data(data, acc, file_path, file_pid)
end)

case File.close(file_pid) do
:ok ->
{:halt, {file_path, :done}}

{:error, _reason} ->
{:halt, :error}
end
end

def handle_headers(headers, status) when status in [301, 302, 303, 307, 308] do
IO.puts(“REDIR: #{status})

{:cont, Enum.find(headers, &(elem(&1, 0) == “location”))}
end

def handle_headers(headers, 200) do
{“content-length”, size} =
Enum.find(headers, &(elem(&1, 0) == “content-length”))

case size do
nil ->
{:cont, {0, 0}}

size ->
{:cont, {0, String.to_integer(size)}}
end
end

def handle_headers(_, status) do
dbg(status)
{:halt, :bad_status}
end

def handle_data(_data, {“location”, location}, file_path, file_pid) do
if Process.alive?(file_pid), do:
:ok = File.close(file_pid)

# recursion
download(location, file_path)
end

def handle_data(data, {processed, size}, file_path, file_pid) do
case IO.binwrite(file_pid, data) do
:ok ->
processed =
if is_integer(size) and size > 0 do
(processed + byte_size(data))
|> tap(fn processed ->
IO.inspect(Float.round(processed * 100 / size, 1),
label: “Processed #{inspect(file_path)} %: “
)
end)
else
processed + byte_size(data)
end

{:cont, {processed, size}}

{:error, reason} ->
{:error, reason}
end
end
end

We test this:

Task.async_stream(
[[vid, “video.mp4”], [img, “image.jpg”]],
&apply(FinchStream, :download, &1),
timeout: :infinity
)
|> Stream.run()

Leave a Reply

Your email address will not be published. Required fields are marked *