twisted 入门
twisted框架 twisted是一个python编写的事件驱动网络框架安装
1 2 3 4 pip install twisted >>> import twisted >>> twisted.__version__ '20.3.0'
基础 下列例子与twisted官方文档介绍有所出入,仅用于自己学习参考 1.一个简单的服务器应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from twisted.internet.protocol import Factory, Protocol from twisted.internet import reactor from time import ctime class TServer(Protocol): def connectionMade(self): self.transport.write("hello stranger!\t\n".encode()) def dataReceived(self, data): if str(data.decode()) == 'q': # 输入q关闭连接 self.transport.write("\nconnection closed...\t\n".encode()) self.transport.loseConnection() else: self.transport.write(" you have sent {0} at {1}\r\n".format(data.decode(), ctime()).encode()) factory = Factory() factory.protocol = TServer reactor.listenTCP(22222, factory) reactor.run()
分析:
使用telnet来连接创建的tcp服务,每输入一个字符就按特定的格式输出,输入q则关闭连接
但使用telnet连接我们创建的服务器应用时发现了一个问题,每敲击一个字符就把数据发送到了服务端,没有办法实现字符串的发送,因此需要编写相应的客户端应用来解决这个问题。 2.一个简单的客户端应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet import reactor class TClient(Protocol): def sendData(self): data = input(">") if data: self.transport.write(data.encode()) def dataReceived(self, data): print(data.decode()) self.sendData() class TClientFactory(ClientFactory): protocol = TClient clientConnectionLost = clientConnectionFailed = lambda self, connector, reason: reactor.stop() # class TClientFactory(ClientFactory): # protocol = TClient # def clientConnectionFailed(self, connector, reason): # reactor.stop() # def clientConnectionFailed(self, connector, reason): # reactor.stop() reactor.connectTCP('localhost', 22222, TClientFactory()) reactor.run()
具体的分析过程与构建服务端时类似,就不赘述了,效果如下:
*3.封装到协议工厂类的例子(《twisted网络编程要领》)
1 2 3 4 5 6 7 8 9 10 11 12 13 # Server的例子,Client类似 from twisted.internet import protocol, reactor class Echo(protocol.Protocol): def dataReceived(self, data): self.transport.write(data) class EchoFactory(protocol.Factory): def buildProtocol(self, addr): return Echo() reactor.listenTCP(8000, EchoFactory()) reactor.run()
事件驱动 这里不多说了,就类似于python多线程时cpu资源分配(GIL控制),在A作业I/O时把资源让给其他作业,放张《twisted网络编程要领》上的图
核心部件 Reactor 反应器 反应器是twisted实现事件驱动的核心,作用是提供网络api,线程,分配事件等。 温馨提示,配合源码食用更佳
Protocol 协议 协议类(BaseProtocol)中定义了一些交互式需要用到的方法 makeConnection 创建连接时调用 connectionMade 连接建立后调用
Protocol: dataReceived 接收数据时调用 connectionLost 关闭连接时调用 在基础部分的例子里我们继承了Protocol类,重写了connectionMade和dataReceived方法来达到自定义协议的目的。
Protocol Factories 协议工厂 封装定义的协议类,为每个连接实例化,当连接终止时删除。协议定义了客户端服务端交互的方式,而协议工厂类中可以存放一些持续性的配置信息(如当前建立的有效连接数,在自定义的Factory类中定义一个numconnections变量,并在自定义的协议类中的connectionMode方法中将其+1)。 因为每建立一次连接都会重新创建协议的新的实例,所以持续性信息需要定义在协议工厂类而不是协议类中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver from twisted.internet import reactor class ChatProtocol(LineReceiver): def __init__(self, factory): self.factory = factory self.name = None self.state = "REGISTER" def connectionMade(self): self.sendLine("What's your name?".encode()) def connectionLost(self, reason): if self.name in self.factory.users: del self.factory.users[self.name] self.broadcastMessage("%s has left the channel." % (self.name,).encode()) def lineReceived(self, line): if self.state == "REGISTER": self.handle_REGISTER(line) else: self.handle_CHAT(line) def handle_REGISTER(self, name): if name in self.factory.users: self.sendLine("Name taken, please choose another.".encode()) return self.sendLine("Welcome, {}!".format(name).encode()) self.broadcastMessage("%s has joined the channel." % (name,)) self.name = name self.factory.users[name] = self self.state = "CHAT" def handle_CHAT(self, message): message = "<%s> %s" % (self.name, message) self.broadcastMessage(message) def broadcastMessage(self, message): for name, protocol in self.factory.users.items(): if protocol != self: protocol.sendLine(message.encode()) class ChatFactory(Factory): def __init__(self): self.users = {} def buildProtocol(self, addr): return ChatProtocol(self) reactor.listenTCP(8123, ChatFactory()) reactor.run()
协议和协议工厂的源码比较好读,就不画调用流程图了
延迟与回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from twisted.internet import reactor, defer class HeadlineRetriever(object): def processHeadline(self, headline): if len(headline) > 50: self.d.errback( "The headline ``%s'' is too long!" % (headline,)) else: self.d.callback(headline) def _toHTML(self, result): return "<h1>%s</h1>" % (result,) def getHeadline(self, input): self.d = defer.Deferred() reactor.callLater(1, self.processHeadline, input) self.d.addCallback(self._toHTML) return self.d def printData(result): print(result) reactor.stop() def printError(failure): print(failure) reactor.stop() h = HeadlineRetriever() d = h.getHeadline("Breaking News: Twisted Takes Us to the Moon!") d.addCallbacks(printData, printError) reactor.run()
Deferred为实现事件驱动异步编程提供了基础,当在执行逻辑代码时需要多线程来避免阻塞主线程的情况发生。 在不使用异步的情况下,如果要多次访问执行耗时操作的服务端网页时,所需的时间是串行的;在使用异步后,由于I/O操作并行,计算操作串行,假定这里的耗时操作是I/O耗时的情况下,所需时间并行。 使用Deferred的非阻塞例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from twisted.internet import reactor from twisted.internet.task import deferLater from twisted.web.resource import Resource from twisted.web.server import Site, NOT_DONE_YET import time class BusyPage(Resource): def _delayedRender(self, request): request.write("Finally done, at %s" % (time.asctime(),)) request.finish() def render_GET(self, request): d = deferLater(reactor, 5, lambda: request) d.addCallback(self._delayedRender) return NOT_DONE_YET factory = Site(BusyPage()) reactor.listenTCP(8124, factory) reactor.run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def _runCallbacks(self): if self._runningCallbacks: # 防止递归调用 return while current.callbacks: item = current.callbacks.pop(0) callback, args, kw = item[ isinstance(current.result, failure.Failure)] # 从栈中弹出当前处理的回调,依据结果分配callback 或 errback try: current._runningCallbacks = True try: current.result = callback(current.result, *args, **kw) # 回调函数执行 finally: current._runningCallbacks = False except: current.result = failure.Failure(captureVars=self.debug) # 异常视为失败
WEB 简单web应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from twisted.web.server import Site, GzipEncoderFactory from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.internet import reactor class Simple(Resource): isLeaf = True def render_GET(self, request): return "<html>Hello, world!</html>".encode() resource = Simple() wrapped = EncodingResourceWrapper(resource, [GzipEncoderFactory()]) site = Site(wrapped) reactor.listenTCP(8080, site) reactor.run()
动态web应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from twisted.internet import reactor from twisted.web.resource import Resource from twisted.web.server import Site import time class ClockPage(Resource): isLeaf = True def render_GET(self, request): rtn = "The local time is %s" % (time.ctime(),) return rtn.encode() resource = ClockPage() factory = Site(resource) reactor.listenTCP(8000, factory) reactor.run()
post请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from twisted.internet import reactor from twisted.web.resource import Resource from twisted.web.server import Site import html class FormPage(Resource): isLeaf = True def render_GET(self, request): return """ <html> <body> <form method="POST"> <input name="form-field" type="text" /> <input type="submit" /> </form> </body> </html> """.encode() def render_POST(self, request): rtn = """ <html> <body>You submitted: %s</body> </html> """ % (html.escape(request.args.get(b'form-field')[0].decode()),) return rtn.encode() factory = Site(FormPage()) reactor.listenTCP(8000, factory) reactor.run()
Twisted Network Programming Essentialshttps://www.cnblogs.com/mumuxinfei/p/4528910.html twistedmatrix.com/documents/current/core/howto/defer.htmlhttps://twistedmatrix.com/documents/current/core/howto/index.html