读取txt大文件并插入到mysql

文件6.9更新总库.txt的大小是18G, 大概有7亿条数据, 需要写入到mysql并提供查询功能。

步骤

1. 建表

建表:建立5个表分别是qq_0, qq_1, qq_2, qq_3, qq_4, 表结构一致, 先不建立索引. 耗时0分钟

create database ku default charset utf8mb4;
CREATE TABLE qq_0 (
    id INT(11) PRIMARY KEY  AUTO_INCREMENT,
    qq_number VARCHAR(64) DEFAULT '',
    mobile VARCHAR(64) DEFAULT ''
);
CREATE TABLE qq_1 (
    id INT(11) PRIMARY KEY  AUTO_INCREMENT,
    qq_number VARCHAR(64) DEFAULT '',
    mobile VARCHAR(64) DEFAULT ''
);
CREATE TABLE qq_2 (
    id INT(11) PRIMARY KEY  AUTO_INCREMENT,
    qq_number VARCHAR(64) DEFAULT '',
    mobile VARCHAR(64) DEFAULT ''
);
CREATE TABLE qq_3 (
    id INT(11) PRIMARY KEY  AUTO_INCREMENT,
    qq_number VARCHAR(64) DEFAULT '',
    mobile VARCHAR(64) DEFAULT ''
);
CREATE TABLE qq_4 (
    id INT(11) PRIMARY KEY  AUTO_INCREMENT,
    qq_number VARCHAR(64) DEFAULT '',
    mobile VARCHAR(64) DEFAULT ''
);

2. 拆分txt

拆分txt. 将txt文件按行拆分成多个文件,方便多线程读取;耗时约10分钟

mkdir out
split -l 10000000 6.9更新总库.txt out/a  

3. 多线程读取并写入

同时读取多个文件,每读取10000行构建一次sql语句并执行。 根据qq号最后一位取余判断应该插入到那个表中。(比如最后一位是5, 插入到qq_1表中)。耗时约2.5小时。


import datetime from collections import defaultdict import pymysql import os def _build_sql(insert_dict): sql_list = list() for table_name, insert_list in insert_dict.items(): if not insert_list: continue sql = f'INSERT INTO {table_name} (qq_number, mobile ) VALUES ' len_insert_values = len(insert_list) count = 0 for qq, mobile in insert_list: sql += f"('{qq}', '{mobile}')" count += 1 if count != len_insert_values: sql += ',' else: sql += ';' sql_list.append(sql) return sql_list def to_mysql(filename): connection = pymysql.connect(host='192.168.10.224', user='root', password='root', database='ku', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) with connection: all_count = 0 # start_time = datetime.datetime.now() insert_dict = { 'qq_0': [], 'qq_1': [], 'qq_2': [], 'qq_3': [], 'qq_4': [], } len_qq_all = 0 with connection.cursor() as cursor: with open(filename) as f: for line in f: try: *qqs, mobile = line.strip().split('----') except Exception: print('error', line) continue for qq in qqs: if len(mobile) > 64 or len(qq) > 64: continue table_name = 'qq_{}'.format(int(qq[-1]) % 5) insert_dict[table_name].append((qq, mobile)) if len_qq_all >= 10000: all_count += 10000 for sql in _build_sql(insert_dict): try: cursor.execute(sql) connection.commit() print('-flle_name: {}----------, time: {} seconds, sql: {}'.format(filename, ( datetime.datetime.now() - start).total_seconds(), sql[:18])) except Exception: import traceback print(traceback.format_exc()) connection.rollback() continue insert_dict = defaultdict(list) len_qq_all = 0 else: len_qq_all += 1 # 最后执行一次 for sql in _build_sql(insert_dict): try: cursor.execute(sql) connection.commit() print('-flle_name: {}----------, time: {} seconds'.format(filename, ( datetime.datetime.now() - start).total_seconds())) except Exception: import traceback print(traceback.format_exc()) connection.rollback() continue # if __name__ == '__main__': from concurrent.futures import ThreadPoolExecutor, as_completed files = [os.path.join('out', x) for x in os.listdir('./out')] print(files) start = datetime.datetime.now() pool = ThreadPoolExecutor(9) futures = [] for file in files: futures.append(pool.submit(to_mysql, file)) for x in as_completed(futures): print(x.result())

4. 建立索引

插入完毕后再建立索引。 耗时约4小时。


ALTER TABLE `qq_0` ADD INDEX qq_number (`qq_number`); ALTER TABLE `qq_0` ADD INDEX mobile (`mobile`); ALTER TABLE `qq_1` ADD INDEX qq_number (`qq_number`); ALTER TABLE `qq_1` ADD INDEX mobile (`mobile`); ALTER TABLE `qq_2` ADD INDEX qq_number (`qq_number`); ALTER TABLE `qq_2` ADD INDEX mobile (`mobile`); ALTER TABLE `qq_3` ADD INDEX qq_number (`qq_number`); ALTER TABLE `qq_3` ADD INDEX mobile (`mobile`); ALTER TABLE `qq_4` ADD INDEX qq_number (`qq_number`); ALTER TABLE `qq_4` ADD INDEX mobile (`mobile`);