-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathconnection.py
More file actions
202 lines (169 loc) · 7.02 KB
/
connection.py
File metadata and controls
202 lines (169 loc) · 7.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""
wsproto/connection
~~~~~~~~~~~~~~~~~~
An implementation of a WebSocket connection.
"""
from __future__ import annotations
from collections import deque
from enum import Enum
from typing import TYPE_CHECKING
from .events import (
BytesMessage,
CloseConnection,
Event,
Message,
Ping,
Pong,
TextMessage,
)
from .frame_protocol import CloseReason, FrameProtocol, Opcode, ParseFailed
from .utilities import LocalProtocolError
if TYPE_CHECKING:
from collections.abc import Generator
from .extensions import Extension
class ConnectionState(Enum):
"""
RFC 6455, Section 4 - Opening Handshake
"""
#: The opening handshake is in progress.
CONNECTING = 0
#: The opening handshake is complete.
OPEN = 1
#: The remote WebSocket has initiated a connection close.
REMOTE_CLOSING = 2
#: The local WebSocket (i.e. this instance) has initiated a connection close.
LOCAL_CLOSING = 3
#: The closing handshake has completed.
CLOSED = 4
#: The connection was rejected during the opening handshake.
REJECTING = 5
class ConnectionType(Enum):
"""An enumeration of connection types."""
#: This connection will act as client and talk to a remote server
CLIENT = 1
#: This connection will as as server and waits for client connections
SERVER = 2
CLIENT = ConnectionType.CLIENT
SERVER = ConnectionType.SERVER
class Connection:
"""
A low-level WebSocket connection object.
This wraps two other protocol objects, an HTTP/1.1 protocol object used
to do the initial HTTP upgrade handshake and a WebSocket frame protocol
object used to exchange messages and other control frames.
"""
def __init__(
self,
connection_type: ConnectionType,
extensions: list[Extension] | None = None,
trailing_data: bytes = b"",
) -> None:
"""
Constructor
:param wsproto.connection.ConnectionType connection_type: Whether this
object is on the client- or server-side of a connection.
To initialise as a client pass ``CLIENT`` otherwise pass ``SERVER``.
:param list extensions: The proposed extensions.
:param bytes trailing_data: Data that has been received, but not yet
processed.
"""
self.client = connection_type is ConnectionType.CLIENT
self._events: deque[Event] = deque()
self._proto = FrameProtocol(self.client, extensions or [])
self._state = ConnectionState.OPEN
self.receive_data(trailing_data)
@property
def state(self) -> ConnectionState:
return self._state
def send(self, event: Event) -> bytes:
data = b""
if isinstance(event, Message) and self.state == ConnectionState.OPEN:
data += self._proto.send_data(event.data, event.message_finished)
elif isinstance(event, Ping) and self.state == ConnectionState.OPEN:
data += self._proto.ping(event.payload)
elif isinstance(event, Pong) and self.state == ConnectionState.OPEN:
data += self._proto.pong(event.payload)
elif isinstance(event, CloseConnection) and self.state in {
ConnectionState.OPEN,
ConnectionState.REMOTE_CLOSING,
}:
data += self._proto.close(event.code, event.reason)
if self.state == ConnectionState.REMOTE_CLOSING:
self._state = ConnectionState.CLOSED
else:
self._state = ConnectionState.LOCAL_CLOSING
else:
msg = f"Event {event} cannot be sent in state {self.state}."
raise LocalProtocolError(
msg,
)
return data
def receive_data(self, data: bytes | None) -> None:
"""
Pass some received data to the connection for handling.
A list of events that the remote peer triggered by sending this data can
be retrieved with :meth:`~wsproto.connection.Connection.events`.
:param data: The data received from the remote peer on the network.
:type data: ``bytes``
"""
if data is None:
# "If _The WebSocket Connection is Closed_ and no Close control
# frame was received by the endpoint (such as could occur if the
# underlying transport connection is lost), _The WebSocket
# Connection Close Code_ is considered to be 1006."
self._events.append(CloseConnection(code=CloseReason.ABNORMAL_CLOSURE))
self._state = ConnectionState.CLOSED
return
if self.state in (ConnectionState.OPEN, ConnectionState.LOCAL_CLOSING):
self._proto.receive_bytes(data)
elif self.state is ConnectionState.CLOSED:
msg = "Connection already closed."
raise LocalProtocolError(msg)
else:
pass # pragma: no cover
def events(self) -> Generator[Event, None, None]:
"""
Return a generator that provides any events that have been generated
by protocol activity.
:returns: generator of :class:`Event <wsproto.events.Event>` subclasses
"""
while self._events:
yield self._events.popleft()
try:
for frame in self._proto.received_frames():
if frame.opcode is Opcode.PING:
assert frame.frame_finished
assert frame.message_finished
assert isinstance(frame.payload, (bytes, bytearray))
yield Ping(payload=frame.payload)
elif frame.opcode is Opcode.PONG:
assert frame.frame_finished
assert frame.message_finished
assert isinstance(frame.payload, (bytes, bytearray))
yield Pong(payload=frame.payload)
elif frame.opcode is Opcode.CLOSE:
assert isinstance(frame.payload, tuple)
code, reason = frame.payload
if self.state is ConnectionState.LOCAL_CLOSING:
self._state = ConnectionState.CLOSED
else:
self._state = ConnectionState.REMOTE_CLOSING
yield CloseConnection(code=code, reason=reason)
elif frame.opcode is Opcode.TEXT:
assert isinstance(frame.payload, str)
yield TextMessage(
data=frame.payload,
frame_finished=frame.frame_finished,
message_finished=frame.message_finished,
)
elif frame.opcode is Opcode.BINARY:
assert isinstance(frame.payload, (bytes, bytearray))
yield BytesMessage(
data=frame.payload,
frame_finished=frame.frame_finished,
message_finished=frame.message_finished,
)
else:
pass # pragma: no cover
except ParseFailed as exc:
yield CloseConnection(code=exc.code, reason=str(exc))