Written by

Distinguished Contractor at Integration Required, LLC
Article sween · May 14 7m read

OMOP Odyssey - GCP Healthcare API Real Time FHIR® to OMOP Transformation ( RealTymus )

Real Time FHIR® to OMOP Transformation

This part of the OMOP Journey,  we reflect before attempting to challenge Scylla on how fortunate we are that InterSystems OMOP transform is built on the Bulk FHIR Export as the source payload.  This opens up hands off interoperability with the InterSystems OMOP transform across several FHIR® vendors, this time with the Google Cloud Healthcare API.

Google Cloud Healthcare API FHIR® Export

GCP FHIR® Datastores support bulk fhir import/export from the cli or api, the premise is simple and the docs are over exhaustive, we'll save a model the trouble of training on it again and link it if interested.  The more valuable thing to understand of the heading of this paragraph is the implementation of the bulk fhir export standard itself.

Important differentiators with Google's implementation of the FHIR® Export are namely, Resource Change Notification via Pub/Sub and the ability to specify incremental exports.

Real Time? ⏲

Yes! Ill die on this sword I guess.  Its not only my rap handle, but the mechanics are definitely there to back a good technical argument to be able to say...

"As a new Organization gets created to FHIR, we transform it, and add it to the InterSystems OMOP CDM in the same stroke as a care_site/location."

Walkthrough

Trying to make this short and to the point and encapsulates how a pub/sub notification coupled with a cloud function can glue these two solutions together and automate your OMOP ingestion at a granular level.

Step One: Wire Up InterSystems OMOP to AWS Bucket

This step is becoming a repetitive in posts in this community, so I will go warp speed through the steps.

  • Procure AWS S3 Bucket
  • Launch InterSystems OMOP, Add Bucket Configuration
  • Eject Policy from InterSystems OMOP Deployment
  • Apply Policy to the AWS S3 Bucket

 

I dunno, the steps and image seemed to work out better in my head, but maybe not.  Here are the docs and here is a more in depth way to get this taken care of in this series with better examples.

Step Two: Add Pub/Sub Target in Google Cloud Healthcare API

As mentioned previous, a foundational piece to making this work is the super great feature that notifies on Resource changes in the data store.  You will find this option on setup in the dialog and is also available post configuration.  I typically like to check both options to have as much data in the notification as possible to play with.  For instance with Deletes, you can include the deleted resource in the notification as well, really great for EMPI solutions.

Step Three: Cloud Function ⭐

The cloud function puts in the work, and the SOW for that looks a little bit like this.

Listen for FHIR resource change pub/sub notifications of type Organization on the create method, and export the data store incrementally from the time the event fired.  Since the export function only supports a GCS target, read in the created export and create fhir export zip file that zips the ndjson files into the root of the zip file and push the created zip file to an aws bucket. 

Re-stating the second feature that makes this especially great, is the ability to export from an specific date and time, meaning we do not need to export the entire dataset.  For this we will use the time we received the event, tack a minute or so on it, in hopes the export, import and transform steps will be smaller and of course, more timely.

 

realtimefhir2omop.py

import os, io, json, base64, time, zipfile, datetime
import requests, boto3
from google.cloud import storage
from google.auth.transport.requests import Request
import google.auth
from google.auth.transport.requests import AuthorizedSession
import base64
import functions_framework
import pathlib
import textwrap
import json
from datetime import datetime, timedelta, timezone



# Config
PROJECT_ID = "pidtoo-fhir"
LOCATION = "us-east4"
DATASET_ID = "isc"
FHIR_STORE_ID = "fhir-omop"
GCS_EXPORT_BUCKET = "fhir-export-bucket"
AWS_BUCKET = "intersystems-fhir2omop"
AWS_REGION = "us-east-2"# Trigger FHIR exportdeftrigger_incremental_export(export_time_iso):
    client = storage.Client()
    bucket = client.bucket("fhir-export-bucket")

    blobs = bucket.list_blobs()
    for blob in blobs:
        print(f"Deleting: {blob.name}")
        blob.delete()
    
    credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)

    export_uri = f"gs://{GCS_EXPORT_BUCKET}/fhir-export-{int(time.time())}/"
    export_uri = f"gs://{GCS_EXPORT_BUCKET}/"
    url = (
        f"https://healthcare.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/"f"datasets/{DATASET_ID}/fhirStores/{FHIR_STORE_ID}:export"
    )

    body = {
        "gcsDestination": {"uriPrefix": export_uri},
        "since": export_time_iso
    }

    response = authed_session.post(url, json=body)
    print(f"Export response: {response.status_code} - {response.text}")
    return export_uri if response.ok elseNone# Poll GCS for export resultsdefwait_for_ndjson_files(export_uri_prefix):
    client = storage.Client()
    bucket_name = export_uri_prefix.split("/")[2]
    prefix = "/".join(export_uri_prefix.split("/")[3:])
    print(bucket_name)
    print(prefix)

    bucket = client.bucket(bucket_name)
    for _ in range(20):  # Wait up to ~5 mins
        blobs = list(bucket.list_blobs(prefix=prefix))
        if any(blob.name.endswith("Organization") for blob in blobs):
            return [blob for blob in blobs if blob.name.endswith("Organization")]
        time.sleep(5)
    raise TimeoutError("Export files did not appear in GCS within timeout window")

# Zip .ndjsons into flat ZIP filedefcreate_zip_from_blobs(blobs, zip_path):
    client = storage.Client()
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for blob in blobs:
            data = blob.download_as_bytes()
            fname = os.path.basename(blob.name)
            zipf.writestr(fname + ".ndjson", data)

# Upload ZIP to AWS S3defupload_to_s3(zip_path, s3_key):
    s3 = boto3.client('s3', region_name=AWS_REGION)
    s3.upload_file(zip_path, AWS_BUCKET, "from_gcp_to_omop" + s3_key)
    print(f"Uploaded {zip_path} to s3://{AWS_BUCKET}/from_gcp_to_omop/{s3_key}")


#@functions_framework.cloud_event#def mit_grandhack(cloud_event):# Print out the data from Pub/Sub, to prove that it worked#    print(base64.b64decode(cloud_event.data["message"]["data"]))#    question = base64.b64decode(cloud_event.data["message"]["data"]).decode()@functions_framework.cloud_eventdefreceive_pubsub(cloud_event):#envelope = request.get_json()
    print(cloud_event)
    data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
    data = cloud_event.data
    print(data)
    print(type(data))
    ifnot data:
        return"No data", 400#payload = data # json.loads(data)#method = payload.get("protoPayload", {}).get("methodName", "")
    method = data['message']['attributes']['action']
    #resource_name = payload.get("protoPayload", {}).get("resourceName", "")
    resource_name = data['message']['attributes']['resourceType']
    #timestamp = payload.get("timestamp", "")
    timestamp = data['message']['publishTime']
    # Input datetime string# Parse the string to a datetime object
    dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)

    # Subtract 5 minutes
    five_minutes_ago = dt - timedelta(minutes=5)

    # Convert back to ISO 8601 string format with 'Z'
    timestamp = five_minutes_ago.isoformat().replace('+00:00', 'Z')

    print(method)
    print(resource_name)
    print(timestamp)

    if"CreateResource"in method and"Organization"in resource_name:
        print(f"New Organization detected at {timestamp}")
        export_uri = trigger_incremental_export(timestamp)
        ifnot export_uri:
            return"Export failed", 500
        blobs = wait_for_ndjson_files(export_uri)
        zip_file_path = "/tmp/fhir_export.zip"
        create_zip_from_blobs(blobs, zip_file_path)
        s3_key = f"/export-{int(time.time())}.zip"
        upload_to_s3(zip_file_path, s3_key)
        return"Exported and uploaded", 200return"No relevant event", 204

 

Step Four: What is Happening right now? 🔥

To split what is going on, lets inspect the real time processing with some screenshots at each point.

FHIR Organization Created

Pub/Sub Event is Published

 

Pub/Sub FHIR Event

{'attributes': {'specversion': '1.0', 'id': '13999883936448345', 'source': '//pubsub.googleapis.com/projects/pidtoo-fhir/topics/fhir-omop-topic', 'type': 'google.cloud.pubsub.topic.v1.messagePublished', 'datacontenttype': 'application/json', 'time': '2025-05-13T20:13:20.339Z'}, 'data': {'message': {'attributes': {'action': 'CreateResource', 'lastUpdatedTime': 'Tue, 13 May 2025 20:13:20 UTC', 'payloadType': 'FullResource', 'resourceType': 'Organization', 'storeName': 'projects/pidtoo-fhir/locations/us-east4/datasets/isc/fhirStores/fhir-omop', 'versionId': 'MTc0NzE2NzIwMDEwNzczODAwMA'}, 'data': 'ewogICJhZGRyZXNzIjogWwogICAgewogICAgICAiY2l0eSI6IC', 'messageId': '13999883936448345', 'message_id': '13999883936448345', 'publishTime': '2025-05-13T20:13:20.339Z', 'publish_time': '2025-05-13T20:13:20.339Z'}, 'subscription': 'projects/pidtoo-fhir/subscriptions/eventarc-us-east4-fhir2omop-trigger-sub-855'}}

 

Cloud Function Receives Resource Event from Subscription

Cloud Function Exports the FHIR Store GCS

Cloud Function Creates ZIP from GCS and Pushes to AWS

InterSystems OMOP Transforms FHIR to OMOP

Organization Available as Care Site in CDM

When did that FHIR Resource get transformed to the CDM ?
YARN | Now. You're looking at now. Everything that happens now is happening  now. | Spaceballs (1987) | Video gifs by quotes | 1606b976 | 紗

Step Four: Validation Fun ✔

Fun with OBS and Not so Much fun with Audio

 

In Conclusion
 

Did something similar last year at MIT Grand Hack, using the same design pattern, but with Questionairre/Response resource and Gemini in the middle of things.
Gemini FHIR Agent MIT Grand Hack

Comments