找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4700|回复: 0

在 C++ 中使用 PPL 进行异步编程

[复制链接]
发表于 2012-3-19 22:40:53 | 显示全部楼层 |阅读模式
好莱坞星探通常要拒绝那些有抱负的演员时,都会轻蔑地说:“别联系我们,我们会联系你的。”然而对于开发人员来说,那句话却道出了许多软件框架工作的秘密,与其让程序员驱动整个应用程序的控制流,不如让框架控制环境并调用程序员提供的回调或事件处理程序。在异步系统中,本范例让你将异步操作的开始与完成进行分离。程序员启动操作,然后注册回调,并在结果可用时调用回调。不必等待完成意味着你可以在操作运行期间执行有用的工作,例如,处理消息循环或启动其他异步操作。如果你对所有潜在阻止的操作严格遵循此模式,则“毛玻璃窗口”、“旋转的同心圆”以及其他此类现象都将成为历史。正如你曾听到的那样,你的应用程序将变得快而流畅。
在 Windows 8 中,异步操作很普遍,并且 WinRT 提供了一个新编程模型,以一致方式对异步进行处理。
图 1 演示了处理异步操作的基本模式。在这段代码中,C++ 函数从文件读取字符串。
图 1 从文件进行读取


  •           template<typename Callback>
  • void ReadString(String^ fileName, Callback func)
  • {
  •   StorageFolder^ item = KnownFolders::PicturesLibrary;

  •   auto getFileOp = item->GetFileAsync(fileName);
  •   getFileOp->Completed = ref new AsyncOperationCompletedHandler<StorageFile^>
  •     ([=](IAsyncOperation<StorageFile^>^ operation, AsyncStatus status)
  •   {
  •     auto storageFile = operation->GetResults();
  •     auto openOp = storageFile->OpenAsync(FileAccessMode::Read);
  •     openOp->Completed =
  •       ref new AsyncOperationCompletedHandler <IRandomAccessStream^>
  •       ([=](IAsyncOperation<IRandomAccessStream^>^ operation, AsyncStatus status)
  •     {
  •       auto istream = operation->GetResults();
  •       auto reader = ref new DataReader(istream);
  •       auto loadOp = reader->LoadAsync(istream->Size);
  •       loadOp->Completed = ref new AsyncOperationCompletedHandler<UINT>
  •         ([=](IAsyncOperation<UINT>^ operation, AsyncStatus status)
  •       {
  •         auto bytesRead = operation->GetResults();
  •         auto str = reader->ReadString(bytesRead);
  •         func(str);
  •       });
  •     });
  •   });
  • }


要注意的第一件事情是 ReadString 的返回类型为 void。没错:该函数不返回值;相反,它使用用户提供的回调,并在结果可用时调用回调。欢迎来到异步编程的世界:“别联系我们,我们会联系你的!”
WinRT 异步操作的分析WinRT 中异步的核心是在 Windows::Foundation 命名空间中定义的四个接口:IAsyncOperation、IAsyncAction、IAsyncOperationWithProgress 和 IAsyncActionWithProgress。WinRT 中所有潜在阻止或长期运行的操作都被定义为异步。按照惯例,方法的名称都以“Async”结尾,而返回类型则为四个接口中的一个。例如图 1 所示示例中的方法 GetFileAsync,它返回 IAsyncOperation<StorageFile^>。许多异步操作不返回值,且它们的类型为 IAsyncAction。可以报告进度的操作将通过 IAsync­OperationWithProgress 和 IAsyncActionWithProgress 公开。
要为异步操作指定完成回调,可以设置 Completed 属性。该属性是一个接收异步接口和完成状态的委托。尽管该委托可以使用函数指针进行实例化,但你通常使用 lambda(我希望到现在为此,你已经熟悉这部分的 C++11)。
要获得操作的值,需要对接口调用 GetResults 方法。请注意,尽管这是从 GetFileAsync 调用返回给你的同样接口,但是当你位于完成处理程序中时,你只能对它调用 GetResults。
完成委托的第二个参数是 AsyncStatus,它返回操作的状态。在实际的应用程序中,你将先检查它的值再调用 GetResults。在图 1 中,为了简单起见而省略了这部分。
你经常会发现,自己同时使用多个异步操作。在我的示例中,我首先获取 StorageFile 的实例(通过调用 GetFileAsync),然后使用 OpenAsync 打开它,再获取 IInputStream。接下来,我加载数据 (LoadAsync) 并使用 DataReader 进行读取。最后,获取字符串并调用用户提供的回调函数。
组合将操作的启动和完成分离对于消除阻止调用非常重要。问题是撰写多个基于回调的异步操作非常困难,并且得到的代码很难研究和调试。必须采取措施控制随之发生的“回调乱局”。
让我们看一个具体的示例。我想使用之前示例中的 ReadString 函数按顺序在两个文件中进行读取,然后将结果连接成一个字符串。我打算再次将它实现为采用回调的函数:

  •           template<typename Callback>
  • void ConcatFiles1(String^ file1, String^ file2, Callback func)
  • {
  •   ReadString(file1, [func](String^ str1) {
  •     ReadString(file2, [func](String^ str2) {
  •       func(str1+str2);
  •     });
  •   });
  • }


效果还不错吧?
如果你看不出这个解决方案存在的瑕疵,那么请考虑下这个问题:什么时候开始从 file2 进行读取?你真的需要先读完第一个文件,再开始读第二个文件吗?当然不是!积极启动多个异步操作并在数据传入时进行处理,效果要好得多。
我们来试一试。首先,因为我并发启动了两个操作,并在操作完成前从函数返回,所以我需要一个特殊的堆分配对象存放中间结果。我将它命名为 ResultHolder:

  •           ref struct ResultHolder
  • {
  •   String^ str;
  • };


图 2 所示,接下来的第一个操作是设置 results->str 成员。要完成的第二个操作将用它构成最终的结果。
图 2 并发从两个文件进行读取


  •           template<typename Callback>
  • void ConcatFiles(String^ file1, String^ file2, Callback func)
  • {
  •   auto results = ref new ResultHolder();

  •   ReadString(file1, [=](String^ str) {
  •     if(results->str != nullptr) { // Beware of the race condition!
  •           func(str + results->str);
  •     }
  •     else{
  •       results->str = str;
  •     }
  •   });

  •   ReadString(file2, [=](String^ str) {
  •     if(results->str != nullptr) { // Beware of the race condition!
  •           func(results->str + str);
  •     }
  •     else{
  •       results->str = str;
  •     }
  •   });
  • }


大多数时候这种做法都是奏效的。该代码有很明显的争用条件,并且它不处理错误,因此我们仍然有很多工作要做。对于结合两个操作这么简单的事情,却用了这么多的代码,难免会出错。
并行模式库中的任务Visual Studio 并行模式库 (PPL) 旨在让 C++ 中异步并行程序的编写变得简单而高效。PPL 用户可以使用诸如任务、并行算法(例如 parallel_for 和 parallel_sort)等更高级的抽象和并发友好型容器(例如 concurrent_vector),来取代在线程和线程池级运行。
PPL 任务类是下一版 Visual Studio 中的新增功能,它使你可以简洁地表示要异步执行的单个工作单元。使用该功能可以按照独立(或互相独立)任务表达程序逻辑,然后让运行时以最佳方式安排这些任务。
任务之所以这么有用,是因为它们的可组合性。在最简单的形式中,对于两个任务,可以将一个任务声明为另一个任务的延续来按顺序编写。这看起来非常简单的结构却允许你以有趣的方式组合多个任务。诸如联接和选项(我稍后再进行介绍)的许多更高级 PPL 构造都是通过这个概念自我建构的。任务延续还可用于以更简洁方式表示异步操作的完成。让我们重新看看图 1 中的示例,现在使用 PPL 任务编写它,如图 3 所示。
图 3 使用嵌套的 PPL 任务从文件进行读取


  •           task<String^> ReadStringTask(String^ fileName)
  • {
  •   StorageFolder^ item = KnownFolders::PicturesLibrary;
  •   task<StorageFile^> getFileTask(item->GetFileAsync(fileName));
  •   return getFileTask.then([](StorageFile^ storageFile) {
  •     task<IRandomAccessStream^> openTask(storageFile->OpenAsync(
  •       FileAccessMode::Read));
  •     return openTask.then([](IRandomAccessStream^ istream) {
  •       auto reader = ref new DataReader(istream);
  •       task<UINT> loadTask(reader->LoadAsync(istream->Size));
  •       return loadTask.then([reader](UINT bytesRead) {
  •         return reader->ReadString(bytesRead);
  •       });
  •     });
  •   });
  • }


因为我现在使用任务而不是回调表示异步,所以用户提供的回调消失了。该函数实际改为返回任务。
在实现过程中,我从 GetFileAsync 返回的异步操作创建了 getFileTask 任务,然后将该操作的完成设置为任务的延续(使用 then 方法)。
then 方法值得仔细研究一下。该方法的参数是 lambda 表达式。实际上,参数还可以是函数指针、函数对象或 std::function 的实例,但是因为 lambda 表达式在 PPL 中十分普遍(实际上在现代的 C++ 中也一样),从这里开始我将只说“lambda”,用来表示所有类型的可调用对象。
then 方法的返回类型是某类型 T 的任务。这种类型 T 由传递给 then 的 lambda 返回类型决定。在最基本的形式下,当 lambda 返回类型 T 的表达式时,then 方法返回 task<T>。例如,下面延续中的 lambda 返回了 int;因此,生成类型为 task<int>:

  •           task<int> myTask = someOtherTask.then([]() { return 42; });


图 3 中使用的延续类型稍有不同。它返回一个任务并执行该任务的异步展开,所以生成类型不是 task<task<int>>,而是 task<int>:

  •           task<int> myTask = someOtherTask.then([]() {
  •   task<int> innerTask([]() {
  •     return 42;
  •   });
  •   return innerTask;
  • });


如果所有这些让你觉得有点头大,不要紧,继续往下看。我保证在几个具有代表意义的示例之后,立即就会豁然开朗起来的。
任务组合根据上面部分讲述的内容,继续在文件读取示例的基础上进行构建。
前面曾提到,C++ 中函数和 lambda 的所有本地变量在返回时均已丢失。要保持该状态,你必须手动将变量复制到堆或其他某个生存期较长的存储。这就是为什么我之前就创建了储存器类。在异步运行的 lambda 中,请务必小心不要通过指针或引用捕获外围函数的任何状态;否则,当函数完成时,你将随指针终止于一个无效的内存位置。
我要强调的是,then 方法对异步接口执行了展开操作,我以更简洁的形式重写了示例,然而成本只不过是引入了另一个储存器结构,如图 4 所示。
图 4 链接多个任务


  •           ref struct Holder
  • {
  •   IDataReader^ Reader;
  • };
  • task<String^> ReadStringTask(String^ fileName)
  • {
  •   StorageFolder^ item = KnownFolders::PicturesLibrary;

  •   auto holder = ref new Holder();

  •   task<StorageFile^> getFileTask(item->GetFileAsync(fileName));
  •   return getFileTask.then([](StorageFile^ storageFile) {
  •     return storageFile->OpenAsync(FileAccessMode::Read);
  •   }).then([holder](IRandomAccessStream^ istream) {
  •     holder->Reader = ref new DataReader(istream);
  •     return holder->Reader->LoadAsync(istream->Size);
  •   }).then([holder](UINT bytesRead) {
  •     return holder->Reader->ReadString(bytesRead);
  •   });
  • }


图 3 中的示例相比,这段代码更易于阅读,因为它呈现的是按顺序的步骤,而不是“楼梯式”的嵌套操作。
除了 then 方法,PPL 还具有一些其他组合构造。其中一个是联接操作,由 when_all 方法实现。when_all 方法采用一系列任务然后返回生成任务,生成任务将构成任务的所有输出收集到 std::vector 中。对于两个参数的一般情况,PPL 具有一个简便的表达方法:运算符 &&。
这就是我如何使用联接运算符重新实现文件串联方法:

  •           task<String^> ConcatFiles(String^ file1, String^ file2)
  • {
  •   auto strings_task = ReadStringTask(file1) && ReadStringTask(file2);
  •   return strings_task.then([](std::vector<String^> strings) {
  •     return strings[0] + strings[1];
  •   });
  • }


选项操作也很有用。如果有一系列的任务,选项(通过 when_any 方法实现)在序列中第一个任务完成时完成。像联接一样,选项也具有一个双参数的简便表达方法,使用运算符 ||。
选项在冗余或推测执行的情况下比较方便;你启动多个任务,由要完成的第一个任务提供所需的结果。你还可以对操作添加超时设置 - 启动一个返回任务的操作,然后将它与休眠指定时间量的任务相组合。如果休眠任务先完成,就表示你的操作超时,因此被放弃或取消。
PPL 具有另一个有助于任务可组合性的构造 (task_completion_event),你可以将它用于任务与非 PPL 代码的交互操作。task_completion_event 可以传递给线程或期望最后设置的 IO 完成回调。从 task_completion_event 创建的任务在设置 task_completion_event 之后即完成。
使用 PPL 编写异步操作无论何时你需要发挥硬件的最大性能,C++ 语言都是你的明智之选。其他语言在 Windows 8 中发挥各自的作用:JavaScript/HTML5 组合很适合编写 GUI;C# 提供高效的开发人员体验;等等。要编写 Metro 样式的应用程序,请使用你擅长的方法和你了解的方式。实际上,你可以在同一个应用程序中使用多种语言。
你经常会发现,编写应用程序前端时使用 JavaScript 或 C# 等语言,而编写后端组件时则使用 C++ 语言,以获得最大性能。如果 C++ 组件导出的操作受计算限制或受 I/O 限制,最好将该操作定义为异步操作。
为实现之前介绍的四种 WinRT 异步接口(IAsyncOperation、IAsyncAction、IAsyncOperation­WithProgress 和 IAsyncActionWithProgress),PPL 在并发命名空间中同时定义了 create_async 方法和 progress_reporter 类。
在最简单的形式中,create_async 采用返回值的 lambda 或函数指针。lambda 的类型决定从 create_async 返回的接口的类型。
如果某个无参数 lambda 返回非 void 类型 T,则 create_async 返回 IAsyncOperation<T> 的实现。对于返回 void 的 lambda,生成接口为 IAsyncAction。
lambda 可以采用 progress_reporter<P> 类型的参数。该类型的实例用于将类型 P 的进度报告发布回调用方。例如,采用 progress_reporter<int> 的 lambda 可以使用整数值报告完成百分比。这种情况下,lambda 的返回类型决定生成接口是 IAsyncOperationWithProgress<T,P> 还是 IAsyncAction<P>。参见图 5
图 5 在 PPL 中编写异步操作


  •           IAsyncOperation<float>^ operation = create_async([]() {
  •   return 42.0f;
  • });

  • IAsyncAction^ action = create_async([]() {
  •     // Do something, return nothing
  • });

  • IAsyncOperationWithProgress<float,int>^ operation_with_progress =
  •   create_async([](progress_reporter<int> reporter) {
  •     for(int percent=0; percent<100; percent++) {
  •       reporter.report(percent);
  •     }
  •     return 42.0f;
  •   });

  • IAsyncActionWithProgress<int>^ action_with_progress =
  •   create_async([](progress_reporter<int> reporter) {
  •     for(int percent=0; percent<100; percent++) {
  •       reporter.report(percent);
  •     }
  •   });


要向其他 WinRT 语言公开异步操作,请在你的 C++ 组件中定义一个公共 ref 类,并定义一个返回四个异步接口之一的函数。你可以在 PPL 示例包中找到有关混合 C++/JavaScript 应用程序的具体示例(要获得该示例包,请联机搜索“Asynchrony with PPL”)。以下代码段以带进度的异步操作公开图像转换例程:

  •           public ref class ImageTransformer sealed
  • {
  • public:
  •   //
  •   // Expose image transformation as an asynchronous action with progress
  •   //
  •   IAsyncActionWithProgress<int>^ GetTransformImageAsync(String^ inFile, String^ outFile);
  • }


图 6 所示,应用程序的客户端部分在 JavaScript 中使用 promise 对象实现。
图 6 在 JavaScript 中使用图像转换例程

  •           var transformer = new ImageCartoonizerBackend.ImageTransformer();
  • ...
  •           transformer.getTransformImageAsync(copiedFile.path, dstImgPath).then(
  • function () {
  • // Handle completion…
  • },
  • function (error) {
  • // Handle error…
  • },
  • function (progressPercent) {
  • // Handle progress:
  • UpdateProgress(progressPercent);
  • }
  • );


错误处理和取消留心的读者可能已经注意到,这种异步处理到目前为止几乎完全不涉及任何错误处理和取消。下面就立即开始讨论这个主题!
文件读取例程总会不可避免地遇到不存在的文件或因众多原因而无法打开的文件。字典查询功能将遇到不认识的字词。图像转换无法尽快生成结果,而被用户取消。在这些场景中,操作在执行完预期的工作之前已经永远终止。
在现代的 C++ 中,异常用于指示错误或其他异常条件。异常在单线程中运行非常好:当引发异常时,堆栈随即展开,一直展开到调用堆栈下的适当 catch 块。加入并发后,事情就变得杂乱了,因为从一个线程生成的异常不容易被另一个线程捕获。
考虑任务和延续任务发生了什么:当任务的主体引发了异常时,其执行流即被中断,并且无法生成值。如果没有值可以传递给延续任务,则延续任务不会运行。即使是不生成值的 void 任务,你也需要能够告诉它之前的任务是否已成功完成。
这就是为什么存在延续任务的另一种形式:对于类型 T 的任务,错误处理延续任务的 lambda 采用 task<T>。要获得之前任务生成的值,必须对参数任务调用 get 方法。如果之前的任务已成功完成,则 get 也成功完成。否则,get 方法将引发异常。
在此我想要强调一个重点。对于 PPL 中的所有任务,包括从异步操作创建的任务,对其调用 get 函数在语法上是有效的。然而,在结果可用之前,get 方法必须阻止调用线程,当然,这与我们“快而流畅”的口号是矛盾的。因此,一般不鼓励对任务调用 get 方法,并且在 STA 中禁止调用该方法(运行时将引发“无效操作”异常)。仅当你将任务作为延续任务的参数,才能调用 get。图 7 显示了一个示例。
图 7 错误处理延续任务


  •           task<image> take_picture([]() {
  •   if (!init_camera())
  •     throw std::exception("can’t init camera");
  •   return get_image();
  • });

  • take_picture.then([](task<image> antecedent) {
  •   try
  •   {
  •     image img = antecedent.get();
  •   }
  •   catch (std::exception ex)
  •   {
  •     // Handle exception here
  •   }
  • });
  • var transformer = new ImageCartoonizerBackend.ImageTransformer();
  • ...
  •           transformer.getTransformImageAsync(copiedFile.path, dstImgPath).then(
  •   function () {
  •     // Handle completion…
  •   },
  •   function (error) {
  •     // Handle error…
  •   },
  •   function (progressPercent) {
  •     // Handle progress:
  •     UpdateProgress(progressPercent);
  •   }
  • );


你程序中的每个延续任务都可能是错误处理延续任务,你可以选择处理所有延续任务中的异常。然而,在由多个任务组成的程序中,处理所有延续任务中的异常可能会造成过度负载。幸运的是,这种情况不一定发生。与未处理的异常相似,沿着调用堆栈向下处理,直到找到捕获它们的框架,由任务引发的异常可以“慢慢流向”链中的下一个延续任务(直到到达最后处理它们的位置)。并且必须对他们进行处理,如果某个异常保持未处理状态超过了任务本可以对它完成处理的生存期,则运行时将引发“未观察到的异常”异常。
现在让我们回到文件读取示例,并针对它讨论错误处理。由 WinRT 引发的所有异常都属于类型 Platform::Exception,因此这也是我要在最后的延续任务中捕获的内容,如图 8 所示。
图 8 使用错误处理从文件读取字符串


  •           task<String^> ReadStringTaskWithErrorHandling(String^ fileName)
  • {
  •   StorageFolder^ item = KnownFolders::PicturesLibrary;

  •   auto holder = ref new Holder();

  •   task<StorageFile^> getFileTask(item->GetFileAsync(fileName));
  •   return getFileTask.then([](StorageFile^ storageFile) {
  •     return storageFile->OpenAsync(FileAccessMode::Read);
  •   }).then([holder](IRandomAccessStream^ istream) {
  •     holder->Reader = ref new DataReader(istream);
  •     return holder->Reader->LoadAsync(istream->Size);
  •   }).then([holder](task<UINT> bytesReadTask) {
  •     try
  •     {
  •       UINT bytesRead = bytesReadTask.get();
  •       return holder->Reader->ReadString(bytesRead);
  •     }
  •     catch (Exception^ ex)
  •     {
  •       String^ result = ""; // return empty string
  •       return result;
  •     }
  •   });
  • }


延续任务捕获到异常后,将视异常为“已处理”,而延续任务则返回成功完成的任务。所以,在图 8 中,ReadStringWithErrorHandling 的调用方将无法得知文件读取是否已成功完成。我在这里要说的是太早处理异常并不总是好事。
取消是过早终止任务的另一种形式。与 PPL 一样,在 WinRT 中进行取消需要双方的协作,即操作的客户端和操作本身。它们的作用不同:客户端请求取消,而操作确认或拒绝请求。由于客户端和操作之间的自然竞争,因此取消请求并不保证一定成功。
在 PPL 中,这两种作用分别由两个类型表示:cancellation_token_source 和 cancellation_token。前一个类型的实例用于通过调用 cancel 方法来请求取消。后一个类型的实例则从 cancellation_token_source 进行实例化,并作为最后一个参数传递给任务的构造函数(then 方法)或 create_async 方法的 lambda。
在任务的主体内部,实现可以通过调用 is_task_cancellation_requested 方法轮询取消请求,并通过调用 cancel_current_task 方法确认请求。由于 cancel_current_task 方法在封面下引发异常,因此可以在调用 cancel_current_task 之前进行一些资源清理。图 9 显示了一个示例。
图 9 任务中取消以及对取消请求的反应


  •           cancellation_token_source ct;

  • task<int> my_task([]() {
  •   // Do some work
  •   // Check if cancellation has been requested
  •   if(is_task_cancellation_requested())
  •   {
  •         // Clean up resources:
  •         // ...
  •           // Cancel task:
  •         cancel_current_task();
  •   }
  •   // Do some more work
  •   return 1;
  • }, ct.get_token());
  • ...
  •           ct.cancel(); // attempt to cancel


请注意,许多任务都可以通过相同的 cancellation_token_source 取消。这对于处理任务链和任务图形时非常方便。你可以取消指定的 cancellation_­token_source 管理的所有任务,而无需单独地取消每一个任务。当然,不保证所有任务都能实际响应取消请求。此类任务将完成,但是它们正常(基于值)的延续任务不会运行。错误处理延续任务将运行,但在尝试从之前任务获取值时将引发 task_canceled 异常。
最后,让我们看一下对生产方使用取消令牌。create_async 方法的 lambda 可以采用 cancellation_token 参数,使用 is_canceled 方法对该参数进行轮询,并在响应取消请求时取消该操作:

  •           IAsyncAction^ action = create_async( [](cancellation_token ct) {
  •   while (!ct.is_canceled()); // spin until canceled
  •   cancel_current_task();
  • });
  • ...
  •           action->Cancel();


请注意,在任务延续的情况下,由 then 方法接收取消令牌,而对于 create_async,取消令牌则传递到 lambda。在后一种情况下,通过对生成的异步接口调用 cancel 方法启动取消,然后由 PPL 通过取消令牌直接将它插入取消请求。
总结如同 Tony Hoare 曾经嘲笑的一样,我们需要教育我们的程序“等待快一点”。然而,不等待的异步编程仍然很难掌控,并且其优势也不是非常明显,因此开发人员不使用它。
在 Windows 8 中,所有阻止操作都是异步的。如果你是一名 C++ 程序员,PPL 可以使异步编程非常愉快。拥抱异步世界吧,告诉你的程序等待再快一点!
作者:yincheng01 发表于2012-3-19 18:39:44 原文链接

您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-22 16:58 , Processed in 0.020215 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表