针对Mysql数据库查询效率的提升,有效的办法是建立索引。下面是对多种索引对查询效率的影响的一个测试。

创建模拟数据 数据是客户-产品-特性的关系表,模拟的是某个客户对某个产品的多个参数(比如是否省电,是否便宜,是否好看等)的偏好数据。表建立如下。

DROP TABLE IF EXISTS customer_product_info;
CREATE TABLE customer_product_info (
    cid INT, 
    pid INT,
    is_a tinyint(1) NOT NULL DEFAULT 0,
    is_b tinyint(1) NOT NULL DEFAULT 0,
    is_c tinyint(1) NOT NULL DEFAULT 0,
    is_d tinyint(1) NOT NULL DEFAULT 0,
    is_e tinyint(1) NOT NULL DEFAULT 0,
    is_f tinyint(1) NOT NULL DEFAULT 0,
    is_g tinyint(1) NOT NULL DEFAULT 0,
    is_h tinyint(1) NOT NULL DEFAULT 0,
    ctime TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    utime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP 
); 

部分数据展示如下:

cid pid is_a is_b is_c is_d is_e is_f is_g is_h
2268 991 false true false true true true false false
9695 9159 true false true true true false false false
6767 3688 false true true true false false false false
8775 8006 true true true false false false false false
7479 6374 true true false false false false false true
2358 8114 true false false false false false true false
9724 4457 false false false false false true false false
7553 2762 false false false false true false false false

使用程序自动总共生成50多万条数据;进行查询使用同一SQL语句进行聚合查询。

SELECT cid, SUM(is_a) as a_count, SUM(is_b) as b_count, SUM(is_c) as c_count,
SUM(is_d) as d_count, SUM(is_e) as e_count, SUM(is_f) as f_count,
SUM(is_g) as g_count, SUM(is_h) as h_count
FROM customer_product_info
GROUP BY cid;

无索引情况

平均查询时间:320ms

建立单项索引情况

平均查询时间:330ms ;这种情况下,查询丝毫没有快,甚至还慢了一点儿。

索引创建如下:

CREATE INDEX idx_is_a ON customer_product_info (is_a);
CREATE INDEX idx_is_b ON customer_product_info (is_b);
CREATE INDEX idx_is_c ON customer_product_info (is_c);
CREATE INDEX idx_is_d ON customer_product_info (is_d);
CREATE INDEX idx_is_e ON customer_product_info (is_e);
CREATE INDEX idx_is_f ON customer_product_info (is_f);
CREATE INDEX idx_is_g ON customer_product_info (is_g);
CREATE INDEX idx_is_h ON customer_product_info (is_h);
CREATE INDEX idx_cid_is_a ON customer_product_info (cid, is_a);
CREATE INDEX idx_cid_is_b ON customer_product_info (cid, is_b);
CREATE INDEX idx_cid_is_c ON customer_product_info (cid, is_c);
CREATE INDEX idx_cid_is_d ON customer_product_info (cid, is_d);
CREATE INDEX idx_cid_is_e ON customer_product_info (cid, is_e);
CREATE INDEX idx_cid_is_f ON customer_product_info (cid, is_f);
CREATE INDEX idx_cid_is_g ON customer_product_info (cid, is_g);
CREATE INDEX idx_cid_is_h ON customer_product_info (cid, is_h);

建立复合索引情况

平均查询时间:140ms ;有很明显的提升,查询时间是之前的一半还少。

索引创建如下:

CREATE INDEX idx_sum_all ON customer_product_info
  (cid, is_a, is_b, is_c, is_d, is_e, is_f, is_g, is_h);

分析:

这个索引会将全表扫描转化为索引扫描(Covering Index),索引扫描更小,通常放在内存中执行,在聚合时也不使用临时表,因此效率更高。

数据与测试代码:

import random
import time
import pymysql
from pymysql.cursors import DictCursor

DB_HOST = '127.0.0.1'
DB_PORT = 3306
DB_NAME = 'db_dec'
DB_USER = 'user001'
DB_USER_PASSWD = 'xxxx' 
DB_CONFIG = {
    'host': DB_HOST,
    'port': DB_PORT,    
    'database': DB_NAME,
    'user': DB_USER,
    'password': DB_USER_PASSWD,
    'charset': 'utf8mb4',
    'cursorclass': DictCursor
}

def get_connection(): 
    return pymysql.connect(**DB_CONFIG)

def execute_sql_result(conn, sql):  
    with conn.cursor() as cursor:
        cursor.execute(sql)
        conn.commit()
        results = cursor.fetchall()   
    return results  


def execute_sql(conn, sql): 
    with conn.cursor() as cursor:
        cursor.execute(sql)
        conn.commit()


def generate_customer_product_data(customers, products): 
    customer_ids = range(1, customers + 1)
    product_ids = list(range(1, products + 1))
    all_relations = []

    print("正在生成对应关系...")

    for cid in customer_ids: 
        count = random.randint(10, 100)         
        selected_products = random.sample(product_ids, count) 

        for pid in selected_products:
            all_relations.append((cid, pid))

    print(f"生成完毕,总记录数: {len(all_relations)}。正在进行全量乱序...")
    random.shuffle(all_relations)

    return all_relations

def generate_is_abc(leng):
    return [1 if random.random() < 0.33 else 0 for _ in range(leng)]

def insert_data(conn): 
    data = generate_customer_product_data(10000, 10000)
    is_abc = generate_is_abc(10000) 
    header = "INSERT INTO customer_product_info (cid, pid, is_a, is_b, is_c, is_d, is_e, is_f, is_g, is_h) VALUES\n"
    insert_sql = header
    
    i = 0
    for _, (cid, pid) in enumerate(data):
        values = f"({cid}, {pid}, {is_abc[i * 8]}, {is_abc[i * 8 + 1]}, {is_abc[i * 8 + 2]}, {is_abc[i * 8 + 3]}, {is_abc[i * 8 + 4]}, {is_abc[i * 8 + 5]}, {is_abc[i * 8 + 6]}, {is_abc[i * 8 + 7]})"
        insert_sql += values 

        if i % 1000!= 999:
            insert_sql += ",\n"
            i += 1
        else:
            insert_sql += ";\n" 
            execute_sql(conn, insert_sql)
            insert_sql = header
            is_abc = generate_is_abc(10000)
            i = 0 


    if insert_sql != header:
        insert_sql = insert_sql.rstrip(",\n") + ";\n"
        execute_sql(conn, insert_sql)

def check_query_performance(conn): 
    sql = """SELECT cid, SUM(is_a) as a_count, SUM(is_b) as b_count, SUM(is_c) as c_count,
       SUM(is_d) as d_count, SUM(is_e) as e_count, SUM(is_f) as f_count,
       SUM(is_g) as g_count, SUM(is_h) as h_count
        FROM customer_product_info
        GROUP BY cid;"""

    start_time = time.time()
    execute_sql_result(conn, sql)
    end_time = time.time() 

    return end_time - start_time


if __name__ == "__main__":
    CONN = get_connection()
    sum = 0
    total_runs = 100    
    for i in range(total_runs):
        sum += check_query_performance(CONN)

    print(f"Average query time: {sum / total_runs:.2f} seconds.")
    CONN.close()