Tuesday, May 22, 2012

(RX) Reactive Extensions A Different Flavour


The Reactive Extensions (Rx) Library is a set of extensions for the IObservable<T> and IObserver<T> interfaces that were added in .NET 4.0. The extensions allow for the use of LINQ to query IObservable<T> collections as if they were IEnumerable collections. In addition Rx also greatly simplifies the orchestration and composition of asynchronous functions and events.

With the Reactive Framework, the paradigm of Enumeration and pulling for more results is flipped upside down. Instead of pulling for the next record, the Reactive Framework “Reacts” to events as they are pushed through the event pipeline. Instead of calling MoveNext as we do with IEnumerable, we react to OnNext events.

With Rx, you can easily compose multiple async operations together.

The IObservable and IObserver interfaces are involved and rather than enumerate over existing sequences, you write queries against sequences of future events.

When would you use the Reactive Framework?

  • Essentially, you can use it anywhere you are responding to events or work with continuous streams
  • Handling UI Events (Mouse clicks, Touch gestures)
  • Working with Asynchronous Web requests (Web services) 
  • Analysing sensor data from Manufacturing, Health care, etc. devices in real time
  • Processing data from continuous data streams (Log files, Stock feeds, etc.)

Basic Example

Firstly, fire up Visual Studio 2010 and create a new project, in my case I created a windows form application.
Use your trusted package manager "Nuget" and install the RXX nuget package.


Below I created a class called FileSystemObservable.

In this class I use a FileSystemWatcher that is used to listens to the file system change notifications and raises events when a directory, or file in a directory, changes.

I also have a CreatedFiles property this is an observable of Type FileSystemEventArgs

In the constructor I create a new instance of the FileSystemWatcher, thereafter with a bit of RX Magic I set the CreatedFiles property to the Event that I want to monitor, which happens to be when a file is created.
public class FileSystemObservable
        private readonly FileSystemWatcher _fileSystemWatcher;

        public IObservable<FileSystemEventArgs> CreatedFiles { get; private set; }

        public FileSystemObservable(string directory,string filter, bool includeSubdirectories)
            _fileSystemWatcher = new FileSystemWatcher(directory, filter)
                EnableRaisingEvents = true,
                IncludeSubdirectories = includeSubdirectories,

            CreatedFiles = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
              (h => h.Invoke,
               h => _fileSystemWatcher.Created += h,
               h => _fileSystemWatcher.Created -= h)
              .Select(x => x.EventArgs);


Below is how you would use the FileSystemObservable. Firstly as the code reads I create a new instance of the FileSystemObservable specifying that I want to monitor the entire C drive and sub directories for all file and directory changes.

I specify that I want to monitor only events when files are created, thereafter using RX I subscribe to the event. When the event gets triggered a message box will be shown with the file details and the fact that it was created.

var createdfilewatcher = new FileSystemObservable(@"c:\", "*.*", true).CreatedFiles
            .Subscribe(x =>
                MessageBox.Show(x.Name + " Created");


More info


For more info I would recommend having a look at http://rxx.codeplex.com/ and http://channel9.msdn.com/Tags/reactive+extensions

No comments:

Post a Comment