如何加快稀疏矩阵向量乘法的代码?

计算科学 稀疏矩阵 矩阵 本征
2021-11-28 10:17:16

我编写了一个 C++ 函数,它将稀疏矩阵(以 CSR 格式存储)乘以密集向量。这是代码:

VectorXd csrMult(VectorXd x, vector<double> Adata, vector<int> Aindices, vector<int> Aindptr, int numRowsA) 
{
    VectorXd Ax = VectorXd::Zero(numRowsA); 
    for (int i = 0; i < numRowsA; i++)
    {
        for (int dataIdx = Aindptr[i]; dataIdx < Aindptr[i + 1]; dataIdx++)
        {
            Ax[i] += Adata[dataIdx] * x[Aindices[dataIdx]];
        }
    }

    return Ax;
}

这里 VectorXd 是 Eigen3 线性代数库提供的数据类型。输入 Adata、Aindices 和 Aindptr 描述了 CSR 格式的矩阵 A。

准确地说:Adata 是 A 的非零条目列表(按行主要顺序存储),Aindices[i] 告诉我们非零条目 Adata[i] 属于 A 的哪一列,Aindptr[i+1] - Aindptr [i] 是 A 的第 i 行中非零条目的数量。

我观察到 Eigen3 的稀疏矩阵向量乘法运算比我的 csrMult 函数快大约 5 倍,即使在禁用 openMP 时也是如此。但是,当我查看源代码SparseDenseProduct.h 时,我相信 Eigen3 正在使用它来计算这个矩阵向量积,我不清楚 Eigen3 是如何更快的。

问题:您对提高我的代码速度有什么建议吗?你能解释一下为什么 Eigen3 更快吗?


编辑 3:我注意到关于正在发生的事情的重要线索。当我通过将 A 的前 N ​​个条目以外的所有条目设置为 0 来缩小问题时(这样 A 变得更加稀疏),我的实现与 Eigen3 相比更好。事实上,如果我将 A 的前 10,000 个非零条目以外的所有条目都设置为 0,我的实现比 Eigen3 高出大约 3 倍。但是,对于我感兴趣的非常大的问题规模,Eigen3 比我的实现高出一个倍数2. 对于大问题,我仍然很想与 Eigen3 配合使用。


编辑2:正如@TylerOlsen 建议的那样,我将输入更改为通过引用传递,现在Eigen3 的速度仅是我的代码的两倍(禁用openMP)。所以这是一个显着的改进。这是我的代码的最新版本。我仍然需要弄清楚如何使我的代码速度提高两倍才能与 Eigen3 保持一致。

void csrMult_v3(VectorXd& Ax, VectorXd& x, vector<double>& Adata, vector<int>& Aindices, vector<int>& Aindptr)
{
    // This code assumes that the size of Ax is numRowsA.
    for (int i = 0; i < Ax.size(); i++)
    {
        double Ax_i = 0.0;
        for (int dataIdx = Aindptr[i]; dataIdx < Aindptr[i + 1]; dataIdx++)
        {
            Ax_i += Adata[dataIdx] * x[Aindices[dataIdx]];
        }

        Ax[i] = Ax_i;
    }       
}

编辑 1:我还尝试了以下代码,其中值 Ax[i] 累积在一个临时变量中,但对运行时的影响可以忽略不计。我仍在观察 Eigen3 大约快 5 倍(未启用 openMP)。

VectorXd csrMult_v2(VectorXd x, vector<double> Adata, vector<int> Aindices, vector<int> Aindptr, int numRowsA)
{
    VectorXd Ax = VectorXd::Zero(numRowsA);
    for (int i = 0; i < numRowsA; i++)
    {
        double Ax_i = 0.0;
        for (int dataIdx = Aindptr[i]; dataIdx < Aindptr[i + 1]; dataIdx++)
        {
            Ax_i += Adata[dataIdx] * x[Aindices[dataIdx]];
        }

        Ax[i] = Ax_i;
    }

    return Ax;
}
1个回答

第二次编辑后,您已经在这里:

for (int i = 0; i < Ax.size(); i++)
{
  double Ax_i = 0.0;
  for (int dataIdx = Aindptr[i]; dataIdx < Aindptr[i + 1]; dataIdx++)
  {
     Ax_i += Adata[dataIdx] * x[Aindices[dataIdx]];
  }

  Ax[i] = Ax_i;
}

但是请注意您如何在行i+1中初始化已经dataIdx具有的值,因为当它具有该值时您停止了行中的循环。因此,您可以将其转换为以下内容:Aindptr[i+1]i

int dataIdx = Aindptr[0];
for (int i = 0; i < Ax.size(); i++)
{
  double Ax_i = 0.0;
  for (; dataIdx < Aindptr[i + 1]; dataIdx++)
  {
     Ax_i += Adata[dataIdx] * x[Aindices[dataIdx]];
  }

  Ax[i] = Ax_i;
}

您需要知道的下一条信息是您正在线性地遍历Aindices数组,从左到右,因为您的索引dataIdx是逐个递增的。因此,您可以为自己保存下标并执行以下操作:

int dataIdx = Aindptr[0];
int *Aindex = &Aindices[dataIdx];
for (int i = 0; i < Ax.size(); i++)
{
  double Ax_i = 0.0;
  for (; dataIdx < Aindptr[i + 1]; ++dataIdx,++Aindex)
  {
     Ax_i += Adata[dataIdx] * x[*Aindex];
  }

  Ax[i] = Ax_i;
}

试试这个。