When our MySQL connector is reading the binlog of a MySQL server or cluster, it parses the DDL statements in the log and builds an in-memory model of each table’s schema as it evolves over time. This process is important because the connector generates events for each table using the definition of the table at the time of each event. We can’t use the database’s current schema, since it may have changed since the point in time (or position in the log) where the connector is reading.

Parsing DDL of MySQL or any other major relational database can seem to be a daunting task. Usually each DBMS has a highly-customized SQL grammar, and although the data manipulation language (DML) statements are often fairly close the standards, the data definition language (DDL) statements are usually less so and involve more DBMS-specific features.

So given this, why did we write our own DDL parser for MySQL? Let’s first look at what Debezium needs a DDL parser to do.

Parsing DDL in the Debezium MySQL connector

The MySQL binlog contains various kinds of events. For example, when a row is inserted into a table, the binlog event contains an indirect reference to the table and the values for each column in the table, but there is no information about the columns that make up the table. The only thing in the binlog referencing table structures are SQL DDL statements that were generated by MySQL when it processed user-supplied DDL statements.

The connector also produces messages using Kafka Connect Schemas, which are simple data structures that define the various names and types of each field, and the way the fields are organized. So, when we generate an event message for the table insert, we first have to have a Kafka Connect Schema object with all the appropriate fields, and then we have to convert the ordered array of column values into a Kafka Connect Struct object using the fields and the individual column values in the table insert event.

Luckily, when we come across a DDL statement we can update our in-memory model and then use this to generate a Schema object. At the same time, we can create a component that will use this Schema object to create a Struct object from the ordered array of column values that appear in the events. All of this can be done once and used for all row events on that table, until we come across another DDL statement that changes the table’s schema at which point we updated our model again.

So all of this requires parsing all of the DDL statements, though for our purposes we only have to understand a small subset of the DDL grammer. We then have to use that subset of statements to update our in-memory model of our tables. And since our in-memory table model is not specific to MySQL, the rest of the functionality to generate Schema objects and components that convert an array of values into Struct objects used in messages is all generic.

Existing DDL libraries

Unfortunately, there aren’t really that many 3rd party open source libraries for parsing DDL statements for MySQL, PostgreSQL, or other popular RDBMSes. JSqlParser is often cited, but it has a single grammar that is a combination of multiple DBMS grammars and therefore is not a strict parser for any specific DBMS. Adding support for other DBMSes by updating the composite grammar would likely be difficult.

Other libraries, such as PrestoDB, define their own SQL grammar and are unable to handle the intracacies and nuances of the MySQL DDL grammar. The Antlr parser generator project has a grammar for MySQL 5.6, but this is limited to a small subset of DML and has no support for DDL or newer 5.7 features. There are older SQL-related grammars for Antlr 3, but these are often massive, suffer from bugs, and limited to specific DBMSes. The Teiid project is a data virtualization engine that sits atop a wide variety of DBMSes and data sources, and it’s tooling has a series of DDL parsers that construct ASTs in a special repository (the author actually helped develop these). There are also Ruby libraries, like Square’s MySQL Parser library. There is also a proprietary commercial product.

Our DDL parser framework

Since we couldn’t find a useful 3rd party open source library, we chose to create our own DDL parser framework limited to our needs:

  • Parse DDL statements and update our in-memory model.

  • Focus on consuming those essential statements (e.g., create, alter, and drop tables and views), while completely ignoring other statements without having to parse them.

  • Structure the parser code similarly to the MySQL DDL grammar documentation and use method names that mirror the rules in the grammar. This will make it easier to maintain over time.

  • Allow creation of parsers for PostgreSQL, Oracle, SQLServer, and other DBMSes as needed.

  • Support customization through subclassing: be able to easily override narrow portions of the logic without having to copy lots of code.

  • Make it easy to develop, debug, and test parsers.

The resulting framework includes a tokenizer that converts one or more DDL statements in a string into a rewindable sequence of tokens, where each token represents punctuation, quoted strings, case-insentivie words and symbols, numbers, keywords, comments, and terminating characters (such as ; for MySQL). The DDL parser, then, walks the token stream looking for patterns using a simple and easy to read fluent API, calling methods on itself to process the various sets of tokens. The parser also uses an internal data type parser for processing SQL data type expressions, such as INT, VARCHAR(64), NUMERIC(32,3), TIMESTAMP(8) WITH TIME ZONE.

The MySqlDdlParser class extends a base class and provides all of the MySQL-specific parsing logic. For example, the DDL statements:

# Create and populate our products using a single insert with many rows
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;

# Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
  product_id INTEGER NOT NULL PRIMARY KEY,
  quantity INTEGER NOT NULL,
  FOREIGN KEY (product_id) REFERENCES products(id)
);

can be easily parsed with:

String ddlStatements = ...
DdlParser parser = new MySqlDdlParser();
Tables tables = new Tables();
parser.parse(ddl, tables);

Here, the Tables object is our in-memory representation of our named table definitions. The parser processes the DDL statements, applying each to the appropriate table definition inside the Tables object.

How it works

Each DdlParser implementation has the following public method that will parse the statements in the supplied String:

    public final void parse(String ddlContent, Tables databaseTables) {
        Tokenizer tokenizer = new DdlTokenizer(!skipComments(), this::determineTokenType);
        TokenStream stream = new TokenStream(ddlContent, tokenizer, false);
        stream.start();
        parse(stream, databaseTables);
    }

Here, the method creates a new TokenStream from the content using a DdlTokenizer that knows how to separate the characters in the string into the various typed token objects. It then calls another parse method that does the bulk of the work:

    public final void parse(TokenStream ddlContent, Tables databaseTables)
                           throws ParsingException, IllegalStateException {
        this.tokens = ddlContent;
        this.databaseTables = databaseTables;
        Marker marker = ddlContent.mark();
        try {
            while (ddlContent.hasNext()) {
                parseNextStatement(ddlContent.mark());
                // Consume the statement terminator if it is still there ...
                tokens.canConsume(DdlTokenizer.STATEMENT_TERMINATOR);
            }
        } catch (ParsingException e) {
            ddlContent.rewind(marker);
            throw e;
        } catch (Throwable t) {
            parsingFailed(ddlContent.nextPosition(),
                          "Unexpected exception (" + t.getMessage() + ") parsing", t);
        }
    }

This sets up some local state, marks the current starting point, and tries to parse DDL statements until no more can be found. If the parsing logic fails to find a match, it generates a ParsingException with the offending line and column plus a message signaling what was found and what was expected. In such cases, this method rewinds the token stream (in case the caller wishes to try an alternative different parser).

Each time the parseNextStatement method is called, the starting position of that statement is passed into the method, giving it the starting position of the statement. Our MySqlDdlParser subclass overrides the parseNextStatement method to use the first token in the statement to determine the kinds of statement allowed in the MySQL DDL grammar:

    @Override
    protected void parseNextStatement(Marker marker) {
        if (tokens.matches(DdlTokenizer.COMMENT)) {
            parseComment(marker);
        } else if (tokens.matches("CREATE")) {
            parseCreate(marker);
        } else if (tokens.matches("ALTER")) {
            parseAlter(marker);
        } else if (tokens.matches("DROP")) {
            parseDrop(marker);
        } else if (tokens.matches("RENAME")) {
            parseRename(marker);
        } else {
            parseUnknownStatement(marker);
        }
    }

When a matching token is found, the method calls the appropriate method. For example, if the statement begins with CREATE TABLE …​, then the parseCreate method is called with the same marker that identifies the starting position of the statement:

    @Override
    protected void parseCreate(Marker marker) {
        tokens.consume("CREATE");
        if (tokens.matches("TABLE") || tokens.matches("TEMPORARY", "TABLE")) {
            parseCreateTable(marker);
        } else if (tokens.matches("VIEW")) {
            parseCreateView(marker);
        } else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("EVENT")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("FUNCTION", "PROCEDURE")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("UNIQUE", "FULLTEXT", "SPATIAL", "INDEX")) {
            parseCreateIndex(marker);
        } else if (tokens.matchesAnyOf("SERVER")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("TABLESPACE")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("TRIGGER")) {
            parseCreateUnknown(marker);
        } else {
            // It could be several possible things (including more
            // elaborate forms of those matches tried above),
            sequentially(this::parseCreateView,
                         this::parseCreateUnknown);
        }
    }

Here, the method first consumes the token with the CREATE literal, and then tries to match the tokens with various patterns of token literals. If a match is found, this method delegates to other more specific parsing methods. Note how the fluent API of the framework makes it quite easy to understand the match patterns.

Let’s go one step further. Assuming our DDL statement starts with CREATE TABLE products (, then the parser will then invoke the parseCreateTable method, again with the same marker denoting the start of the statement:

    protected void parseCreateTable(Marker start) {
        tokens.canConsume("TEMPORARY");
        tokens.consume("TABLE");
        boolean onlyIfNotExists = tokens.canConsume("IF", "NOT", "EXISTS");
        TableId tableId = parseQualifiedTableName(start);
        if ( tokens.canConsume("LIKE")) {
            TableId originalId = parseQualifiedTableName(start);
            Table original = databaseTables.forTable(originalId);
            if ( original != null ) {
                databaseTables.overwriteTable(tableId, original.columns(),
                                              original.primaryKeyColumnNames());
            }
            consumeRemainingStatement(start);
            debugParsed(start);
            return;
        }
        if (onlyIfNotExists && databaseTables.forTable(tableId) != null) {
            // The table does exist, so we should do nothing ...
            consumeRemainingStatement(start);
            debugParsed(start);
            return;
        }
        TableEditor table = databaseTables.editOrCreateTable(tableId);

        // create_definition ...
        if (tokens.matches('(')) parseCreateDefinitionList(start, table);
        // table_options ...
        parseTableOptions(start, table);
        // partition_options ...
        if (tokens.matches("PARTITION")) {
            parsePartitionOptions(start, table);
        }
        // select_statement
        if (tokens.canConsume("AS") || tokens.canConsume("IGNORE", "AS")
            || tokens.canConsume("REPLACE", "AS")) {
            parseAsSelectStatement(start, table);
        }

        // Update the table definition ...
        databaseTables.overwriteTable(table.create());
        debugParsed(start);
    }

This method tries to mirror the MySQL CREATE TABLE grammar rules, which start with:

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    (create_definition,...)
    [table_options]
    [partition_options]

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    [(create_definition,...)]
    [table_options]
    [partition_options]
    select_statement

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    { LIKE old_tbl_name | (LIKE old_tbl_name) }

create_definition:
    ...

The CREATE literal was already consumed before our parseCreateTable begins, so it first tries to consume the TEMPORARY literal if available, the TABLE literal, the IF NOT EXISTS fragment if avaialble, and then consumes and parses the qualified name of the table. If the statement includes LIKE otherTable, it uses the databaseTables (which is the reference to our Tables object) to overwrite the definition of the named table with that of the referenced table. Otherwise, it obtains an editor for the new table, and then (like the grammar rules) parses a list of create_definition fragments, followed by table_options, partition_options, and possibly a select_statement.

Take a look at the full MySqlDdlParser class to see far more details.

Wrap up

This post goes into some detail about why the MySQL connector uses the DDL statements in the binlog, though we only scratched the surface about how the connector does the DDL parsing with its framework, and how that can be reused in future parsers for other DBMS dialects.

Try our tutorial to see the MySQL connector in action, and stay tuned for more connectors, releases, and news.

Randall Hauch

Randall is an open source software developer at Red Hat, and has been working in data integration for almost 20 years. He is the founder of Debezium and has worked on several other open source projects. He lives in Edwardsville, IL, near St. Louis.

     


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.