-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodels.py
64 lines (47 loc) · 1.53 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import attr
from motor.motor_asyncio import AsyncIOMotorClient
motor_client = AsyncIOMotorClient()
proxy_db = motor_client.proxy
proxy_coll = proxy_db.proxy_pool
@attr.s
class ProxyItem(object):
ip = attr.ib()
port = attr.ib(convert=int)
position = attr.ib(default='')
weight = attr.ib(default=1, convert=int) # 权重,默认初始插入数据库时为0
def __str__(self):
return "{}:{}".format(self.ip, self.port)
def pass_verify(self):
self.weight += 1
def fail_verify(self):
self.weight -= 1
async def bulk_upsert_proxy(proxy_list):
bulk = proxy_coll.initialize_ordered_bulk_op()
for proxy_item in proxy_list:
proxy_dict = attr.asdict(proxy_item)
bulk.find(
{"ip": proxy_dict['ip'], 'port': proxy_dict['port']}
).upsert().update({"$set": proxy_dict})
result = await bulk.execute()
return result
async def delete_proxy(proxy_item):
await proxy_coll.find_one_and_delete({
"ip": proxy_item.ip,
"port": proxy_item.port
})
async def update_proxy(proxy_item):
await proxy_coll.update({
"ip": proxy_item.ip,
"port": proxy_item.port
}, {"$set": attr.asdict(proxy_item)})
async def fetch_all_proxy():
all_proxy = await proxy_coll.find().to_list(None)
proxy_list = [
ProxyItem(ip=p['ip'], port=p['port'],
position=p['position'],
weight=p['weight'])
for p in all_proxy
]
return proxy_list