One of the main features of our NEURO product is that it stores all information that it processes (aircraft, overlays, etc.) in a database indefinitely and provides different ways for a user to access this information. This is a great feature that provides a lot of powerful capabilities to users, but storing and retrieving the information is only half of the problem. Now that we have a bunch of stored information, we want users to be able to ask questions about the data. Questions such as "How congested is my airspace?", and "What is the average distance between my UAVs and other aircraft within my airspace?".

So many tracks

Storage Design

NEURO stores all of its data in a Mongo database. Some data, such as user-created overlays, changes infrequently while other data, such as aircraft and UAVs (we call them "tracks") are constantly changing. For infrequently changing data, we take an initial "snapshot" upon startup. From that point on we only store the updates. This is very space efficient but makes retrieval a little more complicated (and slower). For frequently changing data, we store a "snapshot" of the complete state of the data in a single document (per data type) every couple of seconds. This gives us the best retrieval performance, but at the cost of increased storage size.

What is an "Analysis"?

Before I jump into the details of how we used Apache Spark to answer questions about our data, we should talk about how users are going to ask those questions. The user asks questions by creating an "Analysis" with the following information:

  • Analysis type. This is the question that the user is asking. Can be something simple like: "average track density (count)" or something a little more complicated such as: "min, max, and average horizontal and vertical distance between tracks (proximity)".
  • Time period over which to perform the analysis. For example: Monday through Friday 0900 to 1600. The user can also choose to perform a "live" analysis, in which case no time period is required. When a time period is provided, we call this a "historical" Analysis.
  • Filter limiting the set of objects that are looked at. For example: Tracks within "Airspace Alpha".
  • Data series (groups). For example, analyze tracks with a Mode3 of 1200 separately from tracks within 2NM of a UAV.

An additional required attribute for historical analyses is the "resolution" of each analysis result, although this attribute is calculated automatically for the user in order to limit the number of results generated by an Analysis. For example, if the user chooses to perform an Analysis over a 1 year time span, we probably don't want or need to know the result of the analysis for every 2 second time period, even if we have data every 2 seconds. In this case, a resolution of something like 60 seconds (1 minute) would generate about 525949 results over the course of a year (the number of minutes in one year according to Google) and each result would be aggregated from 30 source documents (since in this example we have historical data every 2 seconds).

The time period over which an analysis is performed can be defined as a simple start/end date range, such as March 3rd through April 30th, or a more complicated time period such as Monday, Wednesday, Friday from 0900 to 1600, March 3rd through April 30th. For a "live" analysis, the time period is not defined.

Performing an Analysis

At a high level, the steps required to perform an Analysis can be broken down as follows:

  • Create a PairRDD containing the data for the desired time period using the Mongo Hadoop Connector
  • Filter the PairRDD by calling mapValues (if the user has defined a filter)
  • Create a "Result" for each value by performing the desired analysis on each value, and map each Result to a "resolution index" by calling mapToPair. Many Results may have the same "resolution index" if the desired resolution is less than the resolution of the source data set.
  • If the desired resolution is less than the resolution of the source data, then you will have multiple Results per "resolution index". Reduce these Results into a single Result by calling reduceByKey.
  • Save the Results to the database

NEURO adds some additional complexity by attempting to reuse existing Results from similar completed Analyses in order to improve performance and decrease storage requirements. But for the sake of simplicity I'm going to save that topic for another post!

Create Initial RDD

Unless you're operating on the complete contents of a MongoDB collection, you will want to define "split keys" when creating an RDD using the Mongo Hadoop API to restrict your Spark operations to a subset of the Mongo collection. Once you have an RDD you can further filter it, but choosing good split keys will make a huge impact on performance. See this Mongo page for best practices when choosing a shard (split) key.

The "snapshot" collection in our database contains a compound index on two fields: systemId and startFrame. The startFrame index is defined as descending since in most cases we are retrieving data closest to the current frame (i.e. closest to "now") and a descending key provides us with better performance in this case. The value of startFrame starts at 0 when we starting taking track snapshots and increments by 1 indefinitely until we stop taking snapshots for the associated system. Given that we know the time period over which we were taking snapshots for a given system, the frame ID can easily be calculated from a timestamp.

Since the user defined time period over which to perform the analysis can contain gaps in coverage, we create a single RDD for each discrete time period within the requested time period. For example, if the user requested time period is Monday, Wednesday, and Friday from 0900 to 1400, then we will have 3 discrete time periods and therefore 3 separate RDDs with different split keys. Additional gaps in coverage could be caused by time periods during which the system was not running, requiring more RDDs.

For each discrete time period, we then calculate the start and end frames covered by that period (since we know when we started taking snapshots and we know how often we take them, this is easy) and add them to the org.apache.hadoop.conf.Configuration object that we use to create the RDD that points to the data for this discrete time period:

Configuration config = new Configuration();  
config.set( "mongo.input.uri", inputUri );  
config.set( "mongo.input.split.split_key_pattern", "{systemId: 1, startFrame: -1}" );  
config.set( "mongo.input.split.split_key_min", "{systemId: ObjectId('544a8d525ed257170ea1edd8'), startFrame: 13022}" );  
config.set( "mongo.input.split.split_key_max", "{systemId: ObjectId('544a8d525ed257170ea1edd8'), startFrame: 973}" );

JavaPairRDD<Object,BSONObject> rdd =  
  sparkContext.newAPIHadoopRDD( conf, 
     com.mongodb.hadoop.MongoInputFormat.class, Object.class, BSONObject.class );

Notice that since we have a descending split key, the order of the split keys is reversed. The key with the larger startFrame is set as split_key_min and the key with the smaller startFrame is set as split_key_max. We found the Mongo command splitVector to be useful when troubleshooting our split keys:

db.runCommand({splitVector: "neuro.snapshots", keyPattern: {systemId: 1, startFrame: -1}, force: false, maxChunkSize: 8, min: {systemId: ObjectId("544a8d525ed257170ea1edd8"), startFrame: 13022}, max: {systemId: ObjectId("544a8d525ed257170ea1edd8"), startFrame: 973}});  

We also ran into an issue with the Mongo Hadoop Connector when using descending split keys where the last split was unbounded causing the RDD to point to pretty much all of the documents in the collection. We've submitted a pull request to address this issue. If your split key is not descending then you should have no issues.

Another thing to keep in mind when defining split keys: by default, the Mongo $min and $max operators will be used when creating the split. The $min operator is inclusive while the $max operator is exclusive. If you want documents in the range of 50 to 100 in an RDD, your min should be 50 and max should be 101.

Once you have your individual RDDs pointing to each discrete time interval, you can merge them into a single RDD by calling SparkContext.union.

Filter Unwanted Items

Now that you have a single RDD pointing to documents in the desired time periods, you will want to remove the documents that you don't want to analyze as defined by the filter created by the user. If no filter was provided then you don't need to do this step. In our case, since a single "snapshot" document contains many items, our "filter" process does not remove any documents from the RDD, but instead removes items from each document in the RDD. This means that the size of our RDD is not changing, we're just mapping one snapshot document into a snapshot document with (potentially) less items.

snapshots = snapshots.mapValues( new Function<BSONObject, BSONObject>() {  
  public BSONObject call( BSONObject obj ) {
    List data = (List)obj.get("data");
    for( Iterator it = data.iterator(); it.hasNext(); )
    {
      Object item = it.next();
      if( filter.satisfies( item ) )
      {
        it.remove();
      }
    }
    return obj;
  }
});

Since Spark Functions are Serializable, your filter will need to be Serializable as well. Some of our filters depend on other items in the database, such as a filter that checks if a track is within an airspace. In normal operation, this filter would just retrieve the airspace from the database and check the track against it. This requires the filter to have a reference to the database (usually in the form of a DAO), which is probably not Serializable. Even if it is, you probably don't want your Spark Functions accessing your database if you want your Analysis to finish at some point in the near future. For this case, we "compile" our filter before passing it to the Spark Function for use. The "compile" process just creates a separate Serializable version of the filter containing all of the data that it will need in order to perform the filter. In the example above, the compiled airspace filter contains the airspace geometry in earth centered/earth fixed coordinates whereas the original filter just contained a reference to the airspace in the database.

Analyze Each Item

Now that we have an RDD pointing to our filtered set of data, we can actually perform the analysis. For that we're going to map each "snapshot" document into a "result" document containing the results of the analysis for that point in time. Also, since the resolution of our source snapshots will probably differ from the desired resolution of the results, we will map multiple results to a single key so that we can combine them into a single result in a later step. This is done by calling mapToPair on our snapshot RDD. This is where you actually do the "analysis" of your data. As an example I'll just simply count the amount of items that were present in the snapshot.

JavaPairRDD groupedByResolution = snapshots.mapToPair( new PairFunction<Tuple2<Object,BSONObject>,Integer,BSONObject>() {  
  public Tuple2<Integer, BSONObject> call( Tuple2<Object,BSONObject> tup )
    throws Exception 
  {
    BSONObject snapshot = tup._2;
    int startFrame = (int)snapshot.get("startFrame");
    int count = ((List)snapshot.get("data")).size();

    BasicBSONObject result = new BasicBSONObject();
    result.put("total", count);
    result.put("numberOfResults", 1);

    //Calculate which "resolution frame" this result should be part of
    //using the "startFrame" of the snapshot.  Every result at the same
    //"resolution frame" will be combined into a single result at a later step
    int resolutionFrame = (startFrame - firstFrame) / snapshotsPerResult;

    return new Tuple2<Integer,BSONObject>( resolutionFrame, result );
  }
})

If the resolution of your source data set is "2 seconds" and the desired resolution of the results is "10 seconds", then you will have 5 source items for every result which will eventually need to be combined into a single result. The code above does the grouping of the results by calculating a "resolutionFrame", but the reducing of the results into a single result happens later.

Reduce Results

At this point we have one Result per source snapshot. But as discussed, we want Analysis results at a lower resolution than the source data. Since we've already grouped results by a "resolution frame", we can do this by reducing the groupedByResolution PairRDD. For our simple count example, we want the "average" count across all samples for a single resolution. This will actually be a two-step process. First, we'll reduce all results into a single result containing a sum value and a total number of results. Then we'll map that result into a different result by calculating the average.

groupedByResolution = groupedByResolution.reduceByKey(  
  new Function2<BSONObject, BSONObject, BSONObject>() {
    public BSONObject call( BSONObject obj1, BSONObject obj2 )
      throws Exception
    {
      int sum1 = (Integer)obj1.get("total");
      int numberOfResults1 = (Integer)obj1.get("numberOfResults");
      int sum2 = (Integer)obj2.get("total");
      int numberOfResults2 = (Integer)obj2.get("numberOfResults");

      BasicBSONObject comb = new BasicBSONObject();
      comb.put( "total", sum1+sum2);
      comb.put( "numberOfResults", numberOfResults1+numberOfResults2 );
    }
  });

At this point the groupedByResolution PairRDD is a one-to-one mapping between resolution index (the key of the PairRDD) and Result (the value). Now we need to calculate the average.

groupedByResolution = groupedByResolution.mapValues(  
  new Function<BSONObject,BSONObject>() {
    public BSONObject call( BSONObject result ) {
      int total = (Integer)result.get( "total" );
      int numberOfResults = (Integer)result.get( "numberOfResults" );
      double avg = (double)total / (double)numberOfResults;

      result.put( "average", avg );
      return result;
    }
  });

If instead of the average number of objects per resolution index, we wanted the "max" or "min" number of objects counted during each resolution index, then we would not need the second step.

Save the Results!

Finally, once you have your final RDD, you can save it back to Mongo by doing something like this:

Configuration config = new Configuration();  
config.set( "mongo.output.uri", "mongodb://127.0.0.1:27017/neuro.results");  
groupedByResolution.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config );  

This is where all of the actual work will be performed. Every call to map, or reduce, or reduceByKey up to this point will return immediately.

And finally, here is one of the views of a completed Analysis in NEURO:

Things to Keep in Mind

All Functions you pass to Spark are Serializable - This is especially an issue when you start defining Spark functions as inner classes since inner classes in Java retain a reference to their enclosing class. Even if the enclosing class is also Serializable, you will probably be serializing more data than you want.

Good Split Keys are Important - Split keys determine the data that will be contained in the database-backed RDD. It's a good idea to manually run the splitVector command on your database to make sure that you are generating the split keys you think you are.

Running Spark without a Cluster - You can create a SparkContext without having a running Spark cluster by passing local[n] as the cluster URL to connect to. The value of n is the number of cores you want Spark to use.

Running a Spark Cluster - A Spark Cluster consists of a master and at least one worker. You must run at least one worker in order for Spark to respond to jobs. On Windows you have to manually start the Spark master (at least in the version of Spark that we're using) with a command similar to:

bin/spark-class org.apache.spark.deploy.master.Master --host 127.0.0.1 --port 7077 --webui-port 8080

To start a worker, do something like this:
spark-class org.apache.spark.deploy.worker.Worker spark://127.0.0.1:7077



Adam Stull