Definition of LLMOps
Branch of MLOps, where MLOps is used for: unification of ML development and ML operations.
Similarly LLMOps is focused on Automation and Monitoring, including:
- integration
- testing
- releasing
- deployment
- infrastructure managment
Do not confuse with LLM System design as it is mainly focused on broader design of the entire end-to-end application (chaining, grounding, history). Also LLMOps enables greater transparency and faster response to such requests and ensures greater compliance with an organization’s or industry’s policies.
Examples: data prep / automation, orchestration / predictions, safety
1. Data Exploration for Tuning a Foundation Model
Key features: Google cloud, VerteAI
from utils import authenticate
credentials, PROJECT_ID = authenticate()
REGION = "us-central1"
import vertexai
vertexai.init(project = PROJECT_ID,
location = REGION,
credentials = credentials)
from google.cloud import bigquery
bq_client = bigquery.Client(project=PROJECT_ID,
credentials = credentials)
QUERY_TABLES = """
SELECT
table_name
FROM
`bigquery-public-data.stackoverflow.INFORMATION_SCHEMA.TABLES`
"""
query_job = bq_client.query(QUERY_TABLES)
for row in query_job:
for value in row.values():
print(value)
INSPECT_QUERY = """
SELECT
*
FROM
`bigquery-public-data.stackoverflow.posts_questions`
LIMIT 3
"""
import pandas as pd
query_job = bq_client.query(INSPECT_QUERY)
stack_overflow_df = query_job\
.result()\
.to_arrow()\
.to_pandas()
stack_overflow_df.head()
QUERY_ALL = """
SELECT
*
FROM
`bigquery-public-data.stackoverflow.posts_questions` q
"""
query_job = bq_client.query(QUERY_ALL)
try:
stack_overflow_df = query_job\
.result()\
.to_arrow()\
.to_pandas()
except Exception as e:
print('The DataFrame is too large to load into memory.', e)
QUERY = """
SELECT
CONCAT(q.title, q.body) as input_text,
a.body AS output_text
FROM
`bigquery-public-data.stackoverflow.posts_questions` q
JOIN
`bigquery-public-data.stackoverflow.posts_answers` a
ON
q.accepted_answer_id = a.id
WHERE
q.accepted_answer_id IS NOT NULL AND
REGEXP_CONTAINS(q.tags, "python") AND
a.creation_date >= "2020-01-01"
LIMIT
10000
"""
### this may take some seconds to run
stack_overflow_df = query_job.result()\
.to_arrow()\
.to_pandas()
stack_overflow_df.head(2)
INSTRUCTION_TEMPLATE = f"""\
Please answer the following Stackoverflow question on Python. \
Answer it like you are a developer answering Stackoverflow questions.
Stackoverflow question:
"""
stack_overflow_df['input_text_instruct'] = INSTRUCTION_TEMPLATE + ' '\
+ stack_overflow_df['input_text']
from sklearn.model_selection import train_test_split
train, evaluation = train_test_split(
stack_overflow_df,
### test_size=0.2 means 20% for evaluation
### which then makes train set to be of 80%
test_size=0.2,
random_state=42
)
import datetime
date = datetime.datetime.now().strftime("%H:%d:%m:%Y")
cols = ['input_text_instruct','output_text']
tune_jsonl = train[cols].to_json(orient="records", lines=True)
training_data_filename = f"tune_data_stack_overflow_\
python_qa-{date}.jsonl"
with open(training_data_filename, "w") as f:
f.write(tune_jsonl)
2. Kubeflow Pipelines to orchestrate and automate a workflow
Key features: Kubeflow Pipelines, pipeline yaml files, reusability
from kfp import dsl
from kfp import compiler
# Ignore FutureWarnings in kfp
import warnings
warnings.filterwarnings("ignore",
category=FutureWarning,
module='kfp.*')
### Simple example: component 1
@dsl.component
def say_hello(name: str) -> str:
hello_text = f'Hello, {name}!'
return hello_text
hello_task = say_hello(name="Erwin")
print(hello_task)
### Simple example: component 2
@dsl.component
def how_are_you(hello_text: str) -> str:
how_are_you = f"{hello_text}. How are you?"
return how_are_you
how_task = how_are_you(hello_text=hello_task.output)
print(how_task)
print(how_task.output)
### Simple example: pipeline
@dsl.pipeline
def hello_pipeline(recipient: str) -> str:
# notice, just recipient and not recipient.output
hello_task = say_hello(name=recipient)
# notice .output
how_task = how_are_you(hello_text=hello_task.output)
# notice .output
return how_task.output
pipeline_output = hello_pipeline(recipient="Erwin")
print(pipeline_output)
compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')
pipeline_arguments = {
"recipient": "World!",
}
### import `PipelineJob`
from google.cloud.aiplatform import PipelineJob
job = PipelineJob(
### path of the yaml file to execute
template_path="pipeline.yaml",
### name of the pipeline
display_name=f"deep_learning_ai_pipeline",
### pipeline arguments (inputs)
### {"recipient": "World!"} for this example
parameter_values=pipeline_arguments,
### region of execution
location="us-central1",
### root is where temporary files are being
### stored by the execution engine
pipeline_root="./",
)
### submit for execution
job.submit()
### check to see the status of the job
job.state
### these are the same
### jsonl files from the previous lab
### time stamps have been removed so that
### the files are consistent for all learners
TRAINING_DATA_URI = "./tune_data_stack_overflow_python_qa.jsonl"
EVAUATION_DATA_URI = "./tune_eval_data_stack_overflow_python_qa.jsonl"
### path to the pipeline file to reuse
### the file is provided in your workspace as well
template_path = 'https://us-kfp.pkg.dev/ml-pipeline/\
large-language-model-pipelines/tune-large-model/v2.0.0'
import datetime
date = datetime.datetime.now().strftime("%H:%d:%m:%Y")
MODEL_NAME = f"deep-learning-ai-model-{date}"
TRAINING_STEPS = 200
EVALUATION_INTERVAL = 20
from utils import authenticate
credentials, PROJECT_ID = authenticate()
REGION = "us-central1"
pipeline_arguments = {
"model_display_name": MODEL_NAME,
"location": REGION,
"large_model_reference": "text-bison@001",
"project": PROJECT_ID,
"train_steps": TRAINING_STEPS,
"dataset_uri": TRAINING_DATA_URI,
"evaluation_interval": EVALUATION_INTERVAL,
"evaluation_data_uri": EVAUATION_DATA_URI,
}
pipeline_root "./"
job = PipelineJob(
### path of the yaml file to execute
template_path=template_path,
### name of the pipeline
display_name=f"deep_learning_ai_pipeline-{date}",
### pipeline arguments (inputs)
parameter_values=pipeline_arguments,
### region of execution
location=REGION,
### root is where temporary files are being
### stored by the execution engine
pipeline_root=pipeline_root,
### enable_caching=True will save the outputs
### of components for re-use, and will only re-run those
### components for which the code or data has changed.
enable_caching=True,
)
### submit for execution
job.submit()
### check to see the status of the job
job.state
3. Predictions, Prompts and Safety
Key features: Load Balancing, Safety Attributes, Citations
from utils import authenticate
credentials, PROJECT_ID = authenticate()
REGION = "us-central1"
import vertexai
from vertexai.language_models import TextGenerationModel
vertexai.init(project = PROJECT_ID,
location = REGION,
credentials = credentials)
### load-balancing (dummy=random)
model = TextGenerationModel.from_pretrained("text-bison@001")
list_tuned_models = model.list_tuned_model_names()
for i in list_tuned_models:
print (i)
import random
tuned_model_select = random.choice(list_tuned_models)
### retrieve responce
deployed_model = TextGenerationModel.get_tuned_model\
(tuned_model_select)
PROMPT = "How can I load a csv file using Pandas?"
### depending on the latency of your prompt
### it can take some time to load
response = deployed_model.predict(PROMPT)
print(response)
from pprint import pprint
### load the first object of the response
output = response._prediction_response[0]
### print the first object of the response
pprint(output)
### load the second object of the response
output = response._prediction_response[0][0]
### print the second object of the response
pprint(output)
### retrieve the "content" key from the second object
final_output = response._prediction_response[0][0]["content"]
### printing "content" key from the second object
print(final_output)
INSTRUCTION = """\
Please answer the following Stackoverflow question on Python.\
Answer it like\
you are a developer answering Stackoverflow questions.\
Question:
"""
QUESTION = "How can I store my TensorFlow checkpoint on\
Google Cloud Storage? Python example?"
PROMPT = f"""
{INSTRUCTION} {QUESTION}
"""
print(PROMPT)
final_response = deployed_model.predict(PROMPT)
output = final_response._prediction_response[0][0]["content"]
print(output)
### safety
### retrieve the "blocked" key from the
### "safetyAttributes" of the response
blocked = response._prediction_response[0][0]\
['safetyAttributes']['blocked']
print(blocked)
### retrieve the "safetyAttributes" of the response
safety_attributes = response._prediction_response[0][0]\
['safetyAttributes']
pprint(safety_attributes)
### citations
### retrieve the "citations" key from the
### "citationMetadata" of the response
citation = response._prediction_response[0][0]\
['citationMetadata']['citations']
pprint(citation)
References
This post is written as an open notebook after the completion of the OpenAI Course ‘LLMOps’: Course link
P.S. Written with a help of GPT-3.5