0x00 摘要
在这个系列中,咱们介绍了 HugeCTR,这是一个面向行业的引荐体系练习结构,针对具有模型并行嵌入和数据并行密布网络的大规模 CTR 模型进行了优化。本文介绍 DistributedSlotSparseEmbeddingHash 的后向操作。
其间学习了HugeCTR源码阅读 这篇大作,特此感谢。
本系列其他文章如下:
[源码解析] NVIDIA HugeCTR,GPU版别参数服务器 –(1)
[源码解析] NVIDIA HugeCTR,GPU版别参数服务器— (2)
[源码解析] NVIDIA HugeCTR,GPU版别参数服务器—(3)
[源码解析] NVIDIA HugeCTR,GPU版别参数服务器— (4)
[源码解析] NVIDIA HugeCTR,GPU版别参数服务器— (5) 嵌入式hash表
[源码解析] NVIDIA HugeCTR,GPU版别参数服务器— (6) — Distributed hash表
[源码解析] NVIDIA HugeCTR,GPU 版别参数服务器—(7) —Distributed Hash之前向传达
0x01 回忆
前文咱们介绍了Distributed Hash之前向传达过程,其逻辑流程如下:
本文咱们来看看怎么进行后向传达。
0x02 总述
反向传达是求各种权重的变化对终究的差错能造成什么样的影响,或许说是各种权重怎么调整能让预估差错尽或许小,其实便是给各种权重找到梯度下降最快的方向,让损失函数快速地大局达到一个最优点。
2.1 注释
咱们从注释之中能够看到总共有如下思路,关于后向传达来说,便是核算梯度,然后更新嵌入表。咱们后续就依照这个思路来剖析代码。
/**
* All the CUDA kernel functions used by embedding layer are defined in this file, including
* forward propagation, backward propagation. The functions are defined by propagation type
* and combiner type(sum or mean) as below:
* 1) forward
* sum: calling forward_sum_kernel()
* mean: calling foward_sum_kernel() + forward_scale_kernel()
* 2) backward:
* calculating wgrad:
* sum: calling backward_sum_kernel()
* mean: calling backward_mean_kernel()
* update embedding table: including several steps as below,
* step1: expand sample IDs, calling sample_id_expand_kernel()
* step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib)
* step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib)
* step4: count the number for each unduplicated value_index, calling value_count_kernel()
* step5: use optimizer method to compute deltaw, and record corresponding, including three
* types of optimizer: Adam: caling opt_adam_kernel() Momentum sgd: calling
* opt_momentum_sgd_kernel() Nesterov: calling opt_nesterov_kernel() step6: update embedding table
* by deltaw, calling update_kernel()
*/
2.2 代码
在 session::train() 之中有如下代码,这些就对应了整体思路。
- backward 进行反向传达核算。
- exchange_wgrad 进行交流梯度。
- update_params 来更新参数。
// Embedding backward
for (const auto& one_embedding : embeddings_) {
one_embedding->backward();
}
// Exchange wgrad and update params
if (networks_.size() > 1) {
#pragma omp parallel num_threads(networks_.size())
{
size_t id = omp_get_thread_num();
exchange_wgrad(id);
networks_[id]->update_params();
}
} else if (resource_manager_->get_global_gpu_count() > 1) {
exchange_wgrad(0);
networks_[0]->update_params();
}
for (const auto& one_embedding : embeddings_) {
one_embedding->update_params();
}
0x03 输入
咱们首要看看怎么获取反向传达的输入。由于从嵌入层比较难以查找,咱们换个思路,从 reshape 层来看看。
3.1 界说
能够看到,其主要成员变量便是输入 in_tensors_ 和输出 out_tensors_。
/**
* Layer which reshapes a 3D/2D input tensor to 2D output tensor,
* e.g., (batch_size, n_slots, vector_size) to (batch_size, n_slots * vector_size),
* e.g., (batch_size * n_slots, vector_size) to (batch_size, n_slots * vector_size),
* If the input tensor is 3D, you can choose which slots participate by calling the different Ctor
*/
template <typename T>
class ReshapeLayerCPU : public LayerCPU {
/*
* stores the weight tensors of this layer.
*/
Tensors2<T> weights_;
/*
* stores the weight gradient tensors of this layer.
*/
Tensors2<T> wgrad_;
/*
* stores the references to the input tensors of this layer.
*/
Tensors2<T> in_tensors_;
/*
* stores the references to the output tensors of this layer.
*/
Tensors2<T> out_tensors_;
bool in_place_;
int batch_size_;
int n_slot_;
int vector_length_;
size_t n_active_slot_;
Tensor2<int> selected_tensor_;
std::vector<int> selected_;
}
3.2 切换
从代码能够知道,在练习时分便是反复利用了这两个成员变量 in_tensor 和 out_tensor 来做切换。
- 前向传达时分,fprop是把数据从in_tensor拷贝到out_tensor。
- 后向传达时分,bprop 是把数据从out_tensor拷贝到in_tensor。
所以,前向传达的输入变量,在反向传达时分被用来作为输入变量。因而咱们能够知道嵌入层也是这个套路。
template <typename T>
void ReshapeLayer<T>::fprop(bool is_train) {
prop_common(true, is_train, get_gpu().get_stream());
}
template <typename T>
void ReshapeLayer<T>::bprop() {
prop_common(false, true, get_gpu().get_stream());
}
template <typename T>
void ReshapeLayer<T>::prop_common(bool forward, bool is_train, cudaStream_t stream) {
CudaDeviceContext context(get_device_id());
Tensor2<T>& in_tensor = get_in_tensors(is_train)[0];
Tensor2<T>& out_tensor = out_tensors_[0];
if (in_place_) {
if (forward) { // 前向传达
CK_CUDA_THROW_(cudaMemcpyAsync(out_tensor.get_ptr(), in_tensor.get_ptr(),
in_tensor.get_size_in_bytes(), cudaMemcpyDeviceToDevice,
stream));
} else { // 反向传达
CK_CUDA_THROW_(cudaMemcpyAsync(in_tensor.get_ptr(), out_tensor.get_ptr(),
out_tensor.get_size_in_bytes(), cudaMemcpyDeviceToDevice,
stream));
}
} else {
int block_size = 128;
int n_block = get_gpu().get_sm_count() * 16;
T* in = in_tensor.get_ptr();
T* out = out_tensor.get_ptr();
reshape_kernel<<<n_block, block_size>>>(in, out, batch_size_, n_slot_, vector_length_,
selected_tensor_.get_ptr(), n_active_slot_, forward);
}
#ifndef NDEBUG
CK_CUDA_THROW_(cudaDeviceSynchronize());
CK_CUDA_THROW_(cudaGetLastError());
#endif
}
0x04 backward
4.1 整体代码
由之前剖析咱们能够知道,反向传达时分,输入的梯度便是存储在embedding_data_.get_output_tensors(true)
之中。整体代码分为两部分,榜首步是运用all-gather 操作来在每个GPU之上都收集到一切样本的悉数梯度。第二步是调用 functors_.backward
进行核算。
/**
* The first stage of backward propagation of embedding layer,
* which only computes the wgrad by the dgrad from the top layer.
*/
void backward() override {
// Read dgrad from output_tensors -> compute wgrad
// do all-gather to collect the top_grad
size_t send_count = embedding_data_.get_batch_size_per_gpu(true) *
embedding_data_.embedding_params_.slot_num *
embedding_data_.embedding_params_.embedding_vec_size;
functors_.all_gather(send_count, embedding_data_.get_output_tensors(true),
embedding_feature_tensors_, embedding_data_.get_resource_manager());
// do backward
functors_.backward(embedding_data_.embedding_params_.get_batch_size(true),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size,
embedding_data_.embedding_params_.combiner, row_offset_allreduce_tensors_,
embedding_feature_tensors_, wgrad_tensors_,
embedding_data_.get_resource_manager());
return;
}
4.2 AllGather
反向传达的榜首步是运用 all-gather 操作来在每个 GPU 之上都收集到的一切样本的悉数梯度,这样后续能够进行核算而且更新每个 GPU 之上的参数。
4.2.1 原理
首要咱们看 AllGather 原理。在履行 AllGather 操作时,K个处理器之中,每个处理器都会将来自每个处理器的N个值聚集成一个维度为K*N的输出。输出是按rank索引排序的。AllGather操作会受到不同rank或设备映射的影响,由于rank决议了数据布局。
留意:履行ReduceScatter + AllGather,就等同于AllReduce操作。
4.2.2 代码
调用代码如下,能够看到其会把梯度从反向传达的输入 embedding_data_.get_output_tensors(true)
拷贝到 embedding_feature_tensors_
。因而,embedding_feature_tensors_ 将会具有一切的梯度。
functors_.all_gather(send_count, embedding_data_.get_output_tensors(true),
embedding_feature_tensors_, embedding_data_.get_resource_manager());
算子如下:
/**
* collection communication: all_gather.
* @param send_count the count of elements will be sent.
* @param send_tensors the send tensors of multi GPUs.
* @param recv_tensors the recv tensors of multi GPUs.
* @param device_resources all gpus device resources.
* @param context gpu device context, for switching device.
*/
template <typename Type>
void SparseEmbeddingFunctors::all_gather(size_t send_count, const Tensors2<Type> &send_tensors,
Tensors2<Type> &recv_tensors,
const ResourceManager &resource_manager) {
size_t local_gpu_count = resource_manager.get_local_gpu_count();
size_t total_gpu_count = resource_manager.get_global_gpu_count();
// need to know the Type
ncclDataType_t type;
switch (sizeof(Type)) {
case 2:
type = ncclHalf;
break;
case 4:
type = ncclFloat;
break;
default:
CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
}
// for multi GPUs, use NCCL to do All-Gather
if (total_gpu_count > 1) {
CK_NCCL_THROW_(ncclGroupStart());
for (size_t id = 0; id < local_gpu_count; id++) {
const auto &local_gpu = resource_manager.get_local_gpu(id);
CK_NCCL_THROW_(ncclAllGather(send_tensors[id].get_ptr(), // send buff
recv_tensors[id].get_ptr(), // recv buff
send_count, type, local_gpu->get_nccl(),
local_gpu->get_stream()));
}
CK_NCCL_THROW_(ncclGroupEnd());
}
// for single GPU, just do memcpyD2D
else { // total_gpu_count == 1
const auto &local_gpu = resource_manager.get_local_gpu(0);
CudaDeviceContext context(local_gpu->get_device_id());
CK_CUDA_THROW_(cudaMemcpyAsync(recv_tensors[0].get_ptr(), send_tensors[0].get_ptr(),
send_count * sizeof(Type), cudaMemcpyDeviceToDevice,
local_gpu->get_stream()));
}
return;
}
4.3 backward
这部分完结如下功用:核算本地每个gpu上的梯度。此函数完结之后,wgrad_tensors_ 成员变量便是本GPU核算发生的新梯度。
// do backward
functors_.backward(embedding_data_.embedding_params_.get_batch_size(true),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size,
embedding_data_.embedding_params_.combiner, row_offset_allreduce_tensors_,
embedding_feature_tensors_, wgrad_tensors_,
embedding_data_.get_resource_manager());
calculating wgrad,会挑选如下两种之一:
- sum: calling backward_sum_kernel() ;
- mean: calling backward_mean_kernel();
详细backward代码如下:
template <typename TypeHashKey, typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::backward(size_t batch_size,
const std::vector<size_t> &slot_num_per_gpu,
size_t embedding_vec_size, int combiner,
const Tensors2<TypeHashKey> &row_offset_allreduce_tensors,
const Tensors2<TypeEmbeddingComp> &embedding_feature_tensors,
Tensors2<TypeEmbeddingComp> &wgrad_tensors,
const ResourceManager &resource_manager) {
size_t local_gpu_count = resource_manager.get_local_gpu_count();
CudaDeviceContext context;
for (size_t id = 0; id < local_gpu_count; id++) { // 遍历本地GPU
if (slot_num_per_gpu[id] == 0) {
continue;
}
const auto &local_gpu = resource_manager.get_local_gpu(id);
context.set_device(local_gpu->get_device_id());
// 拿到某一个GPU对应的梯度和offset信息
const TypeEmbeddingComp *top_grad = embedding_feature_tensors[id].get_ptr();
const TypeHashKey *row_offset = row_offset_allreduce_tensors[id].get_ptr();
TypeEmbeddingComp *wgrad = wgrad_tensors[id].get_ptr();
// 核算更新本地梯度
if (combiner == 0) // sum
{
backward_sum(batch_size, slot_num_per_gpu[id], embedding_vec_size, top_grad, wgrad,
local_gpu->get_stream());
} else if (combiner == 1) // mean
{
backward_mean(batch_size, slot_num_per_gpu[id], embedding_vec_size, row_offset, top_grad,
wgrad, local_gpu->get_stream());
} else {
CK_THROW_(Error_t::WrongInput, "Invalid combiner type ");
}
}
return;
}
咱们以backward_sum 为例,这儿采用了GPU多线程更新以加快速度。
template <typename TypeEmbeddingComp>
void backward_sum(size_t batch_size, size_t slot_num, size_t embedding_vec_size,
const TypeEmbeddingComp *top_grad, TypeEmbeddingComp *wgrad,
cudaStream_t stream) {
const size_t grid_size = batch_size; // each block corresponds to a sample
const size_t block_size = embedding_vec_size;
backward_sum_kernel<<<grid_size, block_size, 0, stream>>>(batch_size, slot_num,
embedding_vec_size, top_grad, wgrad);
}
// backward kernel function: for combiner=sum
template <typename TypeEmbeddingComp>
__global__ void backward_sum_kernel(int batch_size, int slot_num, int embedding_vec_size,
const TypeEmbeddingComp *top_grad, TypeEmbeddingComp *wgrad) {
int tid = threadIdx.x;
int bid = blockIdx.x;
if (bid < batch_size && tid < embedding_vec_size) {
for (int i = 0; i < slot_num; i++) {
// 先找到某一个稠密张量的方位,再加上tid就得到了张量之中某一个元素(本tid对应的元素)的方位
size_t feature_index = (size_t)(bid * slot_num + i) * embedding_vec_size + tid;
// 更新梯度数值
wgrad[feature_index] = top_grad[feature_index];
}
}
}
作为对比,贴出backward_mean_kernel,大家能够比对学习。
// backward kernel function: for combiner=mean
template <typename TypeKey, typename TypeEmbeddingComp>
__global__ void backward_mean_kernel(int batch_size, int slot_num, int embedding_vec_size,
const TypeKey *row_offset, const TypeEmbeddingComp *top_grad,
TypeEmbeddingComp *wgrad) {
int bid = blockIdx.x;
int tid = threadIdx.x;
if (bid < batch_size && tid < embedding_vec_size) {
for (int i = 0; i < slot_num; i++) {
size_t feature_row_index = bid * slot_num + i;
int value_num = row_offset[feature_row_index + 1] - row_offset[feature_row_index];
float scaler = 1.0f;
if (value_num > 1) {
scaler = 1.0f / value_num; // partial derivatice of MEAN
}
size_t feature_index = feature_row_index * embedding_vec_size + tid;
float g = TypeConvertFunc<float, TypeEmbeddingComp>::convert(top_grad[feature_index]);
g *= scaler;
wgrad[feature_index] = TypeConvertFunc<TypeEmbeddingComp, float>::convert(g);
}
}
}
现在,wgrad_tensors_ 之中已经是本地 GPU 发生的梯度了,需求根据这个来更新嵌入层权重,便是更新 hash_table_value 的内容。
0x05 ExchangeWgrad
session.train 接下来会交流梯度和更新网络参数。
// Exchange wgrad and update params
if (networks_.size() > 1) {
#pragma omp parallel num_threads(networks_.size())
{
size_t id = omp_get_thread_num();
exchange_wgrad(id);
networks_[id]->update_params();
}
} else if (resource_manager_->get_global_gpu_count() > 1) {
exchange_wgrad(0);
networks_[0]->update_params();
}
详细代码如下:
void Session::exchange_wgrad(size_t device_id) {
auto& gpu_resource = resource_manager_->get_local_gpu(device_id);
CudaCPUDeviceContext context(gpu_resource->get_device_id());
PROFILE_RECORD("exchange_wgrad.start", gpu_resource->get_stream(), false);
exchange_wgrad_->allreduce(device_id, gpu_resource->get_stream());
PROFILE_RECORD("exchange_wgrad.stop", gpu_resource->get_stream(), false);
}
5.1 界说
从界说能够看到,ExchangeWgrad 的功用便是简略封装底层资源。
class ExchangeWgrad {
public:
virtual void allocate() = 0;
virtual void update_embed_wgrad_size(size_t size) = 0;
virtual void allreduce(size_t device_id, cudaStream_t stream) = 0;
};
template <typename TypeFP>
class NetworkExchangeWgrad : public ExchangeWgrad {
public:
const BuffPtrs<TypeFP>& get_network_wgrad_buffs() const { return network_wgrad_buffs_; }
const BuffPtrs<TypeFP>& get_embed_wgrad_buffs() const { return null_wgrad_buffs_; }
void allocate() final;
void update_embed_wgrad_size(size_t size) final;
void allreduce(size_t device_id, cudaStream_t stream);
NetworkExchangeWgrad(const std::shared_ptr<ResourceManager>& resource_manager);
~NetworkExchangeWgrad() = default;
private:
BuffPtrs<TypeFP> network_wgrad_buffs_;
BuffPtrs<TypeFP> null_wgrad_buffs_;
std::vector<std::shared_ptr<GeneralBuffer2<CudaAllocator>>> bufs_;
std::shared_ptr<ResourceManager> resource_manager_;
AllReduceInPlaceComm::Handle ar_handle_;
size_t network_wgrad_size_ = 0;
size_t num_gpus_ = 0;
};
template <typename TypeFP>
class GroupedExchangeWgrad : public ExchangeWgrad {
public:
const BuffPtrs<TypeFP>& get_network_wgrad_buffs() const { return network_wgrad_buffs_; }
const BuffPtrs<TypeFP>& get_embed_wgrad_buffs() const { return embed_wgrad_buffs_; }
void allocate() final;
void update_embed_wgrad_size(size_t size) final;
void allreduce(size_t device_id, cudaStream_t stream);
GroupedExchangeWgrad(const std::shared_ptr<ResourceManager>& resource_manager);
~GroupedExchangeWgrad() = default;
private:
BuffPtrs<TypeFP> network_wgrad_buffs_;
BuffPtrs<TypeFP> embed_wgrad_buffs_;
std::vector<std::shared_ptr<GeneralBuffer2<CudaAllocator>>> bufs_;
std::shared_ptr<ResourceManager> resource_manager_;
AllReduceInPlaceComm::Handle ar_handle_;
size_t network_wgrad_size_ = 0;
size_t embed_wgrad_size_ = 0;
size_t num_gpus_ = 0;
};
5.2 功用
交流功用主要是运用底层 all_reduce 来完结操作。
template <typename T>
void NetworkExchangeWgrad<T>::allreduce(size_t device_id, cudaStream_t stream) {
auto ar_comm = resource_manager_->get_ar_comm();
ar_comm->all_reduce(ar_handle_, stream, device_id);
}
template <typename T>
void GroupedExchangeWgrad<T>::allreduce(size_t device_id, cudaStream_t stream) {
auto ar_comm = resource_manager_->get_ar_comm();
ar_comm->all_reduce(ar_handle_, stream, device_id);
}
0x06 更新参数
Session.train 接下来会让嵌入层来更新参数,详细是运用优化器进行更新。
for (const auto& one_embedding : embeddings_) {
one_embedding->update_params();
}
详细代码如下,其主要逻辑便是在优化器和backward()发生的wgrad协作下,更新hash table。
/**
* The second stage of backward propagation of embedding layer, which
* updates the hash table by wgrad(from backward()) and optimizer.
*/
void update_params() override {
// accumulate times for adam optimizer
embedding_data_.embedding_params_.opt_params.hyperparams.adam.times++;
#pragma omp parallel num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
{
size_t id = omp_get_thread_num();
CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());
// do update params operation
embedding_optimizers_[id].update(
embedding_data_.embedding_params_.get_batch_size(true),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size, max_vocabulary_size_per_gpu_,
*embedding_data_.get_nnz_array(true)[id],
embedding_data_.get_row_offsets_tensors(true)[id], hash_value_index_tensors_[id],
wgrad_tensors_[id], hash_table_value_tensors_[id],
embedding_data_.get_local_gpu(id).get_sm_count(),
embedding_data_.get_local_gpu(id).get_stream());
}
return;
}
这部分是反向操作的难点。现在的问题是,wgrad_tensors_ 之中已经是梯度了,需求根据这个来更新嵌入层权重,便是 hash_table_value。可是怎么更新呢?比方怎样利用GPU多线程更新?是否需求更新 hash_value_index_index?咱们接下来一步一步剖析。
6.1 问题和思路
假定batch_size=2,slot_num=2,给出一个CSR比方格局如下(两个样本):
* 40,50,10,20 // 样本1,slot 1
* 30,50,10 // 样本1,slot 2
* 30,20 // 样本2,slot 1
* 10 // 样本2,slot 2
* Will be convert to the form of:
* row offset: 0,4,7,9,10
* value: 40,50,10,20,30,50,10,30,20,10
6.1.1 前向传达
下图是前向传达的embedding look示例,终究生成的 embedding_feature 之中,embedding vector个数是:batch_size x slot_num,针对咱们的比方:40,50,10,20,30,50,10,30,20,10,分红slot便是:[40,50,10,20],[30,50,10],[30,20],[10]。别离对应embedding_feature矩阵中的四行。
注:终究输出的是 train_output_tensors_
,中心变量为 embedding_feature,embedding_feature 经过了几回GPU之间的通信变化之后演化成了train_output_tensors_ ,两者维度相同,所以咱们就运用 embedding_feature。下面图之中数字是结构出来,只供演示运用。
咱们给出 embedding_feature 之中第三条向量的核算过程,他对应了第二个样本的榜首个slot,便是 “30,20”。所以便是从 hash_table_value 选出了第2行,第3行,对应方位元素相加,即图中给出的核算过程。
6.1.2 后向传达
咱们再考虑后向传达。
后向传达时分用梯度来更新权重,g31,g32,g33,g34 这一行就应该更新 hash_table_value 的第2行,第3行。别的,如果假定第二个样本的榜首个slot 是 “30,20,20,20”,那么其实就应该用梯度更新hash_table_value 的第2行三次,第3行一次。其实也能够看出来,这种更新不要知道 train_value的数值终究是什么。
6.1.3 思路
咱们先用惯例思路来梳理一下上面比方:
- sample_id 列表对应的是40,50,10,20,…..,20 是一个key,它在低维嵌入表 hash_table_value 之中对应一个稠密向量(第2行 10,20,30,40),里边是权重。
- 嵌入层输出是embedding_feature。
- embedding vector个数是:batch_size x slot_num,也便是说,CSR 有几行,这儿就有几个向量。
- 其间第三条向量对应了第二个样本的榜首个slot,便是 “30,20”。所以便是从 hash_table_value 选出了第2行,第3行,对应方位元素相加: 10,220,330,440,550 = (10+100),(20+200),(30+300),(40+400),(50+500)。
- 如果有了梯度稠密向量,其是被 hash table value 若干稠密向量做pooling得到的成果。
- 比方,梯度矩阵第三条向量 g31,g32,g33,g34 对应的便是 embedding_feature 第三条向量 10,220,330,440,550,如果梯度更新权重,就应该更新hash_table_value 的第2行,第3行。
- 如果样本slot之中有多个相同数值,比方第二个样本的榜首个slot 是 “30,20,20,20”,那么其实就应该用更新hash_table_value 的第 2 行三次,第 3 行一次。
咱们接着从CUDA视点来看怎么更新,其意图是让每一个block 更新一个低维矩阵 hash_table_value 的一行,所以有几个问题:
-
怎么根据本GPU线程的 block id 找到其在低维矩稠密向量阵之中的row offset,假定是第二行。
-
怎么知道本 block 应该更新第二行几回。
-
更新这几回,别离用哪一个梯度来更新。
-
比方第1个梯度或许更新第二行,第三个梯度也或许更新第二行。针对咱们的比方:40,50,10,20,30,50,10,30,20,10,分红slot便是:[40,50,10,20],[30,50,10],[30,20],[10]。别离对应梯度矩阵中的四行,所以需求从梯度矩阵之中1,2,4行的梯度来更新 10 对应的 hash_table_value。
-
详细拜见下图,这儿 train_value 到 gradient 仅仅示意,便是逻辑上一一对应。
-
这儿有一个疑问,为什么不像前向传达那样操作,而是要别的重起炉灶呢?这是由于咱们不需求知道样本数值就能够更新权重,不需求把40,50,10,20,…..,等等从头走一遍操作哈希表的流程。所以,接下来就看看HugeCTR怎么解决这几个问题,这儿代码比较烧脑。
6.2 嵌入层更新
咱们首要看看嵌入层的整体代码和注释里边提到的思路。
6.2.1 注释
注释里边关于更新的部分有5步,咱们能够看到其大致思路:
-
step1: expand sample IDs, calling sample_id_expand_kernel();
-
step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib);
-
step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib);
-
step4: count the number for each unduplicated value_index, calling value_count_kernel();
-
step5: use optimizer method to compute deltaw, and record corresponding;
/**
* All the CUDA kernel functions used by embedding layer are defined in this file, including
* forward propagation, backward propagation. The functions are defined by propagation type
* and combiner type(sum or mean) as below:
* 1) forward
* sum: calling forward_sum_kernel()
* mean: calling foward_sum_kernel() + forward_scale_kernel()
* 2) backward:
* calculating wgrad:
* sum: calling backward_sum_kernel()
* mean: calling backward_mean_kernel()
* update embedding table: including several steps as below,
* step1: expand sample IDs, calling sample_id_expand_kernel()
* step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib)
* step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib)
* step4: count the number for each unduplicated value_index, calling value_count_kernel()
* step5: use optimizer method to compute deltaw, and record corresponding, including three
* types of optimizer: Adam: caling opt_adam_kernel() Momentum sgd: calling
* opt_momentum_sgd_kernel() Nesterov: calling opt_nesterov_kernel() step6: update embedding table
* by deltaw, calling update_kernel()
*/
6.2.2 update代码
咱们摘录 EmbeddingOptimizer::update 的代码如下,这儿仅仅挑选了Optimizer_t::AdaGrad相关部分,其经过 opt_adagrad_kernel 进行更新。这儿能够清楚看到注释中的各个过程,咱们接下来就会逐一剖析。
template <typename TypeHashKey, typename TypeEmbeddingComp>
void EmbeddingOptimizer<TypeHashKey, TypeEmbeddingComp>::update(
size_t batch_size, size_t slot_num, size_t embedding_vec_size,
size_t max_vocabulary_size_per_gpu, size_t nnz, const Tensor2<TypeHashKey> &row_offset,
Tensor2<size_t> &hash_value_index, const Tensor2<TypeEmbeddingComp> &wgrad,
Tensor2<float> &hash_table_value, size_t sm_count, cudaStream_t stream) {
OptimizerTensor<TypeEmbeddingComp> &opt_tensor = opt_tensors_;
OptParams &opt_params = param.opt_params;
Tensor2<TypeHashKey> &sample_id = sample_id_tensors_;
Tensor2<TypeHashKey> &sample_id_sort = sample_id_sort_tensors_;
Tensor2<size_t> &hash_value_index_sort = hash_value_index_sort_tensors_;
Tensor2<uint32_t> &hash_value_index_count_offset = hash_value_index_count_offset_tensors_;
Tensor2<uint32_t> &new_hash_value_flag = new_hash_value_flag_tensors_;
Tensor2<uint32_t> &hash_value_flag_sumed = hash_value_flag_sumed_tensors_;
Tensor2<uint32_t> &hash_value_index_count_counter = hash_value_index_count_counter_tensors_;
Tensor2<void> &temp_storage_sort = temp_storage_sort_tensors_;
Tensor2<void> &temp_storage_scan = temp_storage_scan_tensors_;
size_t block_size, grid_size;
try {
// step1: expand sample IDs
block_size = 64;
grid_size = (batch_size * slot_num - 1) / block_size + 1;
sample_id_expand_kernel<<<grid_size, block_size, 0, stream>>>(
batch_size, slot_num, row_offset.get_ptr(), sample_id.get_ptr());
if (opt_params.optimizer == Optimizer_t::SGD &&
opt_params.hyperparams.sgd.atomic_update) { // for SGD, do atomic update
const size_t block_size = embedding_vec_size;
const size_t grid_size = min(max(1ul, nnz), sm_count * 32);
float lr_scale = opt_params.lr / opt_params.scaler;
opt_sgd_atomic_kernel<<<grid_size, block_size, 0, stream>>>(
nnz, embedding_vec_size, lr_scale, hash_value_index.get_ptr(), sample_id.get_ptr(),
wgrad.get_ptr(), hash_table_value.get_ptr());
} else {
// step3: sort by hash_value_index
int end_bit = static_cast<int>(log2(static_cast<float>(max_vocabulary_size_per_gpu))) + 1;
size_t temp_storage_sort_size = temp_storage_sort.get_size_in_bytes();
CK_CUDA_THROW_(cub::DeviceRadixSort::SortPairs(
temp_storage_sort.get_ptr(), temp_storage_sort_size, hash_value_index.get_ptr(),
hash_value_index_sort.get_ptr(), sample_id.get_ptr(), sample_id_sort.get_ptr(), nnz, 0,
end_bit, stream, false));
// step4: count the number for each unduplicated hash_value_index
CK_CUDA_THROW_(
cudaMemsetAsync(hash_value_index_count_counter.get_ptr(), 0, sizeof(uint32_t), stream));
constexpr size_t max_grid_size = 384;
block_size = 256;
grid_size = min(max_grid_size, (nnz - 1) / block_size + 1);
value_count_kernel_1<<<grid_size, block_size, 0, stream>>>(
nnz, hash_value_index_sort.get_ptr(), new_hash_value_flag.get_ptr());
// a pinned memroy
CK_CUDA_THROW_(cudaMemcpyAsync(&hash_hash_value_index_count_num,
hash_value_index_count_counter.get_ptr(), sizeof(uint32_t),
cudaMemcpyDeviceToHost, stream));
// step5: use optimizer method to compute deltaw and update the parameters
block_size = embedding_vec_size;
grid_size = max(1, hash_hash_value_index_count_num);
switch (opt_params.update_type) {
case Update_t::Global: {
switch (opt_params.optimizer) {
case Optimizer_t::Adam: {
}
case Optimizer_t::AdaGrad: {
opt_adagrad_kernel<<<grid_size, block_size, 0, stream>>>(
hash_hash_value_index_count_num, embedding_vec_size, opt_params.lr,
opt_params.hyperparams.adagrad, opt_tensor.opt_accm_tensors_.get_ptr(),
sample_id_sort.get_ptr(), hash_value_index_sort.get_ptr(),
hash_value_index_count_offset.get_ptr(), wgrad.get_ptr(),
hash_table_value.get_ptr(), opt_params.scaler);
break;
}
case Optimizer_t::MomentumSGD:
case Optimizer_t::Nesterov:
case Optimizer_t::SGD:
default:
CK_THROW_(Error_t::WrongInput, "Error: Invalid opitimizer type");
} // switch (optimizer)
break;
}
case Update_t::Local: {
switch (opt_params.optimizer) {
case Optimizer_t::Adam: {
}
case Optimizer_t::AdaGrad: {
opt_adagrad_kernel<<<grid_size, block_size, 0, stream>>>(
hash_hash_value_index_count_num, embedding_vec_size, opt_params.lr,
opt_params.hyperparams.adagrad, opt_tensor.opt_accm_tensors_.get_ptr(),
sample_id_sort.get_ptr(), hash_value_index_sort.get_ptr(),
hash_value_index_count_offset.get_ptr(), wgrad.get_ptr(),
hash_table_value.get_ptr(), opt_params.scaler);
break;
}
case Optimizer_t::MomentumSGD:
case Optimizer_t::Nesterov:
case Optimizer_t::SGD:
default:
CK_THROW_(Error_t::WrongInput, "Error: Invalid opitimizer type");
} // switch (optimizer)
break;
}
case Update_t::LazyGlobal: {
switch (opt_params.optimizer) {
case Optimizer_t::Adam: {
}
case Optimizer_t::AdaGrad:
case Optimizer_t::MomentumSGD:
case Optimizer_t::Nesterov:
case Optimizer_t::SGD: {
CK_THROW_(Error_t::WrongInput,
"Error: lazy global update is only implemented for Adam");
break;
}
default:
CK_THROW_(Error_t::WrongInput, "Error: Invalid opitimizer type");
}
break;
}
default:
CK_THROW_(Error_t::WrongInput, "Error: Invalid update type");
} // switch (update type)
}
#ifndef NDEBUG
cudaDeviceSynchronize();
CK_CUDA_THROW_(cudaGetLastError());
#endif
} catch (const std::runtime_error &rt_err) {
std::cerr << rt_err.what() << std::endl;
throw;
}
return;
}
首要要阐明,这儿nnz(non-zero feature number per batch)来自如下,便是本样本之中非零key个数。
std::vector<std::shared_ptr<size_t>>& get_nnz_array(bool is_train) {
if (is_train) {
return train_nnz_array_;
} else {
return evaluate_nnz_array_;
}
}
咱们接下来逐一看看这些过程。
6.3 拓宽sample id
这儿对应了榜首步,在后续代码之中,每个key对应了一个sample ID。整体思路便是找到每个 key(sample ID) 和梯度矩阵,或许说和embedding_feature
之中哪一行相对应,咱们后续就直接以 embedding_feature
来看,暂时不考虑梯度矩阵 。能够大致理解为把样本id扩展为key id的列表。
step1: expand sample IDs, calling sample_id_expand_kernel()
便是调用 sample_id_expand_kernel 来拓宽sample id。这儿 sample_id 是成员变量 sample_id_tensors_的引用,这样就能够直接修正成员变量。
Tensor2<TypeHashKey> sample_id_tensors_; /**< The temp memory to store the sample ids of hash table value in update_params(). */
详细代码如下:
Tensor2<TypeHashKey> &sample_id = sample_id_tensors_;
// step1: expand sample IDs
block_size = 64;
grid_size = (batch_size * slot_num - 1) / block_size + 1;
sample_id_expand_kernel<<<grid_size, block_size, 0, stream>>>(
batch_size, slot_num, row_offset.get_ptr(), sample_id.get_ptr());
经过前面剖析咱们知道,embedding vector个数是:batch_size x slot_num,也便是说,CSR 有几行,这儿就有几个向量。所以这儿就直接读取CSR行信息即可。即, sample_id_expand_kernel 会把 sample_id_tensors_ 设置为 CSR row offset(expand sample id by row_offset),便是找到 CSR row offset 之中的index。
CSR row_offset = [0,4,7,9,10],样本之中key的数值是40,50,10,20,30,50,10,30,20,10,那么 40,50,10,20对应了 0,30,50,10对应了1,30,20对应了 2,10对应了3。因而,sample_id 数值是 [0,0,0,0,1,1,1,2,2,3],便是记载了该 batch 在 embedding_feature_tensors_ 之中的 row index。
sample_id_expand_kernel 代码如下,这儿几个重点:
- gid 是grid ID,表示本线程对应了embedding_feature_tensors_ 哪个元素。
- blockIdx 表示一个样本。
- (batch_size * slot_num) 代表 本batch在 嵌入层输出 train_output_tensors_ 之中对应了多少行,或许说是在 embedding_feature_tensors_ 之中占有了多少行,其实 embedding_feature_tensors_ 也就这么大。
- sample_id[offset + i] = gid; 意图便是记载该样本某key在 embedding_feature_tensors_ 之中的 row index(对应哪一行)。embedding_feature_tensors_ 这个稠密向量是由 hash_table_value 之中”CSR 本行的元素数目”个稠密向量做pooling得到的成果。
// expand sample id by row_offset
template <typename TypeKey>
__global__ void sample_id_expand_kernel(int batch_size, int slot_num, const TypeKey *row_offset, TypeKey *sample_id) {
// 本线程对应的grid id,其实对应的便是global thread id
int gid = blockIdx.x * blockDim.x + threadIdx.x;
if (gid < (batch_size * slot_num)) { // 假定batch_size=2,slot_num=2,取值为 gid < 4
// 并不是每个GPU线程都会走到这儿,对应咱们的假定,则只会取出gid = 0~3 这样的线程才会进行下面装备操作
// 比方,假定gid取值规模8,那么只要gid=0,gid=1,gid=2,gid=3 这几个线程会进入if,履行操作,其他线程不会进入,比方grid=4就不会进入
TypeKey offset = row_offset[gid]; // 拿到对应的个数,比方 row_offset[0],row_offset[1],row_offset[2]的数值
int value_num = row_offset[gid + 1] - offset; // 拿到CSR 本行的元素数目
for (int i = 0; i < value_num; i++) {
sample_id[offset + i] = gid; // 记载该样本某key在 embedding_feature_tensors_ 之中的 row index
}
}
}
咱们把现在触及的变量整理如下,这儿假定从CSR数值到hash_value_index_tensors_ 行的映射是取十位数,比方50就映射到第5行。
称号 | 数值 | 含义 |
---|---|---|
CSR row offset | 0,4,7,9,10 | 两个样本,两个slot,所以分红四行 |
CSR value | 40,50,10,20,30,50,10,30,20,10 | 样本内容 |
hash_value_index_tensors_ | 4,5,1,2,3,5,1,3,2,1 | 低维嵌入表的index,样本每个key对应一个,比方50对应了 hash_table_value 第5行 |
hash_table_value | 5 x 8 的矩阵 | 低维嵌入表,假定稠密向量长度是8,由于总共只要5个不同数字,所以只要5行 |
embedding_feature_tensors_ | 4 x 8 的矩阵 | 嵌入层输出的稠密向量。形状是(batch_size * slot_num) * embedding_vec_len |
sample_id | 0,0,0,0,1,1,1,2,2,3 | 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比方CSR榜首行是40,50,10,20,它们都为 embedding_feature_tensors_ 的榜首行做出了奉献。 |
6.4 从key得到value_index
下面咱们看看第二步,根据key获取到 hash table value index。
step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib)
这部分仅仅在 test/utest/embedding/sparse_embedding_hash_cpu.hpp 之中有,由于是测验代码,所以此时哈希表没有数据,需求设置,练习代码不需求这一步。
对应代码便是:
// step2: do hash table get() value_index by key
int nnz = row_offset_[batchsize_ * slot_num_];
hash_table_->get(hash_key_.get(), hash_value_index_.get(), nnz);
HashTableCpu 的get方法如下:
void get(const KeyType* keys, ValType* vals, size_t len) const {
if (len == 0) {
return;
}
for (size_t i = 0; i < len; i++) {
auto it = table_->find(keys[i]);
assert(it != table_->end() && "error: can't find key");
vals[i] = it->second;
}
}
6.5 排序
这部分对应第三步:
step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib)
现在得到了:sample_id 数值是 [0,0,0,0,1,1,1,2,2,3],便是记载了该 batch 在 embedding_feature_tensors_ 之中的 row index。
便是把 sample_id 依照 hash_value_index 来排序,终究排序成果放入 hash_value_index_sort 和 sample_id_sort。在咱们比方之中,得到成果如下:hash_value_index_sort 是 [1,1,1,2,2,3,3,4,5,5]。sample_id_sort 是 [0,1,3,0,2,1,2,0,0,1 ]。
咱们还是用表格记载:
称号 | 数值 | 含义 |
---|---|---|
CSR row offset | 0,4,7,9,10 | 两个样本,两个slot,所以分红四行 |
CSR value | 40,50,10,20,30,50,10,30,20,10 | 样本内容 |
hash_value_index_tensors_ | 4,5,1,2,3,5,1,3,2,1 | 低维嵌入表的index,样本每个key对应一个,比方50对应了 hash_table_value 第5行 |
hash_table_value | 5 x 8 的矩阵 | 低维嵌入表,假定稠密向量长度是8,由于总共只要5个不同数字,所以只要5行 |
embedding_feature_tensors_ | 4 x 8 的矩阵 | 嵌入层输出的稠密向量。形状是(batch_size * slot_num) * embedding_vec_len |
sample_id | 0,0,0,0,1,1,1,2,2,3 | 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比方CSR榜首行是40,50,10,20,它们都为 embedding_feature_tensors_ 的榜首行做出了奉献。 |
sample_id_sort | [0,1,3,0,2,1,2,0,0,1 ] | 和 hash_value_index_sort 对应,便是 hash_value_index_sort 前三个 1 别离对应了embedding_feature 的第1行,第2行,第4行(从0开端的序列) |
hash_value_index_sort | [1,1,1,2,2,3,3,4,5,5] | 排序之后的成果,举例来说,111 意思是本batch之中,总共有3个key对终究embedding_feature榜首行做出了奉献 |
详细代码如下:
// step3: sort by hash_value_index
int end_bit = static_cast<int>(log2(static_cast<float>(max_vocabulary_size_per_gpu))) + 1;
size_t temp_storage_sort_size = temp_storage_sort.get_size_in_bytes();
CK_CUDA_THROW_(cub::DeviceRadixSort::SortPairs(
temp_storage_sort.get_ptr(), temp_storage_sort_size, hash_value_index.get_ptr(),
hash_value_index_sort.get_ptr(), sample_id.get_ptr(), sample_id_sort.get_ptr(), nnz, 0,
end_bit, stream, false));
6.5.1 SortPairs
这儿依然用到了CUB的方法,详细能够拜见:nvlabs.github.io/cub/structc…
方法声明如下:
template<typename KeyT , typename ValueT >
static CUB_RUNTIME_FUNCTION cudaError_t cub::DeviceRadixSort::SortPairs (
void * d_temp_storage,
size_t & temp_storage_bytes,
const KeyT * d_keys_in,
KeyT * d_keys_out,
const ValueT * d_values_in,
ValueT * d_values_out,
int num_items,
int begin_bit = 0,
int end_bit = sizeof(KeyT) * 8,
cudaStream_t stream = 0,
bool debug_synchronous = false
)
详细运用方法如下:
6.6 核算value_index对应的数目
现在知道了 hash_value_index_sort 是 [1,1,1,2,2,3,3,4,5,5],sample_id_sort 是 [0,1,3,0,2,1,2,0,0,1 ]。
- hash_value_index_sort 是hash_value_index排序之后的成果,举例来说,111 意思是本batch之中,总共有3个key对终究embedding_feature榜首行做出了奉献
- sample_id_sort 和 hash_value_index_sort 对应,便是 hash_value_index_sort 前三个 1 别离对应了embedding_feature 的第1行,第2行,第4行(从0开端的序列)
接下来需求知道 embedding_feature_tensors_ 每行的来历是多少个 hash_table_value 行,比方第0行有4个,第1行有3个……。embedding_feature_tensors_ 之中的一个行 是被同一个slot的多个 hash_table_value 行的稠密向量做pooling完结的。
这部分对应了如下:
step4: count the number for each unduplicated value_index, calling value_count_kernel()
便是对 hash_value_index_sort 进行处理,这儿是 embedding 表 hash_table_value 的 row index。
// step4: count the number for each unduplicated hash_value_index
CK_CUDA_THROW_(
cudaMemsetAsync(hash_value_index_count_counter.get_ptr(), 0, sizeof(uint32_t), stream));
constexpr size_t max_grid_size = 384;
block_size = 256;
grid_size = min(max_grid_size, (nnz - 1) / block_size + 1);
// 意图是找到新的group,便是新的 row index。意图是为了核算每个row index对应的sample id个数
value_count_kernel_1<<<grid_size, block_size, 0, stream>>>(
nnz, hash_value_index_sort.get_ptr(), new_hash_value_flag.get_ptr());
// prefix_sum
size_t temp_storage_scan_size = temp_storage_scan.get_size_in_bytes();
CK_CUDA_THROW_(cub::DeviceScan::InclusiveSum(
temp_storage_scan.get_ptr(), temp_storage_scan_size, new_hash_value_flag.get_ptr(),
hash_value_flag_sumed.get_ptr(), nnz, stream));
value_count_kernel_2<<<grid_size, block_size, 0, stream>>>(
nnz, new_hash_value_flag.get_ptr(), hash_value_flag_sumed.get_ptr(),
hash_value_index_count_offset.get_ptr(), hash_value_index_count_counter.get_ptr());
uint32_t hash_hash_value_index_count_num = 0;
// this async memcpy will not perform as a async operation because the host memory is not
// a pinned memroy
CK_CUDA_THROW_(cudaMemcpyAsync(&hash_hash_value_index_count_num,
hash_value_index_count_counter.get_ptr(), sizeof(uint32_t),
cudaMemcpyDeviceToHost, stream));
咱们接下来一点点剖析。
6.6.1 value_count_kernel_1
value_count_kernel_1意图是找到新的group,便是新的 row index。意图是为了核算每个row index对应的sample id 个数。便是找到哪些点是新行起始点。咱们拓宽表格如下。
称号 | 数值 | 含义 |
---|---|---|
CSR row offset | 0,4,7,9,10 | 两个样本,两个slot,所以分红四行 |
CSR value | 40,50,10,20,30,50,10,30,20,10 | 样本内容 |
hash_value_index_tensors_ | 4,5,1,2,3,5,1,3,2,1 | 低维嵌入表的index,样本每个key对应一个,比方50对应了 hash_table_value 第5行 |
sample_id | 0,0,0,0,1,1,1,2,2,3 | 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比方CSR榜首行是40,50,10,20,它们都为 embedding_feature_tensors_ 的榜首行做出了奉献。 |
sample_id_sort | [0,1,3,0,2,1,2,0,0,1 ] | 和 hash_value_index_sort 对应,便是 hash_value_index_sort 前三个 1 别离对应了 embedding_feature 的第1行,第2行,第4行(从0开端的序列) |
hash_value_index_sort | [1,1,1,2,2,3,3,4,5,5] | 排序之后的成果,举例来说,1,1,1 意思是本batch之中,总共有3个key对终究embedding_feature榜首行做出了奉献 |
new_hash_value_flag | [1,0,0,1,0,1,0,1,1,0] | 为了核算每个row index对应的sample id 个数。便是找到哪些点是新行起始点 |
详细代码如下:
__global__ void value_count_kernel_1(int nnz, const size_t *hash_value_index_sort,
uint32_t *new_hash_value_flag) {
for (int gid = blockIdx.x * blockDim.x + threadIdx.x; gid < nnz; gid += blockDim.x * gridDim.x) {
size_t cur_value = hash_value_index_sort[gid];
if (gid > 0) {
size_t former_value = hash_value_index_sort[gid - 1];
// decide if this is the start of a group(the elements in this group have the same
// hash_value_index_sort)
if (cur_value != former_value) {
new_hash_value_flag[gid] = 1;
} else {
new_hash_value_flag[gid] = 0;
}
} else { // gid == 0
new_hash_value_flag[gid] = 1;
}
}
}
6.6.2 prefix_sum
对 new_hash_value_flag 排序,意图是得到每个group(row index)内部包括多少元素,放入 hash_value_flag_sumed 之中。
// prefix_sum
size_t temp_storage_scan_size = temp_storage_scan.get_size_in_bytes();
CK_CUDA_THROW_(cub::DeviceScan::InclusiveSum(
temp_storage_scan.get_ptr(), temp_storage_scan_size, new_hash_value_flag.get_ptr(),
hash_value_flag_sumed.get_ptr(), nnz, stream));
这儿运用了 cub::DeviceScan::InclusiveSum,如果想深入研究,能够拜见 nvlabs.github.io/cub/structc… 。
以下是函数阐明。
以下是运用方法。
咱们拓宽表格如下。
称号 | 数值 | 含义 |
---|---|---|
CSR row offset | 0,4,7,9,10 | 两个样本,两个slot,所以分红四行 |
CSR value | 40,50,10,20,30,50,10,30,20,10 | 样本内容 |
hash_value_index_tensors_ | [4,5,1,2,3,5,1,3,2,1] | 低维嵌入表的index,样本每个key对应一个,比方50对应了 hash_table_value 第5行 |
sample_id | [0,0,0,0,1,1,1,2,2,3] | 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比方CSR榜首行是40,50,10,20,它们都为 embedding_feature_tensors_ 的榜首行做出了奉献。 |
sample_id_sort | [0,1,3,0,2,1,2,0,0,1] | 和 hash_value_index_sort 对应,便是 hash_value_index_sort 前三个 1 别离对应了 embedding_feature 的第1行,第2行,第4行(从0开端的序列) |
hash_value_index_sort | [1,1,1,2,2,3,3,4,5,5] | 排序之后的成果,举例来说,1,1,1 意思是本batch之中,总共有3个key对终究embedding_feature榜首行做出了奉献 |
new_hash_value_flag | [1,0,0,1,0,1,0,1,1,0] | 为了核算每个row index对应的sample id 个数。便是找到哪些点是新行起始点 |
hash_value_flag_sumed | [1,1,1,2,2,3,3,4,5,5] | 对 new_hash_value_flag 兼并,意图是得到每个group(row index)内部包括多少元素。 |
hash_table_value | 5 x 8 的矩阵 | 低维嵌入表,假定稠密向量长度是8,由于总共只要5个不同数字,所以只要5行 |
6.6.3 value_count_kernel_2
这个代码作用便是得到终究每行元素个数。
value_count_kernel_2<<<grid_size, block_size, 0, stream>>>(
nnz, new_hash_value_flag.get_ptr(), hash_value_flag_sumed.get_ptr(),
hash_value_index_count_offset.get_ptr(), hash_value_index_count_counter.get_ptr());
uint32_t hash_hash_value_index_count_num = 0;
// this async memcpy will not perform as a async operation because the host memory is not
// a pinned memroy
CK_CUDA_THROW_(cudaMemcpyAsync(&hash_hash_value_index_count_num,
hash_value_index_count_counter.get_ptr(), sizeof(uint32_t),
cudaMemcpyDeviceToHost, stream));
hash_hash_value_index_count_num 是index总数,便是总共实在有几行,其对应了nnz。
* @param nnz non-zero feature number per batch
现在知道了 hash_value_index_sort 是 [1,1,1,2,2,3,3,4,5,5],sample_id_sort 是 [0,1,3,0,2,1,2,0,0,1 ],new_hash_value_flag 是 [1,0,0,1,0,1,0,1,1,0],里边放置了本行是不是新行。hash_value_flag_sumed 是[ 1,1,1,2,2,3,3,4,5,5 ]。
咱们剖析一下代码。整体思维是:在 hash_value_index_index(对应传进来的参数是 hash_value_index_count_offset)设定 “依照数目核算的,对应的 embedding 表 index(便是对应的 embedding 表行号)”。由于embedding_feature 最多只要5行(nnz个数),所以这儿取前五个即可。
比方,每个block要处理低维稠密矩阵一行。如 bid = 1,它希望更新低维稠密矩阵第2行,可是想知道更新几回。所以先从 hash_value_index_count_offset[1] 得到了数值 3,然后找到 hash_value_index_sort[3] 来进行处理。
详细是:遍历grid,可是需求小于nnz(该batch的非零key数目),其实便是 hash_table_value 的行数。比方说nnz这儿等于10,gid 取值便是0~9。grid为0,3,5,7,8 时分new_hash_value_flag[gid] 为 1。hash_value_flag_sumed[gid]别离为:1,2,3,4,5。所以 hash_value_index_count_offset 是 [0, 3, 5, 7, 8, 0, 0, 0, 0, 0],这些是 hash_value_index_sort 之中的offset。
__global__ void value_count_kernel_2(int nnz, const uint32_t *new_hash_value_flag,
const uint32_t *hash_value_flag_sumed,
uint32_t *hash_value_index_index, uint32_t *counter)
{
// 遍历grid,可是需求小于该batch的非零key数目,其实便是 hash_table_value 的行数
for (int gid = blockIdx.x * blockDim.x + threadIdx.x; gid < nnz; gid += blockDim.x * gridDim.x) {
uint32_t flag = new_hash_value_flag[gid];
if (flag == 1) {
// 设定
hash_value_index_index[hash_value_flag_sumed[gid] - 1] = gid;
}
}
if (blockIdx.x * blockDim.x + threadIdx.x == 0) {
*counter = hash_value_flag_sumed[nnz - 1];
hash_value_index_index[*counter] = nnz;
}
}
到现在为止,一切变量如下:
称号 | 数值 | 含义 |
---|---|---|
CSR row offset | 0,4,7,9,10 | 两个样本,两个slot,所以分红四行 |
CSR value | 40,50,10,20,30,50,10,30,20,10 | 样本内容 |
hash_table_value | 5 x 8 的矩阵 | 低维嵌入表,假定稠密向量长度是8,由于总共只要5个不同数字(nnz),所以只要5行 |
embedding_feature_tensors_ | 4 x 8 的矩阵 | 嵌入层输出的稠密向量。形状是(batch_size * slot_num) * embedding_vec_len |
hash_value_index_tensors_ | [4,5,1,2,3,5,1,3,2,1] | 低维嵌入表的index,样本每个key对应一个,比方50对应了 hash_table_value 第5行 |
sample_id | [0,0,0,0,1,1,1,2,2,3] | 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比方CSR榜首行是40,50,10,20,它们都为 embedding_feature_tensors_ 的榜首行做出了奉献。 |
sample_id_sort | [0,1,3,0,2,1,2,0,0,1] | 和 hash_value_index_sort 对应,便是 hash_value_index_sort 前三个 1 别离对应了 embedding_feature 的第1行,第2行,第4行(从0开端的序列) |
hash_value_index_sort | [1,1,1,2,2,3,3,4,5,5] | 排序之后的成果,举例来说,1,1,1 意思是本batch之中,总共有3个key对终究embedding_feature榜首行做出了奉献 |
new_hash_value_flag | [1,0,0,1,0,1,0,1,1,0] | 为了核算每个row index对应的sample id 个数。便是找到哪些点是新行起始点 |
hash_value_flag_sumed | [1,1,1,2,2,3,3,4,5,5] | 对 new_hash_value_flag 兼并,意图是得到每个group(row index)内部包括多少元素。 |
hash_value_index_count_offset | [0, 3, 5, 7, 8, 0, 0, 0, 0, 0] | 每个block要处理低维稠密矩阵一行。如 bid = 1,它希望更新低维稠密矩阵第2行,但想知道更新几回。所以先从 hash_value_index_count_offset[1] 得到了数值 3,然后找到 hash_value_index_sort[3]。由于embedding_feature 最多只要5行(nnz个数),所以这儿取前五个即可 |
终究思路如下:
-
每个block要处理低维稠密矩阵一行。假定bid=0 想更新低维矩阵榜首行,便是要更新10对应的低维矩阵稠密向量。
-
bid对应了key(的梯度),比方 40,50,10,20,30,50,10,30,20,10 这些,其key便是10~50这个5个。
-
hash_value_index_count_offset :本bid关于低维稠密矩阵该行要更新几回。sum_num = hash_value_index_count_offset[1] – hash_value_index_count_offset[0] = 3 – 0 = 3个,所以更新3次。
-
hash_value_index_sort :在 [1,1,1,2,2,3,3,4,5,5] 这儿找到 1,1,1,表示本batch之中总共有3个key对终究embedding_feature榜首行做出了奉献。
-
所以 bid = 0 ,便是hash_table_value[0]这一行 有三个1,应该更新3次。
-
sample_id_sort :更新便是累积,便是这3次更新别离去输入梯度哪一行去找?3个10别离在梯度的0,1,3这几行。
6.7 更新权重
这是终究一步,对应了如下:
step5: use optimizer method to compute deltaw and update the parameters
调用代码如下:
留意,这儿传递的是 sample_id_sort [0,1,3,0,2,1,2,0,0,1],对应的 hash_value_index_sort 是 [1,1,1,2,2,3,3,4,5,5],hash_value_index_count_offset 是 [0, 3, 5, 7, 8, 0, 0, 0, 0, 0]。
case Optimizer_t::AdaGrad: {
opt_adagrad_kernel<<<grid_size, block_size, 0, stream>>>(
hash_hash_value_index_count_num, embedding_vec_size, opt_params.lr,
opt_params.hyperparams.adagrad, opt_tensor.opt_accm_tensors_.get_ptr(),
sample_id_sort.get_ptr(), hash_value_index_sort.get_ptr(),
hash_value_index_count_offset.get_ptr(), wgrad.get_ptr(),
hash_table_value.get_ptr(), opt_params.scaler);
break;
}
很明显能够看到,其便是运用权重更新 hash_table_value。
// Local update for the Adagrad optimizer: compute the gradients and update the accumulators and the
// weights
template <typename TypeKey, typename TypeEmbeddingComp>
__global__ void opt_adagrad_kernel(uint32_t hash_value_index_count_num, int embedding_vec_size,
float lr, const AdaGradParams adagrad,
TypeEmbeddingComp *accum_ptr, const TypeKey *sample_id,
const size_t *hash_value_index_sort,
const uint32_t *hash_value_index_count_offset,
const TypeEmbeddingComp *wgrad, float *hash_table_value,
float scaler) {
int bid = blockIdx.x; // 一个block对应一个样本之中的一个key,比方比方之中的30
int tid = threadIdx.x; // 本线程
if (tid < embedding_vec_size && bid < hash_value_index_count_num) {
// 找到本线程样本在 hash_value_index_sort 的偏移
uint32_t offset = hash_value_index_count_offset[bid]; // [0, 3, 5, 7, 8, 0, 0, 0, 0, 0]
// 累积得出梯度
float gi = accumulate_gradients(embedding_vec_size, sample_id, hash_value_index_count_offset,
wgrad, scaler, offset, bid, tid);
// 找到本样本在低维矩阵之中的row index
size_t row_index = hash_value_index_sort[offset]; // [1,1,1,2,2,3,3,4,5,5]
// 留意,hash_table_value 是元素级别,比方稠密向量长度是8,那么在 hash_table_value 里边就有8个元素
// feature_index 便是得到本线程对应的 embedding vector 之中的哪个元素
size_t feature_index = row_index * embedding_vec_size + tid;
float accum = //accum_ptr 来自优化器
TypeConvertFunc<float, TypeEmbeddingComp>::convert(accum_ptr[feature_index]) + gi * gi;
accum_ptr[feature_index] = TypeConvertFunc<TypeEmbeddingComp, float>::convert(accum);
float weight_diff = -lr * gi / (sqrtf(accum) + adagrad.epsilon);
// 更新梯度
hash_table_value[feature_index] += weight_diff;
}
}
accumulate_gradients 的逻辑是:
// Helper function to accumulate the weight gradients for a thread's embedding vector
template <typename TypeKey, typename TypeEmbeddingComp>
__device__ __forceinline__ float accumulate_gradients(int embedding_vec_size,
const TypeKey *sample_id,
const uint32_t *hash_value_index_count_offset,
const TypeEmbeddingComp *wgrad, float scaler,
uint32_t offset, int bid, int tid) {
// 哪一行更新几回
// 如果bid=0,则sum_num = hash_value_index_count_offset[1] - hash_value_index_count_offset[0] = 3 - 0 = 3个。bid对应了key,比方 40,50,10,20,30,50,10,30,20,10 这些key,其key便是10~50这个5个。所以 bid = 0 便是要更新10对应的低维矩阵稠密向量,便是hash_table_value[0]这一行,有三个1,应该更新3次。
uint32_t sample_num = hash_value_index_count_offset[bid + 1] - hash_value_index_count_offset[bid];
// 核算梯度
float gi = 0.0f;
// sample_id_sort [0,1,3,0,2,1,2,0,0,1] ---- 第几行,恰恰和 wgrad 对上了
for (int i = 0; i < sample_num; i++) { // offset 便是0, 3, 5, 7, 8,比方关于第1行,需求更新3次
// sample_id 是[0,1,3,0,2,1,2,0,0,1],对应了低维矩阵第1,2,4,...,行,便是3个10别离在输出稠密向量的哪一行
// 更新这几回,便是一个累积,这个累积用哪些梯度来累积。
int sample_index = sample_id[offset + i]; // 找到本样本梯度
gi += TypeConvertFunc<float, TypeEmbeddingComp>::convert(
wgrad[sample_index * embedding_vec_size + tid]); // 本线程梯度,而且累积
}
return gi / scaler;
}
终究详细如下图:
至此,咱们关于 DistributedSlotSparseEmbeddingHash 剖析悉数完结,下一篇介绍 LocalSlotSparseEmbeddingHash。
0xEE 个人信息
★★★★★★关于生活和技能的思考★★★★★★
微信公众账号:罗西的思考
0xFF 参阅
nvlabs.github.io/cub/annotat…
developer.nvidia.com/blog/introd…
developer.nvidia.com/blog/announ…
developer.nvidia.com/blog/accele…
HugeCTR源码阅读
embedding层怎么反向传达
web.eecs.umich.edu/~justincj/t…
稀少矩阵存储格局总结+存储功率对比:COO,CSR,DIA,ELL,HYB
惹是生非:论引荐算法中的Embedding思维
tf.nn.embedding_lookup函数原理
求浅显讲解下tensorflow的embedding_lookup接口的意思?
【技能干货】聊聊在大厂引荐场景中embedding都是怎么做的
ctr预估算法关于序列特征embedding可否做拼接,输入MLP?与pooling
引荐体系中的深度匹配模型
土法编造:Embedding 层是怎么实现的?
不等距双杆模型_搜索中的深度匹配模型(下)
深度特征 快牛战略关于高低层特征交融
[深度学习] DeepFM 介绍与Pytorch代码解说
deepFM in pytorch
引荐算法之7——DeepFM模型
DeepFM 参数理解(二)
引荐体系遇上深度学习(三)–DeepFM模型理论和实践
[深度学习] DeepFM 介绍与Pytorch代码解说
docs.nvidia.com/deeplearnin…
带你知道大模型练习关键算法:分布式练习Allreduce算法