Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Removes orphaned topics from transient queries #6714

Merged

Conversation

AlanConfluent
Copy link
Member

@AlanConfluent AlanConfluent commented Dec 2, 2020

Description

Fixes #4009

When a server is uncleanly shutdown or killed, it may leave orphaned topics from transient queries. This PR attempts to clean up these topic when the server starts up.

This is accomplished by writing transient query application ids to disk of the local node. This ensures that any topics are only attempted to be deleted by one node, the one that created it. The data isn't durable in that a node can shutdown and not restart, potentially leaving state uncleaned up.

Testing done

Describe the testing strategy. Unit and integration tests are expected for any behavior changes.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@AlanConfluent AlanConfluent requested a review from a team as a code owner December 2, 2020 23:17
@agavra
Copy link
Contributor

agavra commented Dec 3, 2020

There are some drawbacks with this approach:

  • users have to carefully set the node.id for each of their nodes. this isn't always trivial in cloud environments (perhaps we could use k8s pod name - e.g. ksql-0?) when auto-scaling is enabled
  • this approach makes nodes no longer interchangeable. if a customer scales down their cluster, a node.id would permanently disappear (leaving us open to having orphaned topics for that orphaned node.id)
  • this opens the door to other features leveraging node.id, which is a double-edged sword (I've been arguing that we should have leader election and only have the leader write to the command topic, for example) because asymmetrical systems tend to be more difficult to operate, deploy and debug

I don't have any other alternatives off the top of my head, I'll give it some thought, but I think this merits further discussion. If we do decide to go down this path, might make sense to document this in a quick KLIP (I promise I'd review it promptly!). (EDIT: or at least get multiple eyes on this)

@agavra agavra requested a review from rodesai December 3, 2020 01:01
@AlanConfluent
Copy link
Member Author

AlanConfluent commented Dec 3, 2020

  • users have to carefully set the node.id for each of their nodes. this isn't always trivial in cloud environments (perhaps we could use k8s pod name - e.g. ksql-0?) when auto-scaling is enabled

That's pretty much what I had in mind. You effectively are delegating sticky id generation (and enforcing id uniqueness) to whatever configures ksql -- in a cloud environment, it could be Kubernetes. In a more static environment, it would just be a one-time generated id.

  • this approach makes nodes no longer interchangeable. if a customer scales down their cluster, a node.id would permanently disappear (leaving us open to having orphaned topics for that orphaned node.id)

That's true. Given that this was being used for GCing, imperfect cleanup was considered acceptable, but I agree this isn't ideal. I was trying to keep this somewhat simple since it's about cleanup rather than an essential part of functionality.

  • this opens the door to other features leveraging node.id, which is a double-edged sword (I've been arguing that we should have leader election and only have the leader write to the command topic, for example) because asymmetrical systems tend to be more difficult to operate, deploy and debug

That's fair. People might not fully understand the limitations of how I had intended for this to be used and could misuse it.

I don't have any other alternatives off the top of my head, I'll give it some thought, but I think this merits further discussion. If we do decide to go down this path, might make sense to document this in a quick KLIP (I promise I'd review it promptly!). (EDIT: or at least get multiple eyes on this)

I agree, I think that's fair. Most of the alternatives I thought of for issuing/registering ids are a fair bit more complex. If you want to recreate this kind of sticky id:

  • You could use a consensus mechanism to try to take claim to an id on startup, knowing that an id is only issued to a single node.
  • Similarly, you could elect a coordinator and have it issue ids
  • We could do something similar to the way the command topic currently works (if my understanding is correct) and start a transaction and then do a read followed by a write (ensuring exclusive access) to the topic, allowing a test and set mechanism to be used. Then you can effectively find the next id and claim it by writing to the topic.

Maybe predictable/sticky ids aren't required and generating long random ids is fine (including for restarts) and no consensus is required for that phase. Even if so, you probably need a coordinator for the cleanup phase to ensure that's done once.

Of course, in order to clean things up, you have to decide that the owner of some id is dead or been restarted and a similar protocol must then exist for a coordinator marking that id as dead, waiting a bit, and then GCing the topic. We already do heartbeating for pull queries and could utilize that.

I actually thought of using the advertised listener instead of ksql.node.id since it's effectively the same idea (a unique address for each node) and already something we support. The only issue is that not every configuration people seem to set up allows for inter-node communication. If we assumed that were the case, then at least we could avoid introducing a new config, though we're still left with some of the same drawbacks you discuss.

@agavra
Copy link
Contributor

agavra commented Dec 3, 2020

I agree, I think that's fair. Most of the alternatives I thought of for issuing/registering ids are a fair bit more complex.

Ideally, we'd have an alternative that doesn't involve ID generation at all. I think if we are going down that path, what you have here is the best for what we need. We can always make the ID generation more flexible (i.e. the suggestions you have) but we'll never be able to make it less - so it makes sense to start with static ID generation.

What do you think of a stupid-simple approach of just committing a list of transient queries to disk? That way we can track them and on start-up we can just delete all the ones that we were handling. It still has the potential to leak if a node goes down and never comes up, but that was a problem in either approach.

@rodesai
Copy link
Contributor

rodesai commented Dec 3, 2020

I agree that going the route of adding a per-node ID should have a KLIP. Adding this would be useful for more than just this type of garbage collection, but at the same time requires some care from an operator and has potential compatibility concerns. We should define:
- how can the id be used. I think it's important to think ahead about what having an ID could buy us, because it will help us better think through any constraints it needs to have, and assess whether it's worth the compatibility issues raised below. Currently the proposal is to use it for resource tracking and garbage collection. But we could also use the id for:
- static group membership in streams
- does it help our HA story to have predictable ids?
- it could also be useful to store some metadata about each node, for example what it's last executed command is, so we can perform first-time-only initialization tasks for a query.
- what are the requirements when setting these IDs
- compatibility - what happens if the id is not specified? It would be simplest to just require it, but I think that's actually a pretty big requirement to add. But if we support not specifying the ID we need 2 behaviours every time the id is used.

That said, if we do have to set an ID it's trivial to set a unique predictable ID in CCloud, so that's not a concern, at least for us.

I do like the idea of keeping a registry of running queries on the local disk as an alternative to the above. It's expected that ksqlDB should have access to stable storage to function optimally, so we're not adding a new requirement here.

@AlanConfluent
Copy link
Member Author

I agree that going the route of adding a per-node ID should have a KLIP. Adding this would be useful for more than just this type of garbage collection, but at the same time requires some care from an operator and has potential compatibility concerns.

I agree that defining such a thing would require answering a lot of the questions you stated above. I think that it's probably out of the scope of this GC case to try and tackle, since GC is much simpler potentially.

I do like the idea of keeping a registry of running queries on the local disk as an alternative to the above. It's expected that ksqlDB should have access to stable storage to function optimally, so we're not adding a new requirement here.

I considered this and I think it's a reasonable approach. There's one drawback to only using local disk and that's if the same logical node gets restarted on a new machine, then cleanup isn't possible since there will be nothing saved to local disk. Not sure how often that happens in various cloud environments, but it's certainly possible. That said, if we're aiming to take care of the common case and this covers it, then that's fine by me.

@agavra
Copy link
Contributor

agavra commented Dec 3, 2020

That said, if we're aiming to take care of the common case and this covers it, then that's fine by me.

+1 I think that's what we should be aiming for - like you said earlier, it's mostly to GC things so if we miss it a few times in exceptional cases that's alright, the user can always cleanup these topics on their own (and there were exceptional cases in both proposals). It also makes life easier by avoiding thinking about the hard(er) problems 😅

@AlanConfluent
Copy link
Member Author

I've attempted to address your comments @rodesai @agavra and now write the query ids to disk. Pleas take another look.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, some inline comments

}
// Find any transient query topics
final Set<String> orphanedQueryApplicationIds = topicNames.stream()
.map(topicName -> queryApplicationIds.stream().filter(topicName::startsWith).findFirst())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be using startsWith? I thought that in cloud we prefix all the topics with things other than just the transient application id. contains might be a safer bet either way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far as I can tell, nothing is very different in the cloud beyond setting some configs differently for making the query application id. After all, so far as I can tell it's streams that is creating these topics and they just append to the query application id.

I verified this manually with the transient topic names, so I'm pretty confident startsWith should work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

final KafkaTopicClient topicClient = serviceContext.getTopicClient();
final Set<String> topicNames;
try {
topicNames = topicClient.listTopicNames();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I recall, this is actually a somewhat expensive metadata operation (though we may want to confirm with Kafka experts) if we'll be calling it regularly. Should we flip the semantics (i.e. check whether a topic corresponding to an orphaned query exists) instead of the other way around?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it should process just one set of commands at a time during startup (it shouldn't be possible for there to be more than one unprocessed file). When it does that, it should call this just once. So it does this list call on node startup and uses that list of topics to compare with all of the application query ids in the commands file. It would seem to be more expensive to say for every queryApplicationId, check the existence of a given topic. Not only is that more rpcs, it also might check plenty of completed transient queries the cleaned up with no issue. I considered adding a "done cleaning" entry to the commands so I could find the set of only orphaned queries, but that seems potentially error prone as well and requires writing another entry. This seemed a little simpler.

Tell me if you'd prefer the "done cleaning" approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree the existing PR approach is better. At first, I didn't realize it happens just once on startup but since that's the case this is easier and less error-prone.

return;
}
// Find any transient query topics
final Set<String> orphanedQueryApplicationIds = topicNames.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but shouldn't this filter out any that are currently running? otherwise won't we delete topics for queries that are running?

EDIT: I noticed that what's passed in is only ever the orphaned queryIDs. Can we javadoc this method to explain that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we're on the same page now. This is only ever run on startup and so by definition, the query application ids, if found as topic prefixes, would be orphaned. Of course, that list is all transient queries run, not technically just the orphaned ones.

I'll improve the documentation.


@Test
public void shouldCleanup_allApplicationIds() {
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we follow given/when/then convention? (it's basically already there but without the comments)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* A command which is executed locally and possibly requiring cleanup next time the server is
* restarted.
*/
public class LocalCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think it makes sense to make this an interface with JsonSubTypes so that we can handle a union of lots of different types of LocalCommands (see ExecutionStep for example) or is that overkill?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's probably a little nicer. I'll introduce that. I was debating how general to make this since I don't know how many things will take advantage of it.

return new LocalCommands(directory, ksqlEngine, LocalCommandsFile.createWriteable(file));
}

private void markFileAsProcessed(final File file) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just delete the file? Do we every use these?

throw new IOException("Write permission denied.");
}

final byte[] bytes = MAPPER.writeValueAsBytes(localCommand);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just making sure, this writes it in UTF8 so it's human readable json, right? I think it would be nice to make these files human readble

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(Optional::get)
.collect(Collectors.toSet());
for (final String queryApplicationId : orphanedQueryApplicationIds) {
cleanupService.addCleanupTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the file is only relevant on a new startup, does it need to be added to the cleanup service or can we just have it do this cleanup on startup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted about this offline. This takes advantage of the other cleanup routines done by QueryCleanupService, so needs access to it. It does this cleanup once on startup

Co-authored-by: Almog Gavra <almog@confluent.io>
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hitting the big green button

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Restarting KSQL with active transient queries can leave orphaned internal topics
3 participants