Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds support for streaming multi tables #40

Merged
merged 4 commits into from
Mar 15, 2024
Merged

Conversation

brunocalza
Copy link
Contributor

@brunocalza brunocalza commented Mar 6, 2024

Summary

This PR adds support for streaming changes of multiple Postgres tables at the same time

Context

When you created a vault and started streaming database changes to that vault, only the changes from one table could be streamed. The reason for that was that the table was picked from the vault's name. For example, if the vault was called demo.test, the CLI assumed that the test would be the table. That meant that if you wanted to stream multiple tables, you would have to start multiple CLI processes, which is not ideal

This PR changes this behavior, to allow multiple tables to be picked and its changes streamed to a particular vault, all in a simple vaults stream call.

Changes

  • There's no relation between the vault's name and the table anymore. When you create a vault it simply creates the vault in the provider and smart contract. That means vaults create command doesn't accept dburi flag anymore, because there's no need to connect to the database at this point.
  • In order to provide the tables you want to capture changes you have to provide the dburi and tables flag to the vaults stream command. The tables value is a list of tables separated by comma (e.g. t1,t2). You can also provide the window-size flag.
  • Each table can have its own schema and one parquet file will be generated for each table and uploaded to Basin Provider

Usage

Creating a vault

vaults create --account [ETH_ADDRESS] demo.test

Streaming changes

Suppose you want to stream tables t1 and t2 to vault demo.test, this is the command:

vaults stream --dburi [DB_URI] --tables t1,t2 --private-key [PRIVATE_KEY] demo.test

Note: There's one caveat with the current implementation. Once a stream command is called for a particular vault with a particular dburi and tables there's no way of changing that. For example, suppose that now you want to add another table, that wouldn't work. I wonder what would be the ideal flow here.

Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
@brunocalza brunocalza self-assigned this Mar 7, 2024
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
@brunocalza brunocalza marked this pull request as ready for review March 13, 2024 16:57
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
return exists, nil
}
// DatabaseStreamSetup setups the database for streaming.
type DatabaseStreamSetup struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created this object to make it easier to setup the database for replication. it can:

  • create the publication in Postgres
  • fetch the schemas of the tables in Postgres so that we can create a similar table in Duckdb

@@ -32,8 +32,8 @@ import (
var vaultNameRx = regexp.MustCompile(`^([a-zA-Z_][a-zA-Z0-9_]*)[.]([a-zA-Z_][a-zA-Z0-9_]*$)`)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lots of changes in this file, but in essence:

  • moved the flags from create to stream
  • we're now passing multiple tables and schemas to the database manager

// NewDBManager creates a new DBManager.
func NewDBManager(
dbDir, table string, cols []Column, windowInterval time.Duration, uploader *VaultsUploader,
dbDir string, schemas []TableSchema, windowInterval time.Duration, uploader *VaultsUploader,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing multiple tables and schemas now

return true, nil
}
exportedFiles := []string{}
for _, schema := range dbm.schemas {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for each schema we check if the table has rows, if not we will not export it

@@ -214,13 +225,13 @@ func (dbm *DBManager) UploadAll(ctx context.Context) error {
if re.MatchString(fname) {
dbPath := path.Join(dbm.dbDir, fname)
exportAt := dbPath + ".parquet"
isEmpty, err := dbm.Export(ctx, exportAt)
files, err := dbm.Export(ctx, exportAt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only exported files will be returned


recordVals := []string{}
// build an insert stmt for each record inside tx
stmts := []string{}
for _, r := range tx.Records {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a tx can have records of different tables now, we gotta consider this

if column.IsPrimary {
pks = column.Name
stmts := []string{}
for _, schema := range dbm.schemas {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we gotta create multiple tables now, one for each schema

Copy link
Contributor

@dtbuchholz dtbuchholz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

) *DBManager {
return &DBManager{
dbDir: dbDir,
table: table,
cols: cols,
schemas: schemas,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -156,7 +133,8 @@ func newVaultCreateCommand() *cli.Command {
}

func newStreamCommand() *cli.Command {
var privateKey string
var privateKey, dburi, tables string
var winSize int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should winSize be configurable per table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think there's a good reason for it? we could, but we would need an export gouroutine per table which sounds more complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, def sounds complicated for now. maybe @dtbuchholz can tell if people have asked for it or likely to ask for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed—i think a single window size makes sense as a first feature. when i stream multiple tables, I'm essentially just trying to replicate the full db, so one window size is sufficient.

multiple windows could be useful, i suppose, but that's something we can wait on to hear directly from a dev. personally, i don't think i'd need it unless i was trying to really optimize for something.

Copy link
Contributor

@avichalp avichalp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

@brunocalza brunocalza merged commit ad95dcb into main Mar 15, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants