Reactive Extensions

Содержание

Слайд 2

О Вашем инструкторе

Сергей Тепляков
Visual C# MVP, RSDN Team member
[email protected]
SergeyTeplyakov.blogspot.com

1-

О Вашем инструкторе Сергей Тепляков Visual C# MVP, RSDN Team member Sergey.Teplyakov@gmail.com SergeyTeplyakov.blogspot.com 1-

Слайд 3

Цели курса…

Философию библиотеки Reactive Extensions
Основы библиотеки TPL
Использование библиотеки TPL
Новые возможности C# 5

Слушатели

Цели курса… Философию библиотеки Reactive Extensions Основы библиотеки TPL Использование библиотеки TPL
изучат:

1-

Слайд 4

Необходимая подготовка

Быть знакомы с основами языка C# и платформы .Net
Обладать базовыми знаниями

Необходимая подготовка Быть знакомы с основами языка C# и платформы .Net Обладать
многопоточности
Быть знакомы с LINQ (Language Integrated Query)

Слушатели должны:

1-

Слайд 5

Roadmap

Введение в реактивные расширения
Дуализм интерфейсов
Основы Rx
Observable sequences
Events и Observables
Observables и асинхронные операции
Concurrency
Новости

Roadmap Введение в реактивные расширения Дуализм интерфейсов Основы Rx Observable sequences Events
из Редмонта

1-

Слайд 6

Интерактивная и реактивная модель

Интерактивная и реактивная модель

Слайд 7

Реактивное программирование

Парадигма программирования, ориентированная на потоки данных и распространение изменений. Это означает,

Реактивное программирование Парадигма программирования, ориентированная на потоки данных и распространение изменений. Это
что должна существовать возможность легко выражать статические и динамические потоки данных, а также то, что выполняемая модель должна автоматически распространять изменения сквозь поток данных.
http://ru.wikipedia.org/
/wiki/Реактивное_программирование

Слайд 8

Реактивная модель

"Принцип Голливуда" – "Не звоните нам, мы сами вам позвоним"
(Hollywood

Реактивная модель "Принцип Голливуда" – "Не звоните нам, мы сами вам позвоним"
Principle - Don't call us, we'll call you)

Слайд 9

Rx - это

1. набор классов, представляющих собой асинхронный поток данных
2. набор операторов,

Rx - это 1. набор классов, представляющих собой асинхронный поток данных 2.
предназначенных для манипуляции этими данными
3. набор классов для управления многопоточностью
Rx = Observables + LINQ + Schedulers

Слайд 10

Зачем все это нужно?

Social
media

Stock tickers

RSS feeds

GPS

Server management

UI events

Зачем все это нужно? Social media Stock tickers RSS feeds GPS Server management UI events

Слайд 11

Интерфейс IEnumerable

Pull-based последовательности представлены интерфейсом IEnumerable. Это -
коллекции в памяти (списки,

Интерфейс IEnumerable Pull-based последовательности представлены интерфейсом IEnumerable. Это - коллекции в памяти
вектора и т.д.)
бесконечные последовательности (генераторы)
xml-данные
результаты запроса к БД

Слайд 12

Интерфейс IEnumerable

public interface IEnumerable {     IEnumerator GetEnumerator(); }
public interface IEnumerator : IDisposable {     bool MoveNext();     T Current { get; }     void Reset(); }

Ковариантность из .NET 4.0

Блокирующая операция

Интерфейс IEnumerable public interface IEnumerable { IEnumerator GetEnumerator(); } public interface IEnumerator

Слайд 13

Упрощение интерфейса IEnumerator

public interface IEnumerator : IDisposable {     (T | void | Exception) GetNext(); }

T – следующий элемент
void – окончание

Упрощение интерфейса IEnumerator public interface IEnumerator : IDisposable { (T | void
последовательности
Exception – ошибка

Слайд 14

Интерфейс IObservable

public interface IObservable {     IDisposable Subscribe(IObserver observer); } public interface IObserver {     void OnNext(T value);     void OnError(Exception error);     void OnCompleted(); }

Интерфейс IObservable public interface IObservable { IDisposable Subscribe(IObserver observer); } public interface

Слайд 15

Pull vs Push

Pull vs Push

Слайд 16

Pull vs Push

Pull vs Push

Слайд 17

Простые последовательности
Observable.Empty();
Observable.Return(42);
Observable.Throw(ex);
Observable.Never();

OnComplete

OnNext

OnError
new int[] {};
new int[] {42};

Итератор, генерирующий исключение

Итератор, который не возвращает управление

Простые последовательности Observable.Empty (); Observable.Return(42); Observable.Throw (ex); Observable.Never (); OnComplete OnNext OnError

Слайд 18

Методы Range

Observable.Range(0, 3);

OnNext(0)

OnNext(1)

OnNext(2)

Enumerable.Range(0, 3);

yield 0

yield 1

yield 2

Методы Range Observable.Range(0, 3); OnNext(0) OnNext(1) OnNext(2) Enumerable.Range(0, 3); yield 0 yield 1 yield 2

Слайд 19

Циклы for

var xs = Observable.Generate(             0,             x => x < 10,             x => x + 1,             x => x);
xs.Subscribe(x => {         Console.WriteLine(x);                   });

var xs = new IEnumerable  {     for(int i = 0;          i < 10;          i++)       yield return i; };
foreach(var x in xs) {     Console.WriteLine(x); }

Предположим, у нас есть синтаксис анонимных итераторов

Циклы for var xs = Observable.Generate( 0, x => x x +

Слайд 20

Контракт «реактивных» последовательностей

Grammar: OnNext* [OnCompleted | OnError]
Методы наблюдателя вызываются последовательно

Контракт «реактивных» последовательностей Grammar: OnNext* [OnCompleted | OnError] Методы наблюдателя вызываются последовательно

Слайд 21

Упрощение работы с интерфейсом IObserver

var xs = Observable.Range(0, 10); 1. Только OnNext:
xs.Subscribe(/*OnNext(int)*/x => Console.WriteLine(x)); 2. OnNext и OnCompleted
xs.Subscribe(     /*OnNext(int)*/x => Console.WriteLine(x),     /*OnComplete*/() => Console.WriteLine("Completed"));
3. OnNext и

Упрощение работы с интерфейсом IObserver var xs = Observable.Range(0, 10); 1. Только
OnError
xs.Subscribe(     /*OnNext(int)*/x => Console.WriteLine(x),     /*OnError(Exception)*/e => Console.WriteLine("Error: {0}", e));
4. OnNext, OnError и OnCompleted
xs.Subscribe(     /*OnNext(int)*/x => Console.WriteLine(x),     /*OnError(Exception)*/e => Console.WriteLine("Error: {0}", e),     /*OnCompleted*/() => Console.WriteLine("Completed"));

Слайд 23

События в .NET

Объявление
event Action E;
Публикация
E(42);
Подписка
E += x => Console.WriteLine(x);

События в .NET Объявление event Action E; Публикация E(42); Подписка E += x => Console.WriteLine(x);

Слайд 24

Rx

Объявление
ISubject S = new Subject();
Публикация
S.OnNext(42);
Подписка
S.Subscribe(x => Console.WriteLine(x));

Rx Объявление ISubject S = new Subject (); Публикация S.OnNext(42); Подписка S.Subscribe(x => Console.WriteLine(x));

Слайд 25

class Program {
event Action E;
static void Main() {
var p

class Program { event Action E; static void Main() { var p
= new Program();
p.E += x => Console.WriteLine(x);
p.E(1);
p.E(2);
p.E(3);
}
}

Events vs Observables

class Program {
ISubject S = new Subject();
static void Main() {
var p = new Program();
p.S.Subscribe(x => Console.WriteLine(x));
p.S.OnNext(1);
p.S.OnNext(2);
p.S.OnNext(3);
}
}

Слайд 26

Отписка от событий

Events
static event Action E;
//E += x => Console.WriteLine(x);         Action action = x => Console.WriteLine(x); E += action; E -= action;
Observables
static ISubject S = new Subject();
var token = S.Subscribe(x => Console.WriteLine(x)); token.Dispose();

Отписка от событий Events static event Action E; //E += x =>

Слайд 27

«События» первого класса

Объект называют «объектом первого класса» когда он:
может быть сохранен в

«События» первого класса Объект называют «объектом первого класса» когда он: может быть
переменной
может быть передан в функцию как параметр
может быть возвращен из функции как результат
может быть создан во время выполнения программы
внутренне самоидентифицируем (независим от именования)
http://ru.wikipedia.org/wiki/Объект_первого_класса

Слайд 28

«События» первого класса

// Сохранение в переменной
IObservable textChanged = …;
// Передача в качестве

«События» первого класса // Сохранение в переменной IObservable textChanged = …; //
параметра
void ProcessRequests(IObservable input) {…}
// Возвращение в качестве результата
IObservable QueryServer() {…}

Слайд 29

Возможности наблюдаемых последовательностей
IObservable mouseMoves =     from e in Observable.FromEventPattern(frm, "MouseMove")     select e.EventArgs.Location;
var filtered = mouseMoves.Where(p => p.X == p.Y);
var subscription =      filtered.Subscribe(p => Console.WriteLine("X, Y = {0}", p.X));
subscription.Dispose();

Преобразование события в поток объектов Point

Фильтрация «событий»

Обработка «событий»

Отписка от «событий»

Возможности наблюдаемых последовательностей IObservable mouseMoves = from e in Observable.FromEventPattern (frm, "MouseMove")

Слайд 31

Асинхронные операции Classical Async Pattern

FileStream fs = File.OpenRead("data.txt");
byte[] buffer = new byte[1024];
fs.BeginRead(buffer, 0, buffer.Length,                 ar =>                     {                         int bytesRead = fs.EndRead(ar);                         // Обработка данных в массиве buffer                     },                 null);

Невозможность использования привычных языковых конструкций (using, try/finally etc)

«Вывернутый» поток

Асинхронные операции Classical Async Pattern FileStream fs = File.OpenRead("data.txt"); byte[] buffer =
исполнения

Сложность обработки ошибок, а также чтения и сопровождения кода

Слайд 32

FromAsyncPattern

static int LongRunningFunc(string s) {     Thread.Sleep(TimeSpan.FromSeconds(5));     return s.Length; }
Func longRunningFunc = LongRunningFunc; Func> funcHelper =     Observable.FromAsyncPattern(
longRunningFunc.BeginInvoke, longRunningFunc.EndInvoke); IObservable xs = funcHelper("Hello, String"); xs.Subscribe(x => Console.WriteLine("Length is " + x));

FromAsyncPattern преобразует возвращаемое значение метода в IObservervable

FromAsyncPattern static int LongRunningFunc(string s) { Thread.Sleep(TimeSpan.FromSeconds(5)); return s.Length; } Func longRunningFunc

Слайд 33

Tasks vs FromAsyncPattern

Задачи (Tasks) – это унифицированный способ представления Single value asynchrony

Tasks vs FromAsyncPattern Задачи (Tasks) – это унифицированный способ представления Single value
в .Net Framework
Observables – Multi value asynchrony
Существует простой способ преобразования Tasks -> Observables

Слайд 34

Task -> ToObservable

static class FuncExtensions {     internal static Task ToTask(this Func func, string s)     {         return Task.Factory.FromAsync(             func.BeginInvoke, func.EndInvoke, s, null);     } }
Func longRunningFunc = LongRunningFunc;
string s = "Hello, String"; IObservable xs = longRunningFunc.ToTask(s).ToObservable();              xs.Subscribe(x => Console.WriteLine("Length is " + x),     () => Console.WriteLine("Task is finished"));         }     }

Метод расширения можно написать один раз, а использовать повсюду, а

Task -> ToObservable static class FuncExtensions { internal static Task ToTask(this Func
не только с Rx

Это более предпочтительный способ, поскольку метод FromAsyncPattern скоро будет устаревшим

Слайд 35

Чем Rx не является

Rx не заменяет существующей «асинхронности»:
.NET все еще незаменимы
Асинхронные методы

Чем Rx не является Rx не заменяет существующей «асинхронности»: .NET все еще
все так же применимы в библиотеках
Задачи представляют собой single value asynchrony
Существуют и другие асинхронные источники, как SSIS, PowerShell, etc
Но rx…
Унифицирует работу
Предоставляет возможность композиции
Предоставляет обобщенные операторы
Так что rx – это …
Мост!

Слайд 37

Управление многопоточность

Управление многопоточностью осуществляется с помощью интерфейса IScheduler
var xs = Observable.Range(0, 10, Scheduler.ThreadPool);
xs.Subscribe(x => Console.WriteLine(x));

Будет исполняться в контексте планировщика

Параметризация

Управление многопоточность Управление многопоточностью осуществляется с помощью интерфейса IScheduler var xs =
с помощью IScheduler доступна в большинстве операций

Слайд 38

Синхронизация

Обновление пользовательского интерфейса
Label lbl = new Label(); Form frm = new Form() {Controls = {lbl}}; var xs = Observable.Return(42, Scheduler.ThreadPool); xs.Subscribe(x => {     Thread.Sleep(1000);     lbl.Text = "Result is " + x; });
Использование ObserveOn
xs.ObserveOn(frm).Subscribe(x => {     Thread.Sleep(1000);     lbl.Text = "Result is " + x; });

IScheduler поддерживает разные контексты синхронизации

Синхронизация Обновление пользовательского интерфейса Label lbl = new Label(); Form frm =

Слайд 40

Что мы изучили?

Введение в реактивные расширения
Дуализм интерфейсов
Основы Rx
Observable sequences
Events и Observables
Observables и

Что мы изучили? Введение в реактивные расширения Дуализм интерфейсов Основы Rx Observable
асинхронные операции
Concurrency

1-

Слайд 41

Experimental vs Stable

Две версии библиотеки Reactive Extensions:
Stable
Experimental
Interactive Extensions

Experimental vs Stable Две версии библиотеки Reactive Extensions: Stable Experimental Interactive Extensions

Слайд 42

Где взять?

NuGet
Web - http://msdn.microsoft.com/en-us/data/gg577610
Can’t find? Use google☺

Где взять? NuGet Web - http://msdn.microsoft.com/en-us/data/gg577610 Can’t find? Use google☺

Слайд 43

Дополнительные ссылки

The Reactive Extensions (Rx)... (Data Developer Center) - http://msdn.microsoft.com/en-us/data/gg577609
Reactive Extensions (MSDN)

Дополнительные ссылки The Reactive Extensions (Rx)... (Data Developer Center) - http://msdn.microsoft.com/en-us/data/gg577609 Reactive
- http://msdn.microsoft.com/en-us/library/hh242985(v=vs.103).aspx
Rx Team Blog - http://blogs.msdn.com/b/rxteam/
Реактивные расширения и асинхронные операции - http://sergeyteplyakov.blogspot.com/2010/11/blog-post.html

Слайд 44

Roslyn CTP is awailable!
http://msdn.microsoft.com/ru-ru/roslyn

Roslyn CTP is awailable! http://msdn.microsoft.com/ru-ru/roslyn

Слайд 45

Вопросы?

Вопросы?

Слайд 46

Горячие и холодные последовательности

Горячие и холодные последовательности
Имя файла: Reactive-Extensions.pptx
Количество просмотров: 147
Количество скачиваний: 0