Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Reddit patched: Pipeline optimizations #406

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

duke-cliff
Copy link

I have added the following optimizations, so far we tested the cluster-mode and the result is very stable and much faster than before. Also, the redis load is dropped significantly as well.

Let me know if you are comfortable with the gevent change, it's good to have but might not be too important to other users also it might not work properly on some platforms.

  1. Allow pipeline to read from replicas
  2. Group commands in the pipeline randomly, but the same shard command should just execute in one random node
  3. Will retry moved commands using pipeline(stacked_commands)
  4. For moved response, it will move replicas with the mater to the new slot
  5. Only execute READONLY once per connection
  6. Use gevent to execute pipeline commands to different nodes in parallel
  7. Change READ_COMMANDS to set, making the lookup a little bit faster

@duke-cliff duke-cliff force-pushed the reddit_patched_public branch from 6358cf3 to 5acdb82 Compare October 9, 2020 21:04
Copy link
Owner

@Grokzen Grokzen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments from a initial pass. There is some stuff to discuss around gevent at least, some changes is required to alter back, and some we can talk about/around.

On a side note tho, after diffing into the pipeline code a bit again, i remember back to how complex that piece of code is compared to how i would want it to be way more simpler. I am hoping that in 3.0.0 i can rewrite the node and connection handling good enough to make the pipeline code way simpler compared to today. My dream would be to handle only the commands in the pipeline and just route them to correct node and let the normal client and connection classes handle all the errors and other scenarios that can occur. Will see how that goes down the line tho.

@@ -172,7 +172,7 @@ class RedisCluster(Redis):

# Not complete, but covers the major ones
# https://redis.io/commands
READ_COMMANDS = [
READ_COMMANDS = set([
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A set is bad here as manipulating it as a user should technically be possible to manipulate that list to add in things that might not be supported or in the main code in a release yet.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, a frozenset would be better here.

@@ -257,6 +258,8 @@ def initialize(self):
node, node_name = self.make_node_obj(master_node[0], master_node[1], 'master')
nodes_cache[node_name] = node

self.slave_nodes_by_master[node_name] = set()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set? i try to avoid them if possible

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to lookup the slaves by slave names, it turns out I don't need it eventually. So I think list is perfectly fine here. I will change it.

@@ -267,6 +270,7 @@ def initialize(self):
target_slave_node, slave_node_name = self.make_node_obj(slave_node[0], slave_node[1], 'slave')
nodes_cache[slave_node_name] = target_slave_node
tmp_slots[i].append(target_slave_node)
self.slave_nodes_by_master[node_name].add(slave_node_name)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am i guessing right that we need this as a convenience tracking data structure to determine what slaves is for each master as we do not directly has that right now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, it's for the moved error handling. We should copy the master and the corresponding replicas to the slot.

@@ -15,6 +16,10 @@
from redis.exceptions import ConnectionError, RedisError, TimeoutError
from redis._compat import imap, unicode

from gevent import monkey; monkey.patch_all()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a super fan of this addition tho. I think we had a gevent implementation and a threaded implementation way back but we replaced it with the current version that we have in the main execution code path as it was a major overhead to use any kind of threading or parallelization librar/framework to achive some kind of speed improvements which we never saw that we actually gained unless you ran pipelines against a ton of master nodes, not number of commands but just a big number of nodes.

Basically you will have to defend this new implementation why we should add this one over the existing one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be more efficient if we need to talk to multiple shards/nodes in the pipeline. And for now, it's only used for pipeline commands which should have no impact/overhead on the rest of the logic.

# if we have to run through it again, we only retry the commands that failed.
cmds = sorted(stack, key=lambda x: x.position)

max_redirects = 5
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of redirections should be configurable in as a class variable and not hard coded here. Also the main execution loop has 16 TTL rounds before failing out. This has 5 and that is possibly a discrepancy, yes if you get that many moved/ask errors then something is probably more broken then how this code handles the case :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's an initial PR which I just list out all the things. This would be read from the config for sure.

# build a list of node objects based on node names we need to
nodes, connection_by_node = self._get_commands_by_node(cmds)

# send the commands in sequence.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old docs that is not relevant with your new solution?

# so that we can read them all in parallel as they come back.
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.

# duke-cliff: I think it would still be faster if we use gevent to make the command in parallel
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a comment here in the PR, not inside the code with speculation that it might be faster.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove it.

node_commands = nodes.values()
events = []
for n in node_commands:
events.append(gevent.spawn(self._execute_node_commands, n))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping gevent around this is not really nessesary? You are still doing the same write() + read() command inside the function anyway. Please show by testing and instrumentation that this is indeed faster and by how much to actually justify adding in two new dependencies for this small use-case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using non-blocking, it only means the thread would not be blocked by the IO, but you still need CPU time to serialize/deserialize data to/from the socket.

So in this case, if you call redis.packed_commands(....) for multiple NodeCommands these would be queued in the same thread/process. And with gevent, this would become faster if it runs on multiple cores.

I said this part is nice to have, if you don't want it in the public version, we can talk about whether to give an option to the user or remove it.

nodes = {}
proxy_node_by_master = {}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont really get what this helps with from the docs lines you written about this variable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proxy_node will be the node to execute the NodeCommands.
My previous PR will just find a random node by get_node_by_slot(slot, True). But this would cause a lot of commands in the same shard would pick different replicas, thus they are not able to be packed efficiently.
So the new logic is:

  1. determine the shard of the slot, the shard is the master_node_name
  2. find a proxy node for the shard: which is proxy_node_by_master
  3. group all commands in the same shard to the proxy_node

cur_attempt = 0

while cur_attempt < max_redirects:

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor lint issue, no blank line after a while statement

@duke-cliff
Copy link
Author

@Grokzen I just updated this PR with these changes:

  1. make gevent optional(default: False)
  2. make max_redirects configurable(default: 5)
  3. make the pipeline smarter to decide whether to use replica node based on if the command is READ_ONLY

@Grokzen
Copy link
Owner

Grokzen commented Oct 17, 2020

@duke-cliff I will take these changes out for a test spin and come back to you. It looks good now :)

@FranGM
Copy link
Contributor

FranGM commented Apr 19, 2021

Hi, I'm curious about the status of this PR, I think we could definitely benefit from some of these changes so we'd love to see it (or some subset of it) merged.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants