1. Promises - Shiny and Plumber

Table of Contents

  1. Event-driven Promises
  2. The One Million Promises Challenge
  3. Shiny ExtendedTask: Introduction
  4. Shiny ExtendedTask: Cancellation
  5. Shiny ExtendedTask: Generative Art
  6. Shiny ExtendedTask: mirai map
  7. Shiny Async: Coin Flips
  8. Shiny Async: Progress Bar
  9. Plumber GET Endpoint
  10. Plumber POST Endpoint

Event-driven promises

mirai supplies its own as.promise() method, allowing it to be used as a promise from the promises package.

These are next-generation, event-driven promises, developed in collaboration with Joe Cheng (creator of Shiny).

A ‘mirai’ may be piped directly using the promise pipe %...>%, which implicitly calls as.promise() on the ‘mirai’. Similarly all promise-aware functions such as promises::then() or shiny::ExtendedTask$new() which take a promise can also take a ‘mirai’ (using promises >= 1.3.0).

Alternatively, a ‘mirai’ may be explicitly converted into a promise by as.promise(), which then allows using the methods $then(), $finally() etc. directly.

The following example outputs “hello” to the console after one second when the ‘mirai’ resolves.

library(mirai)
library(promises)

p <- mirai({Sys.sleep(1); "hello"}) %...>% cat()
p
#> <Promise [pending]>

It is possible to both access a ‘mirai’ value at $data and to use a promise for enacting a side effect (assigning the value to an environment in the example below).

env <- new.env()

m <- mirai({
  Sys.sleep(1)
  "hello"
})

promises::then(m, function(x) env$res <- x)

m[]
#> [1] "hello"

After returning to the top level prompt:

env$res
#> [1] "hello"

A mirai_map also has an as.promise() method. It will resolve when the entire map operation completes or at least one mirai in the map is rejected.

« Back to ToC

The One Million Promises Challenge

The code below is taken from the challenge to launch and collect one million promises. For illustration, the example is scaled down to ten thousand.

library(mirai)
daemons(8, dispatcher = FALSE)
#> [1] 8
r <- 0
start <- Sys.time()
m <- mirai_map(1:10000, \(x) x, .promise = \(x) r <<- r + x)
Sys.time() - start
#> Time difference of 2.327623 secs
later::run_now()
r
#> [1] 50005000
daemons(0)
#> [1] 0

The one million promises challenge took 6 mins 25 secs to complete using an Intel i7 11th gen mobile processor with 16GB RAM.

« Back to ToC

Shiny ExtendedTask: Introduction

mirai is an asynchronous backend to scale Shiny applications. Depending on the options supplied to daemons(), mirai tasks may be distributed across local background processes or networked servers in an efficient and performant manner.

Shiny ExtendedTask allows the creation of scalable Shiny apps, which remain responsive intra-session for each user, as well as inter-session for multiple concurrent users.

In the example below, the app remains responsive, with the clock continuing to tick whilst the simulated expensive computation is running asynchronously in a parallel process. Also the button is disabled and the plot greyed out until the computation is complete.

The call to daemons() is made at the top level, and onStop() may be used to automatically shut them down when the app exits.

library(shiny)
library(bslib)
library(mirai)

ui <- page_fluid(
  p("The time is ", textOutput("current_time", inline = TRUE)),
  hr(),
  numericInput("n", "Sample size (n)", 100),
  numericInput("delay", "Seconds to take for plot", 5),
  input_task_button("btn", "Plot uniform distribution"),
  plotOutput("plot")
)

server <- function(input, output, session) {
  output$current_time <- renderText({
    invalidateLater(1000)
    format(Sys.time(), "%H:%M:%S %p")
  })

  task <- ExtendedTask$new(
    function(...) mirai({Sys.sleep(y); runif(x)}, ...)
  ) |> bind_task_button("btn")

  observeEvent(input$btn, task$invoke(x = input$n, y = input$delay))

  output$plot <- renderPlot(hist(task$result()))

}

# run app using 1 local daemon
daemons(1)

# automatically shutdown daemons when app exits
onStop(function() daemons(0))

shinyApp(ui = ui, server = server)

Thanks to Joe Cheng for providing examples on which the above is based.

The key components to using ExtendedTask are:

  1. In the UI, use bslib::input_task_button(). This is a button which is disabled during computation to prevent additional clicks.
input_task_button("btn", "Plot uniform distribution")
  1. In the server, create an ExtendedTask object by calling ExtendedTask$new() on an anonymous function passing ... arguments to mirai(), and bind it to the button created in (1).
task <- ExtendedTask$new(
  function(...) mirai({Sys.sleep(y); runif(x)}, ...)
) |> bind_task_button("btn")
  1. In the server, create an observer on the input button, which invokes the ExtendedTask, passing in named arguments to the anonymous function (and hence the mirai) above.
observeEvent(input$btn, task$invoke(x = input$n, y = input$delay))
  1. In the server, create a render function for the output, which consumes the result of the ExtendedTask.
output$plot <- renderPlot(hist(task$result()))

« Back to ToC

Shiny ExtendedTask: Cancellation

The app below is a demonstration of the cancellation capability added in mirai v2.

It builds on the introductory app by adding a button that sends an infinite sleep extendedTask. This will block execution as we are using a single daemon - any new extendedTasks will be queued behind this never-ending task. There is also a button to cancel that blocking task and allow any queued plots to continue processing.

It works by assigning a reference to the mirai created in the extendedTask$new() method, which can then be passed to stop_mirai().

library(shiny)
library(bslib)
library(mirai)

ui <- page_fluid(
  p("The time is ", textOutput("current_time", inline = TRUE)),
  hr(),
  numericInput("n", "Sample size (n)", 100),
  numericInput("delay", "Seconds to take for plot", 5),
  input_task_button("btn", "Plot uniform distribution"),
  hr(),
  p("Click 'block' to suspend execution, and 'cancel' to resume"),
  input_task_button("block", "Block"),
  actionButton("cancel", "Cancel block"),
  hr(),
  plotOutput("plot")
)

server <- function(input, output, session) {
  output$current_time <- renderText({
    invalidateLater(1000)
    format(Sys.time(), "%H:%M:%S %p")
  })

  task <- ExtendedTask$new(
    function(...) mirai({Sys.sleep(y); runif(x)}, ...)
  ) |> bind_task_button("btn")

  m <- NULL
  block <- ExtendedTask$new(
    function() m <<- mirai(Sys.sleep(Inf))
  ) |> bind_task_button("block")

  observeEvent(input$btn, task$invoke(x = input$n, y = input$delay))
  observeEvent(input$block, block$invoke())
  observeEvent(input$cancel, stop_mirai(m))
  observe({
    updateActionButton(session, "cancel", disabled = block$status() != "running")
  })

  output$plot <- renderPlot(hist(task$result()))

}

# run app using 1 local daemon
daemons(1)

# automatically shutdown daemons when app exits
onStop(function() daemons(0))

shinyApp(ui = ui, server = server)

Thanks to Joe Cheng for providing examples on which the above is based.

« Back to ToC

Shiny ExtendedTask: Generative Art

The following app produces pretty spiral patterns.

The user can add multiple plots, making use of Shiny modules, each having a different calculation time.

The plots are generated asynchronously, and it is easy to see the practical limitations of the number of daemons set. For example, if updating 4 plots, and there are only 3 daemons, the 4th plot will not start to be generated until one of the other plots has finished.

By wrapping the runApp() call in with(daemons(...), ...) the daemons are set up for the duration of the app, exiting automatically when the app is stopped.

library(shiny)
library(mirai)
library(bslib)
library(ggplot2)
library(aRtsy)

# function definitions

run_task <- function(calc_time) {
  Sys.sleep(calc_time)
  list(
    colors = aRtsy::colorPalette(name = "random", n = 3),
    angle = runif(n = 1, min = - 2 * pi, max = 2 * pi),
    size = 1,
    p = 1
  )
}

plot_result <- function(result) {
  do.call(what = canvas_phyllotaxis, args = result)
}

# modules for individual plots

plotUI <- function(id, calc_time) {
  ns <- NS(id)
  card(
    strong(paste0("Plot (calc time = ", calc_time, " secs)")),
    input_task_button(ns("resample"), "Resample"),
    plotOutput(ns("plot"), height="400px", width="400px")
  )
}

plotServer <- function(id, calc_time) {
  force(id)
  force(calc_time)
  moduleServer(
    id,
    function(input, output, session) {

      task <- ExtendedTask$new(
        function(time, run) mirai(run(time), environment())
      ) |> bind_task_button("resample")

      observeEvent(input$resample, task$invoke(calc_time, run_task))

      output$plot <- renderPlot(plot_result(task$result()))

    }
  )
}

# ui and server

ui <- page_sidebar(fillable = FALSE,
  sidebar = sidebar(
    numericInput("calc_time", "Calculation time (secs)", 5),
    actionButton("add", "Add", class="btn-primary"),
  ),
  layout_column_wrap(id = "results", width = "400px", fillable = FALSE)
)

server <- function(input, output, session) {

  observeEvent(input$add, {
    id <- nanonext::random(4)
    insertUI("#results", where = "beforeEnd", ui = plotUI(id, input$calc_time))
    plotServer(id, input$calc_time)
  })
}

app <- shinyApp(ui, server)

# run app using 3 local daemons
with(daemons(3), runApp(app))

The above example builds on original code by Joe Cheng, Daniel Woodie and William Landau.

The above uses environment() instead of ... as an alternative and equivalent way of passing variables present in the calling environment to the mirai.

The key components to using this ExtendedTask example are:

  1. In the UI, use bslib::input_task_button(). This is a button which is disabled during computation to prevent additional clicks.
input_task_button(ns("resample"), "Resample")
  1. In the server, create an ExtendedTask object by calling ExtendedTask$new() on an anonymous function passing named arguments to mirai(), and bind it to the button created in (1). These are passed through to the mirai by the use of environment().
task <- ExtendedTask$new(
  function(time, run) mirai(run(time), environment())
) |> bind_task_button("resample")
  1. In the server, create an observer on the input button, which invokes the ExtendedTask, supplying the arguments to the anonymous function above.
observeEvent(input$resample, task$invoke(calc_time, run_task))
  1. In the server, create a render function for the output, which consumes the result of the ExtendedTask.
output$plot <- renderPlot(plot_result(task$result()))

Shiny ExtendedTask: mirai map

A mirai_map also has an as.promise() method, which allows it to be used directly in a Shiny ExtendedTask. It will resolve when the entire map operation completes or at least one mirai in the map is rejected.

This example, uses mirai_map() to perform multiple calculations simultaneously in multiple daemons, returning the results asynchronously.

library(shiny)
library(bslib)
library(mirai)

ui <- page_fluid(
  titlePanel("ExtendedTask Map Demo"),
  hr(),
  p("The time is ", textOutput("current_time", inline = TRUE)),
  p("Perform 4 calculations that each take between 1 and 4 secs to complete:"),
  input_task_button("calculate", "Calculate"),
  p(textOutput("result")),
  tags$style(type="text/css", "#result {white-space: pre-wrap;}")
)

server <- function(input, output) {
  task <- ExtendedTask$new(function() {
    mirai_map(1:4, function(i) {
      # simulated long calculation
      Sys.sleep(i)
      sprintf(
        "Calc %d | PID %d | Finished at %s.", i, Sys.getpid(), format(Sys.time())
      )
    })
  }) |> bind_task_button("calculate")
  
  observeEvent(input$calculate, {
    task$invoke()
  })
  
  output$result <- renderText({
    # result of mirai_map() is a list
    as.character(task$result())
  }, sep = "\n")
  
  output$current_time <- renderText({
    invalidateLater(1000)
    format(Sys.time(), "%H:%M:%S %p")
  })
}

app <- shinyApp(ui, server)
with(daemons(4), runApp(app))

« Back to ToC

Shiny Async: Coin Flips

The below example demonstrates how to integrate a mirai_map() operation into a Shiny app in an observer, without using ExtendedTask.

By specifying the ‘.promise’ argument, this registers a promise action against each mapped operation. These can then be used to update reactive values or otherwise interact with the Shiny app.

library(shiny)
library(mirai)

flip_coin <- function(...) {
  Sys.sleep(0.1)
  rbinom(n = 1, size = 1, prob = 0.501)
}

ui <- fluidPage(
  div("Is the coin fair?"),
  actionButton("task", "Flip 1000 coins"),
  textOutput("status"),
  textOutput("outcomes")
)

server <- function(input, output, session) {

  # Keep running totals of heads, tails, and task errors
  flips <- reactiveValues(heads = 0, tails = 0, flips = 0)

  # Button to submit a batch of coin flips
  observeEvent(input$task, {
    mirai_map(
      1:1000,
      flip_coin,
      .promise = \(x) {
        if (x) flips$heads <- flips$heads + 1 else flips$tails <- flips$tails + 1
      }
    )
    # Ensure there is something after mirai_map() in the observer, as it is
    # convertible to a promise, and will otherwise be waited for before returning
    flips$flips <- flips$flips + 1000
  })

  # Print time and task status
  output$status <- renderText({
    invalidateLater(millis = 1000)
    time <- format(Sys.time(), "%H:%M:%S")
    sprintf("%s | %s flips submitted", time, flips$flips)
  })

  # Print number of heads and tails
  output$outcomes <- renderText(
    sprintf("%s heads %s tails", flips$heads, flips$tails)
  )

}

app <- shinyApp(ui = ui, server = server)

# run app using 8 local non-dispatcher daemons (tasks are the same length)
with(daemons(8, dispatcher = FALSE), {
  # pre-load flip_coin function on all daemons for efficiency
  everywhere({}, flip_coin = flip_coin)
  runApp(app)
})

This is an adaptation of an original example provided by Will Landau for use of crew with Shiny. Please see https://wlandau.github.io/crew/articles/shiny.html.

« Back to ToC

Shiny Async: Progress Bar

The below example uses a mirai_map() operation in an observer to update a Shiny progress bar with custom messages, and also to update a reactive value once the entire map operation has completed (asynchronously).

library(shiny)
library(mirai)
library(promises)

slow_squared <- function(x) {
  Sys.sleep(runif(1))
  x^2
}

ui <- fluidPage(
  titlePanel("Asynchronous Squares Calculator"),
  p("The time is ", textOutput("current_time", inline = TRUE)),
  hr(),
  actionButton("start", "Start Calculation"),
  br(), br(),
  uiOutput("progress_ui"),
  verbatimTextOutput("result")
)

server <- function(input, output, session) {
  x <- 1:100
  y <- reactiveVal()
  
  observeEvent(input$start, {
    
    progress <- Progress$new(session, min = 0, max = length(x))
    progress$set(message = "Parallel calculation in progress", detail = "Starting...")
    completed <- reactiveVal(0)
    mirai_map(
      x,
      slow_squared,
      slow_squared = slow_squared,
      .promise = function(result) {
        new_val <- completed() + 1
        completed(new_val)  # Increment completed counter
        progress$inc(1, detail = paste("Completed", new_val))  # Update progress
      }
    ) %...>% {
      y(unlist(.))
      progress$close()
    }
    # Ensure there is something after mirai_map() in the observer, as otherwise
    # the created promise will be waited for before returning
    y(0)
  })
  output$current_time <- renderText({
    invalidateLater(1000)
    format(Sys.time(), "%H:%M:%S %p")
  })
  output$result <- renderPrint({
    cat("Sum of squares calculated: ", sum(y()), "\n")
  })
}

app <- shinyApp(ui, server)
with(daemons(8), runApp(app))

This example adapts a contribution from Davide Magno.

« Back to ToC

Plumber GET Endpoint

mirai may be used as an asynchronous backend for plumber pipelines.

In this example, the plumber router code is run in a daemon process itself so that it does not block the interactive process.

The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.

library(mirai)

# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # more efficient not to use dispatcher if all requests are similar length
  daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously

  pr() |>
    pr_get(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L)
            list(
              status = 200L,
              body = list(
                time = format(Sys.time()), msg = msg, pid = Sys.getpid()
              )
            )
          },
          msg = req[["HEADERS"]][["msg"]]
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8985)
})

The API can be queried using an async HTTP client such as nanonext::ncurl_aio().

Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).

library(nanonext)
res <- lapply(
  1:8,
  function(i) ncurl_aio(
    "http://127.0.0.1:8985/echo",
    headers = c(msg = as.character(i))
  )
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2025-05-22 11:28:55\"],\"msg\":[\"1\"],\"pid\":[24581]}"
#> 
#> [[2]]
#> [1] "{\"time\":[\"2025-05-22 11:28:55\"],\"msg\":[\"2\"],\"pid\":[24583]}"
#> 
#> [[3]]
#> [1] "{\"time\":[\"2025-05-22 11:28:55\"],\"msg\":[\"3\"],\"pid\":[24593]}"
#> 
#> [[4]]
#> [1] "{\"time\":[\"2025-05-22 11:28:56\"],\"msg\":[\"4\"],\"pid\":[24583]}"
#> 
#> [[5]]
#> [1] "{\"time\":[\"2025-05-22 11:28:55\"],\"msg\":[\"5\"],\"pid\":[24599]}"
#> 
#> [[6]]
#> [1] "{\"time\":[\"2025-05-22 11:28:56\"],\"msg\":[\"6\"],\"pid\":[24593]}"
#> 
#> [[7]]
#> [1] "{\"time\":[\"2025-05-22 11:28:56\"],\"msg\":[\"7\"],\"pid\":[24581]}"
#> 
#> [[8]]
#> [1] "{\"time\":[\"2025-05-22 11:28:56\"],\"msg\":[\"8\"],\"pid\":[24599]}"

daemons(0)
#> [1] 0

« Back to ToC

Plumber POST Endpoint

This is the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.

Note that req$postBody should always be accessed in the router process and passed in as an argument to the ‘mirai’, as this is retrieved using a connection that is not serializable.

library(mirai)

# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # uses dispatcher - suitable when requests take differing times to complete
  daemons(4L) # handles 4 requests simultaneously

  pr() |>
    pr_post(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L) # simulate expensive computation
            list(
              status = 200L,
              body = list(
                time = format(Sys.time()),
                msg = jsonlite::fromJSON(data)[["msg"]],
                pid = Sys.getpid()
              )
            )
          },
          data = req$postBody
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8986)
})

Querying the endpoint produces the same set of outputs as the previous example.

library(nanonext)
res <- lapply(
  1:8,
  function(i) ncurl_aio(
    "http://127.0.0.1:8986/echo",
    method = "POST",
    data = sprintf('{"msg":"%d"}', i)
  )
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2025-05-22 11:28:59\"],\"msg\":[\"1\"],\"pid\":[24656]}"
#> 
#> [[2]]
#> [1] "{\"time\":[\"2025-05-22 11:28:59\"],\"msg\":[\"2\"],\"pid\":[24658]}"
#> 
#> [[3]]
#> [1] "{\"time\":[\"2025-05-22 11:29:00\"],\"msg\":[\"3\"],\"pid\":[24656]}"
#> 
#> [[4]]
#> [1] "{\"time\":[\"2025-05-22 11:28:59\"],\"msg\":[\"4\"],\"pid\":[24664]}"
#> 
#> [[5]]
#> [1] "{\"time\":[\"2025-05-22 11:29:00\"],\"msg\":[\"5\"],\"pid\":[24664]}"
#> 
#> [[6]]
#> [1] "{\"time\":[\"2025-05-22 11:29:00\"],\"msg\":[\"6\"],\"pid\":[24674]}"
#> 
#> [[7]]
#> [1] "{\"time\":[\"2025-05-22 11:28:59\"],\"msg\":[\"7\"],\"pid\":[24674]}"
#> 
#> [[8]]
#> [1] "{\"time\":[\"2025-05-22 11:29:00\"],\"msg\":[\"8\"],\"pid\":[24658]}"

daemons(0)
#> [1] 0

« Back to ToC