PySpark-Programmierung | Komentor

Python und Apache Spark sind die heißesten Schlagworte in der Analytics-Branche. Apache Spark ist ein beliebtes Open-Source-Framework, das eine blitzschnelle Datenverarbeitung gewährleistet und verschiedene Sprachen wie Scala, Python, Java und R unterstützt. Es läuft dann auf Ihre Sprachpräferenz und Ihren Arbeitsumfang hinaus. In diesem Artikel zur PySpark-Programmierung würde ich über Spark mit Python sprechen, um zu demonstrieren, wie Python die Funktionen von Apache Spark nutzt.

PySpark ist die Zusammenarbeit von Apache Spark und Python.

Apache Spark ist ein Open-Source-Cluster-Computing-Framework, das auf Geschwindigkeit, Benutzerfreundlichkeit und Streaming-Analysen basiert Python ist eine universelle, höhere Programmiersprache. Es bietet eine große Auswahl an Bibliotheken und wird hauptsächlich für maschinelles Lernen und Echtzeit-Streaming-Analysen verwendet.

Mit anderen Worten, es ist eine Python-API für Spark, mit der Sie die Einfachheit von Python und die Leistungsfähigkeit von Apache Spark nutzen können, um Big Data zu zähmen.

PySpark.png

Sie fragen sich vielleicht, warum ich mich für Python entschieden habe, um mit Spark zu arbeiten, wenn andere Sprachen verfügbar sind. Um dies zu beantworten, habe ich einige der Vorteile aufgelistet, die Sie mit Python genießen werden:

  • Python ist sehr einfach zu erlernen und zu implementieren.
  • Es bietet eine einfache und umfassende API.
  • Mit Python ist die Lesbarkeit von Code, Wartung und Vertrautheit weitaus besser.
  • Es bietet verschiedene Möglichkeiten zur Datenvisualisierung, was mit Scala oder Java schwierig ist.
  • Python wird mit einer großen Auswahl an Bibliotheken wie numpy, pandas, scikit-learn, seaborn, matplotlib usw. geliefert.
  • Es wird von einer riesigen und aktiven Community unterstützt.

Nachdem Sie nun die Vorteile der PySpark-Programmierung kennen, lassen Sie uns einfach in die Grundlagen von PySpark eintauchen.

Resilient Distributed Datasets (RDDs)

RDDs sind die Bausteine ​​jeder Spark-Anwendung. RDD steht für:

  • Robust: Es ist fehlertolerant und kann Daten bei einem Ausfall wiederherstellen.
  • Verteilt: Daten werden auf mehrere Knoten in einem Cluster verteilt.
  • Datensatz: Sammlung partitionierter Daten mit Werten.

Es ist eine Schicht abstrahierter Daten über der verteilten Sammlung. Es ist unveränderlicher Natur und folgt faule Verwandlung.

Mit RDDs können Sie zwei Arten von Operationen durchführen:

  1. Transformationen: Diese Operationen werden angewendet, um ein neues RDD zu erstellen.
  2. Aktionen: Diese Operationen werden auf ein RDD angewendet, um Apache Spark anzuweisen, Berechnungen anzuwenden und das Ergebnis an den Treiber zurückzugeben.

Datenrahmen

Datenrahmen in PySpark ist die verteilte Sammlung von strukturierten oder halbstrukturierten Daten. Diese Daten in Dataframe werden in Zeilen unter benannten Spalten gespeichert, ähnlich den relationalen Datenbanktabellen oder Excel-Tabellen.

Es teilt auch einige gemeinsame Attribute mit RDD, wie z. B. Unveränderlich in der Natur, folgt faulen Auswertungen und ist in der Natur verteilt. Es unterstützt eine Vielzahl von Formaten wie JSON, CSV, TXT und viele mehr. Sie können es auch aus den vorhandenen RDDs laden oder das Schema programmgesteuert angeben.

PySpark-SQL

PySpark SQL ist ein übergeordnetes Abstraktionsmodul über dem PySpark Core. Es wird hauptsächlich für die Verarbeitung strukturierter und halbstrukturierter Datensätze verwendet. Es bietet auch eine optimierte API, die die Daten aus verschiedenen Datenquellen mit unterschiedlichen Dateiformaten lesen kann. Mit PySpark können Sie die Daten also sowohl mit SQL als auch mit HiveQL verarbeiten. Aufgrund dieser Funktion gewinnt PySparkSQL langsam an Popularität unter Datenbankprogrammierern und Apache Hive-Benutzern.

PySpark-Streaming

PySpark Streaming ist ein skalierbares, fehlertolerantes System, das dem RDD-Batch-Paradigma folgt. Es wird grundsätzlich in Mini-Batches oder Batch-Intervallen betrieben, die von 500ms bis zu größeren Intervallfenstern reichen können.

Dabei erhält Spark Streaming einen kontinuierlichen Eingangsdatenstrom von Quellen wie Apache Flume, Kinesis, Kafka, TCP-Sockets usw. Diese gestreamten Daten werden dann intern in mehrere kleinere Batches auf der Grundlage von aufgeteilt Batch-Intervall und an die Spark Engine weitergeleitet. Spark Engine verarbeitet diese Datenstapel mithilfe komplexer Algorithmen, die mit High-Level-Funktionen wie Map, Reduce, Join und Window ausgedrückt werden. Sobald die Verarbeitung abgeschlossen ist, werden die verarbeiteten Stapel an Datenbanken, Dateisysteme und Live-Dashboards übertragen.

Pyspark-streaming.png

Die Schlüsselabstraktion für Spark Streaming ist Discretized Stream (DStream). DStreams bauen auf RDDs auf, was es den Spark-Entwicklern erleichtert, im selben Kontext von RDDs und Batches zu arbeiten, um die Streaming-Probleme zu lösen. Darüber hinaus lässt sich Spark Streaming auch in MLlib, SQL, DataFrames und GraphX ​​integrieren, was Ihren Funktionshorizont erweitert. Als High-Level-API bietet Spark Streaming Fehlertoleranz „genau einmal“ Semantik für zustandsbehaftete Operationen.

HINWEIS : „exactly-once“-Semantik bedeutet, dass Ereignisse „genau einmal“ von allen Operatoren in der Stream-Anwendung verarbeitet werden, selbst wenn ein Fehler auftritt.

Das folgende Diagramm stellt die grundlegenden Komponenten von Spark Streaming dar.

Spark-Streaming-Komponenten.png

Wie Sie sehen können, werden Daten aus verschiedenen Quellen wie Kafka, Flume, Twitter, ZeroMQ, Kinesis oder TCP-Sockets und vielen mehr in den Spark Stream aufgenommen. Darüber hinaus werden diese Daten mit komplexen Algorithmen verarbeitet, die mit High-Level-Funktionen wie Map, Reduce, Join und Window ausgedrückt werden. Schließlich werden diese verarbeiteten Daten zur weiteren Verwendung an verschiedene Dateisysteme, Datenbanken und Live-Dashboards übertragen.

Ich hoffe, dies hat Ihnen ein klares Bild davon gegeben, wie PySpark Streaming funktioniert. Kommen wir nun zum letzten, aber verlockendsten Thema dieses Artikels zur PySpark-Programmierung, dem maschinellen Lernen.

Maschinelles Lernen

Wie Sie bereits wissen, ist Python eine ausgereifte Sprache, die seit Ewigkeiten stark für Data Science und maschinelles Lernen verwendet wird. In PySpark wird maschinelles Lernen durch eine Python-Bibliothek namens MLlib (Machine Learning Library) erleichtert. Es ist nichts anderes als ein Wrapper über PySpark Core, der Datenanalysen mit maschinellen Lernalgorithmen wie Klassifizierung, Clustering, linearer Regression und einigen mehr durchführt.

Eine der verlockenden Eigenschaften des maschinellen Lernens mit PySpark ist, dass es auf verteilten Systemen funktioniert und hochgradig skalierbar ist.

MLlib stellt mit PySpark drei Kernfunktionen des maschinellen Lernens bereit:

  1. Datenaufbereitung: Es bietet verschiedene Funktionen wie Extraktion, Transformation, Auswahl, Hashing usw.
  2. Algorithmen für maschinelles Lernen: Es nutzt einige beliebte und fortschrittliche Regressions-, Klassifizierungs- und Clustering-Algorithmen für maschinelles Lernen.
  3. Dienstprogramme: Es verfügt über statistische Methoden wie Chi-Quadrat-Tests, deskriptive Statistik, lineare Algebra und Modellbewertungsmethoden.

Lassen Sie mich Ihnen zeigen, wie Sie maschinelles Lernen implementieren Einstufung durch logistische Regression.

Hier werde ich eine einfache prädiktive Analyse von Lebensmittelkontrolldaten von Chicago City durchführen.

##Importing the required libraries
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import * ##creating a RDD by importing and parsing the input data
def csvParse(s):
import csv
from StringIO import StringIO
sio = StringIO(s)
value = csv.reader(sio).next()
sio.close()
return value food_inspections = sc.textFile('file:////home/edureka/Downloads/Food_Inspections_Chicago_data.csv')\
.map(csvParse) ##Display data format
food_inspections.take(1)

Ausgabe-1-1.png

#Structuring the data
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("results", StringType(), False),
StructField("violations", StringType(), True)])
#creating a dataframe and a temporary table (Results) required for the predictive analysis. ##sqlContext is used to perform transformations on structured data
ins_df = spark.createDataFrame(food_inspections.map(lambda l: (int(l[0]), l[1], l[12], l[13])) , schema)
ins_df.registerTempTable('Count_Results')
ins_df.show()

Ausgabe-2.png

##Let's now understand our dataset
#show the distinct values in the results column
result_data = ins_df.select('results').distinct().show()

Ausgabe-3.png

##converting the existing dataframe into a new dataframe ###each inspection is represented as a label-violations pair. ####Here 0.0 represents a failure, 1.0 represents a success, and -1.0 represents some results besides those two
def label_Results(s):
if s == 'Fail':
return 0.0
elif s == 'Pass with Conditions' or s == 'Pass':
return 1.0
else:
return -1.0
ins_label = UserDefinedFunction(label_Results, DoubleType())
labeled_Data = ins_df.select(ins_label(ins_df.results).alias('label'), ins_df.violations).where('label >= 0')
labeled_Data.take(1)

Ausgabe-4.png

##Creating a logistic regression model from the input dataframe
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(labeled_Data)
## Evaluating with Test Data test_Data = sc.textFile('file:////home/edureka/Downloads/Food_Inspections_test.csv')\
.map(csvParse) \
.map(lambda l: (int(l[0]), l[1], l[12], l[13]))
test_df = spark.createDataFrame(test_Data, schema).where("results="Fail" OR results="Pass" OR results="Pass with Conditions"")
predict_Df = model.transform(test_df)
predict_Df.registerTempTable('Predictions')
predict_Df.columns

Ausgabe-5.png

## Printing 1st row
predict_Df.take(1)

Ausgabe-6.png

## Predicting the final result
numOfSuccess = predict_Df.where("""(prediction = 0 AND results="Fail") OR
(prediction = 1 AND (results="Pass" OR
results="Pass with Conditions"))""").count()
numOfInspections = predict_Df.count()
print "There were", numOfInspections, "inspections and there were", numOfSuccess, "successful predictions"
print "This is a", str((float(numOfSuccess) / float(numOfInspections)) * 100) + "%", "success rate"

Ausgabe-7.png

Damit kommen wir zum Ende dieses Blogs zur PySpark-Programmierung. Ich hoffe, es hat dazu beigetragen, Ihr Wissen etwas aufzuwerten.

Wenn Sie diesen Blog zur PySpark-Programmierung interessant fanden, können Sie fortfahren und lesen Sie ähnliche Blogs hier.

Haben Sie eine Frage an uns? Bitte erwähnen Sie es im Kommentarbereich und wir werden uns bei Ihnen melden.

Similar Posts

Leave a Reply

Your email address will not be published.