--- title: "mirai - Plumber Integration" vignette: > %\VignetteIndexEntry{mirai - Plumber Integration} %\VignetteEngine{knitr::knitr} %\VignetteEncoding{UTF-8} --- ### Plumber Integration `mirai` may be used as an asynchronous / distributed backend for [`plumber`](https://www.rplumber.io/) pipelines. Example usage is provided below for different types of endpoint. #### Example GET Endpoint The plumber router code is run in a daemon process itself so that it does not block the interactive process. The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the 'msg' request header together with a timestamp and the process ID of the process it is run on. ``` r library(mirai) # important to supply SIGINT so the plumber server is interrupted and exits # cleanly when torn down daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT) #> [1] 1 m <- mirai({ library(plumber) library(promises) # to provide the promise pipe library(mirai) # does not use dispatcher (suitable when all requests require similar compute) daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously pr() |> pr_get( "/echo", function(req, res) { mirai( { Sys.sleep(1L) list(status = 200L, body = list(time = format(Sys.time()), msg = msg, pid = Sys.getpid())) }, msg = req[["HEADERS"]][["msg"]] ) %...>% (function(x) { res$status <- x$status res$body <- x$body }) } ) |> pr_run(host = "127.0.0.1", port = 8985) }) ``` The API can be queried using an async HTTP client such as `nanonext::ncurl_aio()`. Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set). ``` r library(nanonext) res <- lapply(1:8, function(i) ncurl_aio("http://127.0.0.1:8985/echo", headers = c(msg = as.character(i)))) collect_aio(res) #> [[1]] #> [1] "{\"time\":[\"2024-06-10 10:33:41\"],\"msg\":[\"1\"],\"pid\":[39397]}" #> #> [[2]] #> [1] "{\"time\":[\"2024-06-10 10:33:41\"],\"msg\":[\"2\"],\"pid\":[39442]}" #> #> [[3]] #> [1] "{\"time\":[\"2024-06-10 10:33:41\"],\"msg\":[\"3\"],\"pid\":[39487]}" #> #> [[4]] #> [1] "{\"time\":[\"2024-06-10 10:33:41\"],\"msg\":[\"4\"],\"pid\":[39532]}" #> #> [[5]] #> [1] "{\"time\":[\"2024-06-10 10:33:42\"],\"msg\":[\"5\"],\"pid\":[39442]}" #> #> [[6]] #> [1] "{\"time\":[\"2024-06-10 10:33:42\"],\"msg\":[\"6\"],\"pid\":[39532]}" #> #> [[7]] #> [1] "{\"time\":[\"2024-06-10 10:33:42\"],\"msg\":[\"7\"],\"pid\":[39487]}" #> #> [[8]] #> [1] "{\"time\":[\"2024-06-10 10:33:42\"],\"msg\":[\"8\"],\"pid\":[39397]}" daemons(0) #> [1] 0 ``` #### Example POST Endpoint Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data. Note that `req$postBody` should always be accessed in the router process and passed in as an argument to the 'mirai', as this is retrieved using a connection that is not serializable. ``` r library(mirai) # important to supply SIGINT so the plumber server is interrupted and exits cleanly when torn down daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT) #> [1] 1 m <- mirai({ library(plumber) library(promises) # to provide the promise pipe library(mirai) # uses dispatcher (suitable for requests with differing compute lengths) daemons(4L, dispatcher = TRUE) # handles 4 requests simultaneously pr() |> pr_post( "/echo", function(req, res) { mirai( { Sys.sleep(1L) # simulate expensive computation list(status = 200L, body = list(time = format(Sys.time()), msg = jsonlite::fromJSON(data)[["msg"]], pid = Sys.getpid())) }, data = req$postBody ) %...>% (function(x) { res$status <- x$status res$body <- x$body }) } ) |> pr_run(host = "127.0.0.1", port = 8986) }) ``` Querying the endpoint produces the same set of outputs as the previous example. ``` r library(nanonext) res <- lapply(1:8, function(i) ncurl_aio("http://127.0.0.1:8986/echo", method = "POST", data = sprintf('{"msg":"%d"}', i))) collect_aio(res) #> [[1]] #> [1] "{\"time\":[\"2024-06-10 10:33:45\"],\"msg\":[\"1\"],\"pid\":[39674]}" #> #> [[2]] #> [1] "{\"time\":[\"2024-06-10 10:33:45\"],\"msg\":[\"2\"],\"pid\":[39679]}" #> #> [[3]] #> [1] "{\"time\":[\"2024-06-10 10:33:45\"],\"msg\":[\"3\"],\"pid\":[39684]}" #> #> [[4]] #> [1] "{\"time\":[\"2024-06-10 10:33:45\"],\"msg\":[\"4\"],\"pid\":[39677]}" #> #> [[5]] #> [1] "{\"time\":[\"2024-06-10 10:33:46\"],\"msg\":[\"5\"],\"pid\":[39674]}" #> #> [[6]] #> [1] "{\"time\":[\"2024-06-10 10:33:46\"],\"msg\":[\"6\"],\"pid\":[39684]}" #> #> [[7]] #> [1] "{\"time\":[\"2024-06-10 10:33:46\"],\"msg\":[\"7\"],\"pid\":[39679]}" #> #> [[8]] #> [1] "{\"time\":[\"2024-06-10 10:33:46\"],\"msg\":[\"8\"],\"pid\":[39677]}" daemons(0) #> [1] 0 ```