【干货系列】5种方法教你使用Python突破SQL性能限制 您所在的位置:网站首页 漫威里面的超能力有哪些 【干货系列】5种方法教你使用Python突破SQL性能限制

【干货系列】5种方法教你使用Python突破SQL性能限制

#【干货系列】5种方法教你使用Python突破SQL性能限制| 来源: 网络整理| 查看: 265

Python 和 SQL:它们在一起走得更远

尽管有很多关于数据库查询性能优化的内容,但我发现,有时,提高 SQL 能力的最佳方法是将其与脚本语言(如 Python)结合使用。众所周知,虽然 SQL 是一种用于读取、操作和写入数据库的强大方法,但它缺乏脚本语言的灵活性和实用性,这使得某些操作(如循环)几乎不可能进行。 此外,底层数据库的限制会降低性能,也有可能阻止执行消耗资源的查询。 例如,我曾经遇到过一个持续存在的过度的元数据读取错误,我将在下面详细说明。像 Python 这样的脚本语言可以为上述缺陷提供了一种解决方法,他能做的不仅仅是取代SQL 工作。 结合使用 Python 和 SQL 可以生成更强大、高效和清晰的脚本。

1. 遍历多个 SQL 表

Python 可以帮助提升 SQL 的最直白的方法之一是将查询字符串合并到 Python 的循环结构中以连续迭代多个查询。使用在 Python 中定义的变量,你可以创建一个基本查询并使用 SQL 文本和 Python 变量进行操作。 例如,假设我们正在尝试获取一个列表,它按大小列出GCP 项目中包含的所有数据集和表。 在纯 SQL 中,你的查询语句应该是:

SELECT * FROM `my_project.dataset_1.INFORMATION_SCHEMA` UNION ALL SELECT * FROM `my_project.dataset_2.INFORMATION_SCHEMA` UNION ALL SELECT * FROM `my_project.dataset_3.INFORMATION_SCHEMA`

使用Python实现的话,我们就不需要手动地查询一个又一个的数据集:

from google.cloud import bigquery datasets = ['dataset_1', 'dataset_2', 'dataset_3'] bq_client = bigquery.Client() for dataset in datasets: get_datasets = bq_client.query("SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size FROM `"+dataset.dataset_id+"`.__TABLES__ GROUP BY 1, 2, 3") tables = get_datasets.result() for table in tables: dataset_id = table.dataset table_id = table.table_id size = table.size_bytes gb_size = table.gb_size print(dataset_id, table_id, size, gb_size)

尽管有两个循环,这可能看起来有些复杂,但我们所做的只是循环遍历数据集列表。在程序里我们要更改的只是我们引用的数据集,本质上,创建的是与 UNION 相同数目的查询,但重复性手动工作少很多。

""" SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size FROM `my_project.dataset_1.`__TABLES__ GROUP BY 1, 2, 3 """ """ SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size FROM `my_project.dataset_2.`__TABLES__ GROUP BY 1, 2, 3 """ """ SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size FROM `my_project.dataset_3.`__TABLES__ GROUP BY 1, 2, 3 """

如果你的实际工作中需要查询这么多的数据,可以使用Python的 bq_client.list_datasets() 函数生成数据集列表。需要注意的是,在 SQL 中是可以进行循环的,但通常必须采取额外的步骤,例如定义变量和创建 UDF 来完成所需的操作。可以参考: SQL WHILE loop with simple examples www.sqlshack.com/sql-while-loop-with-simple-examples/

2. 自动化模式定义

BigQuery Python Client允许开发人员将模式(Schema)定义为列表,这个列表稍后可以导入加载函数(Loading Functions)。在脚本中定义 BigQuery 结构而不是默认静态创建,这样的做法会更可靠点,以免类型出错。 如果您手动创建了一个BigQuery 结构,它可能如下所示:

schema = [ bigquery.SchemaField("first_name", "STRING"), bigquery.SchemaField("last_name", "STRING"), bigquery.SchemaField("age", "INTEGER") ]

这样创建没有什么大问题,但是,当您处理需要 100 列或更多列的数据时,这就...很没劲了。我的解决方案是一个相对简单的 Python 函数,它以类似于我之前描述的循环的方式自动填充这些字段。

def create_schema(field_list: list, type_list: list): schema_list = [] for fields, types in zip(field_list, type_list): schema = bigquery.SchemaField(fields, types) schema_list.append(schema) return schema_list

该函数的输出将与上面定义的模式完全相同,但如果我不指定字段是否为 NULLABLE,它将默认为 NULLABLE。

3. 在 1 行 Python 中转换为Data Frame

有一种优雅而简单的方法可以创建从 SQL 查询派生的DataFrame。它只需要一行 Python,尤其是当你查询存储在配置文件中的数据时。

query = """ SELECT * FROM `my_project.dataset.table` """ query_job = bq_client.query(cfg.query).to_dataframe()

你甚至可以在同一行代码里直接到数据结果存储到一个CSV文件中。

query_job = bq_client.query(cfg.query).to_dataframe().to_csv('query_output.csv')

至少对我来说,这是一个比必须从 SQL 引擎的 UI 导出或下载报告更简化的过程。

4. 解决 SQL 环境限制

在我曾经的一个自动化审计项目中,需要删除我们数据仓库中未使用的表。可以想象,此过程涉及大量元数据。您可能不知道,BigQuery 对每次实例中允许的元读取数量施加了限制。 我不断遇到warning和error,告诉我无法运行查询,因为它尝试了太多元读取。起初我试图将我的工作分成两个单独的 CTE,但因为它们在同一个查询中运行,所以我仍然会遇到同样的错误。然后一位资深工程师建议我应该在 Python 中以Chunk的形式运行这个东西,并使用 Pandas 进行连接。 这种方法非常有效,以至于我最终将整个脚本转换为 Pandas,只将查询保留为原始数据源。如果您运行的查询过于消耗资源,请考虑将其拆分为多个部分,在 Python 中运行并使用 Pandas 重新加入。因为我引用的 CTE 有数百行长并且涉及 50 多个元读取,所以下面只是其很小的代码片段:

query_1 = """ SELECT * FROM a_resource_consuming_cte_1 """ query_2 = """ SELECT * FROM a_resource_consuming_cte_2 """ query_1_df = bq_client.query(query_1).to_dataframe() query_2_df = bq_client.query(query_2).to_dataframe() final_df = pd.concat([query_1_df, query_2_df] 5. 添加/截断 (Append/Truncate)

对 BigQuery 的一个主要抱怨是它不支持 APPEND/TRUNCATE 操作。我指我们可以将一条记录添加到表中或覆盖它们。目前,BigQuery 确实包含一个允许开发人员指定这两个操作的参数。因此,如果你只想覆盖一段特定时间范围内的 SQL 表,就需要有点创意了。 值得庆幸的是,结合 Python 和 SQL 将使我们能够进行简单进行这两个操作。 在开始编写代码之前,让我们先谈谈为什么日常需要覆盖行。假设有一个每天更新几次的电子表格,并且在每天结束时,我们想要上传今天日期的条目结果。由于工作表一天可以编辑多次,因此简单地附加数据会产生重复的行,从而导致数据混乱。所以我们会希望在加载数据时消除任何重复项。 最简单的方法是将 CRUD 语句与 Python/Pandas 代码配对,这将创建我们要覆盖的数据子集。

crud_statement = """ DELETE FROM table WHERE date = CURRENT_DATE() """ bq_client.query(crud_statement) df = df[(df['date'] == date.today())] bq_client.load_table_from_dataframe(df, job_config)

这将使我们的数据绝对相等并能够反映每次运行脚本时的实时变化。

总结

将 Python 等脚本语言与 SQL 相结合,可以为仅使用 SQL 无法实现的操作创造更多可能性。由于许多数据作业不仅需要 SQL 知识,而且至少还需要 Python 等脚本语言的知识,因此掌握 SQL 和 Python 结合的强大功能是很必要的。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有