Object mapping between Pydantic and %RegisteredObject
I'm exploring this right now: given a bunch of types defined as Pydantic models, how can I come up with an equivalent %RegisteredObject/%SerialObject and convert to/from (e.g., to support persistence and match validation as much as possible)?
People who know Python better than I do (e.g., your average undergraduate from this decade): is this a stupid idea or a cool idea? Has anyone else done this before?
Comments
Is this the kind of behavior you want?
Set pyType = ##class(%IPM.Utils.PydanticModelAdaptor).ExamplePydanticModel() // a convenience method to return a pydantic type object, with 3 fields defined: id, name, and email Set object = ##class(%IPM.Utils.PydanticModelAdaptor).%New(pyType) Set object.id = "1234", object.name = "John Doe", object.email = "john.doe@intersystems.com" Zwrite object.id, object.name, object.email // output // 1234 // "John Doe" // "john.doe@intersystems.com"
The above is achievable using the following. You can also implement %JSONExport, %Validate, etc.
Class %IPM.Utils.PydanticModelAdaptor Extends %RegisteredObject
{
/// Probably worthwhile to change the `PyModel` to something less likely to conflict with Pydantic field names
/// E.g. PyModel can be the type (aka class) object defined as
/// class UserModel(BaseModel):
/// id: int
/// name: str = Field(..., min_length=3)
/// email: Optional[str] = None
Property PyModel As %SYS.Python;
/// E.g.
/// PyFieldArray("id") = 1234
/// PyFieldArray("name") = "John Doe"
/// PyFieldEmail("email") = "john.doe@example.com" or unset
Property PyFieldArray As %String [ MultiDimensional ];
Method %OnNew(pyType As %SYS.Python) As %Status [ Private, ServerOnly = 1 ]
{
Set ..PyModel = pyType
Quit $$$OK
}
Method %DispatchGetProperty(Property As %String) [ ServerOnly = 1 ]
{
Set userFields = ..PyModel."model_fields"
Set fieldInfo = userFields.get(Property)
If '$IsObject(fieldInfo) {
$$$ThrowStatus($$$ERROR($$$GeneralError, "Unknown field name: "_Property))
}
Set fieldType = fieldInfo.annotation
If $Data(..PyFieldArray(Property), output) # 2 {
// TODO check the `fieldType`, raise error if not matching or violates min_length, max_length, etc.
Return output
}
$$$ThrowStatus($$$ERROR($$$GeneralError, "Field not set: "_Property))
}
Method %DispatchSetProperty(Property As %String, Val) [ ServerOnly = 1 ]
{
Set userFields = ..PyModel."model_fields"
Set fieldInfo = userFields.get(Property)
If '$IsObject(fieldInfo) {
$$$ThrowStatus($$$ERROR($$$GeneralError, "Unknown field name: "_Property))
}
Set fieldType = fieldInfo.annotation
// TODO check the `fieldType`, raise error if not matching or violates min_length, max_length, etc.
Set ..PyFieldArray(Property) = Val
}
ClassMethod ExamplePydanticModel() [ Language = python ]
{
from pydantic import BaseModel, Field
from typing import Optional
class UserModel(BaseModel):
id: int
name: str = Field(..., min_length=3)
email: Optional[str] = None
return UserModel
}
}Neat use of Dynamic Dispatch! I was thinking something more like (note - this is very quick and dirty/WIP):
/// Generate a set of ObjectScript classes corresponding to Pydantic models defined in a given Python module./// /// Args:/// sourceModule: Path to the Python module containing Pydantic models./// targetPackage: Target package for generated ObjectScript classes./// baseClass: Base class for generated ObjectScript classes.////// Significant contributions by Windsurf / Claude 3.7 Sonnet (Thinking)/// That is to say, if it doesn't work, it's the AI's fault. (Plus mine for being bad at Python.)ClassMethod Generate(sourceModule = "mcp", targetPackage = "pkg.isc.mcp.types.test", baseClass = "pkg.isc.mcp.types.BaseModel") [ Language = python ]
{
import importlib
import inspect
import traceback
import sys
from pydantic import BaseModel
import iris
import datetime
from typing import Union, Literal
from types import NoneType, UnionType
from logging import getLogger
# Map complex type expressions to ObjectScript types
complex_type_map = {
'dict[str, typing.Any]': '%DynamicObject',
'list[typing.Any]': '%DynamicArray'
}
# Other complex expressions that should be flagged as required properties
complex_required_type_map = {
}
# Map Pydantic field types to ObjectScript types
type_map = {
'str': '%String',
'int': '%Integer',
'float': '%Float',
'bool': '%Boolean',
'datetime.datetime': '%TimeStamp',
'datetime.date': '%Date',
'dict': '%DynamicObject',
'list': '%DynamicArray'
}
def get_all_models(module_name):
models = []
processed_models = set() # Keep track of models we've seen to avoid duplicates
def find_models(module_name):
module = importlib.import_module(module_name)
discovered = []
# Find all top-level models in this module
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel:
if obj.__name__ not in processed_models:
discovered.append(obj)
processed_models.add(obj.__name__)
return discovered
# First find all top-level models in the specified module
module = importlib.import_module(module_name)
top_models = []
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel:
top_models.append(obj)
processed_models.add(obj.__name__)
models.extend(top_models)
# Now recursively find all referenced models
i = 0while i < len(models):
current_model = models[i]
i += 1
# Check each field for model references
for field_name, field_info in current_model.__fields__.items():
annotation = field_info.annotation
referenced_models = find_referenced_models(annotation)
for model in referenced_models:
if model.__name__ not in processed_models:
models.append(model)
processed_models.add(model.__name__)
print(f"Added referenced model: {model.__name__}")
return models
def process_model(targetPackage, model):
# Format class name with package prefix
class_name = f"{targetPackage}.{model.__name__}"
# Check ifclass already exists
cls_def = iris.cls('%Dictionary.ClassDefinition')._OpenId(class_name)
if cls_def != "":
print(f"Updating existing class: {class_name}")
else:
# Create newclass definition
cls_def = iris.cls('%Dictionary.ClassDefinition')._New()
cls_def.Name = class_name
print(f"Creating new class: {class_name}")
cls_def.Super = baseClass
cls_def.ProcedureBlock = 1
# Add parameter to indicate this is an auto-generated class
cls_def.Parameters.Clear()
auto_gen_param = iris.cls('%Dictionary.ParameterDefinition')._New()
auto_gen_param.Name = "AUTOGENERATED"
auto_gen_param.Default = "1"
auto_gen_param.parent = cls_def
# Clear existing properties - always start from a clean slate
cls_def.Properties.Clear()
# Process model fields to create properties
for field_name, field_info in model.__fields__.items():
# Skip fields that start with underscore
if field_name.startswith('_'):
continue
# Simplify property checking - create it fresh
# The _Save() call will handle merging if it's already defined
prop = iris.cls('%Dictionary.PropertyDefinition')._New()
prop.Name = field_name
prop.parent = cls_def
print(f"Processing field: {field_name}: {field_info.annotation}")
annotation = field_info.annotation
(os_type, collection_type, required) = process_annotation(annotation)
print(f"\tType: {os_type}, Collection type: {collection_type}")
prop.Type = os_type
prop.Collection = collection_type
prop.Required = 1if required else0
# Save the class definition
sc = cls_def._Save()
if not iris.cls('%SYSTEM.Status').IsOK(sc):
print(f"Error saving class {class_name}: {iris.cls('%SYSTEM.Status').GetErrorText(sc)}")
def process_annotation(annotation, topLevel = True):
# Set up logger once
logger = getLogger("Generator")
logger.setLevel("DEBUG")
os_type = ''
collection_type = ''
required = True
logger.debug(f"Processing annotation: {annotation}")
if complex_type_map.__contains__(str(annotation)):
os_type = complex_type_map[str(annotation)]
return (os_type, collection_type, False)
if complex_required_type_map.__contains__(str(annotation)):
os_type = complex_required_type_map[str(annotation)]
return (os_type, collection_type, True)
# Check if it's a Union type (Python 3.10+ pipe syntax)
if isinstance(annotation, UnionType):
union_types = annotation.__args__
logger.debug(f"Native union type with args: {union_types}")
# Check if it's an Optional (Union with NoneType)
if (type(None) in union_types) or (NoneType in union_types):
# Get the actual type (filter out None)
actual_type = next(arg for arg in union_types if arg is not type(None) and arg is not NoneType)
logger.debug(f"Optional type detected: {actual_type}")
(os_type, collection_type, required) = process_annotation(actual_type)
required = False
else:
# For regular union types, use a strategy that picks the most flexible type
logger.debug(f"Processing union with multiple types")
# Default to using the last type in the union
for type_arg in union_types:
(os_type, collection_type, required) = process_annotation(type_arg, False)
# Handle typing.Union
elif hasattr(annotation, "__origin__") and annotation.__origin__ is Union:
union_types = annotation.__args__
logger.debug(f"typing.Union with args: {union_types}")
# Check if it's an Optional (Union with NoneType)
if (type(None) in union_types) or (NoneType in union_types):
# Get the actual type (filter out None)
actual_type = next(arg for arg in union_types if arg is not type(None) and arg is not NoneType)
logger.debug(f"Optional type detected: {actual_type}")
(os_type, collection_type, required) = process_annotation(actual_type)
required = False
else:
# For regular union types, use the same strategy as above
logger.debug(f"Processing union with multiple types")
for type_arg in union_types:
(os_type, collection_type, required) = process_annotation(type_arg, False)
# Handle container types (List, Dict, etc.)
elif hasattr(annotation, "__origin__"):
container_type = annotation.__origin__
# Handle Literal separately
if container_type is Literal:
logger.debug(f"Literal type: {annotation}")
os_type = '%String'
elif topLevel == False:
# For nested complex types, just fall back to %DynamicArray/%DynamicObject
os_type = type_map.get(annotation.__name__, '%DynamicObject')
else:
type_args = annotation.__args__
logger.debug(f"Container type: {container_type} with args: {type_args}")
# For List[str], type_args would be (str,)
# For Dict[str, int], type_args would be (str, int)
if len(type_args) == 1:
# For a single type, it's a collection
(os_type, collection_type, required) = process_annotation(type_args[0], False)
collection_type = "list"
logger.debug(f"List type with element type: {os_type}")
elif len(type_args) == 2:
# For a key-value pair, it's a dictionary
(os_type, collection_type, required) = process_annotation(type_args[1], False)
collection_type = "array"
logger.debug(f"Dictionary type with value type: {os_type}")
# Handle types with a __name__ attribute (basic types)
elif hasattr(annotation, "__name__"):
type_name = annotation.__name__
os_type = type_map.get(type_name, '%String')
logger.debug(f"Named type: {type_name} -> {os_type}")
# Handle any other types
else:
os_type = type_map.get(str(annotation), '%String')
logger.debug(f"Other type: {annotation} -> {os_type}")
logger.debug(f"Final mapping: {os_type}, collection: {collection_type}, required: {required}")
return (os_type, collection_type, required)
def find_referenced_models(annotation):
"""Find all Pydantic models referenced in this type annotation."""
result = []
# Direct model reference
if inspect.isclass(annotation) and issubclass(annotation, BaseModel) and annotation != BaseModel:
result.append(annotation)
# Check for container types (Union, List, etc.)
elif hasattr(annotation, "__origin__"):
# For Union types, check each argument
if annotation.__origin__ is Union:
for arg in annotation.__args__:
result.extend(find_referenced_models(arg))
# For container types like List, Dict
elif hasattr(annotation, "__args__"):
for arg in annotation.__args__:
result.extend(find_referenced_models(arg))
return result
try:
# Find all Pydantic models in the module
models = get_all_models(sourceModule)
# Add referenced classes to type_map
for model in models:
# Format class name with package prefix
class_name = f"{targetPackage}.{model.__name__}"
type_map[model.__name__] = class_name
print(models);
# Process each model
for model in models:
print(f"\r\n")
process_model(targetPackage, model)
# Compile the whole package
status = iris.cls('%SYSTEM.OBJ').CompilePackage(targetPackage, 'ck')
if not iris.cls('%SYSTEM.Status').IsOK(status):
print(f"Error compiling package {targetPackage}: {iris.cls('%SYSTEM.Status').GetErrorText(status)}")
# Return success
return1
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
print("Exception caught in Generator.Generate:")
print(''.join(lines))
print(f"Error details: {str(e)}")
return0
}There's still a TON of nuances to deal with here, but it's a start at least...
I was thinking about something like that too but wasn't sure how to dynamically construct an objectscript class.
Hummm, interesting idea, but I think there is some missing context here.
First about objects by themselves. We have Embedded Python that already bridge/bind Python objects to ObjectScript objects. So try to cast a Python object to a %RegisteredObject may not be the optimal way to go. The Embedded Python is already doing that for you.
Second, about Pydantic/ORM. The end goal of this idea is to persist the Pydantic model to a database, right? There are many ways to do that, I would prefer to stick to the 'pythonic' way of doing things. So, if you want to persist a Pydantic model, I would suggest using SQLAlchemy or SQLModel. They are both great libraries for ORM in Python and have a lot of features that make it easy to work with databases.
Now, if your second goal is to be able to leverage DTL for Python objects, then I would suggest to use an Vdoc approach. You can find a POC here : https://grongierisc.github.io/interoperability-embedded-python/dtl/
In a nutshell, don't try to bind python way of doing things to ObjectScript. Use the best of both worlds. Use Python for what it is good at and use ObjectScript for what it is good at.
Thank you! This is a really helpful perspective.
Ultimately I'm looking at both persistence and DTL.