Tuesday 30 June 2015

CPython Integration in Weka

Continuing the interoperability in Weka that was started with R integration a few years ago, we now have integration with Python. Whilst Weka has had the ability to do Python scripting via Jython for quite some time, the latest effort adds CPython integration in the form of a "wekaPython" package that can be installed via Weka's package manager. This opens the door to all the highly optimised scientific libraries in Python - such as numpy, scipy, pandas and scikit-learn - that have components written in C or Fortran. Scikit-learn is a relatively new machine learning library that is increasing in popularity very rapidly (see the latest KDNuggets software poll).

Like the R integration in Weka, the CPython support allows for general scripting via a Knowledge Flow Python scripting step. This allows arbitrary scripts to be executed and one or more variables to be extracted from the Python runtime. Weka instances are transferred into Python as pandas data frames, and pandas data frames can be extracted from Python and converted back into instances. Furthermore, arbitrary variables can be extracted in textual form, and matlibplot graphics can be extracted as PNG images.


The package also provides a wrapper classifier and wrapper clusterer for the supervised and unsupervised learning algorithms implemented in scikit-learn. This allows the scikit-learn algorithms to be used and evaluated within Weka's framework, just like the MLRClassifier from the RPlugin package allows ML algorithms from R to be used. With both RPlugin and wekaPython installed it is quite cool to run comparisons between implementations in the different frameworks - e.g. here is a quick comparison on some UCI datasets (using Weka's Experiment environment to run a 10x10 fold cross-validation) between random forest implementations in Weka, R and scikit-learn. All default settings were used except for the number of trees, which was set to 500 for each implementation. Since scikit-learn only handles numeric input variables, both Weka's random forest and the MLRClassifier running R random forest were wrapped in the FilteredClassifier to apply unsupervised nominal to binary encoding (one hot encoding) so that all three implementations received the same input:


weka.classifiers.sklearn.ScikitLearnClassifier

This classifier wraps the majority of the supervised learning algorithms in scikit-learn. The wrapper supports retrieving the underlying model from python (as a pickled string) so that the ScikitLearnClassifier can be serialised and used for prediction at a later date.


weka.clusterers.ScikitLearnClusterer

This clusterer wraps clustering algorithms in scikit-learn. It basically functions in exactly the same way as the ScikitLearnClassifier, which allows it to be used in any Weka UI or from Weka's command line interface.

Under the hood

The underlying integration works via a micro-server written in python that is launched by Weka automatically. Communication is done over plain sockets and messages are stored in JSON structures. Datasets are transmitted as plain CSV and image data as base64 encoded PNG.

wekaPython works with both Python 2.7.x and 3.x. As it relies on a few new features in core Weka, a snapshot build of the development version (3.7) of Weka is required until Weka 3.7.13 is released. Numpy, pandas, matplotlib and scikit-learn must be installed in python for the wekaPython package to operate. Anaconda is a nice python distribution that comes with all the requirements (and lots more).

Wednesday 4 March 2015

Weka and Spark

Spark is smokin' at the moment. Every being and his/her/its four-legged canine-like companion seems to be scrambling to provide some kind of support for running their data processing platform under Spark. Hadoop/YARN is the old big data dinosaur, whereas Spark is like Zaphod Beeblebrox - so hip it has trouble seeing over its own pelvis; so cool you could keep a side of beef in it for a month :-) Certainly, Spark has advantages over first generation map-reduce when it comes to machine learning. Having data (or as much as possible) in memory can't be beat for iterative algorithms. Perhaps less so for incremental, single pass methods.

Anyhow, the coolness factor alone was sufficient to get me to have a go at seeing whether Weka's general purpose distributed processing package - distributedWekaBase - could be leveraged inside of Spark. To that end, there is now a distributedWekaSpark package which, short of a few small modifications to distributedWekaBase (mainly to support some retrieval of outputs in-memory rather than from files), proved fairly straightforward to produce. In fact, because develop/test cycles seemed so much faster in Spark than Hadoop, I prototyped Weka's distributed k-means|| implementation in Spark before coding it for Hadoop.

Internals

Internally, distributedWekaSpark handles CSV files only at present. In the future we'll look at supporting other data formats, such as Parquet, Avro and sequence files. CSV handling is the same as for distributedWekaHadoop - due to the fact that source data is split/partitioned over multiple nodes/workers it can't have a header row, and attribute names can be supplied via a "names" file or manually as a parameter to the jobs. The CSV data is loaded and parsed just once, resulting in an
RDD<weka.core.Instance> distributed data structure. Thanks to Spark's mapPartitions() processing framework, processing proceeds in much the same way as it does in distributedWekaHadoop, with the exception that the overhead of re-parsing the data on each iteration when running iterative algorithms is eliminated. Processing an entire partition at a time also avoids object creation overhead when making use of distributedWekaBase's classes.

Reduce operations are pairwise associative and commutative in Spark, and there isn't quite the analogue of a Hadoop reduce (where a single reducer instance iterates over a list of all elements with the same key value). Because of this, and to avoid lots of object creations again, many "reduce" operations were implemented via sorting and repartitioning followed by a map partitions phase. In most cases this approach works just fine. In the case of the job that randomly shuffles and stratifies the input data, the result (when there are class labels that occur less frequently than the number of requested folds) is slightly different to the Hadoop implementation. The Spark implementation results in these minority classes getting oversampled slightly.

distributedWekaSpark is not the only Weka on Spark effort available. Kyriakos-Aris Koliopoulos has been developing at the same time as myself, and released a proof-of-concept he developed for his Masters these about five months ago:

https://github.com/ariskk/distributedWekaSpark

I've borrowed his nifty cache heuristic that uses the source file size and object overhead settings to automatically determine a Spark storage level to use for RDDs.

Having RDDs referenceable for the duration that the Spark context is alive makes it possible to have a tighter coupling between Spark job steps in the Knowledge Flow. The success and failure connection types introduced in distributedWekaHadoop can now be used to carry data, such as the context and references to various RDD datasets that are in play. This allows Spark steps downstream from the start point in the flow to present a simpler version of their configuration UI, i.e. they can hide connection and CSV parsing details (as this is only required to be specified once, at the start step).

What's in the package?

The distributedWekaSpark package comes bundled with core Spark classes that are sufficient for running out-of-the-box in Spark's local mode and sourcing data from the local filesystem. This mode can take advantage of all the cores on your desktop machine by launching workers in separate threads. All the bundled template examples for Spark available from the Knowledge Flow's template menu use this mode of operation. It should also be sufficient to run against a standalone Spark cluster using a shared filesystem such as NTFS. 

To run against HDFS (and/or on YARN), it is necessary to delete all the Spark jar files in ${user.home}/wekafiles/packages/distributedWekaSpark/lib and copy in the spark-assembly-X.Y.Z-hadoopA.B.C.jar from your Spark distribution, as this will have been compiled against the version of Hadoop/HDFS that you're using.

All the same jobs that are available in distributedWekaHadoop have been implemented in distributedWekaSpark. Like in the Hadoop case, there is a full featured command line interface available. All jobs can stand alone - i.e. they will invoke other jobs (such as ARFF header creation and data shuffling) internally if necessary. As mentioned above, when running in the Knowledge Flow, individual job steps become aware of the Spark context and what datasets already exist in memory on the cluster. This allows the configuration of connection details and CSV parsing options to only have to be specified once, and downstream job steps can simplify their UI accordingly.





When referencing inputs or outputs in HDFS, the Weka Spark code handles hdfs:// URLs in a somewhat non-standard way. Like the way the hadoop fs/hdfs command operates relative to the current user's directory in HDFS (unless an absolute path is specified), hdfs:// URLs are interpreted relative to the current user's directory. So a URL like

hdfs://host:port/output/experiment1

refers to the output/experiment1 directory in the current user's home directory. To force an absolute path an extra / is needed - e.g.

hdfs://host:port//user/mhall/output/experiment1

Limitations

Aside from only handling CSV files at present, another limitation stems from the the fact that only one Spark context can be active within a single JVM. This imposes some constraints on the structure of Knowledge Flow processes using Spark steps. There can only be one Spark-related start point in a given Knowledge Flow graph. This is because a start point is where the Spark context is first created; so having more than one will lead to grief.

When running on a Spark cluster managed by YARN only the "yarn-client" mode is supported. This is the mode where the driver program executes on the local machine and the YARN resource manager is simply used to provision worker nodes in the cluster for Spark to use. The reason for this is that Weka's implementation configures everything programatically via SparkConf/JavaSparkContext, including all Weka and supporting jar files that are needed to run. This works fine for standalone Spark clusters, Mesos clusters and yarn-client mode, but not for yarn-cluster mode (where the driver program itself runs in an application master process on the cluster). In yarn-cluster mode some funky jiggery pokery (as done by the spark-submit shell script) is necessary to get everything configured for the driver to work on the cluster. It seems pretty ugly that this can't be handled by Spark seamlessly behind the scenes via the same SparkConf/SparkContext configuration as the other modes. Hopefully this will get rectified in a future release of Spark. Some discussion of this issue can be seen in the email thread at:

http://markmail.org/message/cmxswp2ffqjrkxix#query:+page:1+mid:42fug7it6f2zoxga+state:results

Another YARN-related issue is that it is necessary to have your Hadoop cluster's conf dir in the CLASSPATH. This is because Spark picks up the resource manager's address and other bits and pieces from the configuration files. Unfortunately, it is not possible to set this information programatically. You can only get at the Hadoop Configuration object being used by Spark internally after the SparkContext has been created - by this time it's too late, as Spark is already trying to talk to the resource manager.

Anyhow, distributedWekaSpark is available from Weka's package manager today. So give it a go and pass on any feedback you might have.