# Import standard Libraries import os import argparse import logging # Model Libraries import pandas as pd import re import numpy as np import datetime # AzureML Libraries from azureml.core import Dataset, Datastore, Workspace from azureml.data.datapath import DataPath from azureml.core import Experiment, Run ### SETUP AZURE ML - ADLS CONNECTION ### # Get the workspace from config ws = Workspace.from_config() config_path = 'config/ml-register-config.csv' datastore_name = 'cub_pb' def main(datastore,input_file_name, input_file_path, tags, partition_format,dataset_name): input_file_path = input_file_path.split('/') file_path = '/'.join(input_file_path[0:]) path_on_datastore=''.join([file_path, input_file_name]) """ Reads in raw dataset or data from inputs before transforming and registering new datasets in ML workspace :param input_file_name: Name of file or dataset to import :param datastore : Name of the datastore :param input_file_path: If raw file, parameter used to provide path to file where first directory is assumed to be name of datastore :param tags: Tags to use when registering dataset """ logging.info("Datastore Name: %s", datastore) dataset = Dataset.Tabular.from_parquet_files( path=(datastore,path_on_datastore), validate=False, partition_format = partition_format ) dataset.register( workspace = ws, name = dataset_name, tags = {"name": "tapshare", "project": "data-prep1"}, create_new_version = True, description='version 2' ) def get_config_file(datastore_name,config_path): """ Reads in config-file given in input and saves the file as Dataframe :param datastore_name: Name of file or dataset to import :param config_path : Name of the config_path where config-file is available :param input_file_path: If raw file, parameter used to provide path to file where first directory is assumed to be name of datastore :param tags: Tags to use when registering dataset """ datastore = Datastore.get(ws, datastore_name) config_dataset = Dataset.Tabular.from_delimited_files( path=(datastore,config_path),separator=',',header=True).to_pandas_dataframe() return config_dataset if __name__ == "__main__": os.makedirs('logs', exist_ok=True) logging.basicConfig(filename='./logs/processdata.log', level=logging.INFO) # Get run context so we can log back to workspace run = Run.get_context() # Get the workspace this job is running under try: ws = run.experiment.workspace except AttributeError: logging.info("Offline run, getting workspace from context") ws = Workspace.from_config() # args = parse_args() # logging.info(f"input_file_name argument: {args.input_file_name}") # logging.info(f"input_file_path argument: {args.input_file_path}") # logging.info(f"tags argument: {args.tags}") # logging.info(f"partition_format argument: {args.partition_format}") for index,row in get_config_file( config_path=config_path, datastore_name=datastore_name).iterrows(): print(row[0],row[1],row[2],row[3],row[4],row[5]) active=row[5] if active: main( datastore = Datastore.get(ws, datastore_name=str(row[0])), # name of file (or registered dataset) input_file_name=str(row[1]), # path to dataset input_file_path=str(row[2]), # tags to use - if any tags = None, # partition format - if any partition_format=str(row[3]), # Dataset Name to be registered as dataset_name=str(row[4]) )