воскресенье, 25 апреля 2010 г.

Parallel.ForEach vs. PLINQ

Введение



Когда Вам нужно оптимизировать программу для работы с многоядерными процессорами, для начала стоит задать вопрос, а можно ли разделить Вашу программу на части, которые смогут выполняться параллельно. Если Ваше решение использует интенсивные вычислительные действия, которые выполняются параллельно для каждого элемента из большого набора данных, то главный кандидат для использования новых возможностей параллельного программирования это .NET Framework 4: Parallel.ForEach и Parallel LINQ (PLINQ). Этот документ познакомит Вас с Parallel.ForEach и PLINQ, рассмотрит, как использовать эти технологии и объяснит специфичные сценарии, представляемые каждой технологией.


Parallel.ForEach


Метод ForEach класса Parallel – это многопоточная реализация обычного цикла foreach в C#. Напомним, что цикл foreach позволяет последовательно выполнять операции над выбранным перечислимым набором данных используя IEnumerable. Parallel.ForEach похож на foreach в том, что он выполняет операции над перечислимым набором данных. Отличием Parallel.ForEach от foreach является использование многопоточности для выполнения различных действий тела цикла. Оказывается, что благодаря этим отличиям Parallel.ForEach становится полезным механизмом для параллельного программирования.
Для того чтобы оценить параллельную работу с последовательностью, важно разобраться в том, как можно поделить действия цикла на маленькие части, которые смогут работать параллельно. Такое разделение позволяет каждому потоку выполнять тело цикла для одной части данных.
Parallel.ForEach имеет несколько overload-методов. Обычно используется следующая сигнатура:

public static ParallelLoopResult ForEach<TSource>(
  IEnumerable<TSource> source,
  Action<TSource> body)


* This source code was highlighted with Source Code Highlighter.


Параметр source определяет последовательность, над которой будет происходить перечисление, параметр body задает делегат, который будет вызываться для каждого элемента последовательности. Для простоты описания, мы не будем разбирать остальные сигнатуры метода Parallel.ForEach. Список различных overload-методов можно найти здесь.

PLINQ


Подобно Parallel.ForEach, PLINQ это модель программирования для выполнения палаллельных операций над данными. Пользователь определяет параллельные операции комбинируя различные предопределенные операторы, такие как, проекции, фильтры, агрегации, и так далее. Поскольку LINQ является декларативным языком, PLINQ может вмешиваться и обрабатывать параллельные запросы от имени пользователя. Подобно Parallel.ForEach, PLINQ добивается параллельного выполнения блягодаря разбиению входной последовательности и обработке различных элементов в разных потоках.
Хотя каждый из этих инструментов заслуживает отдельной статьи, дальнейшее описание выходит за рамки этого документа. Точнее, этот документ укажет на интересные отличия между двумя подходами к параллельной обработке данных. В частности, этот документ охватит те сценарии, в которых необходимо использовать Parallel.ForEach вместо PLINQ и наоборот.

Простая параллельная обработка данных с независимыми дейстиями



Параллельные операции над данными с независимыми действиями предполагают выполнение дорогих вычислительных действий без возврата значения.

Использование Parallel.ForEach для выполнения независимых действий


Шаблон независимого действия может быть достигнут при использовании любой перегрузки метода Parallel.ForEach, т.к. все они объединены этой возможностью. Если Вы используете упомянутую выше перегрузку, код будет выглядеть следующим образом:
public static void IndependentAction(

IEnumerable<T> source, Action<T> action)
{
  Parallel.ForEach(source, element => action(element));
}


* This source code was highlighted with Source Code Highlighter.


В этом примере, action(element) представляет собой дорогое вычислительное действие, которое нужно выполнить над каждым элементом.

Почему не PLINQ?


Хотя PLINQ имеет оператор ForAll, для данного типа сценария проще думать в терминах параллельных циклов, а не параллельных запросов. Кроме того, PLINQ может оказаться слишком тяжелым для простых независимых действий. С Parallel.ForEach Вы можете указать ParallelOptions.MaxDegreeOfParallelism, который определяет, что необходимо N потоков. Таким образом, если ресурсы ThreadPool будут недостаточны, даже если число доступных потоков будет меньше N, эти потоки начнут помогать выполнению Parallel.ForEach. Чем больше потоков становится доступными, тем больше освободившхся ресурсов будет использоваться для выполнения тела цикла. Однако, PLINQ требует точно N потоков, которые определяются extension-методом WithDegreeOfParallelism(). Другими словами, для PLINQ, значение N представляет собой число потоков, которые активно учавствуют в запросе.


Упорядоченная параллельная обработка данных



Использование PLINQ для сохранения порядка


Если Ваша операция требует на выходе сохранить порядок входного набора данных, то Вам, вероятно, будет проще использовать PLINQ чем Parallel.ForEach. В PLINQ оператор AsOrdered() автоматически обработает порядок входной последовательности, при выполнении последующих запросов, позволив Вам абстрагироваться от явного управления порядком последовательности, и в результате сохранит данные. Оператор делает это следующим образом: он читает элементы из входного набора данных, разбивает их, параллельно вычисляет результаты, и, наконец, упорядочивает результаты в правильной выходной последовательности. Например, если Вам необходимо преобразовать кадры фильма из RGB в Grayscale, то получившееся кино было бы некорректным, если порядок кадров не сохранялся. Ниже приведен пример того, как можно использовать запрос, который сохраняет порядок выводимых данных, для параллельной обработки растровых кадров фильма, и последующего прохода по ним в той последовательности, в которой они были во входном наборе кадров.

public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
  var ProcessedMovie =
    Movie
    .AsParallel()
  .AsOrdered()
    .Select(frame => ConvertToGrayscale(frame));

    foreach (var grayscaleFrame in ProcessedMovie)
    {
      // Movie frames will be evaluated lazily
    }
}


* This source code was highlighted with Source Code Highlighter.


Этот пример кратко демонстрирует, как можно сохранить последовательность данных, используя PLINQ. Вместо того, чтобы беспокоиться о том, как реализовать Ваше решение по сохранению порядка последовательности данных, Вы просто выражаете то, что Вы хотели, используя оператор PLINQ. Вызов AsOrdered() после AsParallel() обеспечивает правильный порядок в трансформированной последовательности фильма.

Почему не Parallel.ForEach?


Как упоминалось ранее, сохранение порядка важно, когда идет речь о получении результатов, которые остаются в исходном порядке. Хотя параллельная обработка данных с сохранением порядка может быть реализована с использованием Parallel.ForEach, она требует значительного объема работы, за исключением нескольких тривиальных случаев. В качестве примера можно рассмотреть такой тривиальный случай, Вы можете использовать следующий перегруженный метод Parallel.ForEach для достижения такого же эффекта как и у оператора AsOrdered():

public static ParallelLoopResult ForEach<TSource >(
  IEnumerable<TSource> source, 
  Action<TSource, ParallelLoopState, Int64>
  body)


* This source code was highlighted with Source Code Highlighter.


Этот перегруженный метод передает индекс текущего элемента в делегат пользователя. Затем Вы можете записать результат в элемент с этим индексом в выходной последовательности, параллельно вычислить дорогостоящую операцию, и, наконец, использовать правильный порядок выходных данных. Следующий пример демонстрирует один из многих путей сохранения порядка:

public static double[] PairwiseMultiply(
  double[] v1,
  double[] v2)
{
  var length = Math.Min(v1.Length, v2.Lenth);
  double[] result = new double[length];
  Parallel.ForEach(v1,
    (element, loopstate, elementIndex) =>
      result[elementIndex] = element * v2[elementIndex]);
  return result;
}


* This source code was highlighted with Source Code Highlighter.



Заметьте, что в данном примере, входная и выходная последовательность это массивы с фиксированной длинной, которые позволили достаточно просто сохранить порядок. Однако недостатки этого подхода очевидны. Если бы исходная последовательность была IEnumerable, а не массивом, то у Вас есть 4 способа осуществить сохранение порядка:


  • Первый способ требует O(n) обращений запроса IEnumerable.Count(), т.к. он проходит по всей коллекции для возврата количества элементов. После Вы можете объявить массив с необходимым количеством элементов перед вызовом Parallel.ForEach и использовать вышеописанный подход.

  • Второй способ должен осуществить исходный порядок коллекции до ее использования. В случае если набор Ваших данных чрезмерно большой, то ни один из двух описанных способов Вам не подойдет.

  • Третий способ требует некоторой логики над Вашей выходной коллекцией. Выходная коллекция могла бы быть хэш-структурой. В этом случае количество памяти, требуемое только для сохранения выходных данных, будет по крайней мере в 2 раза больше чем размер памяти входных данных, из-за проблем с коллизиями. Важно отметить, что если размер Ваших входных данных будет большой, хэш-структура может получиться чрезмерно большой и замедлит работу системы из-за общего кэша и сборщика мусора.

  • Наконец, последний способ должен сохранять результаты в соответствии с их исходными индексами и реализовывать Ваш алгоритм сотрировки, для осуществления сортировки перед выводом результатов.



С PLINQ пользователь просто запрашивает сохранения порядка и механизм запроса обработает мельчайшие детали, для гарантии правильного порядка выходного результата. Также инфраструктура PLINQ позволяет оператору AsOrdered() обрабатывать потоковые входные данные, другими словами, PLINQ позволяет использовать во входных данных механизмы отложенной загрузки. Если Вы используете PLINQ, то осуществлять любой из выше описанных механизмов не имеет смысла, т.к. Вы можете просто использовать оператор AsOrdered().


Потоковая параллельная обработка данных



Использование PLINQ для потоковой обработки


PLINQ предоставляет возможность обработки результата запроса как поток. Эта возможность чрезвычайна ценна по нескольким причинам:

  1. Результаты не нужно помещать в массив, таким образом, не все данные будут находиться в памяти в любой момент времени.

  2. Вы можете пройтись по результату в одном потоке, т.к. он уже сформирован


Приведем пример, предположим, что Вы хотите вычислить риск каждой акции в наборе акций, выбрать только те акции, которые соответствуют определенным критериям анализа рисков, и затем выполнить некоторые вычисления над выбранными результатами. Вот так будет выглядеть код на PLINQ:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
  var StockRiskPortfolio =
    Stocks
    .AsParallel()
    .AsOrdered()
    .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock) })
    .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));

    foreach (var stockRisk in StockRiskPortfolio)
    {
      SomeStockComputation(stockRisk.Risk);
      // StockRiskPortfolio will be a stream of results
    }
}


* This source code was highlighted with Source Code Highlighter.



В вышеприведенном примере, элементы были разбиты на несколько частей, обработаны в нескольких потоках, и собраны вместе. Важно понимать, что эти действия будут выполняться одновременно, и как только часть результатов станут доступны, отдельный поток сможет начать вычисления в цикле foreach. По умолчанию PLINQ оптимизирует пропускную способность, а не время ожидания, и использует внутри себя буфер. Таким образом, даже если определенный результат был вычислен, он будет находиться в выходном буфере пока буфер не будет полностью сформирован. Такое поведение можно изменить, используя extension-метод WithMergeOptions, который позволяет Вам определить буферизацию выхода. WithMergeOptions принимает в качестве параметра ParallelMergeOptions, который определяет, как запрос выдает результаты, когда они используются отдельным потоком. Вы можете выбрать следующие варианты:

  • ParallelMergeOptions.NotBuffered: эта опция определяет, что каждый обработанный элемент должен быть возвращен из каждого потока, как только он сформирован.

  • Parallel.MergeOptions.AutoBuffered: эта опция определяет, что элементы собираются в буфер и элементы буфера периодически отдаются работающему с ним потоку.

  • ParallelMergeOptions.FullyBuffered: эта опция, определяет, что все элементы должны быть буферизованы прежде, чем к ним можно будет обратиться. Эта опция позволяет быстрее выполнить запрос целиком, но процесс возврата первого значения займет больше времени.



Пример как использовать WithMergeOptions доступен на MSDN.

Почему не Parallel.ForEach?


Давайте разберем недостатки использования Parallel.ForEach. В случае неупорядоченной обработки результатов вычислений как потока, с использованием Parallel.ForEach, код будет выглядеть следующим образом:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
  Parallel.ForEach(Stocks,
    stock => {
      var risk = ComputeRisk(stock);
      if(ExpensiveRiskAnalysis(risk)
      {
        // stream processing
        lock(myLock) { SomeStockComputation(risk) };
        // store results
      }
}


* This source code was highlighted with Source Code Highlighter.


Этот код почти идентичен примеру с PLINQ выше, за исключением явного выставления lock’а и менее изящного кода. Отметим, что в этой ситуации Parallel.ForEach ожидает от Вас сохранение результатов потокобезопасным способом, в то время как PLINQ делает это всё за Вас автоматически.
Напомним, что делегат из Parallel.ForEach имеет тип Action, который ничего не возвращает. Существует 3 варианта, чтобы сохранить результаты. Первый заключается в сохранении значений в коллекции, которая не является потокобезопасной, что потребует явново выставления lock’ов. Второй вариант заключается в сохранении результирующих значений в потокобезопасной коллекции. К счастью, вместо того чтобы реализовывать свою собственную коллекцию .NET Framework 4 предоставляет богатый набор таких коллекций в namespac’е System.Collections.Concurrent. Третий вариант заключается в сохранении результирующих данных, используя специальный перегруженный метод Parallel.ForEach, который будет описан далее. Каждый из этих вариантов предполагает явное управление побочными эффектами для простой записи результатов, которые абстрагируются посредством использования инфраструктуры PLINQ.


Обработка двух коллекций.



Использование PLINQ для обработки двух коллекций.


LINQ оператор Zip определенным образом обрабатывает выполнение вычисления, одновременно выполняя работу для более, чем двух отдельных коллекций. Поскольку он используется с другими операторами запроса, Вы можете параллельно выполнить любое число сложных операций над каждой коллекцией перед тем, как объединять коллекции. Например:

public static IEnumerable<T> Zipping<T>(
  IEnumerable<T> a,
  IEnumerable<T> b)
{
  return
    a
    .AsParallel()
    .AsOrdered()
    .Select(element => ExpensiveComputation(element))
    .Zip(
      b
      .AsParallel()
      .AsOrdered()
      .Select(element => DifferentExpensiveComputation(element)),
        (a_element, b_element) => Combine(a_element, b_element));
}


* This source code was highlighted with Source Code Highlighter.



Этот пример показывает, что каждый источник данных может быть обработан одновременно с различными вычислениями. Как только результаты становятся доступны из обоих источников, они могут быть объединены при помощи оператора Zip.

Почему не Parallel.ForEach?


Такое объединение может быть реализовано при помощи одного из перегруженных методов Parallel.ForEach, как например:

public static IEnumerable<T> Zipping<T>(
  IEnumerable<T> a,
  IEnumerable<T> b)
{
  var numElements = Math.Min(a.Count(), b.Count());
  var result = new T[numElements];
  Parallel.ForEach(a,
    (element, loopstate, index) =>
    {
      var a_element = ExpensiveComputation(element);
      var b_element = DifferentExpensiveComputation(b.ElementAt(index));
      result[index] = Combine(a_element, b_element);
    });
  return result;
}


* This source code was highlighted with Source Code Highlighter.


Однако, здесь существуют потенциальные опасности и недостатки, связанные с попыткой объединить две коллекции таким способом. Недостатки такого похода включают в себя описанное выше использование Parallel.ForEach для сохранения порядка, а также одним из подводных камней является выход за границы колекции, так как Вы явно управляете индексами.


Локальное состояние потока (Thread-Local State)



Использование PLINQ для доступа к локальному состоянию потока


Выполняя параллельные операции, привлекательным решением для уменьшения затрат синхронизации будет использование локального хранения в потоке результатов выполнения каждого потока. В конце необходимо объединить результаты, таким образом, уменьшая количество раз использования примитива синхронизации. Вариант для доступа к локальному состоянию потока при помощи LINQ заключается в использовании перегруженного LINQ оператора Aggregate(), который выглядит следующим образом:

public static TResult Aggregate<TSource, TAccumulate,TResult>( 
  this ParallelQuery<TSource> source,
  Func<TAccumulate> seedFactory,
  Func<TAccumulate,TSource,TAccumulate> updateAccumulatorFunc,
  Func<TAccumulate,TAccumulate,TAccumulate> combineAccumulatorFunc,
  Func<TAccumulate, TResult> resultSelector)


* This source code was highlighted with Source Code Highlighter.


Так можно реализовать оператор вычисления геометрического среднего с использованием этого overload’а:

public static double GeometricMean(this IEnumerable<int> source)
{
  return
    source
    .AsParallel()
    .Aggregate(
      () => new double[] { 1.0, 0.0 },
      (accumulator, element) =>
      {
        accumulator[0] *= element;
        accumulator[1]++;
        return accumulator;
      },
      (accumulator1, accumulator2) =>
      {
        accumulator1[0] *= accumulator2[0];
        accumulator1[1] += accumulator2[1];
        return accumulator1;
      },
      accumulatedValues =>
      Math.Pow(accumulatedValues[0], (1/accumulatedValues[1])));
}


* This source code was highlighted with Source Code Highlighter.


Хотя объем работы на элемент слишком мал для того чтобы использовать перимущества распараллеливания, пример приведен для демонстрации того, как использовать метод Aggregate с доступом к локальному состоянию потока.
Использование Parallel.ForEach для доступа к локальному состоянию потока
Сигнатура метода Parallel.ForEach для доступа к локальному состоянию потока выглядит следующим образом:

public static ParallelLoopResult ForEach<TSource, TLocal>(
  IEnumerable<TSource> source, 
  Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, 
  Action<TLocal> localFinally)


* This source code was highlighted with Source Code Highlighter.


Следующий пример демонстрирует, как использовать эту сигнатуру, чтобы получить геометрическое среднее из последовательности:

public static double GeometricMean(IEnumerable<int> source)
{
  double Products = 1.0;
  double count = 0.0;
  object mylock = new object();
  Parallel.ForEach(source,
    () => new double[] { 1.0, 0.0 },
    (element, loopstate, tlocal) =>
    {
      tlocal[0] *= element;
      tlocal[1]++;
      return tlocal;
    },
    (final) =>
    {
      lock (mylock)
      {
        Products *= final[0];
        count += final[1];
      }
    });
  return Math.Pow(Products, 1 / count);
}


* This source code was highlighted with Source Code Highlighter.


Этот пример иллюстрирует использование Parallel.ForEach. Локальное состояние потока может использоваться, чтобы уменьшить затраты на синхронизацию. Если использовать другой overload-метод Parallel.ForEach, то lock будет выставляться каждый раз, когда продукт расчитывается, что слишком дорого для небольшой операции. Важно понять что с Parallel.ForEach Вам необходимо явно управлять механизмом синхронизации, когда сохраняете результат, тогда как с PLINQ, детали синхронизации абстрагированы от пользователя. Также PLINQ может быть более подходящим, в случае, когда Ваша операция объединения требует сложной синхронизации. Однако, если Ваша синхронизация могла бы быть выполнена легким и простым способом, например, interlocked-операция, то выполнение запроса PLINQ могло бы занять больше времени, нежели использование стандартного механизма PLINQ. Кроме того, важно иметь в виду аспекты деталей реализаций Parallel.ForEach и PLINQ, такие как степень параллелизма во время выполения, описанная выше. В зависимости от деталей того, что Вы хотите реализовать, зависит какая технология лучше соответствует ситуации.


Выход из операций



Использование Parallel.ForEach для выхода из операций


Для случая когда управляемая логика значима, важно понимать, что выход из цикла Parallel.ForEach позволяет достичь Вам того же эфекта, как и в последовательном цикле, гарантируя корректность.Повторим, некоторые overload-методы Parallel.ForEach позволяют Вам следить за ParallelLoopState, наиболее общим является следующий:

public static ParallelLoopResult ForEach<TSource >(
  IEnumerable<TSource> source, 
  Action<TSource, ParallelLoopState> body)


* This source code was highlighted with Source Code Highlighter.



ParallelLoopState поддерживает выход из цикла при помощи двух методов, которые будут описаны в следующих двух секциях.

ParallelLoopState.Stop()


Stop() сообщает циклу, что не нужно больше выполнять итерации, и цикл прекратится как можно быстрей. Свойство ParallelLoopState.IsStopped позволяет каждой итерации определить, когда другая операция вызвала метод Stop(). Stop() обычно полезна в случае, когда цикл выполняет поиск, не зависящий от порядка, и может быть закончен как только необходимый элемент найден. Например, если Вам нужно найти объект в колекции объектов, то код будет похож на следующий:

public static boolean FindAny<T, T>(
  IEnumerable<T> TSpace,
  T match)
  where T : IEqualityComparer<T>
{
  var matchFound = false;
  Parallel.ForEach(TSpace,
    (curValue, loopstate) =>
    {
      if (curValue.Equals(match))
      {
        matchFound = true;
        loopstate.Stop();
      }
    });

  return matchFound;
}


* This source code was highlighted with Source Code Highlighter.



В то время эта же функциональность может быть реализована при помощи PLINQ, этот пример демонстрирует как использовать ParallelLoopState.Stop() для управления потоком выполенения.

ParallelLoopState.Break()


Break() сообщает циклу, что итерации принадлежащие предшествующим элементам, всё еще должны быть выполнены, а итерации принадлежащие следующим элементам выполнять не надо. Будет ипользоваться самая ранняя итерация, из которой метод Break() был вызван. Это значение можно получить из свойства ParallelLoopState.LowestBreakIteration. Break() обычно полезен в случае, когда Ваш цикл выполняет упорядоченный поиск. Другими словами, некоторые критерии поиска определяют элемент, до которого данные должны быть обработаны. Например, если у Вас есть последовательность неуникальных элементов, и Вам необходимо вернуть минимальный индекс найденного объекта, код выглядел бы так:

public static int FindLowestIndex<T, T>(
  IEnumerable<T> TSpace,
  T match)
  where T : IEqualityComparer<T>
{
  int matchedIndex = -1;
  Parallel.ForEach(TSpace,
    (curValue, loopstate, curIndex) =>
    {
      if (curValue.Equals(match))
      {
        matchedIndex = curIndex;
        loopstate.Break();
      }
    });
  return matchedIndex;
}


* This source code was highlighted with Source Code Highlighter.



Этот пример будет выполняться до тех пор, пока экземпляр класса не будет найден, после чего вызовет метод Break(), указывая, что теперь необходимо искать только в элементах с индексом меньше, чем текущий. Если цикл найдет еще один элемент, то Break() вызовется еще раз. Такое поведение будет повторяться до тех пор, пока не будет найдено ни одного элемента. Элемент, на котором был вызван последний раз метод Break(), и будет минимальным индексом искомого элемента.

Почему не PLINQ?


Не смотря на то, что PLINQ поддерживает выход из процесса выполнения запроса, отличия между механизмом выхода PLINQ и Parallel.ForEach существенны. Для выхода из запроса PLINQ Вы должны указать запросу флаг отмены(объяснено здесь). С Parallel.ForEach флаги выхода опрашиваются в каждой итерации. В PLINQ флаг отмены проверяется время от времени, поэтому Вы не можете быть уверены, что отмененный запрос остановится сразу.

Заключение



Parallel.ForEach и PLINQ – отличные инструменты для быстрого встраивания параллелизма в Ваши приложения. Используя эти инструментальные средства, имейте ввиду различия и советы описанные в этой статье, и всегда выбирайте правильный инструмент для своей задачи.


Источник - When To Use Parallel.ForEach and When to Use PLINQ

Комментариев нет:

Отправить комментарий