-
Notifications
You must be signed in to change notification settings - Fork 438
Expand file tree
/
Copy pathpg.py
More file actions
75 lines (62 loc) · 2 KB
/
pg.py
File metadata and controls
75 lines (62 loc) · 2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import asyncio
import asyncpg
DB = {}
class PG:
def __init__(self, host, port, user, pwd, dbName):
self.user = user
self.pwd = pwd
self.host = host
self.port = port
self.dbName = dbName
self.db = None
self.table = None
def __getitem__(self, tb):
self.table = tb
return self
async def db_pool(self):
global DB
if DB.get(self.dbName, None) is None:
DB[self.dbName] = await asyncpg.create_pool(
host=self.host,
port=self.port,
user=self.user,
password=self.pwd,
database=self.dbName,
)
self.db = DB[self.dbName]
print("DB", self.dbName, self.db)
return self
# 如果测算程序不调用,切数据库会出现pgadmin看不到表格的问题.
def terminate_pool(self):
print("will terminate", self.db)
if self.db is not None:
self.db.terminate()
DB.pop(self.dbName, None)
async def check(self):
if self.dbName is None:
raise "no db name"
if self.table is None:
raise "no table name"
if self.db is None:
await self.db_pool()
async def execute(self, sql):
print("sql===", sql)
await self.check()
async with self.db.acquire() as conn:
await conn.execute(sql)
async def trans(self, sqls):
await self.check()
async with self.db.acquire() as conn:
async with conn.transaction():
for sql in sqls:
print("sql===", sql)
await conn.execute(sql)
async def select(self, sql):
await self.check()
async with self.db.acquire() as conn:
q = await conn.fetch(sql)
return [dict(i) for i in q]
def run(self, f):
print(self.dbName, self.table)
asyncio.get_event_loop().run_until_complete(f(self))
self.terminate_pool()