-
Notifications
You must be signed in to change notification settings - Fork 20
Reduce per-row overhead in session recv and putValue dispatch #195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
TheDistributor
wants to merge
1
commit into
master
Choose a base branch
from
mgallwey/accelerate-network-io
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -229,6 +229,15 @@ def _open_socket(self, connect_timeout, host, port, af, read_timeout): | |
| self.__sock = socket.socket(af, socket.SOCK_STREAM) | ||
| # disable Nagle's algorithm | ||
| self.__sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | ||
| # Bigger receive buffer cuts recv_into syscalls during full-table | ||
| # scans (each batch is ~140 KB; default SO_RCVBUF is ~64 KB which | ||
| # forces 2-3 reads per batch). Kernel clamps to its sysctl maximum | ||
| # so the request is opportunistic. | ||
| try: | ||
| self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, | ||
| 1 << 20) # 1 MiB | ||
| except OSError: | ||
| pass | ||
| # separate connect and read timeout; we do not necessarily want to | ||
| # close out connection if reads block for a long time, because it could | ||
| # take a while for the server to generate data to send | ||
|
|
@@ -481,12 +490,13 @@ def send(self, message): | |
| raise | ||
|
|
||
| def recv(self, timeout=None): | ||
| # type: (Optional[float]) -> Optional[bytes] | ||
| # type: (Optional[float]) -> Optional[bytearray] | ||
| """Pull the next message from the socket. | ||
|
|
||
| If timeout is None, wait forever (until read_timeout, if set). | ||
| If timeout is a float, then set this timeout for this recv(). | ||
| On timeout, return None but do not close the connection. | ||
| Returns a bytearray to avoid an extra copy in the caller. | ||
| """ | ||
| try: | ||
| # We only wait on timeout to read the header. Once we read | ||
|
|
@@ -506,25 +516,38 @@ def recv(self, timeout=None): | |
| raise RuntimeError("Session.recv read no data!") | ||
|
|
||
| if self.__cipherIn: | ||
| msg = self.__cipherIn.transform(msg) | ||
| # cryptography.Cipher.update accepts any buffer-like object, so | ||
| # pass the bytearray directly instead of copying via bytes(msg). | ||
| # Result is wrapped back into bytearray so callers always receive | ||
| # a bytearray regardless of cipher state. | ||
| return bytearray(self.__cipherIn.transform(msg)) | ||
|
|
||
| return msg | ||
|
|
||
| def __readFully(self, msgLength, timeout=None): | ||
| # type: (int, Optional[float]) -> Optional[bytes] | ||
| """Pull the next complete message from the socket.""" | ||
| # type: (int, Optional[float]) -> Optional[bytearray] | ||
| """Pull the next complete message from the socket. | ||
|
|
||
| Pre-allocates a bytearray of the exact required size and fills it with | ||
| recv_into(), avoiding the repeated bytearray concatenations and the | ||
| final bytes() copy that the previous implementation performed. | ||
|
Comment on lines
+532
to
+533
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't justify this implementation comparing it to some previous implementaiton, in the docstring of the method. This information is appropriate for the git commit message, but not here. |
||
| """ | ||
| if msgLength == 0: | ||
| return bytearray() | ||
| sock = self._sock | ||
| msg = bytearray() | ||
| msg = bytearray(msgLength) | ||
| mv = memoryview(msg) | ||
| old_tmout = sock.gettimeout() | ||
| while msgLength > 0: | ||
| offset = 0 | ||
| while offset < msgLength: | ||
| if timeout is not None: | ||
| # It's a little wrong that this timeout applies to each recv() | ||
| # instead of to the entire operation; however we only use this | ||
| # when reading the header which will always be read in one | ||
| # pass anyway. | ||
| sock.settimeout(timeout) | ||
| try: | ||
| received = sock.recv(msgLength) | ||
| received = sock.recv_into(mv[offset:], msgLength - offset) | ||
| except socket.timeout: | ||
| return None | ||
| except IOError as e: | ||
|
|
@@ -539,11 +562,10 @@ def __readFully(self, msgLength, timeout=None): | |
| raise SessionException( | ||
| "Session closed waiting for data: wanted length=%d," | ||
| " received length=%d" | ||
| % (msgLength, len(msg))) | ||
| msg += received | ||
| msgLength -= len(received) | ||
| % (msgLength, offset)) | ||
| offset += received | ||
|
|
||
| return bytes(msg) | ||
| return msg | ||
|
|
||
| def stream_recv(self, blocksz=4096, timeout=None): | ||
| # type: (int, Optional[float]) -> Generator[bytes, None, None] | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh. How does this work? I mean I understand that we want to preserve compatibility but doesn't this mean we can't send bool types?
Does the server interpret the integer as a bool based on the type?