Python&Mysql
# Python&Mysql
# 简单使用
import mysql.connector
config = {
'host': 'localhost', ## default localhost
'user': 'root',
'password': 'lv123',
## 'port': 3306, ## 默认即为3306
'database':'test', #无默认数据库
'charset': 'utf8', ## 默认即为utf8
'buffered': True, ## Unread result found
"auth_plugin":'mysql_native_password' ## mysql8错误
}
try:
conn = mysql.connector.connect(**config)
except mysql.connector.Error as e:
print('connect fail %s' % e)
cursor = conn.cursor()
## show tables
cursor.execute('SHOW TABLES')
## 操作浮标拿到数据
res = cursor.fetchall()
## 执行插入语句
## insert into vendors (vend_name, vend_address) values('test123', 'testad')
cursor.execute('insert into vendors (vend_name, vend_address) values'
' (%s, %s)', ['test123', 'testad'])
print(cursor.rowcount)
## 提交事务
conn.commit()
## 关闭浮标
cursor.close()
## 开启查询
cursor = conn.cursor()
cursor.execute('select * from vendors where vend_name = %s', ('test123',))
res = cursor.fetchall()
print(res)
- 理解浮标的概念
- 中间遇到了
Unread result found
和mysql.connector.errors.InternalError: Unread result found
,
参考https://blog.csdn.net/weixin_42124607/article/details/112041788 (opens new window) 和 https://www.jianshu.com/p/f0192a0afe99 (opens new window) - dict unpack
**dict
- 元组一个元素记得加
,
,['test123',]
# SQLAlchemy
from sqlalchemy import Column, create_engine, INTEGER,String
from sqlalchemy.orm import sessionmaker, declarative_base
## from sqlalchemy.ext.declarative import declarative_base API 太旧,更新了
## 创建对象的基类
Base = declarative_base()
## 定义User对象
class Vendor(Base):
## 表名
__tablename__ = 'vendors'
#
__table_args__ = {'extend_existing': True}
## 表结构
vend_id = Column(INTEGER,primary_key=True, autoincrement=True)
vend_name = Column(String(50))
vend_address = Column(String(50))
## "auth_plugin":'mysql_native_password'
engine = create_engine('mysql+mysqlconnector://root:lv123@localhost:3306/test?auth_plugin=mysql_native_password')
DBSession = sessionmaker(bind=engine)
## 创建session 对象
session = DBSession()
## 创建新对象用来插入表
new_vendor = Vendor(vend_name='vtest', vend_address='addtest')
session.add(new_vendor)
session.commit()
session.close()
## 查询刚才新插入的new_vendor
session = DBSession()
res = session.query(Vendor).filter(Vendor.vend_name=='vtest').one()
print('type:', type(res))
print('res:', res.vend_id, res.vend_name, res.vend_address)
session.close()