Строки фильтрации PySpark в DataFrame по условию

[*]
При работе с фреймами данных pyspark нам часто нужно фильтровать строки по разным критериям. В этой статье мы обсудим различные способы фильтрации строк в кадре данных pyspark.
Метод filter()
filter()
Метод при вызове в кадре данных pyspark принимает условный оператор в качестве входных данных. Условный оператор обычно использует один или несколько столбцов фрейма данных и возвращает столбец, содержащий значения True или False. filter()
Метод проверяет маску и выбирает строки, для которых маска, созданная условным оператором, имеет значение True на выходе. Остальные строки отбрасываются.
PySpark Фильтр DataFrame по значению столбца
Чтобы отфильтровать кадр данных pyspark по значению столбца, мы будем использовать filter()
метод. Здесь мы проверим значение столбца в условном операторе и передадим его в filter()
метод. После выполнения мы получим кадр данных pyspark со строками, удовлетворяющими условию. Вы можете наблюдать это на следующем примере.

import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter(dfs.Physics>80)
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Выход:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
В этом примере мы сначала читаем CSV-файл в кадр данных pyspark. Затем мы воспользовались filter()
метод для фильтрации строк из фрейма данных. в filter()
метод, мы передали условие dfs.Physics>80
. Здесь, dfs
кадр данных, созданный из файла csv и Physics
имя столбца. Следовательно filter()
метод вернет кадр данных, имеющий значения больше 80 в Physics
столбец.
Фильтровать PySpark DataFrame с помощью оператора SQL
Вы также можете использовать операторы SQL для фильтрации кадра данных pyspark по значению столбца. Для этого мы можем использовать оператор SQL SELECT с предложением WHERE для проверки условия в данном имени столбца. Чтобы отфильтровать кадр данных по значению столбца с помощью SQL в PySpark, мы можем использовать следующие шаги.
- Сначала мы создадим представление кадра данных pyspark, используя
createOrReplaceTempView()
функция.createOrReplaceTempView()
Метод при вызове в кадре данных pyspark принимает имя представления в качестве входного аргумента. После выполнения он генерирует представление кадра данных с заданным именем. Мы можем выполнять операторы SQL в этом представлении для фильтрации данных. - Далее мы создадим оператор SQL для фильтрации строк, используя оператор SELECT и предложение WHERE.
- Наконец, мы будем использовать
sql()
функция для выполнения инструкции SQL.
После выполнения sql()
мы получим выходной фрейм данных с отфильтрованными строками. Вы можете наблюдать это на следующем примере.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
dfs.createOrReplaceTempView("df_sql")
new_df=spark.sql("SELECT * FROM df_sql WHERE Physics>80")
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Выход:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
В приведенном выше примере мы сначала создали кадр данных из файла csv. Затем мы воспользовались createOrReplaceTempView()
метод для создания представления фрейма данных pyspark, поскольку мы не можем напрямую выполнять операторы sql в фрейме данных. Далее мы использовали sql()
функция для выполнения оператора SQL для фильтрации строк фрейма данных на основе Physics
столбец.
Вместо того, чтобы использовать описанный выше подход, вы также можете передать оператор в предложении sql WHERE непосредственно в filter()
метод, вызвав его во входном фрейме данных. После этого вы получите выходной фрейм данных с нужными строками, как показано в следующем примере.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter("Physics>80")
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Выход:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
В этом примере мы напрямую передали подзапрос SQL, который мы использовали в предложении WHERE в предыдущем примере, в filter()
метод. Тем не менее, выход остается неизменным.
Фильтровать PySpark DataFrame по нескольким условиям
Вы также можете фильтровать кадры данных pyspark по нескольким условиям. Для этого вам нужно включить все условия внутри filter()
метод или в предложении sql WHERE с использованием условных операторов.
Например, мы можем фильтровать строки в кадре данных pyspark по нескольким условиям, используя filter()
метод, как показано ниже.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter((dfs.Physics>70) & (dfs.Chemistry<90))
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Выход:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
| Joel| 45| 75| 87|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
В приведенном выше примере мы передали два условия в filter()
метод с помощью оператора И (&). Следовательно, filter()
Метод фильтрует входной фрейм данных по обоим условиям и выдает результат. Здесь нужно иметь в виду, что каждое условие заключается в круглые скобки, а затем они объединяются с помощью условных операторов.
Если вы не используете круглые скобки с условными операторами в filter()
метод. Программа выдаст ошибку. Вы можете наблюдать это на следующем примере.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter(dfs.Physics>70 & dfs.Chemistry<90)
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Выход:
Py4JError: An error occurred while calling o240.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
В приведенном выше коде мы не использовали круглые скобки с условиями в filter()
метод. Следовательно, программа сталкивается с Py4JError.
PySpark Фильтр DataFrame по нескольким условиям с использованием SQL
Вместо метода фильтра вы также можете использовать предложение sql WHERE для фильтрации кадра данных pyspark по нескольким условиям. Для этого вы можете передать все условия в предложении WHERE и объединить их с помощью условных операторов. После выполнения условного оператора вы получите желаемый результат.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
dfs.createOrReplaceTempView("df_sql")
new_df=spark.sql("SELECT * FROM df_sql WHERE Physics>70 AND Chemistry<90")
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Выход:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
| Joel| 45| 75| 87|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
Вместо создания представления кадра данных и выполнения оператора SQL с использованием sql()
функцию, вы также можете передать подзапрос, используемый в предложении WHERE в операторе SQL, в filter()
метод. После этого вы получите тот же результат, что и в предыдущем примере.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter("Physics>70 AND Chemistry<90")
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Выход:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
| Joel| 45| 75| 87|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
Заключение
В этой статье мы обсудили несколько способов фильтрации строк в кадре данных pyspark. Чтобы узнать больше о pyspark, вы можете прочитать эту статью о том, как выбрать отдельные строки из кадра данных pyspark. Вам также может понравиться эта статья о том, как сортировать кадр данных pyspark.
Надеюсь, вам понравилось читать эту статью. Следите за информативными статьями.
Счастливого обучения!
Связанный
Рекомендуемое обучение Python
Курс: Python 3 для начинающих
Более 15 часов видеоконтента с инструкциями для начинающих. Узнайте, как создавать приложения для реального мира, и освойте основы.