MySQL建立索引对查询效率提升
针对Mysql数据库查询效率的提升,有效的办法是建立索引。下面是对多种索引对查询效率的影响的一个测试。
创建模拟数据 数据是客户-产品-特性的关系表,模拟的是某个客户对某个产品的多个参数(比如是否省电,是否便宜,是否好看等)的偏好数据。表建立如下。
DROP TABLE IF EXISTS customer_product_info;
CREATE TABLE customer_product_info (
cid INT,
pid INT,
is_a tinyint(1) NOT NULL DEFAULT 0,
is_b tinyint(1) NOT NULL DEFAULT 0,
is_c tinyint(1) NOT NULL DEFAULT 0,
is_d tinyint(1) NOT NULL DEFAULT 0,
is_e tinyint(1) NOT NULL DEFAULT 0,
is_f tinyint(1) NOT NULL DEFAULT 0,
is_g tinyint(1) NOT NULL DEFAULT 0,
is_h tinyint(1) NOT NULL DEFAULT 0,
ctime TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
utime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
); 部分数据展示如下:
| cid | pid | is_a | is_b | is_c | is_d | is_e | is_f | is_g | is_h |
|---|---|---|---|---|---|---|---|---|---|
| 2268 | 991 | false | true | false | true | true | true | false | false |
| 9695 | 9159 | true | false | true | true | true | false | false | false |
| 6767 | 3688 | false | true | true | true | false | false | false | false |
| 8775 | 8006 | true | true | true | false | false | false | false | false |
| 7479 | 6374 | true | true | false | false | false | false | false | true |
| 2358 | 8114 | true | false | false | false | false | false | true | false |
| 9724 | 4457 | false | false | false | false | false | true | false | false |
| 7553 | 2762 | false | false | false | false | true | false | false | false |
使用程序自动总共生成50多万条数据;进行查询使用同一SQL语句进行聚合查询。
SELECT cid, SUM(is_a) as a_count, SUM(is_b) as b_count, SUM(is_c) as c_count,
SUM(is_d) as d_count, SUM(is_e) as e_count, SUM(is_f) as f_count,
SUM(is_g) as g_count, SUM(is_h) as h_count
FROM customer_product_info
GROUP BY cid;无索引情况
平均查询时间:320ms
建立单项索引情况
平均查询时间:330ms ;这种情况下,查询丝毫没有快,甚至还慢了一点儿。
索引创建如下:
CREATE INDEX idx_is_a ON customer_product_info (is_a);
CREATE INDEX idx_is_b ON customer_product_info (is_b);
CREATE INDEX idx_is_c ON customer_product_info (is_c);
CREATE INDEX idx_is_d ON customer_product_info (is_d);
CREATE INDEX idx_is_e ON customer_product_info (is_e);
CREATE INDEX idx_is_f ON customer_product_info (is_f);
CREATE INDEX idx_is_g ON customer_product_info (is_g);
CREATE INDEX idx_is_h ON customer_product_info (is_h);
CREATE INDEX idx_cid_is_a ON customer_product_info (cid, is_a);
CREATE INDEX idx_cid_is_b ON customer_product_info (cid, is_b);
CREATE INDEX idx_cid_is_c ON customer_product_info (cid, is_c);
CREATE INDEX idx_cid_is_d ON customer_product_info (cid, is_d);
CREATE INDEX idx_cid_is_e ON customer_product_info (cid, is_e);
CREATE INDEX idx_cid_is_f ON customer_product_info (cid, is_f);
CREATE INDEX idx_cid_is_g ON customer_product_info (cid, is_g);
CREATE INDEX idx_cid_is_h ON customer_product_info (cid, is_h);建立复合索引情况
平均查询时间:140ms ;有很明显的提升,查询时间是之前的一半还少。
索引创建如下:
CREATE INDEX idx_sum_all ON customer_product_info
(cid, is_a, is_b, is_c, is_d, is_e, is_f, is_g, is_h);分析:
这个索引会将全表扫描转化为索引扫描(Covering Index),索引扫描更小,通常放在内存中执行,在聚合时也不使用临时表,因此效率更高。
数据与测试代码:
import random
import time
import pymysql
from pymysql.cursors import DictCursor
DB_HOST = '127.0.0.1'
DB_PORT = 3306
DB_NAME = 'db_dec'
DB_USER = 'user001'
DB_USER_PASSWD = 'xxxx'
DB_CONFIG = {
'host': DB_HOST,
'port': DB_PORT,
'database': DB_NAME,
'user': DB_USER,
'password': DB_USER_PASSWD,
'charset': 'utf8mb4',
'cursorclass': DictCursor
}
def get_connection():
return pymysql.connect(**DB_CONFIG)
def execute_sql_result(conn, sql):
with conn.cursor() as cursor:
cursor.execute(sql)
conn.commit()
results = cursor.fetchall()
return results
def execute_sql(conn, sql):
with conn.cursor() as cursor:
cursor.execute(sql)
conn.commit()
def generate_customer_product_data(customers, products):
customer_ids = range(1, customers + 1)
product_ids = list(range(1, products + 1))
all_relations = []
print("正在生成对应关系...")
for cid in customer_ids:
count = random.randint(10, 100)
selected_products = random.sample(product_ids, count)
for pid in selected_products:
all_relations.append((cid, pid))
print(f"生成完毕,总记录数: {len(all_relations)}。正在进行全量乱序...")
random.shuffle(all_relations)
return all_relations
def generate_is_abc(leng):
return [1 if random.random() < 0.33 else 0 for _ in range(leng)]
def insert_data(conn):
data = generate_customer_product_data(10000, 10000)
is_abc = generate_is_abc(10000)
header = "INSERT INTO customer_product_info (cid, pid, is_a, is_b, is_c, is_d, is_e, is_f, is_g, is_h) VALUES\n"
insert_sql = header
i = 0
for _, (cid, pid) in enumerate(data):
values = f"({cid}, {pid}, {is_abc[i * 8]}, {is_abc[i * 8 + 1]}, {is_abc[i * 8 + 2]}, {is_abc[i * 8 + 3]}, {is_abc[i * 8 + 4]}, {is_abc[i * 8 + 5]}, {is_abc[i * 8 + 6]}, {is_abc[i * 8 + 7]})"
insert_sql += values
if i % 1000!= 999:
insert_sql += ",\n"
i += 1
else:
insert_sql += ";\n"
execute_sql(conn, insert_sql)
insert_sql = header
is_abc = generate_is_abc(10000)
i = 0
if insert_sql != header:
insert_sql = insert_sql.rstrip(",\n") + ";\n"
execute_sql(conn, insert_sql)
def check_query_performance(conn):
sql = """SELECT cid, SUM(is_a) as a_count, SUM(is_b) as b_count, SUM(is_c) as c_count,
SUM(is_d) as d_count, SUM(is_e) as e_count, SUM(is_f) as f_count,
SUM(is_g) as g_count, SUM(is_h) as h_count
FROM customer_product_info
GROUP BY cid;"""
start_time = time.time()
execute_sql_result(conn, sql)
end_time = time.time()
return end_time - start_time
if __name__ == "__main__":
CONN = get_connection()
sum = 0
total_runs = 100
for i in range(total_runs):
sum += check_query_performance(CONN)
print(f"Average query time: {sum / total_runs:.2f} seconds.")
CONN.close()