Data Engineering, Data Lake
How to Create a Data Formatting Plugin in VDK
A step-by-step tutorial on how to manipulate a table in your data lake by writing a VDK custom plugin
Versatile Data Kit (VDK) is a framework for ingesting and manipulating different data sources into a single data lake. I’ve already discussed VDK in my previous article, so for an introduction to it, you can refer there.
In this article, I’ll discuss how to implement a custom plugin in VDK. As a use case, I’ll implement a simple plugin that takes input from an existing table, and performs data formatting on it. To show how the plugin works, I’ll implement only the simple task that takes some strings from a column of a table and substitutes them with another string. The following figure shows the data formatting job I’m going to implement:
In the table on the left, the same country, Italy, appears with two different names, so I’ll perform data formatting to have a homogeneous country value, as shown in the table on the right.
To perform the previously described task, I’ll implement a VDK plugin you can call as follows:
vdk format-column -t my-table -c country -o "Italie,Italia" -d Italy
where:
- the
-t
option specifies the table to manipulate - the
-c
option specifies the column to manipulate - the
-o
option specifies the list of occurrences to substitute - the
-d
option specifies the target value.
To implement the custom plugin in VDK, I’ll follow the steps described below:
- Creating the Skeleton
- Configuring the Plugin
- Configuring the Jobs
- Testing and Installing the Plugin.
1. Creating the Skeleton
To create the skeleton of the plugin, I use cookiecutter
as follows:
cookiecutter https://github.com/tozka/cookiecutter-vdk-plugin
When I run the previous command, I need to answer to some questions:
When the procedure is complete, a new directory is created. The following figure shows the tree of the created directory:
The contains two main directories: src
and tests
, as well as three files: README.md
, requirements.txt
, and setup.py
.
First, I edit the README.md
file. I can leave the requirements.txt
as it is because I won’t add any specific library.
I focus now on the plugin configuration.
2 Configuring the Plugin
I open the setup.py
file with an editor. In this file, you can specify the version of the plugin, as well as its name, the required packages, and the entry point directory for the plugin:
__version__ = "0.1.0"setuptools.setup(
name="vdk-preprocessing",
version=__version__,
url="https://github.com/vmware/versatile-data-kit",
description="Versatile Data Kit SDK Preprocessing plugin to manipulate an already ingested table.", long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core"],
package_dir={"": "src"}, packages=setuptools.find_namespace_packages(where="src"), entry_points={"vdk.plugin.run": ["vdk-preprocessing = vdk.plugin.preprocessing.preprocessing_plugin"]}, classifiers=[ "Development Status :: 4 - Beta",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],)
Now I can rename the directories’ names to reflect the semantics of my project. The final directories’ tree should look like the following one:
I’ve renamed plugin_entry.py
to preprocessing_plugin.py
, then I’ve added a preprocessing_formatting_job
directory within preprocessing
, where I’ve put a format_column_step.py
script.
Now I am working on the script preprocessing_plugin.py
. This script defines the plugin interface. In my case, I’ll implement a command-line plugin, so here I must define the command name of the plugin, as well as the options to pass to the command. In my case, the command will receive 4 parameters as input:
@click.command(
name="format-column",
help="Execute a SQL query against a configured database and format the table.",
no_args_is_help=True,
)@click.option(
"-t",
"--table",
help="The name of the table.",
default="my_table",
type=click.STRING,
)@click.option(
"-c",
"--column",
help="The column to format.",
default="my_column",
type=click.STRING,
)@click.option(
"-o",
"--occurrences",
help="The list of occurrences to modify separated by comma.",
default="",
type=click.STRING,
)@click.option(
"-d",
"--destination",
help="The target value.",
default="",
type=click.STRING,
)@click.pass_context
You can notice that I need to pass the context to my command function.
VDK uses hookimpl
decorators to implement a plugin, so you can refer to the VDK official documentation for further details.
Once defined the options, I can define the format_column()
function, which will receive them:
def format_column(ctx: click.Context, table: str, column: str, occurrences : str, destination : str):
args = dict(table=table, column=column, occurrences=occurrences, destination=destination)
ctx.invoke(
run,
data_job_directory=os.path.dirname(format_column_step.__file__),
arguments=json.dumps(args),
)
The previous function simply invokes the specific job running the task.
Finally, I add the command to VDK:
@hookimpl
def vdk_command_line(root_command: click.Group) -> None:
root_command.add_command(format_column)
3 Configuring the Jobs
Now it’s time to configure the formatting job. This job is composed of just one single step, named format_column.py.
I define a new class, named ColumnFormatter, which performs the task. In my case, it searches for all the occurrences in the column of the table passed as an argument, and replaces it with the target value:
class ColumnFormatter:
def __init__(self, job_input: IJobInput):
self.__job_input = job_input def format_column(self, table: str, column: str, occurrences: list, destination : str):
for value in occurrences:
query = f"UPDATE {table} SET {column} = '{destination}' WHERE {column} = '{value}'; "
self.__job_input.execute_query(query)
The ColumnFormatter
class must contain a IJobInput
instance object.
Now I implement the run()
function, to indicate to VDK that this is a step:
def run(job_input: IJobInput) -> None:
table = job_input.get_arguments().get("table")
column = job_input.get_arguments().get("column")
occurrences = job_input.get_arguments().get("occurrences").split(",")
destination = job_input.get_arguments().get("destination") formatter = ColumnFormatter(job_input)
formatter.format_column(table, column, occurrences,destination)
The run()
function simply retrieves the command line arguments and passes them to the ColumnFormatter method format_column()
.
4 Testing and Installing the Plugin
The plugin is ready. The last step involves writing tests and installing the plugin. I change the default structure of the tests
directory since I’ll implement a simple test.
I write the tests as a script contained in the tests directory, as shown in the following figure:
You can write as many tests as you want. In this example, I write just only one test, to show how they work. VDK uses pytest
to perform tests.
I write the following test:
- I create a table, using the
sqlite-query
plugin. I also populate the table with the values of the table shown at the top of this article (columns town and country)
runner = CliEntryBasedTestRunner(sqlite_plugin, preprocessing_plugin)runner.invoke(
[
"sqlite-query",
"--query",
"CREATE TABLE test_table (city TEXT, country TEXT)",
]
)mock_sqlite_conf = mock.MagicMock(SQLiteConfiguration)
sqlite_ingest = IngestToSQLite(mock_sqlite_conf)
payload = [
{"city": "Pisa", "country": "Italie"},
{"city": "Milano", "country": "Italia"},
{"city": "Paris", "country": "France"},
]sqlite_ingest.ingest_payload(
payload=payload,
destination_table="test_table",
target=db_dir,
)
I have used the CliEntryBasedTestRunner()
class provided by VDK to call the sqlite-query plugin.
- I call the vdk-preprocessing plugin to substitute the occurrences “Italie,Italia” in the country column with the value “Italy”
result = runner.invoke(["format-column", "--table", "test_table", "--column", "country", "--occurrences", "Italie,Italia", "--destination", "Italy"])
- I check the output.
result = runner.invoke([
"sqlite-query",
"--query",
"SELECT country FROM test_table",
]
)output = result.stdout
assert output == (
"country\n"
"---------\n"
"Italy\n"
"Italy\n"
"France\n")
Now you can run tests. Firstly, I install the package:
pip install -e /path/to/vdk-preprocessing/
and then from the vdk-preprocessing
directory:
pytest
If everything is ok, your test should be PASSED
.
Summary
Congratulations! You have just learned how to build a custom plugin in VDK! You can find more information about VDK plugins in the VDK official repository. You can also use the plugins already available in VDK!
You find the code of the example described in this post in my GitHub repository.