Introduction

Our team often deals with sparse matrices. In recommendation systems, the user/item rating matrix, which shows the extent of how much a user likes an item (e.g., a movie), is typically very sparse. Rather than a user/movie rating matrix, we often look at user/host login matrices to find out which logins are unusual.

sparse host login matrix

The sparse host login matrix

 

The plot above shows a typical user/host login matrix. Each login from a user to a host is shown. Interesting patterns emerge; for example you can see that some user accounts visit almost all 3500 hosts so they might be scripted to do a manual job. The visualization can be misleading but the sparsity of the matrix is about 99.7%!

The problem

A common task in algorithms for recommendation engines is the calculation the matrix product of two sparse matrices. Let us call the operands of the multiplication left matrix and right matrix. The product matrix itself will be sparse as well. When these matrices are big, we cannot possibly afford storing any of them in a dense way. They probably would not even fit into the memory, causing a lot of headache.

For one of our anomaly detector algorithm, we needed to implement a scalable sparse matrix-matrix multiplication in Apache Spark. We first turned to the documentation of Spark MLlib to learn what is implemented in Spark already. You can first read the summary of our findings. Then we present a solution for sparse matrix multiplication that worked better for us than the currently implemented functions in Spark.

Disclaimer: in the following part of this post, the cited source and API documentation reflect the state of Apache Spark as of November 2016, i.e., v2.0.2. As Spark rapidly evolves, the summary and findings might become outdated in the future.

Matrix multiplication implementations in Spark

In Spark MLlib, there are two main types of matrices: local matrices are stored on a single machine, while distributed matrices are stored on multiple machines. As of now, there are two implementations of local matrices: DenseMatrix and SparseMatrix, and four implementations of distributed matrices: CoordinateMatrix, IndexedRowMatrix, RowMatrix and BlockMatrix. We wanted to store our matrix distributively so we looked at those possibilities.

According to the documentation, a CoordinateMatrix should be used to store very sparse matrices with huge dimensions, which seemed to be our use case. A CoordinateMatrix is built up from MatrixEntry instances (actually from an RDD[MatrixEntry]), each MatrixEntry describing a non-zero element in the matrix with its coordinates and value. Unfortunately for us, matrix multiplication is not yet a supported operation of the CoordinateMatrix class.

You can think of IndexedRowMatrix as a collection of IndexedRows, where an IndexedRow is a tuple of a row index and a Vector. The Vector can be a sparse vector. Thus, an IndexedRowMatrix can also work as a sparse representation of a matrix. RowMatrix is similar to IndexedRowMatrix but an entry in such a matrix is only a Vector without the row index. A RowMatrix can be multiplied with another matrix, and a CoordinateMatrix can be coverted into a RowMatrix with the toRowMatrix method. You might think that this way we can solve sparse matrix multiplication. But only until you find in the source of RowMatrix.scala that the right matrix should be a DenseMatrix, so both matrices in the operation cannot be sparse:

def multiply(B: Matrix): RowMatrix = {
  val n = numCols().toInt
  val k = B.numCols
  require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}")
  
  require(B.isInstanceOf[DenseMatrix],
    s"Only support dense matrix at this time but found ${B.getClass.getName}.")
  
  ...
}

(excerpt from the multiply method of RowMatrix)

The case of BlockMatrix is similar to the previous. It has a multiply method; however, from its Scala API docs it turns out that if the right matrix is SparseMatrix, it will be converted to a DenseMatrix. We tried it but worked miserably for us as the dense matrix could not fit into memory.

We have seen that distributed sparse matrix multiplication is not yet solved in Spark. To give you a full picture let us mention that in local matrix multiplication, again, the right matrix should be dense.

A naive solution

After being disappointed for a minute, it occurred to us that a naive implementation of the multiplication of two sparse matrices should not be hard to realize. We recalled having studied how to multiply matrices in the MapReduce way in the book Mining of Massive Datasets by Leskovec, Rajaraman and Ullman. In the following we build on the ideas and notations from Chapter 2.3.9 of said book; it is worth taking a look there.

Let us call the left matrix \(M\). It consists of \(i\) rows, \(j\) columns and has entries \(m_{ij}\). Likewise, the right matrix is \(N\) with \(j\) rows, \(k\) columns and entries \(n_{jk}\). The product matrix \(P\) has entries \(p_{ik}\) where \(p_{ik} = \sum_{j} m_{ij}n_{jk} \).

Our CoordinateMatrix implementation of the algorithm works like this:

def coordinateMatrixMultiply(leftMatrix: CoordinateMatrix, rightMatrix: CoordinateMatrix): CoordinateMatrix = {
  val M_ = leftMatrix.entries.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
  val N_ = rightMatrix.entries.map({ case MatrixEntry(j, k, w) => (j, (k, w)) })

  val productEntries = M_
    .join(N_)
    .map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
    .reduceByKey(_ + _)
    .map({ case ((i, k), sum) => MatrixEntry(i, k, sum) })

  new CoordinateMatrix(productEntries)
}

First we arrange the entries of both matrices into pairs to form key/value RDDs. For the left matrix, the key will be the column index, for the right matrix it will be the row index, both denoted by \(j\). The value of the key/value pair is a tuple itself comprising the other coordinate and the value of the current matrix entry.

The key of the join is \(j\), so the entries with the same \(j\) will stick together (that is, entries of matrix \(M\) from column \(j\) and entries of matrix \(N\) from row \(j\)). The elements of the same \(j\) will be multiplied together (see the formula of the matrix product and the first map after the join), then the products with the same coordinates in \(P\) will be summed together (reduceByKey).

In the end, we just map what we calculated to obtain MatrixEntry elements so that we can make a CoordinateMatrix from them.

This solution might be naive but so far it works for our use case. At least none of the matrices are converted into their dense representations, making the multiplication more scalable than what the API currently provides. Obviously its limits should be further analyzed. If you have any insights about this, we would be curious to hear it.

Conclusion

During building a recommedation system, we faced the problem of large sparse matrix multiplication with Apahce Spark. Since Spark MLlib did not contain a suitable solution for us, we fabricated our own implementation. Hopefully, this blog post will help you on your way with dealing with large sparse matrix using Spark.