I have trained a classification model using pyspark.ml.classification.RandomForestClassifier
and applied it on a new dataset for prediction.I am removing the customer_id column before feeding the dataset to the model but not sure how to map the customer_id back after prediction. So, there is no way for me to identify which row belongs to which customer as Spark dataframes are inherently unordered.
Best Answer
Here is a nice spark doc example of classification
using pipeline
technique where the original schema is preserved and only the selected cols are used as input features to the learning algorithm (ex: I replaced with random forest
).
reference => https://spark.apache.org/docs/latest/ml-pipeline.html
from pyspark.ml import Pipelinefrom pyspark.ml.classification import RandomForestClassifierfrom pyspark.ml.feature import HashingTF, Tokenizer# Prepare training documents from a list of (id, text, label) tuples.training = spark.createDataFrame([(0, "a b c d e spark", 1.0),(1, "b d", 0.0),(2, "spark f g h", 1.0),(3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and rf.tokenizer = Tokenizer(inputCol="text", outputCol="words")hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)pipeline = Pipeline(stages=[tokenizer, hashingTF, rf])# Fit the pipeline to training documents.model = pipeline.fit(training)# Prepare test documents, which are unlabeled (id, text) tuples.test = spark.createDataFrame([(4, "spark i j k"),(5, "l m n"),(6, "spark hadoop spark"),(7, "apache hadoop")], ["id", "text"])# Make predictions on test documents and print columns of interest.prediction = model.transform(test)# schema is preservedprediction.printSchema()root|-- id: long (nullable = true)|-- text: string (nullable = true)|-- words: array (nullable = true)| |-- element: string (containsNull = true)|-- features: vector (nullable = true)|-- rawPrediction: vector (nullable = true)|-- probability: vector (nullable = true)|-- prediction: double (nullable = false)# sample rowfor i in prediction.take(1): print(i)Row(id=4, text='spark i j k', words=['spark', 'i', 'j', 'k'], features=SparseVector(262144, {20197: 1.0, 24417: 1.0, 227520: 1.0, 234657: 1.0}), rawPrediction=DenseVector([5.0857, 4.9143]), probability=DenseVector([0.5086, 0.4914]), prediction=0.0)
Here is a nice spark doc example of the VectorAssembler
class where multiple cols are combined as input features which would be input to the learning algorithm.
reference => https://spark.apache.org/docs/latest/ml-features.html#vectorassembler
from pyspark.ml.linalg import Vectorsfrom pyspark.ml.feature import VectorAssemblerdataset = spark.createDataFrame([(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],["id", "hour", "mobile", "userFeatures", "clicked"])assembler = VectorAssembler(inputCols=["hour", "mobile", "userFeatures"],outputCol="features")output = assembler.transform(dataset)print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")output.select("features", "clicked").show(truncate=False)Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'+-----------------------+-------+|features |clicked|+-----------------------+-------+|[18.0,1.0,0.0,10.0,0.5]|1.0 |+-----------------------+-------+