Splitting Element IDs for Parallel Processing
element-splitting.RmdWhy split elements across jobs?
ModelArray supports within-process parallelism via the
n_cores parameter - multiple CPU cores in a single R
session working through elements in parallel. For many datasets, this is
sufficient.
However, for very large datasets (e.g., 600,000+ fixels with thousands of subjects), even a multi-core single job may take too long. In these cases, you can split the elements across multiple independent HPC jobs, each processing a different range of elements.
Generating split ranges
Use numElementsTotal() to get the total element count,
then divide it into ranges:
library(ModelArray)
modelarray <- ModelArray("data.h5", scalar_types = c("FDC"))
n_total <- numElementsTotal(modelarray, "FDC")
# Split into chunks of 100,000 elements
chunk_size <- 100000
starts <- seq(1, n_total, by = chunk_size)
ends <- pmin(starts + chunk_size - 1, n_total)
# Each job gets one of these ranges
for (i in seq_along(starts)) {
cat(sprintf("Job %d: element.subset = %d:%d\n", i, starts[i], ends[i]))
}Job 1: element.subset = 1:100000
Job 2: element.subset = 100001:200000
Job 3: element.subset = 200001:300000
Job 4: element.subset = 300001:400000
Job 5: element.subset = 400001:500000
Job 6: element.subset = 500001:600000
Job 7: element.subset = 600001:602229
Running each job
Each job runs the same model on its assigned range of elements. For example, an R script parameterized by job index:
# run_chunk.R
# Usage: Rscript run_chunk.R <job_index>
args <- commandArgs(trailingOnly = TRUE)
job_index <- as.integer(args[1])
library(ModelArray)
modelarray <- ModelArray("data.h5", scalar_types = c("FDC"))
phenotypes <- read.csv("cohort.csv")
chunk_size <- 100000
n_total <- numElementsTotal(modelarray, "FDC")
start <- (job_index - 1) * chunk_size + 1
end <- min(job_index * chunk_size, n_total)
result <- ModelArray.lm(
FDC ~ Age + sex + motion,
modelarray, phenotypes, "FDC",
element.subset = start:end,
n_cores = 4
)
# Save this chunk to its own HDF5 file
# (safe because each job writes to a different file)
chunk_h5 <- sprintf("result_chunk_%d.h5", job_index)
writeResults(chunk_h5, df.output = result, analysis_name = "results_lm_chunk")On an HPC cluster with a job array (e.g., SLURM):
#!/bin/bash
#SBATCH --array=1-7
#SBATCH --cpus-per-task=4
Rscript run_chunk.R $SLURM_ARRAY_TASK_ID
Combining results and writing to HDF5
After all jobs complete, read each chunk .h5,
concatenate rows, then write one final result to
data.h5:
library(ModelArray)
library(rhdf5)
# Collect chunk HDF5 files and order by numeric chunk index
chunk_files <- Sys.glob("result_chunk_*.h5")
if (length(chunk_files) == 0) {
stop("No chunk HDF5 files found (expected files like result_chunk_1.h5).")
}
chunk_ids <- as.integer(sub("result_chunk_([0-9]+)\\.h5$", "\\1", basename(chunk_files)))
if (anyNA(chunk_ids)) {
stop("Could not parse chunk index from one or more chunk filenames.")
}
chunk_files <- chunk_files[order(chunk_ids)]
# Read each chunk's saved results matrix + column names
results_list <- lapply(chunk_files, function(fn) {
mat <- h5read(fn, "results/results_lm_chunk/results_matrix")
col_names <- h5read(fn, "results/results_lm_chunk/column_names")
df <- as.data.frame(mat)
colnames(df) <- as.character(col_names)
df
})
# Optional consistency check across chunks
if (!all(vapply(results_list[-1], function(x) identical(colnames(x), colnames(results_list[[1]])), logical(1)))) {
stop("Column names differ across chunk files; cannot concatenate safely.")
}
full_result <- do.call(rbind, results_list)
# Verify we have all elements
cat("Total rows:", nrow(full_result), "\n")
# Single-process write back to the main HDF5 file
writeResults("data.h5", df.output = full_result, analysis_name = "results_lm")Important: only one process should write to the HDF5
file at a time. HDF5 does not support concurrent writes from multiple
processes — see vignette("hdf5-large-analyses") for details
on why.