Recently I encountered with the problem of range minimum query (RMQ) and there are many ways to solve it, depending on the trade off between preprocessing complexity and per-query runtime complexity. Block-paritioning and a sparse table would be clever techniques. A hybrid of both would need some more code but brings great performance.

For reference, this is how we can do a RMQ with different algorithms:

import math
import random
import sys

def minargmin(arr, L, R):
"""Reusable function to find the min element in arr between indices L and R inclusive

Returns the min value and the index
"""
try:
return min((v,i) for i,v in enumerate(arr[L:R+1], L))
except ValueError:
return (float("inf"), -1)

def naive(arr, queries):
"""Complexity: < O(1), O(n) >

Search on array directly every time, O(1) preprocessing as none is done,
O(n) time for each query"""
ans = []
for L,R in queries:
ans.append(minargmin(arr, L, R))
return ans

def lookup(arr, queries):
"""Complexity < O(n^2), O(1) >

Build lookup table of O(n^2) size for all possible queries, then lookup
"""
# preprocess a lookup table
table, N = [], len(arr)
for i in range(N):
table.append([0] * N)
idx, val = i, arr[i]
table[i][i] = i
for j in range(i+1, N):
if arr[j] < val:
idx, val = j, arr[j]
table[i][j] = idx
# find answer for each query
ans = []
for L,R in queries:
idx = table[L][R]
val = arr[idx]
ans.append((val,idx))
return ans

def blockpartition(arr, queries):
"""Complexity < O(n), O(sqrt(n)) >

Build a block-partition table of minimum and combine the array search with table for answer.
Need to consider the corner case that the query range covers no block
"""
# build a block-partition table
N = len(arr)
sqrtN = int(math.sqrt(N))
block = [minargmin(arr, i, i+sqrtN-1) for i in range(0, N, sqrtN)]
def blockmin(begin, end):
try:
return min(block[begin:end])
except ValueError:
return float("inf"), -1
# search on query
ceildiv = lambda a,b: -(a // -b)  # ceiling division
ans = []
for L, R in queries:
block_begin = ceildiv(L, sqrtN)
block_end = R // sqrtN
if block_begin < block_end:
# we have one block at least!
L_end = block_begin * sqrtN
R_begin = block_end * sqrtN
candidate = [
minargmin(arr, L, L_end-1),
blockmin(block_begin, block_end),
minargmin(arr, R_begin, R)
]
ans.append(min(candidate))
else:
# range too small to use any block, find minimum directly
ans.append(minargmin(arr, L, R))
return ans

def sparsetable(arr, queries):
"""Complexity < O(n log n), O(1) >

Build a sparse table for lookup, table only supports range of 2^n but can
start anywhere. Any range (L,R) can be a combination of two range at most.
"""
# preprocess a sparse table
N = len(arr)
log2 = N.bit_length()
table = [[0]*log2 for _ in range(N)]
# fill the first column
for i in range(N):
table[i][0] = (arr[i], i)
# fill the subsequent columns
for j in range(1,log2):
size = 1 << j
halfsize = size >> 1
for i in range(N):
if i+size > N:
break # unusable range
table[i][j] = min(table[i][j-1], table[i+halfsize][j-1])
# search on query
ans = []
for L, R in queries:
length = R-L+1
log2 = length.bit_length() - 1
pow2 = 1 << log2
R2 = R - pow2 + 1
# (L,R) as union of (L,L+pow2-1) and (R2,R)
ans.append(min(table[L][log2], table[R2][log2]))
return ans

def test(N, Q):
# Generate random array of N integers
arr = [random.randint(0, 5000) for _ in range(N)]
# Generate queries (L,R) for finding minimum in arr[L:R+1]
queries = [sorted([random.randint(0,N-1), random.randint(0,N-1)]) for _ in range(Q)]
print(arr)
print(f"Queries: {queries}")
ans_1 = naive(arr, queries)
print(", ".join(f"arr[{idx}]={val}" for val,idx in ans_1))
ans_2 = lookup(arr, queries)
print(", ".join(f"arr[{idx}]={val}" for val,idx in ans_2))
assert ans_1 == ans_2
ans_3 = blockpartition(arr, queries)
print(", ".join(f"arr[{idx}]={val}" for val,idx in ans_3))
assert ans_1 == ans_3
ans_4 = sparsetable(arr, queries)
print(", ".join(f"arr[{idx}]={val}" for val,idx in ans_4))
assert ans_1 == ans_4

if __name__ == "__main__":
try:
N = int(sys.argv[1])
except:
N = 100
try:
Q = int(sys.argv[2])
except:
Q = 30
test(N, Q)


There is Fischer-Heun structure too, but it would be more code to implement and harder to explain.

Now here’s the Arpa’s trick. First we use a disjoint set union (DSU) for each element of a unsorted array arr. A DSU is merely some tree structure stored as an array, zero-indexed with length N, that each element is an integer of 0 to N such that it is a pointer to another element in the array. An element may point to itself, which is the root of a tree. Every element in the array is an arc in a digraph and no cycle should be formed. It is called DSU because all elements in the same tree will be considered as in the same set. The tree would not necessarily binary. In fact, for optimal use, it should be a very fat tree with high fan-out at root and depth 1.

What the DSU does here is to tell on each element, what is the next number smaller than myself. It is used like this (original was in C++, converted the code into Python):

import numpy as np

arr = [3928,   53, 3093, 4657, 2209, 1823, 3613, 1018,  129,   32,  # 10
3585,  903, 1538, 2462, 2092, 2093, 2230, 3209, 2800, 1689,  # 20
4938, 3443,  386, 2725, 3363, 2351, 2696, 1641, 3931, 1073,  # 30
3121, 2160, 1132, 2829, 2447, 2411,  381, 3528, 3309, 1496,  # 40
4439, 4848, 4050, 2572,  158, 1076, 4222,  662, 3294, 4084,  # 50
4312, 2752, 4420,  210, 4073, 1403,  800,  766, 2433, 1255,  # 60
4260, 1391,  215, 1826,  488, 4379, 2582, 4896, 1245, 1328,  # 70
1093, 2146, 1081,   48, 4918, 1037, 2653, 2201, 2080,  656,  # 80
1124, 2575, 2037,  183, 2912, 2952, 2409, 1323, 1764, 2647,  # 90
2035, 1950, 4997,  844, 2437, 2825, 4001, 3263, 3897, 2227]  # 100
# variables
N = len(arr)                 # size of input
dsu = list(range(N))         # DSU
stack = []                   # stack
# algorithm
for i in range(N):
while stack and arr[stack[-1]] >= arr[i]:
dsu[stack.pop()] = i
stack.append(i)
with np.printoptions(precision=2, linewidth=80, suppress=True, threshold=1000):
print("Array")
print(np.array(arr).reshape(10,-1))
print("DSU")
print(np.array(dsu).reshape(10,-1))
print("Stack")
print(stack)


The use of numpy above is just for aesthetic display. Initially the DSU makes every node a tree of itself. The for loop moves the cursor i from index 0 till the end of the array arr. All indices will be pushed into the stack. But when we are at an index, we check with the elements at the top of the stack and rewrite the DSU to the current index if the array’s element is greater. This for loop essentially makes the DSU tells what is the next element that is smaller than myself.

The above code runs to produce the following:

Array
[[3928   53 3093 4657 2209 1823 3613 1018  129   32]
[3585  903 1538 2462 2092 2093 2230 3209 2800 1689]
[4938 3443  386 2725 3363 2351 2696 1641 3931 1073]
[3121 2160 1132 2829 2447 2411  381 3528 3309 1496]
[4439 4848 4050 2572  158 1076 4222  662 3294 4084]
[4312 2752 4420  210 4073 1403  800  766 2433 1255]
[4260 1391  215 1826  488 4379 2582 4896 1245 1328]
[1093 2146 1081   48 4918 1037 2653 2201 2080  656]
[1124 2575 2037  183 2912 2952 2409 1323 1764 2647]
[2035 1950 4997  844 2437 2825 4001 3263 3897 2227]]
DSU
[[ 1  9  4  4  5  7  7  8  9  9]
[11 22 22 14 19 19 19 18 19 22]
[21 22 36 25 25 27 27 29 29 36]
[31 32 36 34 35 36 44 38 39 44]
[42 42 43 44 73 47 47 53 51 51]
[51 53 53 73 55 56 57 62 59 62]
[61 62 73 64 73 66 68 68 70 70]
[72 72 73 73 75 79 77 78 79 83]
[83 82 83 83 86 86 87 93 93 90]
[91 93 93 93 99 99 97 99 99 99]]
Stack
[9, 73, 83, 93, 99]


We can see that dsu[0]==1 as the next element in arr that is smaller than arr[0] is arr[1]. Similarly dsu[8]==9. But dsu[9]==9 because itself is the smallest element in the array. We also have dsu[73]==73 because arr[73]==48, the second smallest element in the array. We have dsu[73] != 9 because we always have dsu[i] >= i for it points to the next smaller element. Upon finish, the stack has all the elements that has dsu[i]==i, i.e., the root of trees. The DSU partitions nodes into index ranges 0 to $$k_1$$, then $$(k_1+1)$$ to $$k_2$$, etc., with final partition $$(k_{n-1}+1)$$ to $$k_n=N-1$$. Each partition is one set such that if we navigate from index i according to the index pointed by dsu[i], we eventually reached and stay at $$k_m$$ which i was started in the same partition. For example, let’s pick i==23. We see this:

dsu[23] == 25
dsu[25] == 27
dsu[27] == 29
dsu[29] == 36
dsu[36] == 44
dsu[44] == 73
dsu[73] == 73


and we will never jump to below 23 or above 73. Now we can see that the line dsu[stack.pop()] = i is to merge an existing set to a new root in DSU.

This is a powerful structure because for any range $$(L,R)$$ with $$R$$ at the end of the array (or DSU), we just need to start with the index $$L$$ of the DSU and navigate until we meet the root. Then it is the minimum of the range. In fact, the DSU we built in the above loop is incremental. When the loop index is at i, we processed the input array arr up to index i and the DSU works as we described in the above paragraph for up to index i too.

Here is how we can use this DSU trick to do the RMQ. Consider the queries and additional code below:

queries = [[61, 78], [53, 74], [14, 26], [15, 96], [63, 80],
[ 3, 62], [ 1, 49], [ 2, 57], [ 9, 33], [16, 83],
[69, 80], [62, 84], [25, 58], [29, 75], [28, 55],
[12, 53], [52, 97], [11, 96], [66, 98], [ 9, 27],
[39, 86], [23, 88], [22, 96], [66, 68], [56, 83],
[ 3,  7], [31, 44], [ 9, 88], [ 5, 60], [18, 71]]
Q = len(queries)             # number of queries
qu = [[] for _ in range(N)]  # 2D array to group queries by R-end
ans = [None]*Q               # to hold answers
# set up 2D array of queries
for i,(L,R) in enumerate(queries):
qu[R].append(i)
# helper function on DSU
def root(k):
if dsu[k] == k:
return k
dsu[k] = root(dsu[k])
return dsu[k]
# algorithm
for i in range(N):
while stack and arr[stack[-1]] >= arr[i]:
dsu[stack.pop()] = i
stack.append(i)
for j in qu[i]:
idx = root(queries[j][0])
ans[j] = [idx, arr[idx]]


The function root(k) is to get the root of a tree that element k belongs in the DSU. When it is invoked, it may mutate the DSU to make it more efficient by bringing nodes to direct element of the root (a.k.a. path compression). Hence after we called root(23), we will see dsu[23]==73 so as dsu[25]==73 etc., due to the recursive nature of the function. This is allowed because the ultimate use of DSU is to find the root, i.e., the minimum element found so far starting from index i, rather than what is the next smaller element.

Range queries are presented as a pair $$(L,R)$$ which, in Python notation, is to find min(arr[L:R+1]). We assumed $$L\le R$$. We used a kind of radix sort to organize the queries by the $$R$$ term. And we answer those queries right after we updated the DSU up to index $$R$$. We can’t wait until the DSU is fully processed for the entire array or otherwise the DSU will reflect the minimum in ranges of $$(L,N-1)$$ instead.

The following is the complete code, with the model answer to verify the correctness of the code:

import numpy as np

# input array
arr = [3928,   53, 3093, 4657, 2209, 1823, 3613, 1018,  129,   32,  # 10
3585,  903, 1538, 2462, 2092, 2093, 2230, 3209, 2800, 1689,  # 20
4938, 3443,  386, 2725, 3363, 2351, 2696, 1641, 3931, 1073,  # 30
3121, 2160, 1132, 2829, 2447, 2411,  381, 3528, 3309, 1496,  # 40
4439, 4848, 4050, 2572,  158, 1076, 4222,  662, 3294, 4084,  # 50
4312, 2752, 4420,  210, 4073, 1403,  800,  766, 2433, 1255,  # 60
4260, 1391,  215, 1826,  488, 4379, 2582, 4896, 1245, 1328,  # 70
1093, 2146, 1081,   48, 4918, 1037, 2653, 2201, 2080,  656,  # 80
1124, 2575, 2037,  183, 2912, 2952, 2409, 1323, 1764, 2647,  # 90
2035, 1950, 4997,  844, 2437, 2825, 4001, 3263, 3897, 2227]  # 100
# queries, (L,R) inclusive
queries = [[61, 78], [53, 74], [14, 26], [15, 96], [63, 80],
[ 3, 62], [ 1, 49], [ 2, 57], [ 9, 33], [16, 83],
[69, 80], [62, 84], [25, 58], [29, 75], [28, 55],
[12, 53], [52, 97], [11, 96], [66, 98], [ 9, 27],
[39, 86], [23, 88], [22, 96], [66, 68], [56, 83],
[ 3,  7], [31, 44], [ 9, 88], [ 5, 60], [18, 71]]
model_answer = [[73,  48], [73,  48], [22, 386], [73,  48], [73,  48],
[ 9,  32], [ 9,  32], [ 9,  32], [ 9,  32], [73,  48],
[73,  48], [73,  48], [44, 158], [73,  48], [44, 158],
[44, 158], [73,  48], [73,  48], [73,  48], [ 9,  32],
[73,  48], [73,  48], [73,  48], [68,1245], [73,  48],
[ 7,1018], [44, 158], [ 9,  32], [ 9,  32], [44, 158]]
# variables
N = len(arr)                 # size of input
Q = len(queries)             # number of queries
stack = []                   # stack
dsu = list(range(N))         # DSU
ans = [None]*Q
qu = [[] for _ in range(N)]  # 2D array of queries
# set up 2D array of queries
for i,(L,R) in enumerate(queries):
qu[R].append(i)
# helper function on DSU
def root(k):
if dsu[k] == k:
return k
dsu[k] = root(dsu[k])
return dsu[k]
# algorithm
for i in range(N):
while stack and arr[stack[-1]] >= arr[i]:
dsu[stack.pop()] = i
stack.append(i)
for j in qu[i]:
idx = root(queries[j][0])
ans[j] = [idx, arr[idx]]

This code processed the input array sequentially. When the DSU is built, the while-loop will run in the order of the size of the stack. Similarly at the inner for-loop, the DSU root query will be done in the order of the depth of a tree ($$O(\log n)$$). When the path compression is performed, a DSU should be run in the order of $$O(\alpha(n))$$ for $$\alpha(n)$$ the inverse Ackermann function, which is very close to $$O(1)$$. Hence the total overall time complexity (so as space complexity) should be close to $$O(N+Q)$$.