• Petter Hultin Gustafsson

Movies on MLOps


Classifying movie ratings using the MLOps platform with PySpark and TensorFlow.




Introduction


Classifying movies is always super cool and useful. I, myself, have built at least five successful business around it, so I wanted to share an end-to-end example of how you can go from being an average Netflix rater to making millions of dollars on your skillset. First, we will go through the preprocessing using PySpark on the MLOps platform, we will then continue training awesome models that we can deploy so that millions of users can pay for the ratings.


Note! Subscription tiers at about $29/month have worked best for me in the past.



Inspecting the data


The dataset can be found here. I’m going to go with the ratings, metadata and keywords for this classifier. The columns that I’m interested in and will work with are:


  • ratings.csv — all of them

  • metadata.csv — [id, budget, genres, popularity, runtime, revenue, original_language, production_companies, vote_count, vote_average]


  • keywords.csv — all of them


Since these are the columns with least null values and most information. I’m not going to do any Pandas data analysis here, because Kaggle has already done it for me. Personally, I keep all my cool datasets on S3, so importing them on the MLOps platform is easy, just add the S3 paths under “Create Datasource”. It should look something like this:


Adding datasources in the MLOps platform enables automatic tracking of data over time.



Let’s get crackin’


First things first. This is my __main__ .

Importing the DataFrames might seem a bit obscure, but since I generated the code from the console, I get the UUID names of the version tracked tables. After reading the data we basically call a function that does all our transformations, and then we write the resulting DataFrame to disk, specifying format and label columns (for statistics and training). Using the MLOps read and write functions also automatically allows you to trigger this job on a schedule, and it will only process data it has not yet seen (pipeline gold).


Selecting datasources in the MLOps platform allows you to generate template code for your PySpark job.


Note! Saving this dataset in Parquet instead of CSV, results in about two magnitudes of disk space saved. Initial dataset size: ~700 Mb, transformed CSV: ~900 Mb, transformed Parquet: 14 Mb.


Oh btw, don’t forget to add some imports, it might help.


Transformations


So to summarize what I want to accomplish with the data:


  1. Get the IDs from all the nested columns [genres, keywords, production_companies] and sort them by the numbers and then stringify them so that, for example, a combination of genre_1 + genre_2 can be seen as a feature.


2. Convert all string columns [original_language, + columns from 1)] to numerical classes.


3. Replace all empty values (zeroes in our case) with the mean of each column.


4. Normalize and scale all feature columns.

5. One hot encode my ratings (labels), since I will be using TensorFlow’s CategoricalCrossentropy as loss function. This is optional, and if you are using SparkML, or Scikit, you could just be satisfied with having a single column of integers instead.


Preprocessing code


So let’s start with the boring, annoying part of figuring out how to parse these poorly structured python dicts (they could at least have bothered doing a json.dumps()). So this is my oh-my-god helper function:

So now that we got that out of the way, let’s dive into the my_transformations() function. I’m going to start with the parsing. And basically, what I will do is run a UDF for each json column and pass each row to the above function. Basically we will go from having a stringified Python dict to a Spark ArrayType filled with integers (the Id’s), these integers I will then sort with the array_sort function so that genre_1+genre_2 is the same as genre_2+genre_1:


As you can see above I’m also using my own special “null value” handler at the end of df_kw_parsed and df_meta_parsed to remove all arrays that are empty.


Next up, I will join my three DataFrames together, repartition the data evenly onto the Spark executors and then cast a bunch of columns. You will also notice that I use the Spark built-in function array_join , which will help me stringify my arrays so that I can create integer classes out of them later.


Once that hurdle is over, I will create my first Spark Pipeline that will take all my string columns and turn each unique value into its own integer value, as well as using the Imputer function to fill zero values with the mean. I will also take my ratings column and use Bucketizer . Basically, since the ratings are a bit skewed in number of samples, I will this function to bucket ratings into a unified class, reducing the number of classes from 10 to 4.


Alright, we have two things left to do, scaling and one hot encoding. So let’s start with scaling, which I will do for for all features. I will use the Spark StandardScaler that normalizes and then scales, because I’m lazy. And honestly, because we are doing movie ratings


Finally, we can run our last pipeline, with the OneHotEncoder . There is just one little obstacle here, and that is that it’s outputted into a SparseVector and we want it as columns, since we are going to save it to disk.


Notice that I use the function to_array from gist number three. Finally I will return to main and write to disk.

Now that my script is done, I want to pop it into the MLOps platform so I do “Create Dataset”, choose 4 standard workers (that’s a lot of compute power for this little princess dataset). I will create a full subset right away and do a 80/10/10 split on train/val/test. I also won’t calculate any column metrics, since I know they are normalized and scaled, and also because we are doing movie ratings. Here is the result:



As we can see, this badboy took us about 35 minutes to execute and costed me about a dollar. Good for me that I’m 100 % certain this will generate a kick-ass classifier in the next part!

If I’m interested, I can directly via the Logs button inspect the CloudWatch logs. I can also check out the PySpark Code directly, which is nice if you are multiple people on a team working towards a common goal. You can then easily start of where someone left.


Well that was a handful. Now let’s move on and build a model!


Training a TensorFlow model


Alright, so as you know, the heavy lifting is over, and building models is just a walk in the park once the data is tip top. So, straight from the Datasets view we can select our dataset and click “Create Model” which will look something like:



If you are a Data Scientist, and have read this far, you probably agree that using a DNN to fit a model around this dataset is a tad too much. But who has time for that kind of philosophical thinking!

I will use a 120–120–4 DNN with CategoricalCrossEntropy in the Keras API. All this I will run on a GPU instance with 30 epochs. And this is all the code we need:


As you can see, most important things are baked into the MLOps SDK after submitting through the console. You access all the hyperparameters through mlops.hyperparameters . The best thing is, if I wanted to run hyperparameter optimization on this dataset, the total lines of code I would have to change would be zero. This allows you to experiment at a fast pace, doing many iterations on smaller datasets, and then quickly scale it up once you are ready to build your production model. As well, using the mlops.callback class, we automatically feed all metrics back to the console so that you and your colleagues can collaborate and iterate together.



Ooooh! Fantastic results. A stunning accuracy of 25 % on a 4 class problem. But what could have gone wrong? Must have been those lazy data engineers. Oh wait…


If you can see what’s wrong — leave a comment! The first correct solution wins a movie (rating).


Click here if you want to learn more about the MLOps platform!