测试数据
方法一:使用pymysql库
需要先在mysql内建好数据表(此处使用下图)
import pymysql def read_csv(filename): '''读取文件''' fileContent = [] with open(filename) as f: for line in f: fileContent.append(line.strip('\n').split(',')) return fileContent def execute_sql(db,sql): '''执行mysql语句 执行sql语句 ''' mydb = pymysql.Connection(host='localhost',user='root',password='123456',database='wsj',port=3306) cursor = mydb.cursor() cursor.execute('use '+db) result = cursor.execute(sql) mydb.commit() mydb.close() return result def run(): '''读取csv文件数据并存入mysql数据库 1.读取csv文件数据 2.将数据存入mysql数据库 ''' fileContent = read_csv(filename='airports.csv') # "iata","airport","city","state","country","lat","long" # 创建数据表 表数据第一行为列名 sql = 'create table if not exists airports(iata varchar(20),airport varchar(50),city varchar(50),state varchar(40),country varchar(30),lat float,long_ float)' #print(sql) execute_sql(db='wsj',sql=sql) # 方法一:分条保存到数据库 for d in fileContent[1:-1]: sql = 'insert into airports values ({})'.format(','.join(d)) print(sql) execute_sql(db='wsj',sql=sql) # 方法二:一条语句存入数据库 sql = 'insert into airports values ' for d in fileContent[1:-1]: sql += '({}),'.format(','.join(d)) sql = sql[:-1]+';' execute_sql(db='wsj', sql=sql) if __name__ == '__main__': run()
方法二:使用pandas库
使用该方法会根据列内数据自动创建数据表,如下
import pandas as pd from sqlalchemy import create_engine def run(): '''使用pandas库''' # 创建engine engine = create_engine("mysql+pymysql://root:123456@localhost:3306/wsj") # 读取数据 fileContent = pd.read_csv('airports.csv') # 将数据转为DataFrame格式(用于保存到mysql数据库) fileContent = pd.DataFrame(fileContent) print(type(fileContent)) # 保存到mysql数据库 fileContent.to_sql(name='airports2',con=engine,if_exists='replace',index=False) if __name__ == '__main__': run()