概述
NVIDIA数据加载库(DALI)是一个高度优化的构建模块集合和执行引擎,用于加速计算机视觉和音频深度学习应用的数据管道。
由深度学习框架提供的输入和增强管道通常可以分为以下两类:
- 快速但不灵活 - 以C++编写,它们作为单个单片Python对象暴露,具有非常特定的操作集和顺序
- 慢但灵活 - 以C++或Python编写的一组构建模块,可用于组合任意数据管道,最终变得缓慢。这种类型数据管道的最大开销之一是Python中的全局解释器锁(GIL)。这迫使开发人员使用多进程,使设计高效输入管道变得复杂。
DALI通过提供性能和灵活性来脱颖而出,加速不同数据管道。它通过公开经过优化的构建模块来实现这一点,这些模块使用简单高效的引擎执行,并且可以将操作卸载到GPU(从而实现对多GPU系统的扩展)。
它是一个单独的库,可以轻松集成到不同的深度学习训练和推理应用程序中。
DALI通过直接框架插件、多种输入数据格式和可配置图形,在启用GPU的系统中提供易用性和灵活性。DALI可以帮助加速由于CPU周期限制而受到I/O管道限制的深度学习工作流。通常,具有高GPU到CPU比率的系统(例如Amazon EC2 P3.16xlarge、NVIDIA DGX1-V或NVIDIA DGX-2)在主机CPU上受限,因此未充分利用可用的GPU计算能力。DALI显着加速了这种密集GPU配置上的输入处理,以实现整体吞吐量。
管道
在使用DALI进行数据处理的核心是数据处理管道的概念。它由多个操作连接而成,呈现为一个有向图,并包含在nvidia.dali.Pipeline类的对象中。该类提供了定义、构建和运行数据处理管道所需的函数。
from nvidia.dali.pipeline import Pipeline
定义管道
让我们从为分类任务定义一个非常简单的管道开始,该任务确定一张图片中是否包含狗还是小猫。我们准备了一个包含狗和小猫图片的目录结构在我们的代码库中。
我们的简单管道将从这个目录中读取图片,解码它们,并返回(图片,标签)对。
创建管道最简单的方法是使用pipeline_def装饰器。在simple_pipeline函数中,我们定义要执行的操作以及它们之间的计算流程。
- 使用fn.readers.file来从硬盘读取jpeg(编码图像)和标签。
- 使用fn.decoders.image操作将图像从jpeg解码为RGB。
- 指定应将哪些中间变量作为管道的输出返回。
有关pipeline_def的更多信息,请查看文档。
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as typesimage_dir = "data/images"
max_batch_size = 8@pipeline_def
def simple_pipeline():jpegs, labels = fn.readers.file(file_root=image_dir)images = fn.decoders.image(jpegs, device="cpu")return images, labels
构建管道
为了使用simple_pipeline定义的管道,我们需要创建并构建它。这可以通过调用simple_pipeline来实现,该函数将创建管道的一个实例。然后我们调用build方法来构建这个新创建的实例:
请注意,使用pipeline_def装饰一个函数会向其添加新的命名参数。它们可用于控制管道的各个方面,比如:
pipe = simple_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)
pipe.build()
最大批处理大小, 用于在CPU上执行计算的线程数, 要使用的GPU设备(使用simple_pipeline创建的管道尚未用于计算GPU), 随机数生成的种子。
有关管道参数的更多信息,您可以查看管道文档(Pipeline — NVIDIA DALI 1.36.0 documentation)。
运行管道
在构建管道之后,我们可以运行它以获得一批结果。
pipe_out = pipe.run()
print(pipe_out)
管道的输出保存在pipe_out变量中,是一个包含2个元素的元组(正如预期 - 在simple_pipeline函数中我们指定了2个输出)。这两个元素都是TensorListCPU对象 - 每个对象包含一个CPU张量列表。
为了显示结果(仅用于调试目的 - 在实际训练过程中,我们不会执行这一步,因为这会导致我们的图像批量在GPU和CPU之间来回传输),我们可以将DALI的张量数据转换为NumPy数组。不过,并非每个TensorList都可以通过这种方式访问 - TensorList比NumPy数组更通用,可以容纳具有不同形状的张量。为了检查是否可以直接将它发送到NumPy,我们可以调用TensorList的is_dense_tensor函数。
images, labels = pipe_out
print("Images is_dense_tensor: " + str(images.is_dense_tensor()))
print("Labels is_dense_tensor: " + str(labels.is_dense_tensor()))
Images is_dense_tensor: False
Labels is_dense_tensor: True
事实证明,包含标签的TensorList可以用张量表示,而包含图像的TensorList则不行。
print(labels)
TensorListCPU([[0][0][0][0][0][0][0][0]],dtype=DALIDataType.INT32,num_samples=8,shape=[(1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,)])
让我们看看返回的标签的形状和内容。 为了查看图像,我们需要遍历TensorList中包含的所有张量,并使用其at方法访问。
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt%matplotlib inlinedef show_images(image_batch):columns = 4rows = (max_batch_size + 1) // (columns)fig = plt.figure(figsize=(24, (24 // columns) * rows))gs = gridspec.GridSpec(rows, columns)for j in range(rows * columns):plt.subplot(gs[j])plt.axis("off")plt.imshow(image_batch.at(j))
show_images(images)
添加增强功能
随机洗牌
从上面的示例中可以看出,我们的管道返回的第一个图像批次仅包含狗。这是因为我们没有对数据集进行洗牌,所以fn.readers.file按字典顺序返回图像。
让我们创建一个新的管道来改变这种情况。
@pipeline_def
def shuffled_pipeline():jpegs, labels = fn.readers.file(file_root=image_dir, random_shuffle=True, initial_fill=21)images = fn.decoders.image(jpegs, device="cpu")return images, labels
为了获得shuffled_pipeline,我们对simple_pipeline进行了两处更改 - 我们向fn.readers.file操作添加了两个参数:
- random_shuffle启用阅读器中图像的洗牌。洗牌是通过使用从磁盘读取的图像缓冲区执行的。当阅读器被要求提供下一个图像时,它会从缓冲区中随机选择一个图像,输出它,并立即用新读取的图像替换该位置。
- initial_fill设置缓冲区的容量。该参数的默认值(1000)非常适合包含数千个示例的数据集,但对于我们只包含21张图像的非常小的数据集来说太大了。这可能导致返回的批次中经常出现重复的图像。这就是为什么在本示例中,我们将其设置为数据集的大小。
让我们测试这一修改的结果。
pipe = shuffled_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234)
pipe.build()
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
现在管道返回的图像已经被正确洗牌。
增强功能
DALI不仅可以从磁盘读取图像并将它们批量处理成张量,还可以对这些图像执行各种增强操作,以提高深度学习训练结果。
旋转是这类增强操作的一个例子。让我们创建一个新的管道,在输出图像之前对其进行旋转。 为此,我们向我们的管道添加了一个新操作:fn.rotate。
@pipeline_def
def rotated_pipeline():jpegs, labels = fn.readers.file(file_root=image_dir, random_shuffle=True, initial_fill=21)images = fn.decoders.image(jpegs, device="cpu")rotated_images = fn.rotate(images, angle=10.0, fill_value=0)return rotated_images, labels
正如我们在文档中看到的,rotate可以接受多个参数,但除了输入之外,只有一个参数是必需的 - angle告诉操作符应该旋转图像多少度。我们还指定了fill_value以更好地可视化结果。
让我们测试一下新创建的管道:
pipe = rotated_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234)
pipe.build()
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
将张量作为参数和随机数生成
将每个图像旋转10度并不那么有趣。为了进行有意义的增强,我们希望有一个操作符,可以将我们的图像按照给定范围内的随机角度旋转。
Rotate的angle参数可以接受float或float张量类型的值。第二个选项,即float张量,使我们能够通过由其他操作产生的张量为每个图像提供不同的旋转角度,以此来实现随机旋转。
随机数生成器是可以与DALI一起使用的操作的示例。让我们使用fn.random.uniform来创建一个管道,通过随机角度旋转图像。
注意:
请记住,每当您将DALI操作的输出作为命名关键字参数传递给另一个操作时,该数据必须放置在CPU上。在下面的示例中,我们将random.uniform的输出(默认device='cpu')用作旋转的angle关键字参数。
DALI中的这种参数称为“argument inputs”。关于它们的更多信息可以在管道文档部分找到。
常规输入(非命名、位置参数)没有这样的约束,可以使用CPU或GPU数据,如下所示。
@pipeline_def
def random_rotated_pipeline():jpegs, labels = fn.readers.file(file_root=image_dir, random_shuffle=True, initial_fill=21)images = fn.decoders.image(jpegs, device="cpu")angle = fn.random.uniform(range=(-10.0, 10.0))rotated_images = fn.rotate(images, angle=angle, fill_value=0)return rotated_images, labels
这一次,我们不是为角度参数提供固定值,而是将其设置为fn.random.uniform操作符的输出。
让我们来检查一下结果:
pipe = random_rotated_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234)
pipe.build()
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
这一次,旋转角度是从一个数值范围中随机选择的。
添加GPU加速
DALI提供了访问GPU加速操作符的功能,可以提高输入和增强管道的速度,并使其可以扩展到多GPU系统。
将张量复制到GPU
让我们修改之前随机旋转管道的示例,以便在旋转时使用GPU。
@pipeline_def
def random_rotated_gpu_pipeline():jpegs, labels = fn.readers.file(file_root=image_dir, random_shuffle=True, initial_fill=21)images = fn.decoders.image(jpegs, device="cpu")angle = fn.random.uniform(range=(-10.0, 10.0))rotated_images = fn.rotate(images.gpu(), angle=angle, fill_value=0)return rotated_images, labels
为了告诉DALI我们想要使用GPU,我们只需要对管道进行一个变更。我们将rotate操作中的输入从images(CPU上的张量)改为images.gpu(),这样可以将其复制到GPU上。
pipe = random_rotated_gpu_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234)
pipe.build()
pipe_out = pipe.run()
print(pipe_out)
pipe_out仍然包含2个TensorLists,但是这一次,rotate操作的第一个输出是在GPU上。我们无法直接从CPU访问TensorListGPU的内容,因此为了可视化结果,我们需要使用as_cpu方法将其复制到CPU上。
images, labels = pipe_out
show_images(images.as_cpu())
重要提示
DALI不支持在管道内将数据从GPU移动到CPU。这就是为什么CPU操作不能跟在GPU操作后面的原因。
混合解码
有时,特别是对于分辨率较高的图像,解码存储在JPEG格式中的图像可能成为瓶颈。为了解决这个问题,开发了nvJPEG和nvJPEG2000库。它们将解码过程在CPU和GPU之间拆分,显著减少了解码时间。
在fn.decoders.image中指定“mixed”设备参数可以启用nvJPEG和nvJPEG2000支持。其他文件格式仍然在CPU上解码。
@pipeline_def
def hybrid_pipeline():jpegs, labels = fn.readers.file(file_root=image_dir, random_shuffle=True, initial_fill=21)images = fn.decoders.image(jpegs, device="mixed")return images, labels
具有device=mixed的fn.decoders.image使用了一种同时利用CPU和GPU的混合计算方法。这意味着它接受CPU输入,但返回GPU输出。这就是为什么管道返回的images对象是TensorListGPU类型的原因。
pipe = hybrid_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234)
pipe.build()
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images.as_cpu())
让我们通过测量使用4个CPU线程的shuffled_pipeline和hybrid_pipeline的速度来比较fn.decoders.image在'cpu'和'mixed'后端的速度。
from timeit import default_timer as timertest_batch_size = 64def speedtest(pipeline, batch, n_threads):pipe = pipeline(batch_size=batch, num_threads=n_threads, device_id=0)pipe.build()# warmupfor i in range(5):pipe.run()# testn_test = 20t_start = timer()for i in range(n_test):pipe.run()t = timer() - t_startprint("Speed: {} imgs/s".format((n_test * batch) / t))
speedtest(shuffled_pipeline, test_batch_size, 4)
Speed: 2597.527149961429 imgs/s
speedtest(hybrid_pipeline, test_batch_size, 4)
Speed: 5828.851662794091 imgs/s
正如我们所看到的,使用GPU加速解码显著提高了速度。