0%

twisted框架

twisted 入门

twisted框架
twisted是一个python编写的事件驱动网络框架
安装

1
2
3
4
pip install twisted
>>> import twisted
>>> twisted.__version__
'20.3.0'

基础
下列例子与twisted官方文档介绍有所出入,仅用于自己学习参考
1.一个简单的服务器应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor
from time import ctime


class TServer(Protocol):
def connectionMade(self):
self.transport.write("hello stranger!\t\n".encode())


def dataReceived(self, data):
if str(data.decode()) == 'q': # 输入q关闭连接
self.transport.write("\nconnection closed...\t\n".encode())
self.transport.loseConnection()
else:
self.transport.write(" you have sent {0} at {1}\r\n".format(data.decode(), ctime()).encode())


factory = Factory()
factory.protocol = TServer
reactor.listenTCP(22222, factory)
reactor.run()

分析:

使用telnet来连接创建的tcp服务,每输入一个字符就按特定的格式输出,输入q则关闭连接

1
telnet localhost 22222

但使用telnet连接我们创建的服务器应用时发现了一个问题,每敲击一个字符就把数据发送到了服务端,没有办法实现字符串的发送,因此需要编写相应的客户端应用来解决这个问题。
2.一个简单的客户端应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import reactor

class TClient(Protocol):
def sendData(self):
data = input(">")
if data:
self.transport.write(data.encode())

def dataReceived(self, data):
print(data.decode())
self.sendData()

class TClientFactory(ClientFactory):
protocol = TClient
clientConnectionLost = clientConnectionFailed = lambda self, connector, reason: reactor.stop()

# class TClientFactory(ClientFactory):
# protocol = TClient
# def clientConnectionFailed(self, connector, reason):
# reactor.stop()
# def clientConnectionFailed(self, connector, reason):
# reactor.stop()


reactor.connectTCP('localhost', 22222, TClientFactory())
reactor.run()

具体的分析过程与构建服务端时类似,就不赘述了,效果如下:

*3.封装到协议工厂类的例子(《twisted网络编程要领》)

1
2
3
4
5
6
7
8
9
10
11
12
13
# Server的例子,Client类似
from twisted.internet import protocol, reactor

class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)

class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()

reactor.listenTCP(8000, EchoFactory())
reactor.run()

事件驱动
这里不多说了,就类似于python多线程时cpu资源分配(GIL控制),在A作业I/O时把资源让给其他作业,放张《twisted网络编程要领》上的图

核心部件
Reactor 反应器
反应器是twisted实现事件驱动的核心,作用是提供网络api,线程,分配事件等。
温馨提示,配合源码食用更佳

Protocol 协议
协议类(BaseProtocol)中定义了一些交互式需要用到的方法
makeConnection 创建连接时调用
connectionMade 连接建立后调用

Protocol:
dataReceived 接收数据时调用
connectionLost 关闭连接时调用
在基础部分的例子里我们继承了Protocol类,重写了connectionMade和dataReceived方法来达到自定义协议的目的。

Protocol Factories 协议工厂
封装定义的协议类,为每个连接实例化,当连接终止时删除。协议定义了客户端服务端交互的方式,而协议工厂类中可以存放一些持续性的配置信息(如当前建立的有效连接数,在自定义的Factory类中定义一个numconnections变量,并在自定义的协议类中的connectionMode方法中将其+1)。
因为每建立一次连接都会重新创建协议的新的实例,所以持续性信息需要定义在协议工厂类而不是协议类中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor

class ChatProtocol(LineReceiver):
def __init__(self, factory):
self.factory = factory
self.name = None
self.state = "REGISTER"

def connectionMade(self):
self.sendLine("What's your name?".encode())

def connectionLost(self, reason):
if self.name in self.factory.users:
del self.factory.users[self.name]
self.broadcastMessage("%s has left the channel." % (self.name,).encode())

def lineReceived(self, line):
if self.state == "REGISTER":
self.handle_REGISTER(line)
else:
self.handle_CHAT(line)

def handle_REGISTER(self, name):
if name in self.factory.users:
self.sendLine("Name taken, please choose another.".encode())
return
self.sendLine("Welcome, {}!".format(name).encode())
self.broadcastMessage("%s has joined the channel." % (name,))
self.name = name
self.factory.users[name] = self
self.state = "CHAT"

def handle_CHAT(self, message):
message = "<%s> %s" % (self.name, message)
self.broadcastMessage(message)

def broadcastMessage(self, message):
for name, protocol in self.factory.users.items():
if protocol != self:
protocol.sendLine(message.encode())

class ChatFactory(Factory):
def __init__(self):
self.users = {}

def buildProtocol(self, addr):
return ChatProtocol(self)

reactor.listenTCP(8123, ChatFactory())
reactor.run()

协议和协议工厂的源码比较好读,就不画调用流程图了

延迟与回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from twisted.internet import reactor, defer
class HeadlineRetriever(object):
def processHeadline(self, headline):
if len(headline) > 50:
self.d.errback(
"The headline ``%s'' is too long!" % (headline,))
else:
self.d.callback(headline)
def _toHTML(self, result):
return "<h1>%s</h1>" % (result,)
def getHeadline(self, input):
self.d = defer.Deferred()
reactor.callLater(1, self.processHeadline, input)
self.d.addCallback(self._toHTML)
return self.d

def printData(result):
print(result)
reactor.stop()


def printError(failure):
print(failure)
reactor.stop()

h = HeadlineRetriever()
d = h.getHeadline("Breaking News: Twisted Takes Us to the Moon!")
d.addCallbacks(printData, printError)
reactor.run()

Deferred为实现事件驱动异步编程提供了基础,当在执行逻辑代码时需要多线程来避免阻塞主线程的情况发生。
在不使用异步的情况下,如果要多次访问执行耗时操作的服务端网页时,所需的时间是串行的;在使用异步后,由于I/O操作并行,计算操作串行,假定这里的耗时操作是I/O耗时的情况下,所需时间并行。
使用Deferred的非阻塞例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.web.resource import Resource
from twisted.web.server import Site, NOT_DONE_YET
import time
class BusyPage(Resource):
def _delayedRender(self, request):
request.write("Finally done, at %s" % (time.asctime(),))
request.finish()
def render_GET(self, request):
d = deferLater(reactor, 5, lambda: request)
d.addCallback(self._delayedRender)
return NOT_DONE_YET
factory = Site(BusyPage())
reactor.listenTCP(8124, factory)
reactor.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _runCallbacks(self):
if self._runningCallbacks:
# 防止递归调用
return
while current.callbacks:
item = current.callbacks.pop(0)
callback, args, kw = item[
isinstance(current.result, failure.Failure)]
# 从栈中弹出当前处理的回调,依据结果分配callback 或 errback
try:
current._runningCallbacks = True
try:
current.result = callback(current.result, *args, **kw)
# 回调函数执行
finally:
current._runningCallbacks = False
except:
current.result = failure.Failure(captureVars=self.debug)
# 异常视为失败

WEB
简单web应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from twisted.web.server import Site, GzipEncoderFactory
from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.internet import reactor

class Simple(Resource):
isLeaf = True
def render_GET(self, request):
return "<html>Hello, world!</html>".encode()

resource = Simple()
wrapped = EncodingResourceWrapper(resource, [GzipEncoderFactory()])
site = Site(wrapped)
reactor.listenTCP(8080, site)
reactor.run()

动态web应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from twisted.internet import reactor
from twisted.web.resource import Resource
from twisted.web.server import Site
import time

class ClockPage(Resource):
isLeaf = True
def render_GET(self, request):
rtn = "The local time is %s" % (time.ctime(),)
return rtn.encode()

resource = ClockPage()
factory = Site(resource)
reactor.listenTCP(8000, factory)
reactor.run()

post请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from twisted.internet import reactor
from twisted.web.resource import Resource
from twisted.web.server import Site
import html
class FormPage(Resource):
isLeaf = True
def render_GET(self, request):
return """
<html>
<body>
<form method="POST">
<input name="form-field" type="text" />
<input type="submit" />
</form>
</body>
</html>
""".encode()
def render_POST(self, request):
rtn = """
<html>
<body>You submitted: %s</body>
</html>
""" % (html.escape(request.args.get(b'form-field')[0].decode()),)
return rtn.encode()

factory = Site(FormPage())
reactor.listenTCP(8000, factory)
reactor.run()

Twisted Network Programming Essentials
https://www.cnblogs.com/mumuxinfei/p/4528910.html
twistedmatrix.com/documents/current/core/howto/defer.html
https://twistedmatrix.com/documents/current/core/howto/index.html