Context
We want the server to download data, ie trigger an HTTP get request to some endpoint. The response should be a stream, and we want to append the chunks into a file. We also want to display the download progress.
We will use the Req library, a superset of Finch, which is itself a superset of Mint.
A few reasons for Req:
it enable body response streaming,
it seems be able to reconnect when the connection fails or timeouts (not tested here).
it takes care of redirections.
[Front-end note]: an HTTP request made from the browser uses the fetch API which can consume a fetch as a stream. To get a download progress, you still need to use XMLHttpRequest. This is the reason why Liveview used it in the uploader snippet.
Code
The code below is “livebookable” 😁 or can be run in an iex session.
========================================================
HTTP streaming with progress using Finch and Req
{:finch, “~> 0.18.0”},
{:req, “~> 0.4.14”}
])
Finch.start_link(name: ExStream.Finch)
Test endpoints
We use two endpoints for testing. The last one includes a redirection.
img = “https://source.unsplash.com/QT-l619id6w”
Stream & progress with Req
We start with the Req library.
We start with a download with streams into a file. The code is pretty compact: we pass a callback in the :into option. This callback returns a File.stream to collect the chunks.
def download(url, file_path) do
Req.get!(url, raw: true, into: File.stream!(file_path, [:write]))
end
end
We test it:
We now write a module that displays the progress. We grab the “content-length” with a HEAD request. The body is again streamed via a callback declared with the :into option. This callback writes the chunks into a file, and stores the progress state into the :private key of the struct %Req.Response{}.
def download(url, file_path) do
[size] = Map.get(Req.head!(url: url).headers, “content-length”, [“0”])
size = String.to_integer(size)
file_pid = File.open!(file_path, [:write, :binary])
func = fn {:data, data}, {req, res} ->
IO.binwrite(file_pid, data)
chunk_size = byte_size(data)
res = Req.Response.update_private(res, :progress, chunk_size, &(&1 + chunk_size))
if size>0, do:
{Req.Response.get_private(res, :progress) * 100 / size, chunk_size, size} |> dbg()
{:cont, {req, res}}
end
Req.get!(url: url, raw: true, into: func)
File.close(file_pid)
end
end
We test concurrent HTTP calls with Task.async_stream because we use the same function with different arguments.
[[vid, “video.mp4”], [img, “image.jpg”]],
&apply(ReqProgressStream, :download, &1),
timeout: :infinity
)
|> Stream.run()
Stream & progress with Finch
We start with the Finch library. The code below is adapted from the example coming with stream_while.
We expose a function that take an URL and a path. As such, it may be fragile to connection errors.
def download(url, file_path) do
IO.puts(“Starting to process #{inspect(file_path)}………..”)
# Open a file to which binary chunks will be appended to.
# this process is reset in case of redirection
file_pid = File.open!(file_path, [:write, :binary])
unless is_pid(file_pid), do: raise(“File creation problem on disk”)
# the HTTP stream request
Finch.build(:get, url)
|> Finch.stream_while(ExStream.Finch, nil, fn
# we put the status in the “acc” to handle redirections
{:status, status}, _acc ->
{:cont, status}
# – when we receive 302, we put the “location” header in the “acc”
# – when we receive a 200, we put the “content-length” and the file name in the “acc”,
{:headers, headers}, acc ->
handle_headers(headers, acc)
# when we receive the “location” tuple, we recurse
# otherwise, we write the chunk into the file and print out the current progress.
{:data, data}, acc ->
handle_data(data, acc, file_path, file_pid)
end)
case File.close(file_pid) do
:ok ->
{:halt, {file_path, :done}}
{:error, _reason} ->
{:halt, :error}
end
end
def handle_headers(headers, status) when status in [301, 302, 303, 307, 308] do
IO.puts(“REDIR: #{status}“)
{:cont, Enum.find(headers, &(elem(&1, 0) == “location”))}
end
def handle_headers(headers, 200) do
{“content-length”, size} =
Enum.find(headers, &(elem(&1, 0) == “content-length”))
case size do
nil ->
{:cont, {0, 0}}
size ->
{:cont, {0, String.to_integer(size)}}
end
end
def handle_headers(_, status) do
dbg(status)
{:halt, :bad_status}
end
def handle_data(_data, {“location”, location}, file_path, file_pid) do
if Process.alive?(file_pid), do:
:ok = File.close(file_pid)
# recursion
download(location, file_path)
end
def handle_data(data, {processed, size}, file_path, file_pid) do
case IO.binwrite(file_pid, data) do
:ok ->
processed =
if is_integer(size) and size > 0 do
(processed + byte_size(data))
|> tap(fn processed ->
IO.inspect(Float.round(processed * 100 / size, 1),
label: “Processed #{inspect(file_path)} %: “
)
end)
else
processed + byte_size(data)
end
{:cont, {processed, size}}
{:error, reason} ->
{:error, reason}
end
end
end
We test this:
[[vid, “video.mp4”], [img, “image.jpg”]],
&apply(FinchStream, :download, &1),
timeout: :infinity
)
|> Stream.run()