Data Engineering, Data Lake

How to Create a Data Formatting Plugin in VDK

A step-by-step tutorial on how to manipulate a table in your data lake by writing a VDK custom plugin

Angelica Lo Duca
Towards Data Science
7 min readOct 12, 2022

--

Photo by Campaign Creators on Unsplash

Versatile Data Kit (VDK) is a framework for ingesting and manipulating different data sources into a single data lake. I’ve already discussed VDK in my previous article, so for an introduction to it, you can refer there.

In this article, I’ll discuss how to implement a custom plugin in VDK. As a use case, I’ll implement a simple plugin that takes input from an existing table, and performs data formatting on it. To show how the plugin works, I’ll implement only the simple task that takes some strings from a column of a table and substitutes them with another string. The following figure shows the data formatting job I’m going to implement:

Image by Author

In the table on the left, the same country, Italy, appears with two different names, so I’ll perform data formatting to have a homogeneous country value, as shown in the table on the right.

To perform the previously described task, I’ll implement a VDK plugin you can call as follows:

vdk format-column -t my-table -c country -o "Italie,Italia" -d Italy

where:

  • the -t option specifies the table to manipulate
  • the -c option specifies the column to manipulate
  • the -o option specifies the list of occurrences to substitute
  • the -d option specifies the target value.

To implement the custom plugin in VDK, I’ll follow the steps described below:

  1. Creating the Skeleton
  2. Configuring the Plugin
  3. Configuring the Jobs
  4. Testing and Installing the Plugin.

1. Creating the Skeleton

To create the skeleton of the plugin, I use cookiecutter as follows:

cookiecutter https://github.com/tozka/cookiecutter-vdk-plugin

When I run the previous command, I need to answer to some questions:

Image by Author

When the procedure is complete, a new directory is created. The following figure shows the tree of the created directory:

Image by Author

The contains two main directories: src and tests, as well as three files: README.md, requirements.txt, and setup.py.

First, I edit the README.md file. I can leave the requirements.txt as it is because I won’t add any specific library.

I focus now on the plugin configuration.

2 Configuring the Plugin

I open the setup.py file with an editor. In this file, you can specify the version of the plugin, as well as its name, the required packages, and the entry point directory for the plugin:

__version__ = "0.1.0"setuptools.setup(
name="vdk-preprocessing",
version=__version__,
url="https://github.com/vmware/versatile-data-kit",
description="Versatile Data Kit SDK Preprocessing plugin to manipulate an already ingested table.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"), entry_points={"vdk.plugin.run": ["vdk-preprocessing = vdk.plugin.preprocessing.preprocessing_plugin"]}, classifiers=[ "Development Status :: 4 - Beta",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],)

Now I can rename the directories’ names to reflect the semantics of my project. The final directories’ tree should look like the following one:

I’ve renamed plugin_entry.py to preprocessing_plugin.py, then I’ve added a preprocessing_formatting_job directory within preprocessing, where I’ve put a format_column_step.py script.

Now I am working on the script preprocessing_plugin.py. This script defines the plugin interface. In my case, I’ll implement a command-line plugin, so here I must define the command name of the plugin, as well as the options to pass to the command. In my case, the command will receive 4 parameters as input:

@click.command(
name="format-column",
help="Execute a SQL query against a configured database and format the table.",
no_args_is_help=True,
)
@click.option(
"-t",
"--table",
help="The name of the table.",
default="my_table",
type=click.STRING,
)
@click.option(
"-c",
"--column",
help="The column to format.",
default="my_column",
type=click.STRING,
)
@click.option(
"-o",
"--occurrences",
help="The list of occurrences to modify separated by comma.",
default="",
type=click.STRING,
)
@click.option(
"-d",
"--destination",
help="The target value.",
default="",
type=click.STRING,
)
@click.pass_context

You can notice that I need to pass the context to my command function.

VDK uses hookimpl decorators to implement a plugin, so you can refer to the VDK official documentation for further details.

Once defined the options, I can define the format_column() function, which will receive them:

def format_column(ctx: click.Context, table: str, column: str, occurrences : str, destination : str):
args = dict(table=table, column=column, occurrences=occurrences, destination=destination)
ctx.invoke(
run,
data_job_directory=os.path.dirname(format_column_step.__file__),
arguments=json.dumps(args),
)

The previous function simply invokes the specific job running the task.

Finally, I add the command to VDK:

@hookimpl
def vdk_command_line(root_command: click.Group) -> None:
root_command.add_command(format_column)

3 Configuring the Jobs

Now it’s time to configure the formatting job. This job is composed of just one single step, named format_column.py.

I define a new class, named ColumnFormatter, which performs the task. In my case, it searches for all the occurrences in the column of the table passed as an argument, and replaces it with the target value:

class ColumnFormatter:
def __init__(self, job_input: IJobInput):
self.__job_input = job_input
def format_column(self, table: str, column: str, occurrences: list, destination : str):
for value in occurrences:
query = f"UPDATE {table} SET {column} = '{destination}' WHERE {column} = '{value}'; "
self.__job_input.execute_query(query)

The ColumnFormatter class must contain a IJobInput instance object.

Now I implement the run() function, to indicate to VDK that this is a step:

def run(job_input: IJobInput) -> None:
table = job_input.get_arguments().get("table")
column = job_input.get_arguments().get("column")
occurrences = job_input.get_arguments().get("occurrences").split(",")
destination = job_input.get_arguments().get("destination")
formatter = ColumnFormatter(job_input)
formatter.format_column(table, column, occurrences,destination)

The run() function simply retrieves the command line arguments and passes them to the ColumnFormatter method format_column().

4 Testing and Installing the Plugin

The plugin is ready. The last step involves writing tests and installing the plugin. I change the default structure of the tests directory since I’ll implement a simple test.

I write the tests as a script contained in the tests directory, as shown in the following figure:

Image by Author

You can write as many tests as you want. In this example, I write just only one test, to show how they work. VDK uses pytest to perform tests.

I write the following test:

  • I create a table, using the sqlite-query plugin. I also populate the table with the values of the table shown at the top of this article (columns town and country)
runner = CliEntryBasedTestRunner(sqlite_plugin, preprocessing_plugin)runner.invoke(
[
"sqlite-query",
"--query",
"CREATE TABLE test_table (city TEXT, country TEXT)",
]
)
mock_sqlite_conf = mock.MagicMock(SQLiteConfiguration)
sqlite_ingest = IngestToSQLite(mock_sqlite_conf)
payload = [
{"city": "Pisa", "country": "Italie"},
{"city": "Milano", "country": "Italia"},
{"city": "Paris", "country": "France"},
]
sqlite_ingest.ingest_payload(
payload=payload,
destination_table="test_table",
target=db_dir,
)

I have used the CliEntryBasedTestRunner() class provided by VDK to call the sqlite-query plugin.

  • I call the vdk-preprocessing plugin to substitute the occurrences “Italie,Italia” in the country column with the value “Italy”
result = runner.invoke(["format-column", "--table", "test_table", "--column", "country", "--occurrences", "Italie,Italia", "--destination", "Italy"])
  • I check the output.
result = runner.invoke([
"sqlite-query",
"--query",
"SELECT country FROM test_table",
]
)
output = result.stdout
assert output == (
"country\n"
"---------\n"
"Italy\n"
"Italy\n"
"France\n")

Now you can run tests. Firstly, I install the package:

pip install -e /path/to/vdk-preprocessing/

and then from the vdk-preprocessing directory:

pytest

If everything is ok, your test should be PASSED.

Summary

Congratulations! You have just learned how to build a custom plugin in VDK! You can find more information about VDK plugins in the VDK official repository. You can also use the plugins already available in VDK!

You find the code of the example described in this post in my GitHub repository.

Related Articles

--

--

Researcher | +50k monthly views | I write on Data Science, Python, Tutorials, and, occasionally, Web Applications | Book Author of Comet for Data Science