---
title: "4. Serialization - Arrow, ADBC, polars, torch"
vignette: >
%\VignetteIndexEntry{4. Serialization - Arrow, ADBC, polars, torch}
%\VignetteEngine{litedown::vignette}
%\VignetteEncoding{UTF-8}
---
### Table of Contents
1. [Serialization: Arrow, polars and beyond](#serialization-arrow-polars-and-beyond)
2. [Serialization: Torch](#serialization-torch)
3. [Database Hosting using Arrow Database Connectivity](#database-hosting-using-arrow-database-connectivity)
4. [Shiny / mirai / DBI / ADBC Integrated Example](#shiny-mirai-dbi-adbc-integrated-example)
### Serialization: Arrow, polars and beyond
Native R serialization is used for sending data between host and daemons.
Some R objects by their nature cannot be serialized, such as those accessed via an external pointer.
In these cases, performing 'mirai' operations on them would normally error.
Using the [`arrow`](https://arrow.apache.org/docs/r/) package as an example:
``` r
library(mirai)
library(arrow, warn.conflicts = FALSE)
daemons(1)
#> [1] 1
everywhere(library(arrow))
x <- as_arrow_table(iris)
m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> 'miraiError' chr Error: Invalid
, external pointer to null
daemons(0)
#> [1] 0
```
However, `serial_config()` can be used to create custom serialization configurations, specifying functions that hook into R's native serialization mechanism for reference objects ('refhooks').
This configuration may then be passed to the 'serial' argument of a `daemons()` call.
``` r
cfg <- serial_config(
"ArrowTabular",
arrow::write_to_raw,
function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
)
daemons(1, serial = cfg)
#> [1] 1
everywhere(library(arrow))
m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> $a
#> Table
#> 6 rows x 5 columns
#> $Sepal.Length
#> $Sepal.Width
#> $Petal.Length
#> $Petal.Width
#> $Species >
#>
#> See $metadata for additional Schema metadata
#>
#> $b
#> [1] "some text"
daemons(0)
#> [1] 0
```
It can be seen that this time, the arrow table is seamlessly handled in the 'mirai' process.
This is the case even when the object is deeply nested inside lists or other structures.
Multiple serialization functions may be registered to handle different object classes.
As an example, we can use Arrow in combination with [`polars`](https://pola-rs.github.io/r-polars/), a 'lightning fast' dataframe library written in Rust (requires `polars` >= 0.16.4), in the following way:
``` r
daemons(
n = 1,
serial = serial_config(
c("ArrowTabular", "RPolarsDataFrame"),
list(arrow::write_to_raw, function(x) polars::as_polars_df(x)$to_raw_ipc()),
list(function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE), polars::pl$read_ipc)
)
)
#> [1] 1
x <- polars::as_polars_df(iris)
m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> $a
#> shape: (6, 5)
#> ┌──────────────┬─────────────┬──────────────┬─────────────┬─────────┐
#> │ Sepal.Length ┆ Sepal.Width ┆ Petal.Length ┆ Petal.Width ┆ Species │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ cat │
#> ╞══════════════╪═════════════╪══════════════╪═════════════╪═════════╡
#> │ 5.1 ┆ 3.5 ┆ 1.4 ┆ 0.2 ┆ setosa │
#> │ 4.9 ┆ 3.0 ┆ 1.4 ┆ 0.2 ┆ setosa │
#> │ 4.7 ┆ 3.2 ┆ 1.3 ┆ 0.2 ┆ setosa │
#> │ 4.6 ┆ 3.1 ┆ 1.5 ┆ 0.2 ┆ setosa │
#> │ 5.0 ┆ 3.6 ┆ 1.4 ┆ 0.2 ┆ setosa │
#> │ 5.4 ┆ 3.9 ┆ 1.7 ┆ 0.4 ┆ setosa │
#> └──────────────┴─────────────┴──────────────┴─────────────┴─────────┘
#>
#> $b
#> [1] "some text"
daemons(0)
#> [1] 0
```
[« Back to ToC](#table-of-contents)
### Serialization: Torch
Tensors from the [`torch`](https://torch.mlverse.org/) package may be used seamlessly in 'mirai' computations.
#### Setup Steps
1. Create the serialization configuration, specifying 'class' as 'torch_tensor'.
1. Set up daemons, supplying the configuration to the 'serial' argument.
1. (Optional) Use `everywhere()` to make the `torch` package available on all daemons for convenience.
``` r
library(mirai)
library(torch)
cfg <- serial_config(
class = "torch_tensor",
sfunc = torch::torch_serialize,
ufunc = torch::torch_load
)
daemons(1, serial = cfg)
#> [1] 1
everywhere(library(torch))
```
#### Example Usage
The below example creates a convolutional neural network using `torch::nn_module()`.
A set of model parameters is also specified.
The model specification and parameters are then passed to and initialized within a 'mirai'.
``` r
model <- nn_module(
initialize = function(in_size, out_size) {
self$conv1 <- nn_conv2d(in_size, out_size, 5)
self$conv2 <- nn_conv2d(in_size, out_size, 5)
},
forward = function(x) {
x <- self$conv1(x)
x <- nnf_relu(x)
x <- self$conv2(x)
x <- nnf_relu(x)
x
}
)
params <- list(in_size = 1, out_size = 20)
m <- mirai(do.call(model, params), model = model, params = params)
m[]
#> An `nn_module` containing 1,040 parameters.
#>
#> ── Modules ────────────────────────────────────────────────────────────────────────────────────────────────
#> • conv1: #520 parameters
#> • conv2: #520 parameters
```
The returned model is an object containing many tensor elements.
``` r
m$data$parameters$conv1.weight
#> torch_tensor
#> (1,1,.,.) =
#> 0.0819 0.1133 -0.1414 0.1952 -0.1873
#> -0.0891 0.0270 0.1191 -0.0464 -0.0070
#> 0.1356 0.1064 0.1864 0.1305 0.1798
#> 0.1298 0.0821 0.0562 0.0589 -0.0959
#> -0.1518 0.1985 0.1805 0.0345 -0.1971
#>
#> (2,1,.,.) =
#> -0.1264 -0.0125 -0.0202 0.1182 0.1244
#> 0.1154 -0.0031 0.1793 -0.1635 -0.0745
#> 0.0472 0.1440 -0.0050 0.1924 0.1731
#> -0.0779 0.0934 -0.1557 0.1600 -0.1897
#> 0.0471 -0.1021 -0.1683 -0.0790 -0.1609
#>
#> (3,1,.,.) =
#> 0.1492 0.0792 -0.1541 -0.1382 0.1055
#> -0.0194 0.1130 0.0035 0.0372 -0.0765
#> -0.0094 0.0892 -0.1291 -0.1263 -0.1995
#> -0.1420 0.1769 -0.1180 -0.1237 0.1703
#> -0.0844 -0.0603 -0.0840 0.1627 -0.0893
#>
#> (4,1,.,.) =
#> -0.0244 0.1084 -0.0833 0.0874 -0.0458
#> -0.1197 0.0395 0.0854 0.1207 -0.0940
#> 0.0985 0.0751 0.0837 -0.0775 0.1109
#> 0.0663 -0.0570 -0.0031 -0.1076 0.0321
#> 0.0204 -0.1927 -0.1126 0.0482 0.1241
#>
#> (5,1,.,.) =
#> -0.1691 0.1692 -0.0388 0.0829 -0.1553
#> ... [the output was truncated (use n=-1 to disable)]
#> [ CPUFloatType{20,1,5,5} ][ requires_grad = TRUE ]
```
It is usual for model parameters to then be passed to an optimiser.
This can also be initialized within a 'mirai' process.
``` r
optim <- mirai(optim_rmsprop(params = params), params = m$data$parameters)
optim[]
#>
#> Inherits from:
#> Public:
#> add_param_group: function (param_group)
#> clone: function (deep = FALSE)
#> defaults: list
#> initialize: function (params, lr = 0.01, alpha = 0.99, eps = 1e-08, weight_decay = 0,
#> load_state_dict: function (state_dict, ..., .refer_to_state_dict = FALSE)
#> param_groups: list
#> state: State, R6
#> state_dict: function ()
#> step: function (closure = NULL)
#> zero_grad: function (set_to_none = FALSE)
#> Private:
#> deep_clone: function (name, value)
#> step_helper: function (closure, loop_fun)
daemons(0)
#> [1] 0
```
Above, tensors and complex objects containing tensors were passed seamlessly between host and daemon processes, in the same way as any other R object.
The custom serialization in `mirai` leverages R's own native 'refhook' mechanism to allow such completely transparent usage. Designed to be fast and efficient, data copies are minimised and the 'official' serialization methods from the `torch` package are used directly.
[« Back to ToC](#table-of-contents)
### Database Hosting using Arrow Database Connectivity
It is possible using the `DBI` interface to access and manipulate data in the Apache Arrow data format efficiently through ABDC (Arrow Database Connectivity).
The example below creates an in-memory SQLite connection using the `adbcsqlite` backend.
Serialization is set up with the relevant serialization functions from the `arrow` package as part of the `everywhere()` call. Note that the specified class is 'nanoarrow_array_stream' as `nanoarrow` is the backend for all queries made by the DBI `db*Arrow()` functions.
``` r
library(mirai)
cfg <- serial_config(
class = "nanoarrow_array_stream",
sfunc = arrow::write_to_raw,
ufunc = function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
)
daemons(1, serial = cfg)
#> [1] 1
everywhere(
{
library(DBI) # `adbi` and `adbcsqlite` packages must also be installed
con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = ":memory:")
}
)
```
`mirai()` calls may then be used to write to or query the database all in the Arrow format.
``` r
m <- mirai(dbWriteTableArrow(con, "iris", iris))
m[]
#> 'miraiError' chr Error in h(simpleError(msg, call)): error in evaluating the argument 'conn' in selecting a method for function 'dbWriteTableArrow': object 'con' not found
m <- mirai(dbReadTableArrow(con, "iris"))
m[]
#> 'miraiError' chr Error in h(simpleError(msg, call)): error in evaluating the argument 'conn' in selecting a method for function 'dbReadTableArrow': object 'con' not found
m <- mirai(dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6'))
m[]
#> 'miraiError' chr Error in h(simpleError(msg, call)): error in evaluating the argument 'conn' in selecting a method for function 'dbGetQueryArrow': object 'con' not found
```
Due to the tight integration of the `mirai` serialization mechanism with R's 'refhook' system, we can easily return complex / nested objects containing multiple queries in the Arrow format:
``` r
m <- mirai({
a <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6')
b <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Width" < 2.6')
x <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Length" < 1.5')
y <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Width" < 0.2')
list(sepal = list(length = a, width = b), petal = list(length = x, width = y))
})
m[]
#> 'miraiError' chr Error in h(simpleError(msg, call)): error in evaluating the argument 'conn' in selecting a method for function 'dbGetQueryArrow': object 'con' not found
```
As before, `everywhere()` can be used again to cleanly tear down the databases, before resetting daemons.
``` r
everywhere(dbDisconnect(con))
daemons(0)
#> [1] 0
```
[« Back to ToC](#table-of-contents)
### Shiny / mirai / DBI / ADBC Integrated Example
The following is an example of how database connections hosted in mirai daemons may be used to power a Shiny app.
The one-time `serialization()` setup ensures seamless transport of Apache Arrow data, and occurs in the global environment outside of `server()`.
A new database connection is created in a new daemon process for every new Shiny session. The resources are freed when a sesssion ends. This logic is all defined within `server()`. A unique ID is used to identify each session, and is specified as the 'compute profile' for daemons.
Non-dispatcher daemons are created as scheduling is not required (all queries expected to take roughly the same time, and in this case each session uses only one daemon anyway).
Shiny ExtendedTask is then used to perform each query via a `mirai()` call, using the session-specific compute profile.
``` r
library(mirai)
library(secretbase)
library(shiny)
library(bslib)
# create an Arrow serialization configuration
cfg <- serial_config(
class = "nanoarrow_array_stream",
sfunc = arrow::write_to_raw,
ufunc = nanoarrow::read_nanoarrow
)
# write 'iris' dataset to temp database file (for this demonstration)
file <- tempfile()
con <- DBI::dbConnect(adbi::adbi("adbcsqlite"), uri = file)
DBI::dbWriteTableArrow(con, "iris", iris)
DBI::dbDisconnect(con)
# common input parameters
slmin <- min(iris$Sepal.Length)
slmax <- max(iris$Sepal.Length)
ui <- page_fluid(
p("The time is ", textOutput("current_time", inline = TRUE)),
hr(),
h3("Shiny / mirai / DBI / ADBC demonstration"),
p("New daemon-hosted database connection is created for every Shiny session"),
sliderInput(
"sl", "Query iris dataset based on Sepal Length", min = slmin, max = slmax,
value = c(slmin, slmax), width = "75%"
),
input_task_button("btn", "Return query"),
tableOutput("table")
)
# uses Shiny ExtendedTask with mirai
server <- function(input, output, session) {
# create unique session id by hashing current time with a random key
id <- secretbase::siphash13(Sys.time(), key = nanonext::random(4L))
# create new daemon for each session
daemons(1L, serial = cfg, .compute = id)
# tear down daemon when session ends
session$onEnded(function() daemons(0L, .compute = id))
# everywhere() loads DBI and creates ADBC connection in each daemon
# and sets up serialization
everywhere(
{
library(DBI) # `adbi` and `adbcsqlite` packages must also be installed
con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = file)
},
file = file,
.compute = id
)
output$current_time <- renderText({
invalidateLater(1000)
format(Sys.time(), "%H:%M:%S %p")
})
task <- ExtendedTask$new(
function(...) mirai(
dbGetQueryArrow(
con,
sprintf(
"SELECT * FROM iris WHERE \"Sepal.Length\" BETWEEN %.2f AND %.2f",
sl[1L],
sl[2L]
)
),
...,
.compute = id
)
) |> bind_task_button("btn")
observeEvent(input$btn, task$invoke(sl = input$sl))
output$table <- renderTable(task$result())
}
# run Shiny app
shinyApp(ui = ui, server = server)
# deletes temp database file (for this demonstration)
unlink(file)
```
[« Back to ToC](#table-of-contents)