Некоторое количество заметок назад я рассказывал о шаблоне многозадачного программирования Data Flow и демонстрировал пример его реализации на C++ с помощью библиотеки Asynchronous Agents Library (AAL). Пришла пора ещё раз вспомнить об этом шаблоне: дело в том, что с недавних пор его реализация стала доступной для платформы .NET.
Задача
Вспомним прошлую задачу и постараемся переписать её на C#. Нужно написать простейший командный интерпретатор, который принимает текстовый поток команд, разделённых символом перевода строк, распознаёт каждую команду, выполняет её и отправляет результат выполнения на консоль или в выходной файл. При этом должны выполняться 3 условия:
Решение
Все необходимые классы, реализующие шаблон Data Flow, располагаются в пространстве имён System.Threading.Tasks.Dataflow и являются частью библиотеки TPL Dataflow Library. Но не трудитесь искать сборку с этой библиотекой в числе стандартных модулей .NET. Её там нет. Она распространяется в виде отдельного NuGet пакета. Поэтому, прежде чем приступать к экспериментам, откройте в вашем проекте консоль менеджера пакетов NuGet и выполните в нём вот такую команду:
Как и в прошлый раз начнём с самих команд и определим интерфейс и несколько реализаций – GetTime, Echo и BadCommand для внутреннего использования:
Теперь пришло время заняться самим конвейером. Концепция и структура библиотеки Tasks.Dataflow очень похожа на то, что мы уже видели в Asynchronous Agents Library (AAL), даже названия некоторых элементов в обеих библиотеках созвучны. В случае AAL мы строили конвейер из блоков сообщений (message blocks), теперь же мы будем оперировать блоками потоков данных (dataflow blocks). Описание всех доступных блоков потоков данных можно найти в официальной документации, для наших же целей нам понадобятся два типа блоков: TransformBlock<TInput, TOutput> и ActionBlock<TInput>. Первый тип блоков предназначен для преобразования данных: с помощью делегата, переданного экземпляру блока при его создании, он трансформирует каждый принимаемый элемент данных из типа TInput в тип TOutput. С помощью этого класса мы реализуем блок распознавания команды (parser) и блок исполнения команды (executor):
Второй тип блоков гораздо проще: для каждого принимаемого элемента данных он вызывает делегат, переданный экземпляру блока при его создании. Этот класс мы используем для реализации блока вывода результата (printer):
Теперь можно связать все блоки в один конвейер:
И добавить условия остановки:
Обратите внимание, как блоки передают друг другу либо ошибку, либо сигнал остановки. В случае передачи блоку ошибки он тут же прекращает любую активность и завершается. При этом все вызовы Wait функции для задачи (Task), возвращённой через свойство Completion данного блока, завершаться выбросом исключения. Вызов метода Complete, в отличие от передачи ошибки, не приводит к моментальной остановке блока. Он сигнализирует, что поступление данных больше не ожидается, – блок должен обработать все переданные ему элементы данных до конца и завершиться. Все потоки, ожидающие завершения блока с помощью вызова block.Completion.Wait() завершат ожидание и продолжат своё исполнение.
Исключением является лишь последний блок в конвейере: в случае ошибки он возбуждает сигнал отмены с помощью canceletion token`а. Вы могли обратить внимание, что он использовался и для инициализации всех блоков в конвейере. Таким нехитрым способом мы сможем остановить весь конвейер целиком, не зависимо от того, на каком именно этапе произошла ошибка.
Осталось только открыть файл с командами и запустить конвейер:
Обратите внимание на последний нюанс на сегодня. Последней строчкой идёт ожидание завершения конвейера, при этом жду я сразу 2 задачи: первая – задача, связанная с последним блоком конвейера, вторая – её continuation. Собственно признаком завершения конвейера является завершение второй задачи, но если мы будем ждать только её, мы рискуем не получить исключения, произошедшие в самом конвейере. Чтобы этого не произошло, мы должны либо явно проверять их наличие через свойство printer.Completion.Exception, либо добавить задачу, связанную с последним блоком конвейера к списку ожидания – в этом случае исключение автоматически пробросится в контекст потока.
Ну и как водится, ссылка на исходный код всего проекта здесь (но на этот раз для Visual Studio 2012).
Задача
Вспомним прошлую задачу и постараемся переписать её на C#. Нужно написать простейший командный интерпретатор, который принимает текстовый поток команд, разделённых символом перевода строк, распознаёт каждую команду, выполняет её и отправляет результат выполнения на консоль или в выходной файл. При этом должны выполняться 3 условия:
- интерпретатор должен быть многозадачным
- ответы должны поступать на выход в том же порядке, в котором команды поступали на вход
- мы не должны использовать синхронизацию данных на выходе, чтобы устранить Lock Convoy.
Решение
Все необходимые классы, реализующие шаблон Data Flow, располагаются в пространстве имён System.Threading.Tasks.Dataflow и являются частью библиотеки TPL Dataflow Library. Но не трудитесь искать сборку с этой библиотекой в числе стандартных модулей .NET. Её там нет. Она распространяется в виде отдельного NuGet пакета. Поэтому, прежде чем приступать к экспериментам, откройте в вашем проекте консоль менеджера пакетов NuGet и выполните в нём вот такую команду:
Install-Package Microsoft.Tpl.Dataflow
Эта команда скачает все необходимые сборки из репозитория NuGet пакетов и добавит их к вашему проекту.Как и в прошлый раз начнём с самих команд и определим интерфейс и несколько реализаций – GetTime, Echo и BadCommand для внутреннего использования:
internal interface ICommand
{
string Execute();
string Arguments { get; set; }
}
// Usage: gettime [gmt]
internal class GetTime : ICommand
{
// ...
}
// Usage: echo <text>
internal class Echo : ICommand
{
// ...
}
// Representation of a bad command
internal class BadCommand : ICommand
{
// ...
}
// ...
// Fill list of supported commands
Dictionary<string, Type> commands = new Dictionary<string, Type>();
commands.Add(GetTime.Name, typeof(GetTime));
commands.Add(Echo.Name, typeof(Echo));
Детали реализаций самих команд не так интересны, поэтому тут я их опустил, но вы можете найти их в полном исходном тексте проекта по ссылке ниже.{
string Execute();
string Arguments { get; set; }
}
// Usage: gettime [gmt]
internal class GetTime : ICommand
{
// ...
}
// Usage: echo <text>
internal class Echo : ICommand
{
// ...
}
// Representation of a bad command
internal class BadCommand : ICommand
{
// ...
}
// ...
// Fill list of supported commands
Dictionary<string, Type> commands = new Dictionary<string, Type>();
commands.Add(GetTime.Name, typeof(GetTime));
commands.Add(Echo.Name, typeof(Echo));
Теперь пришло время заняться самим конвейером. Концепция и структура библиотеки Tasks.Dataflow очень похожа на то, что мы уже видели в Asynchronous Agents Library (AAL), даже названия некоторых элементов в обеих библиотеках созвучны. В случае AAL мы строили конвейер из блоков сообщений (message blocks), теперь же мы будем оперировать блоками потоков данных (dataflow blocks). Описание всех доступных блоков потоков данных можно найти в официальной документации, для наших же целей нам понадобятся два типа блоков: TransformBlock<TInput, TOutput> и ActionBlock<TInput>. Первый тип блоков предназначен для преобразования данных: с помощью делегата, переданного экземпляру блока при его создании, он трансформирует каждый принимаемый элемент данных из типа TInput в тип TOutput. С помощью этого класса мы реализуем блок распознавания команды (parser) и блок исполнения команды (executor):
// create "parser" block
TransformBlock<string, ICommand> parser =
new TransformBlock<string, ICommand>(
line =>
{
if (string.IsNullOrEmpty(line))
{
ICommand bad = new BadCommand();
bad.Arguments = "<EMPTY>";
return bad;
}
string [] tokens = line.Split(
new char[] { ' ', '\t' }, 2);
Type commandType;
if (!commands.TryGetValue(
tokens[0], out commandType))
{
ICommand bad = new BadCommand();
bad.Arguments = line;
return bad;
}
ICommand command = (ICommand)
Activator.CreateInstance(commandType);
command.Arguments =
tokens.Length == 2 ? tokens[1] : null;
return command;
}, new ExecutionDataflowBlockOptions {
CancellationToken = toCancel.Token });
// create "executor" block
TransformBlock<ICommand, string> executor =
new TransformBlock<ICommand, string>(
command => command.Execute(),
new ExecutionDataflowBlockOptions {
CancellationToken = toCancel.Token });
TransformBlock<string, ICommand> parser =
new TransformBlock<string, ICommand>(
line =>
{
if (string.IsNullOrEmpty(line))
{
ICommand bad = new BadCommand();
bad.Arguments = "<EMPTY>";
return bad;
}
string [] tokens = line.Split(
new char[] { ' ', '\t' }, 2);
Type commandType;
if (!commands.TryGetValue(
tokens[0], out commandType))
{
ICommand bad = new BadCommand();
bad.Arguments = line;
return bad;
}
ICommand command = (ICommand)
Activator.CreateInstance(commandType);
command.Arguments =
tokens.Length == 2 ? tokens[1] : null;
return command;
}, new ExecutionDataflowBlockOptions {
CancellationToken = toCancel.Token });
// create "executor" block
TransformBlock<ICommand, string> executor =
new TransformBlock<ICommand, string>(
command => command.Execute(),
new ExecutionDataflowBlockOptions {
CancellationToken = toCancel.Token });
Второй тип блоков гораздо проще: для каждого принимаемого элемента данных он вызывает делегат, переданный экземпляру блока при его создании. Этот класс мы используем для реализации блока вывода результата (printer):
// create "printer" block
ActionBlock<string> printer =
new ActionBlock<string>(
result => System.Console.WriteLine(result),
new ExecutionDataflowBlockOptions {
CancellationToken = toCancel.Token });
ActionBlock<string> printer =
new ActionBlock<string>(
result => System.Console.WriteLine(result),
new ExecutionDataflowBlockOptions {
CancellationToken = toCancel.Token });
Теперь можно связать все блоки в один конвейер:
// link blocks together
parser.LinkTo(executor);
executor.LinkTo(printer);
parser.LinkTo(executor);
executor.LinkTo(printer);
И добавить условия остановки:
// prepare the cancelation
CancellationTokenSource toCancel = new CancellationTokenSource();
// ...
parser.Completion.ContinueWith(
finishedTask =>
{
if (finishedTask.IsFaulted)
{
// pass the fault reason to the next block
((IDataflowBlock)executor).Fault(
finishedTask.Exception);
}
else
{
// tranfer "complete" command to the next block
executor.Complete();
}
});
executor.Completion.ContinueWith(
finishedTask =>
{
if (finishedTask.IsFaulted)
{
// pass the fault reason to the next block
((IDataflowBlock)printer).Fault(
finishedTask.Exception);
}
else
{
// tranfer "complete" command to the next block
printer.Complete();
}
});
Task printerFinish = printer.Completion.ContinueWith(
finishedTask =>
{
if (finishedTask.IsFaulted)
{
// cancel all in case of error
toCancel.Cancel();
}
});
CancellationTokenSource toCancel = new CancellationTokenSource();
// ...
parser.Completion.ContinueWith(
finishedTask =>
{
if (finishedTask.IsFaulted)
{
// pass the fault reason to the next block
((IDataflowBlock)executor).Fault(
finishedTask.Exception);
}
else
{
// tranfer "complete" command to the next block
executor.Complete();
}
});
executor.Completion.ContinueWith(
finishedTask =>
{
if (finishedTask.IsFaulted)
{
// pass the fault reason to the next block
((IDataflowBlock)printer).Fault(
finishedTask.Exception);
}
else
{
// tranfer "complete" command to the next block
printer.Complete();
}
});
Task printerFinish = printer.Completion.ContinueWith(
finishedTask =>
{
if (finishedTask.IsFaulted)
{
// cancel all in case of error
toCancel.Cancel();
}
});
Обратите внимание, как блоки передают друг другу либо ошибку, либо сигнал остановки. В случае передачи блоку ошибки он тут же прекращает любую активность и завершается. При этом все вызовы Wait функции для задачи (Task), возвращённой через свойство Completion данного блока, завершаться выбросом исключения. Вызов метода Complete, в отличие от передачи ошибки, не приводит к моментальной остановке блока. Он сигнализирует, что поступление данных больше не ожидается, – блок должен обработать все переданные ему элементы данных до конца и завершиться. Все потоки, ожидающие завершения блока с помощью вызова block.Completion.Wait() завершат ожидание и продолжат своё исполнение.
Исключением является лишь последний блок в конвейере: в случае ошибки он возбуждает сигнал отмены с помощью canceletion token`а. Вы могли обратить внимание, что он использовался и для инициализации всех блоков в конвейере. Таким нехитрым способом мы сможем остановить весь конвейер целиком, не зависимо от того, на каком именно этапе произошла ошибка.
Осталось только открыть файл с командами и запустить конвейер:
using(StreamReader input = File.OpenText(fileName))
{
CancellationToken cancel = toCancel.Token;
// read lines
for (string line = input.ReadLine();
line != null && !cancel.IsCancellationRequested;
line = input.ReadLine())
{
// transfer next line to the parser
parser.Post(line);
}
// mark block as completed
parser.Complete();
}
// wait both: printer block and its continuation
Task.WaitAll(printer.Completion, printerFinish);
{
CancellationToken cancel = toCancel.Token;
// read lines
for (string line = input.ReadLine();
line != null && !cancel.IsCancellationRequested;
line = input.ReadLine())
{
// transfer next line to the parser
parser.Post(line);
}
// mark block as completed
parser.Complete();
}
// wait both: printer block and its continuation
Task.WaitAll(printer.Completion, printerFinish);
Обратите внимание на последний нюанс на сегодня. Последней строчкой идёт ожидание завершения конвейера, при этом жду я сразу 2 задачи: первая – задача, связанная с последним блоком конвейера, вторая – её continuation. Собственно признаком завершения конвейера является завершение второй задачи, но если мы будем ждать только её, мы рискуем не получить исключения, произошедшие в самом конвейере. Чтобы этого не произошло, мы должны либо явно проверять их наличие через свойство printer.Completion.Exception, либо добавить задачу, связанную с последним блоком конвейера к списку ожидания – в этом случае исключение автоматически пробросится в контекст потока.
Ну и как водится, ссылка на исходный код всего проекта здесь (но на этот раз для Visual Studio 2012).
2 комментария:
Задача слишком примитивна для того, чтобы использовать TPL Dataflow Library: граф потоков данных не изменяется, состоит из 4х узлов, каждый узел имеет только один вход. Проще и эффективнее это решается с помощью 4х потоков (threads), соединенных очередями. Было бы интереснее посмотреть действительно сложную задачу, с динамически создаваемыми узлами, некоторые с несколькими входами (представлены с помощью JoinBlock).
по следам двух последних постов, примерчик бы как Data Flow из poweshell вызывать, причем спрятать все обертки от того, кто реализует блоки и flow.
Этот кусок кода ContinueWith уж слишком громоздко выглядит. его бы как то спрятать
Отправить комментарий