--- title: "mirai - Databases and Arrow" vignette: > %\VignetteIndexEntry{mirai - Databases and Arrow} %\VignetteEngine{knitr::knitr} %\VignetteEncoding{UTF-8} --- ### Database Hosting - The Basics `mirai` supports the hosting of multiple database connections across processes on the local machine or a remote server. `everywhere()` easily sets up identical database connections in each daemon process. The following represents a simple example, which sets up 2 local daemons, and then opens a connection to the same SQLite file database in each daemon. ``` r file <- tempfile() library(mirai) daemons(2) #> [1] 2 everywhere({ library(DBI) con <<- dbConnect(RSQLite::SQLite(), file) }, file = file) ``` `mirai()` calls may then be used to write to or query the database, and may be executed on either daemon. ``` r m <- mirai(dbWriteTable(con, "iris", iris)) m[] #> [1] TRUE m <- mirai(dbListTables(con)) m[] #> [1] "iris" m <- mirai(dbGetQuery(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6')) m[] #> Sepal.Length Sepal.Width Petal.Length Petal.Width Species #> 1 4.4 2.9 1.4 0.2 setosa #> 2 4.3 3.0 1.1 0.1 setosa #> 3 4.4 3.0 1.3 0.2 setosa #> 4 4.5 2.3 1.3 0.3 setosa #> 5 4.4 3.2 1.3 0.2 setosa ``` `everywhere()` can be used again to cleanly tear down the databases, before resetting daemons. ``` r everywhere(dbDisconnect(con)) daemons(0) #> [1] 0 ``` ### Database Hosting - Using Arrow Database Connectivity It is possible using the `DBI` interface to access and manipulate data in the Apache Arrow data format efficiently through ABDC (Arrow Database Connectivity). The example below creates an in-memory SQLite connection using the `adbcsqlite` backend. Serialization is set up with the relevant serialization and deserialization functions from the `arrow` package. Note that the format class is 'nanoarrow_array_stream' as `nanoarrow` is the backend for all queries made by the DBI `db*Arrow()` functions. ``` r library(mirai) daemons(1) #> [1] 1 everywhere({ library(DBI) # `adbi` and `adbcsqlite` packages must also be installed con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = ":memory:") }) serialization( list( arrow::write_to_raw, function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE) ), class = "nanoarrow_array_stream" ) ``` `mirai()` calls may then be used to write to or query the database all in the Arrow format. ``` r m <- mirai(dbWriteTableArrow(con, "iris", iris)) m[] #> [1] TRUE m <- mirai(dbReadTableArrow(con, "iris")) m[] #> Table #> 150 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species m <- mirai(dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6')) m[] #> Table #> 5 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species ``` Due to the tight integration of the `mirai` serialization mechanism with R's 'refhook' system, we can easily return complex / nested objects containing multiple queries in the Arrow format: ``` r m <- mirai({ a <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6') b <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Width" < 2.6') x <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Length" < 1.5') y <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Width" < 0.2') list(sepal = list(length = a, width = b), petal = list(length = x, width = y)) }) m[] #> $sepal #> $sepal$length #> Table #> 5 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species #> #> $sepal$width #> Table #> 19 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species #> #> #> $petal #> $petal$length #> Table #> 24 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species #> #> $petal$width #> Table #> 5 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species ``` As before, `everywhere()` can be used again to cleanly tear down the databases, before resetting daemons. ``` r everywhere(dbDisconnect(con)) daemons(0) #> [1] 0 ``` ### Shiny / mirai / DBI / ADBC Integrated Example The following is an example of how database connections hosted in mirai daemons may be used to power a Shiny app. The one-time `serialization()` setup ensures seamless transport of Apache Arrow data, and occurs in the global environment outside of `server()`. A new database connection is created in a new daemon process for every new Shiny session. The resources are freed when a sesssion ends. This logic is all defined within `server()`. A unique ID is used to identify each session, and is specified as the 'compute profile' for daemons. Non-dispatcher daemons are created as scheduling is not required (all queries expected to take roughly the same time, and in this case each session uses only one daemon anyway). Shiny ExtendedTask is then used to perform each query via a `mirai()` call, using the session-specific compute profile. ``` r library(mirai) library(secretbase) library(shiny) library(bslib) # write 'iris' dataset to temp database file (for this demonstration) file <- tempfile() con <- DBI::dbConnect(adbi::adbi("adbcsqlite"), uri = file) DBI::dbWriteTableArrow(con, "iris", iris) DBI::dbDisconnect(con) # common input parameters slmin <- min(iris$Sepal.Length) slmax <- max(iris$Sepal.Length) ui <- page_fluid( p("The time is ", textOutput("current_time", inline = TRUE)), hr(), h3("Shiny / mirai / DBI / ADBC demonstration"), p("New daemon-hosted database connection is created for every Shiny session"), sliderInput( "sl", "Query iris dataset based on Sepal Length", min = slmin, max = slmax, value = c(slmin, slmax), width = "75%" ), input_task_button("btn", "Return query"), tableOutput("table") ) # uses Shiny ExtendedTask with mirai server <- function(input, output, session) { # create unique session id by hashing current time with a random key id <- secretbase::siphash13(Sys.time(), key = nanonext::random(4L)) # create new daemon for each session (set dispatcher = FALSE) daemons(1L, dispatcher = FALSE, .compute = id) # tear down daemon when session ends session$onEnded(function() daemons(0L, .compute = id)) # everywhere() loads DBI and creates ADBC connection in each daemon everywhere( { library(DBI) # `adbi` and `adbcsqlite` packages must also be installed con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = file) }, file = file, .compute = id ) output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) task <- ExtendedTask$new( function(...) mirai( dbGetQueryArrow( con, sprintf( "SELECT * FROM iris WHERE \"Sepal.Length\" BETWEEN %.2f AND %.2f", sl[1L], sl[2L] ) ), ..., .compute = id ) ) |> bind_task_button("btn") observeEvent(input$btn, task$invoke(sl = input$sl)) output$table <- renderTable(task$result()) } # serialization() specifies the native Arrow serialization functions serialization( list(arrow::write_to_raw, nanoarrow::read_nanoarrow), class = "nanoarrow_array_stream" ) # run Shiny app shinyApp(ui = ui, server = server) # deletes temp database file (for this demonstration) unlink(file) ```