--- title: "3. Promises - Shiny and Plumber" vignette: > %\VignetteIndexEntry{3. Promises - Shiny and Plumber} %\VignetteEngine{litedown::vignette} %\VignetteEncoding{UTF-8} --- ### Table of Contents 1. [Event-driven Promises](#event-driven-promises) 2. [The One Million Promises Challenge](#the-one-million-promises-challenge) 3. [Shiny ExtendedTask: Introduction](#shiny-extendedtask-introduction) 4. [Shiny ExtendedTask: Cancellation](#shiny-extendedtask-cancellation) 5. [Shiny ExtendedTask: Generative Art](#shiny-extendedtask-generative-art) 6. [Shiny ExtendedTask: mirai map](#shiny-extendedtask-mirai-map) 7. [Shiny Async: Coin Flips](#shiny-async-coin-flips) 8. [Shiny Async: Progress Bar](#shiny-async-progress-bar) 9. [Plumber GET Endpoint](#plumber-get-endpoint) 10. [Plumber POST Endpoint](#plumber-post-endpoint) ### Event-driven promises `mirai` supplies its own `as.promise()` method, allowing it to be used as a promise from the [`promises`](https://rstudio.github.io/promises/) package. These are next-generation, event-driven promises, developed in collaboration with Joe Cheng (creator of Shiny). - Does not require each promise to be polled for completion in a background loop like other promises. - Instead, promise actions are automatically triggered as soon as each 'mirai' resolves (asynchronously). - Allows for much higher responsiveness (zero latency) and massive scalability (thousands or even millions of promises). A 'mirai' may be piped directly using the promise pipe `%...>%`, which implicitly calls `as.promise()` on the 'mirai'. Similarly all promise-aware functions such as `promises::then()` or `shiny::ExtendedTask$new()` which take a promise can also take a 'mirai' (using `promises` >= 1.3.0). Alternatively, a 'mirai' may be explicitly converted into a promise by `as.promise()`, which then allows using the methods `$then()`, `$finally()` etc. directly. The following example outputs "hello" to the console after one second when the 'mirai' resolves. ``` r library(mirai) library(promises) p <- mirai({Sys.sleep(1); "hello"}) %...>% cat() p #> ``` It is possible to both access a 'mirai' value at `$data` and to use a promise for enacting a side effect (assigning the value to an environment in the example below). ``` r env <- new.env() m <- mirai({ Sys.sleep(1) "hello" }) promises::then(m, function(x) env$res <- x) m[] #> [1] "hello" ``` After returning to the top level prompt: ```r env$res #> [1] "hello" ``` A `mirai_map` also has an `as.promise()` method. It will resolve when the entire map operation completes or at least one mirai in the map is rejected. [« Back to ToC](#table-of-contents) ### The One Million Promises Challenge The code below is taken from the challenge to launch and collect one million promises. For illustration, the example is scaled down to ten thousand. ``` r library(mirai) daemons(8, dispatcher = FALSE) #> [1] 8 r <- 0 start <- Sys.time() m <- mirai_map(1:10000, \(x) x, .promise = \(x) r <<- r + x) Sys.time() - start #> Time difference of 2.327623 secs later::run_now() r #> [1] 50005000 daemons(0) #> [1] 0 ``` The one million promises challenge took 6 mins 25 secs to complete using an Intel i7 11th gen mobile processor with 16GB RAM. [« Back to ToC](#table-of-contents) ### Shiny ExtendedTask: Introduction mirai is an asynchronous backend to scale [Shiny](https://shiny.posit.co/) applications. Depending on the options supplied to `daemons()`, mirai tasks may be distributed across local background processes or networked servers in an efficient and performant manner. Shiny ExtendedTask allows the creation of scalable Shiny apps, which remain responsive intra-session for each user, as well as inter-session for multiple concurrent users. In the example below, the app remains responsive, with the clock continuing to tick whilst the simulated expensive computation is running asynchronously in a parallel process. Also the button is disabled and the plot greyed out until the computation is complete. > The call to `daemons()` is made at the top level, and `onStop()` may be used to automatically shut them down when the app exits. ``` r library(shiny) library(bslib) library(mirai) ui <- page_fluid( p("The time is ", textOutput("current_time", inline = TRUE)), hr(), numericInput("n", "Sample size (n)", 100), numericInput("delay", "Seconds to take for plot", 5), input_task_button("btn", "Plot uniform distribution"), plotOutput("plot") ) server <- function(input, output, session) { output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) task <- ExtendedTask$new( function(...) mirai({Sys.sleep(y); runif(x)}, ...) ) |> bind_task_button("btn") observeEvent(input$btn, task$invoke(x = input$n, y = input$delay)) output$plot <- renderPlot(hist(task$result())) } # run app using 1 local daemon daemons(1) # automatically shutdown daemons when app exits onStop(function() daemons(0)) shinyApp(ui = ui, server = server) ``` *Thanks to Joe Cheng for providing examples on which the above is based.* The key components to using ExtendedTask are: 1. In the UI, use `bslib::input_task_button()`. This is a button which is disabled during computation to prevent additional clicks. ``` r input_task_button("btn", "Plot uniform distribution") ``` 2. In the server, create an ExtendedTask object by calling `ExtendedTask$new()` on an anonymous function passing `...` arguments to `mirai()`, and bind it to the button created in (1). ``` r task <- ExtendedTask$new( function(...) mirai({Sys.sleep(y); runif(x)}, ...) ) |> bind_task_button("btn") ``` 3. In the server, create an observer on the input button, which invokes the ExtendedTask, passing in named arguments to the anonymous function (and hence the mirai) above. ``` r observeEvent(input$btn, task$invoke(x = input$n, y = input$delay)) ``` 4. In the server, create a render function for the output, which consumes the result of the ExtendedTask. ``` r output$plot <- renderPlot(hist(task$result())) ``` [« Back to ToC](#table-of-contents) ### Shiny ExtendedTask: Cancellation The app below is a demonstration of the cancellation capability added in mirai v2. It builds on the introductory app by adding a button that sends an infinite sleep extendedTask. This will block execution as we are using a single daemon - any new extendedTasks will be queued behind this never-ending task. There is also a button to cancel that blocking task and allow any queued plots to continue processing. It works by assigning a reference to the mirai created in the `extendedTask$new()` method, which can then be passed to `stop_mirai()`. ``` r library(shiny) library(bslib) library(mirai) ui <- page_fluid( p("The time is ", textOutput("current_time", inline = TRUE)), hr(), numericInput("n", "Sample size (n)", 100), numericInput("delay", "Seconds to take for plot", 5), input_task_button("btn", "Plot uniform distribution"), hr(), p("Click 'block' to suspend execution, and 'cancel' to resume"), input_task_button("block", "Block"), actionButton("cancel", "Cancel block"), hr(), plotOutput("plot") ) server <- function(input, output, session) { output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) task <- ExtendedTask$new( function(...) mirai({Sys.sleep(y); runif(x)}, ...) ) |> bind_task_button("btn") m <- NULL block <- ExtendedTask$new( function() m <<- mirai(Sys.sleep(Inf)) ) |> bind_task_button("block") observeEvent(input$btn, task$invoke(x = input$n, y = input$delay)) observeEvent(input$block, block$invoke()) observeEvent(input$cancel, stop_mirai(m)) observe({ updateActionButton(session, "cancel", disabled = block$status() != "running") }) output$plot <- renderPlot(hist(task$result())) } # run app using 1 local daemon daemons(1) # automatically shutdown daemons when app exits onStop(function() daemons(0)) shinyApp(ui = ui, server = server) ``` *Thanks to Joe Cheng for providing examples on which the above is based.* [« Back to ToC](#table-of-contents) ### Shiny ExtendedTask: Generative Art The following app produces pretty spiral patterns. The user can add multiple plots, making use of Shiny modules, each having a different calculation time. The plots are generated asynchronously, and it is easy to see the practical limitations of the number of daemons set. For example, if updating 4 plots, and there are only 3 daemons, the 4th plot will not start to be generated until one of the other plots has finished. By wrapping the `runApp()` call in `with(daemons(...), ...)` the daemons are set up for the duration of the app, exiting automatically when the app is stopped. ``` r library(shiny) library(mirai) library(bslib) library(ggplot2) library(aRtsy) # function definitions run_task <- function(calc_time) { Sys.sleep(calc_time) list( colors = aRtsy::colorPalette(name = "random", n = 3), angle = runif(n = 1, min = - 2 * pi, max = 2 * pi), size = 1, p = 1 ) } plot_result <- function(result) { do.call(what = canvas_phyllotaxis, args = result) } # modules for individual plots plotUI <- function(id, calc_time) { ns <- NS(id) card( strong(paste0("Plot (calc time = ", calc_time, " secs)")), input_task_button(ns("resample"), "Resample"), plotOutput(ns("plot"), height="400px", width="400px") ) } plotServer <- function(id, calc_time) { force(id) force(calc_time) moduleServer( id, function(input, output, session) { task <- ExtendedTask$new( function(time, run) mirai(run(time), environment()) ) |> bind_task_button("resample") observeEvent(input$resample, task$invoke(calc_time, run_task)) output$plot <- renderPlot(plot_result(task$result())) } ) } # ui and server ui <- page_sidebar(fillable = FALSE, sidebar = sidebar( numericInput("calc_time", "Calculation time (secs)", 5), actionButton("add", "Add", class="btn-primary"), ), layout_column_wrap(id = "results", width = "400px", fillable = FALSE) ) server <- function(input, output, session) { observeEvent(input$add, { id <- nanonext::random(4) insertUI("#results", where = "beforeEnd", ui = plotUI(id, input$calc_time)) plotServer(id, input$calc_time) }) } app <- shinyApp(ui, server) # run app using 3 local daemons with(daemons(3), runApp(app)) ``` *The above example builds on original code by Joe Cheng, Daniel Woodie and William Landau.* The above uses `environment()` instead of `...` as an alternative and equivalent way of passing variables present in the calling environment to the mirai. The key components to using this ExtendedTask example are: 1. In the UI, use `bslib::input_task_button()`. This is a button which is disabled during computation to prevent additional clicks. ``` r input_task_button(ns("resample"), "Resample") ``` 2. In the server, create an ExtendedTask object by calling `ExtendedTask$new()` on an anonymous function passing _named_ arguments to `mirai()`, and bind it to the button created in (1). These are passed through to the mirai by the use of `environment()`. ``` r task <- ExtendedTask$new( function(time, run) mirai(run(time), environment()) ) |> bind_task_button("resample") ``` 3. In the server, create an observer on the input button, which invokes the ExtendedTask, supplying the arguments to the anonymous function above. ``` r observeEvent(input$resample, task$invoke(calc_time, run_task)) ``` 4. In the server, create a render function for the output, which consumes the result of the ExtendedTask. ``` r output$plot <- renderPlot(plot_result(task$result())) ``` ### Shiny ExtendedTask: mirai map A `mirai_map` also has an `as.promise()` method, which allows it to be used directly in a Shiny ExtendedTask. It will resolve when the entire map operation completes or at least one mirai in the map is rejected. This example, uses `mirai_map()` to perform multiple calculations simultaneously in multiple daemons, returning the results asynchronously. ```r library(shiny) library(bslib) library(mirai) ui <- page_fluid( titlePanel("ExtendedTask Map Demo"), hr(), p("The time is ", textOutput("current_time", inline = TRUE)), p("Perform 4 calculations that each take between 1 and 4 secs to complete:"), input_task_button("calculate", "Calculate"), p(textOutput("result")), tags$style(type="text/css", "#result {white-space: pre-wrap;}") ) server <- function(input, output) { task <- ExtendedTask$new(function() { mirai_map(1:4, function(i) { # simulated long calculation Sys.sleep(i) sprintf( "Calc %d | PID %d | Finished at %s.", i, Sys.getpid(), format(Sys.time()) ) }) }) |> bind_task_button("calculate") observeEvent(input$calculate, { task$invoke() }) output$result <- renderText({ # result of mirai_map() is a list as.character(task$result()) }, sep = "\n") output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) } app <- shinyApp(ui, server) with(daemons(4), runApp(app)) ``` [« Back to ToC](#table-of-contents) ### Shiny Async: Coin Flips The below example demonstrates how to integrate a `mirai_map()` operation into a Shiny app in an observer, without using ExtendedTask. By specifying the '.promise' argument, this registers a promise action against each mapped operation. These can then be used to update reactive values or otherwise interact with the Shiny app. ``` r library(shiny) library(mirai) flip_coin <- function(...) { Sys.sleep(0.1) rbinom(n = 1, size = 1, prob = 0.501) } ui <- fluidPage( div("Is the coin fair?"), actionButton("task", "Flip 1000 coins"), textOutput("status"), textOutput("outcomes") ) server <- function(input, output, session) { # Keep running totals of heads, tails, and task errors flips <- reactiveValues(heads = 0, tails = 0, flips = 0) # Button to submit a batch of coin flips observeEvent(input$task, { mirai_map( 1:1000, flip_coin, .promise = \(x) { if (x) flips$heads <- flips$heads + 1 else flips$tails <- flips$tails + 1 } ) # Ensure there is something after mirai_map() in the observer, as it is # convertible to a promise, and will otherwise be waited for before returning flips$flips <- flips$flips + 1000 }) # Print time and task status output$status <- renderText({ invalidateLater(millis = 1000) time <- format(Sys.time(), "%H:%M:%S") sprintf("%s | %s flips submitted", time, flips$flips) }) # Print number of heads and tails output$outcomes <- renderText( sprintf("%s heads %s tails", flips$heads, flips$tails) ) } app <- shinyApp(ui = ui, server = server) # run app using 8 local non-dispatcher daemons (tasks are the same length) with(daemons(8, dispatcher = FALSE), { # pre-load flip_coin function on all daemons for efficiency everywhere({}, flip_coin = flip_coin) runApp(app) }) ``` *This is an adaptation of an original example provided by Will Landau for use of `crew` with Shiny. Please see .* [« Back to ToC](#table-of-contents) ### Shiny Async: Progress Bar The below example uses a `mirai_map()` operation in an observer to update a Shiny progress bar with custom messages, and also to update a reactive value once the entire map operation has completed (asynchronously). ```r library(shiny) library(mirai) library(promises) slow_squared <- function(x) { Sys.sleep(runif(1)) x^2 } ui <- fluidPage( titlePanel("Asynchronous Squares Calculator"), p("The time is ", textOutput("current_time", inline = TRUE)), hr(), actionButton("start", "Start Calculation"), br(), br(), uiOutput("progress_ui"), verbatimTextOutput("result") ) server <- function(input, output, session) { x <- 1:100 y <- reactiveVal() observeEvent(input$start, { progress <- Progress$new(session, min = 0, max = length(x)) progress$set(message = "Parallel calculation in progress", detail = "Starting...") completed <- reactiveVal(0) mirai_map( x, slow_squared, slow_squared = slow_squared, .promise = function(result) { new_val <- completed() + 1 completed(new_val) # Increment completed counter progress$inc(1, detail = paste("Completed", new_val)) # Update progress } ) %...>% { y(unlist(.)) progress$close() } # Ensure there is something after mirai_map() in the observer, as otherwise # the created promise will be waited for before returning y(0) }) output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) output$result <- renderPrint({ cat("Sum of squares calculated: ", sum(y()), "\n") }) } app <- shinyApp(ui, server) with(daemons(8), runApp(app)) ``` *This example adapts a contribution from Davide Magno.* [« Back to ToC](#table-of-contents) ### Plumber GET Endpoint mirai may be used as an asynchronous backend for [`plumber`](https://www.rplumber.io/) pipelines. In this example, the plumber router code is run in a daemon process itself so that it does not block the interactive process. The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the 'msg' request header together with a timestamp and the process ID of the process it is run on. ``` r library(mirai) # supply SIGINT so the plumber server is interrupted and exits cleanly when finished daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT) #> [1] 1 m <- mirai({ library(plumber) library(promises) # to provide the promise pipe library(mirai) # more efficient not to use dispatcher if all requests are similar length daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously pr() |> pr_get( "/echo", function(req, res) { mirai( { Sys.sleep(1L) list( status = 200L, body = list( time = format(Sys.time()), msg = msg, pid = Sys.getpid() ) ) }, msg = req[["HEADERS"]][["msg"]] ) %...>% (function(x) { res$status <- x$status res$body <- x$body }) } ) |> pr_run(host = "127.0.0.1", port = 8985) }) ``` The API can be queried using an async HTTP client such as `nanonext::ncurl_aio()`. Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set). ``` r library(nanonext) res <- lapply( 1:8, function(i) ncurl_aio( "http://127.0.0.1:8985/echo", headers = c(msg = as.character(i)) ) ) collect_aio(res) #> [[1]] #> [1] "{\"time\":[\"2025-05-22 11:28:55\"],\"msg\":[\"1\"],\"pid\":[24581]}" #> #> [[2]] #> [1] "{\"time\":[\"2025-05-22 11:28:55\"],\"msg\":[\"2\"],\"pid\":[24583]}" #> #> [[3]] #> [1] "{\"time\":[\"2025-05-22 11:28:55\"],\"msg\":[\"3\"],\"pid\":[24593]}" #> #> [[4]] #> [1] "{\"time\":[\"2025-05-22 11:28:56\"],\"msg\":[\"4\"],\"pid\":[24583]}" #> #> [[5]] #> [1] "{\"time\":[\"2025-05-22 11:28:55\"],\"msg\":[\"5\"],\"pid\":[24599]}" #> #> [[6]] #> [1] "{\"time\":[\"2025-05-22 11:28:56\"],\"msg\":[\"6\"],\"pid\":[24593]}" #> #> [[7]] #> [1] "{\"time\":[\"2025-05-22 11:28:56\"],\"msg\":[\"7\"],\"pid\":[24581]}" #> #> [[8]] #> [1] "{\"time\":[\"2025-05-22 11:28:56\"],\"msg\":[\"8\"],\"pid\":[24599]}" daemons(0) #> [1] 0 ``` [« Back to ToC](#table-of-contents) ### Plumber POST Endpoint This is the equivalent using a POST endpoint, accepting a JSON instruction sent as request data. Note that `req$postBody` should always be accessed in the router process and passed in as an argument to the 'mirai', as this is retrieved using a connection that is not serializable. ``` r library(mirai) # supply SIGINT so the plumber server is interrupted and exits cleanly when finished daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT) #> [1] 1 m <- mirai({ library(plumber) library(promises) # to provide the promise pipe library(mirai) # uses dispatcher - suitable when requests take differing times to complete daemons(4L) # handles 4 requests simultaneously pr() |> pr_post( "/echo", function(req, res) { mirai( { Sys.sleep(1L) # simulate expensive computation list( status = 200L, body = list( time = format(Sys.time()), msg = jsonlite::fromJSON(data)[["msg"]], pid = Sys.getpid() ) ) }, data = req$postBody ) %...>% (function(x) { res$status <- x$status res$body <- x$body }) } ) |> pr_run(host = "127.0.0.1", port = 8986) }) ``` Querying the endpoint produces the same set of outputs as the previous example. ``` r library(nanonext) res <- lapply( 1:8, function(i) ncurl_aio( "http://127.0.0.1:8986/echo", method = "POST", data = sprintf('{"msg":"%d"}', i) ) ) collect_aio(res) #> [[1]] #> [1] "{\"time\":[\"2025-05-22 11:28:59\"],\"msg\":[\"1\"],\"pid\":[24656]}" #> #> [[2]] #> [1] "{\"time\":[\"2025-05-22 11:28:59\"],\"msg\":[\"2\"],\"pid\":[24658]}" #> #> [[3]] #> [1] "{\"time\":[\"2025-05-22 11:29:00\"],\"msg\":[\"3\"],\"pid\":[24656]}" #> #> [[4]] #> [1] "{\"time\":[\"2025-05-22 11:28:59\"],\"msg\":[\"4\"],\"pid\":[24664]}" #> #> [[5]] #> [1] "{\"time\":[\"2025-05-22 11:29:00\"],\"msg\":[\"5\"],\"pid\":[24664]}" #> #> [[6]] #> [1] "{\"time\":[\"2025-05-22 11:29:00\"],\"msg\":[\"6\"],\"pid\":[24674]}" #> #> [[7]] #> [1] "{\"time\":[\"2025-05-22 11:28:59\"],\"msg\":[\"7\"],\"pid\":[24674]}" #> #> [[8]] #> [1] "{\"time\":[\"2025-05-22 11:29:00\"],\"msg\":[\"8\"],\"pid\":[24658]}" daemons(0) #> [1] 0 ``` [« Back to ToC](#table-of-contents)