Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams & Tables drop should terminate their persistent queries #6

Open
louich opened this issue Jun 12, 2019 · 1 comment
Open

Streams & Tables drop should terminate their persistent queries #6

louich opened this issue Jun 12, 2019 · 1 comment

Comments

@louich
Copy link

louich commented Jun 12, 2019

Goals

  • Fix error during DROP caused by the persistent queries dependencies needing prior termination for the STREAMS & TABLES instantiated with CREATE AS SELECT statement.
  • Return useful information about the errors still occuring.

Problem observed

  • All resource instantiated from CREATE AS SELECT fails on DROP because their persistent queries needs to be terminated before the drop.
  • So far there is no methods defined to TERMINATE the QUERIES.

Expected behavior

Running a DROP on a STREAMS or a TABLES SHOULD terminate the underlying persistent query before the drop if present BUT SHOULD NOT manage cascading drops as this is a resource dependency issue related to Confulent KSQL.

Prerequisites

  • Implemented DESCRIBE query for STREAMS & TABLES information listing.
  • Implemented TERMINATE query for QUERIES termination.

Solution

  1. Run DESCRIBE on the resource

    In order to check if there is persistent queries related to a STREAM or a TABLE, the KSQL client should check the resource status with the DESCRIBE query before running a DROP operation.

    As stated in the doc

    Query:

    {
      "ksql": "DESCRIBE [RESOURCE_NAME];",
      "streamsProperties": {}
    }
    

    Result:

    [
        {
            "@type": "sourceDescription",
            "statementText": "DESCRIBE [RESOURCE_NAME];",
            "sourceDescription": {
                "name": "[RESOURCE_NAME]",
                "readQueries": [
                  /* => resource dependencies */
                  /* Other queries listening for data */
                ],
                "writeQueries": [
                    {
                        "sinks": [
                            "[RESOURCE_NAME]"
                        ],
                        "id": "[QUERY_ID]",
                        "queryString": "CREATE ..."
                    }
                ],
                "fields": [
                    {
                        "name": "ROWTIME",
                        "schema": {
                            "type": "BIGINT",
                            "fields": null,
                            "memberSchema": null
                        }
                    },
                    {
                        "name": "ROWKEY",
                        "schema": {
                            "type": "STRING",
                            "fields": null,
                            "memberSchema": null
                        }
                    },
                    {
                        "name": "[CUSTOM_NAME]",
                        "schema": {
                            "type": "[CUSTOM_TYPES]",
                            "fields": "[CUSTOM_FIELDS]",
                            "memberSchema": "[CUSTOM_MEMBER_SCHEMA]"
                        }
                    },
                    ...
                ],
                "type": "[STREAM|TABLE]",
                "key": "[value from the `WITH KEY` option]",
                "timestamp": "",
                "statistics": "",
                "errorStats": "",
                "extended": false,
                "format": "[AVRO|JSON|DELIMITED]",
                "topic": "[THE_TOPIC_RELATED_TO_THE_OUTPUT_DATA]",
                "partitions": 0,
                "replication": 0
            }
        }
    ]

    From a code perspective, this should look like this:

    sourceDescription, err := c.Describe(RESOURCE_NAME)
    if err != nil {
      return err
    }
  2. Check if there's some read queries

    Read queries should be perceived as downstream dependencies and should therefore imply a CASCADE DROP. As this behavior is not intended and is an issue related to the KSQL development, the DROP should fail under this condition, specifying that a subset of resource depends on the current resource.

    Concretely, from the received payload from the operation above, the code should look like this:

    if len(sourceDescription.ReadQueries) > 0 {
      dependency := sourceDescription.readQueries[0].sinks[0]
      return fmt.Errorf("could not drop '%s', '%s' needs to be dropped before.", RESOURCE_NAME, dependency)
    }
  3. Try to terminate the write queries related to the current resource.

    To determine if a persistent query is running for a certain resource, we would expect the query sinks to be an array containing exclusively the resource name. If this is not the case, the function should return an error notifying that some underlying queries needs to be looked at before the resource can be dropped.

    On the other hand, if the query is only related to the resource to be dropped, then the query should be terminated.

    Because the writeQueries object is an array, the routine must be applied to all of its entries.

    for _, q := range sourceDescription.WriteQueries {
      expectedSinks := []string{RESOURCE_NAME}
      if q.Sinks != expectedSinks {
        return fmt.Errorf("could not drop '%s', the query '%s' should sinks '%v' but '%v' was found instead.", RESOURCE_NAME, q.ID, expectedSinks, q.Sinks)
      }
      err := c.Terminate(q.ID)
      if err != nil {
        return err
      }
    }
  4. Drop the resource

    After the precedent steps, it should now be safe to drop the resource. Error occuring at this point should only refer to standard errors caused by network connection or invalid operations flow.

    Looking into the cm-update-confluent the final function return in the code should remain:

      return c.qTOerr(req)
@louich
Copy link
Author

louich commented Jun 12, 2019

@Mongey this is what I meant when I had troubles with terraform-provider-ksql and I hope that merging a patch here would resolve the plugin issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant