Definition of LLMOps

Branch of MLOps, where MLOps is used for: unification of ML development and ML operations.
Similarly LLMOps is focused on Automation and Monitoring, including:

  • integration
  • testing
  • releasing
  • deployment
  • infrastructure managment

Do not confuse with LLM System design as it is mainly focused on broader design of the entire end-to-end application (chaining, grounding, history). Also LLMOps enables greater transparency and faster response to such requests and ensures greater compliance with an organization’s or industry’s policies.

Examples: data prep / automation, orchestration / predictions, safety

1. Data Exploration for Tuning a Foundation Model
Key features: Google cloud, VerteAI

from utils import authenticate
credentials, PROJECT_ID = authenticate() 
REGION = "us-central1"

import vertexai

vertexai.init(project = PROJECT_ID,
              location = REGION,
              credentials = credentials)
			  
from google.cloud import bigquery

bq_client = bigquery.Client(project=PROJECT_ID,
                            credentials = credentials)
							
QUERY_TABLES = """
SELECT
  table_name
FROM
  `bigquery-public-data.stackoverflow.INFORMATION_SCHEMA.TABLES`
"""

query_job = bq_client.query(QUERY_TABLES)

for row in query_job:
    for value in row.values():
        print(value)	

INSPECT_QUERY = """
SELECT
    *
FROM
    `bigquery-public-data.stackoverflow.posts_questions`
LIMIT 3
"""

import pandas as pd

query_job = bq_client.query(INSPECT_QUERY)	

stack_overflow_df = query_job\
    .result()\
    .to_arrow()\
    .to_pandas()
stack_overflow_df.head()	


QUERY_ALL = """
SELECT
    *
FROM
    `bigquery-public-data.stackoverflow.posts_questions` q
"""

query_job = bq_client.query(QUERY_ALL)

try:
    stack_overflow_df = query_job\
    .result()\
    .to_arrow()\
    .to_pandas()
except Exception as e:
    print('The DataFrame is too large to load into memory.', e)
	
QUERY = """
SELECT
    CONCAT(q.title, q.body) as input_text,
    a.body AS output_text
FROM
    `bigquery-public-data.stackoverflow.posts_questions` q
JOIN
    `bigquery-public-data.stackoverflow.posts_answers` a
ON
    q.accepted_answer_id = a.id
WHERE
    q.accepted_answer_id IS NOT NULL AND
    REGEXP_CONTAINS(q.tags, "python") AND
    a.creation_date >= "2020-01-01"
LIMIT
    10000
"""

### this may take some seconds to run
stack_overflow_df = query_job.result()\
                        .to_arrow()\
                        .to_pandas()

stack_overflow_df.head(2)

INSTRUCTION_TEMPLATE = f"""\
Please answer the following Stackoverflow question on Python. \
Answer it like you are a developer answering Stackoverflow questions.

Stackoverflow question:
"""

stack_overflow_df['input_text_instruct'] = INSTRUCTION_TEMPLATE + ' '\
    + stack_overflow_df['input_text']
	
from sklearn.model_selection import train_test_split

train, evaluation = train_test_split(
    stack_overflow_df,
    ### test_size=0.2 means 20% for evaluation
    ### which then makes train set to be of 80%
    test_size=0.2,
    random_state=42
)	

import datetime

date = datetime.datetime.now().strftime("%H:%d:%m:%Y")

cols = ['input_text_instruct','output_text']
tune_jsonl = train[cols].to_json(orient="records", lines=True)

training_data_filename = f"tune_data_stack_overflow_\
                            python_qa-{date}.jsonl"
							
with open(training_data_filename, "w") as f:
    f.write(tune_jsonl)

2. Kubeflow Pipelines to orchestrate and automate a workflow
Key features: Kubeflow Pipelines, pipeline yaml files, reusability


from kfp import dsl
from kfp import compiler

# Ignore FutureWarnings in kfp
import warnings
warnings.filterwarnings("ignore", 
                        category=FutureWarning, 
                        module='kfp.*')
						
### Simple example: component 1
@dsl.component
def say_hello(name: str) -> str:
    hello_text = f'Hello, {name}!'
    
    return hello_text
	
hello_task = say_hello(name="Erwin")
print(hello_task)

### Simple example: component 2
@dsl.component
def how_are_you(hello_text: str) -> str:
    
    how_are_you = f"{hello_text}. How are you?"
    
    return how_are_you
	
how_task = how_are_you(hello_text=hello_task.output)
print(how_task)
print(how_task.output)


### Simple example: pipeline
@dsl.pipeline
def hello_pipeline(recipient: str) -> str:
    
    # notice, just recipient and not recipient.output
    hello_task = say_hello(name=recipient)
    
    # notice .output
    how_task = how_are_you(hello_text=hello_task.output)
    
    # notice .output
    return how_task.output 
	
	
pipeline_output = hello_pipeline(recipient="Erwin")
print(pipeline_output)


compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

pipeline_arguments = {
    "recipient": "World!",
}

### import `PipelineJob` 
from google.cloud.aiplatform import PipelineJob

job = PipelineJob(
        ### path of the yaml file to execute
        template_path="pipeline.yaml",
        ### name of the pipeline
        display_name=f"deep_learning_ai_pipeline",
        ### pipeline arguments (inputs)
        ### {"recipient": "World!"} for this example
        parameter_values=pipeline_arguments,
        ### region of execution
        location="us-central1",
        ### root is where temporary files are being 
        ### stored by the execution engine
        pipeline_root="./",
)

### submit for execution
job.submit()

### check to see the status of the job
job.state


### these are the same 
### jsonl files from the previous lab

### time stamps have been removed so that 
### the files are consistent for all learners
TRAINING_DATA_URI = "./tune_data_stack_overflow_python_qa.jsonl" 
EVAUATION_DATA_URI = "./tune_eval_data_stack_overflow_python_qa.jsonl"  

### path to the pipeline file to reuse
### the file is provided in your workspace as well
template_path = 'https://us-kfp.pkg.dev/ml-pipeline/\
large-language-model-pipelines/tune-large-model/v2.0.0'

import datetime
date = datetime.datetime.now().strftime("%H:%d:%m:%Y")
MODEL_NAME = f"deep-learning-ai-model-{date}"

TRAINING_STEPS = 200
EVALUATION_INTERVAL = 20

from utils import authenticate
credentials, PROJECT_ID = authenticate() 

REGION = "us-central1"

pipeline_arguments = {
    "model_display_name": MODEL_NAME,
    "location": REGION,
    "large_model_reference": "text-bison@001",
    "project": PROJECT_ID,
    "train_steps": TRAINING_STEPS,
    "dataset_uri": TRAINING_DATA_URI,
    "evaluation_interval": EVALUATION_INTERVAL,
    "evaluation_data_uri": EVAUATION_DATA_URI,
}

pipeline_root "./"

job = PipelineJob(
        ### path of the yaml file to execute
        template_path=template_path,
        ### name of the pipeline
        display_name=f"deep_learning_ai_pipeline-{date}",
        ### pipeline arguments (inputs)
        parameter_values=pipeline_arguments,
        ### region of execution
        location=REGION,
        ### root is where temporary files are being 
        ### stored by the execution engine
        pipeline_root=pipeline_root,
        ### enable_caching=True will save the outputs 
        ### of components for re-use, and will only re-run those
        ### components for which the code or data has changed.
        enable_caching=True,
)

### submit for execution
job.submit()

### check to see the status of the job
job.state

3. Predictions, Prompts and Safety
Key features: Load Balancing, Safety Attributes, Citations

from utils import authenticate
credentials, PROJECT_ID = authenticate() 

REGION = "us-central1"

import vertexai
from vertexai.language_models import TextGenerationModel

vertexai.init(project = PROJECT_ID,
              location = REGION,
              credentials = credentials)

### load-balancing (dummy=random)
model = TextGenerationModel.from_pretrained("text-bison@001")
list_tuned_models = model.list_tuned_model_names()
for i in list_tuned_models:
    print (i)
	
import random
tuned_model_select = random.choice(list_tuned_models)

### retrieve responce
deployed_model = TextGenerationModel.get_tuned_model\
(tuned_model_select)
PROMPT = "How can I load a csv file using Pandas?"
### depending on the latency of your prompt
### it can take some time to load
response = deployed_model.predict(PROMPT)
print(response)

from pprint import pprint

### load the first object of the response
output = response._prediction_response[0]

### print the first object of the response
pprint(output)

### load the second object of the response
output = response._prediction_response[0][0]

### print the second object of the response
pprint(output)

### retrieve the "content" key from the second object
final_output = response._prediction_response[0][0]["content"]

### printing "content" key from the second object
print(final_output)

INSTRUCTION = """\
Please answer the following Stackoverflow question on Python.\
Answer it like\
you are a developer answering Stackoverflow questions.\
Question:
"""

QUESTION = "How can I store my TensorFlow checkpoint on\
Google Cloud Storage? Python example?"

PROMPT = f"""
{INSTRUCTION} {QUESTION}
"""

print(PROMPT)

final_response = deployed_model.predict(PROMPT)

output = final_response._prediction_response[0][0]["content"]

print(output)

### safety

### retrieve the "blocked" key from the 
### "safetyAttributes" of the response
blocked = response._prediction_response[0][0]\
['safetyAttributes']['blocked']

print(blocked)

### retrieve the "safetyAttributes" of the response
safety_attributes = response._prediction_response[0][0]\
['safetyAttributes']

pprint(safety_attributes)

### citations

### retrieve the "citations" key from the 
### "citationMetadata" of the response
citation = response._prediction_response[0][0]\
['citationMetadata']['citations']

pprint(citation)

References

This post is written as an open notebook after the completion of the OpenAI Course ‘LLMOps’: Course link

P.S. Written with a help of GPT-3.5