top of page

Portfolio Holdings Data: Document Database Load

Updated: Jun 24

The load process for the documents is reasonably simple. It consists of a single Python script which reads a targeted number of JSON documents from the input location, and runs a number of functions to extract data that map to the buckets, scopes, and collections that I had shown in the previous posts. Below is a recent XML to JSON conversion run:

XML to JSON Conversion Execution
XML to JSON Conversion Execution
Converted JSON Files
Converted JSON Files
JSON Conversion Status Records from PostgreSQL
JSON Conversion Status Records from PostgreSQL

The "filing_ref" and "filing_ref_status" tables in PostgreSQL are used in this process as well tracking which files require loading, and the status of the collection load. The load is non-transactional, but failed loads can be restarted on a document. The script and load outputs are contained below.

Load Python Script

import json
import os
import time
import uuid
from datetime import timedelta
from typing import Dict, List

from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, ClusterTimeoutOptions, QueryOptions

import env.creds as creds
import env.uri as uri
from connect_util import getconn

endpoint = uri.couchbase_server

auth = PasswordAuthenticator(creds.CDC_USER, creds.CDC_PASS)

timeout_opts = ClusterTimeoutOptions(
    kv_timeout=timedelta(seconds=10),
    connect_timeout=timedelta(seconds=10),
    query_timeout=timedelta(seconds=120),
)

cluster = Cluster(
    "couchbase://{}".format(endpoint),
    ClusterOptions(auth, timeout_options=timeout_opts),
)

bucket = cluster.bucket("us-fund-filings")
collection = bucket.default_collection()

# Get DB connection
conn, conn_r = getconn()

def update_database(ref_sid):
    sql_update = (
        f"update inbd_raw.filing_ref_status "
        f"set data_persisted_ind = true, "
        f"record_ts = current_timestamp, data_persisted_ts = current_timestamp "
        f"where filing_ref_sid = {ref_sid}"
    )
    cur = conn_r.cursor()
    upd_stat = cur.execute(sql_update)
    conn_r.commit()
    return upd_stat

def get_query_results(filed_dt: str, filed_cik: str, filing_doc: str) -> List[Dict]:
    sql_query_select = (
        f"select fr.filing_ref_sid as filing_ref_sid "
        f"from inbd_raw.filing_ref fr join inbd_raw.filing_ref_status frs on frs.filing_ref_sid = fr.filing_ref_sid "
        f"where ((fr.form_type = 'NPORT-P' and frs.data_persisted_ind = false) and (fr.cik = '{filed_cik}' "
        f"and fr.date_filed = {filed_dt} and fr.file_name like '%{filing_doc}%'))"
    )
    cur = conn_r.cursor()
    cur.execute(sql_query_select)
    cur_data_val = cur.fetchone()
    if cur_data_val:
       return_val = cur_data_val[0]
       return return_val
    else:
        return None

def gen_uuid():
    return str(uuid.uuid4())

def insert_into_collection(
    bucket_scope_str: str, collection_str: str, data_dict: Dict | List
):
    bucket_scope = bucket.scope(bucket_scope_str)
    collection = bucket_scope.collection(collection_str)
    collection.insert(gen_uuid(), data_dict)

def fund_info_doc(
    fund_info: Dict = None,
    reg_cik: str = None,
    series_name: str = None,
    rpt_period_end: str = None,
    rpt_authorized_dt: str = None,
) -> int:
    if fund_info:
        fund_info_dict = dict(replace_new(fund_info, "@", ""))
        fund_info_dict.update(
            {
                "CIK": reg_cik,
                "SeriesName": series_name,
                "PeriodEndDate": rpt_period_end,
                "ReportAuthorizedDate": rpt_authorized_dt,
            }
        )
        insert_into_collection("AssetManagers", "FundParents", fund_info_dict)
    return 0

def doc_exist(
    bucket: str,
    scope: str,
    collection: str,
    reg_cik: str = None,
    reg_lei: str = None,
    series_lei: str = None,
    period_end_dt: str = None,
) -> str | None:
    global parm_val, col

    if reg_cik:
        col = "CIK"
        parm_val = reg_cik
    elif reg_lei:
        col = "ParentLEI"
        parm_val = reg_lei
    elif series_lei:
        col = "SeriesLEI"
        parm_val = series_lei

    result_count = cluster.query(
        f"select count(meta().id) from `{bucket}`.`{scope}`.`{collection}` WHERE {col}=$1 AND PeriodEndDate=$2",
        QueryOptions(positional_parameters=[parm_val, period_end_dt]),
    )

    for num in result_count.rows():
        r_count = num.get("$1")
        if r_count == 0:
            rec_count = r_count
            return rec_count

        else:
            result_uuid = cluster.query(
                f"select meta().id from `{bucket}`.`{scope}`.`{collection}` WHERE {col}=$1 AND PeriodEndDate=$2",
                QueryOptions(positional_parameters=[parm_val, i_rpt_period_end]),
            )
            for row in result_uuid.rows():
                meta_id = row.get("id")
                if meta_id is not None:
                    doc_id = meta_id
                else:
                    doc_id = None

                    return doc_id

def replace_new(data, match, repl) -> List:
    if isinstance(data, Dict) or isinstance(data, List):
        temp_json = json.dumps(data)
        temp_json_new = ""
        for line in temp_json.replace(match, repl):
            temp_json_new = temp_json_new + line
        return json.loads(temp_json_new)

def get_fund_file_list(target_dir: str = None, target_size: int = None) -> List:
    if os.path.exists(target_dir):
        target_files: List = []
        for root, dirs, files in os.walk(target_dir):
            for file in files:
                # Set utime to current time
                file_path = root + "/" + file
                file_path = str(file_path).replace("\\", "/")
                target_files.append(file_path)
                target_files_size = len(target_files)
                if target_files_size == target_size:
                    return target_files
    else:
        print("Path does not exists: " + target_dir)
        exit(1)

def fund_gen_info(
    gen_info: Dict = None,
    reg_lei=None,
    series_name=None,
    rpt_period_end=None,
    rpt_authorized_dt=None,
) -> int:
    if gen_info:
        gen_info_dict = dict(replace_new(gen_info, "@", ""))
        gen_info_dict.update(
            {
                "ParentLEI": reg_lei,
                "SeriesName": series_name,
                "PeriodEndDate": rpt_period_end,
                "ReportAuthorizedDate": rpt_authorized_dt,
            }
        )
        insert_into_collection("Funds", "GeneralInformation", gen_info_dict)

    return 0

def fund_return_info(
    return_info: Dict = None,
    reg_lei=None,
    series_name=None,
    rpt_period_end=None,
    rpt_authorized_dt=None,
) -> int:
    if return_info and isinstance(return_info, Dict):
        return_info_dict = dict(replace_new(return_info, "@", ""))
        return_info_dict.update(
            {
                "ParentLEI": reg_lei,
                "SeriesName": series_name,
                "PeriodEndDate": rpt_period_end,
                "ReportAuthorizedDate": rpt_authorized_dt,
            }
        )
        insert_into_collection("FundReturns", "MonthlyReturns", return_info_dict)

    return 0

def inv_sec_info(
    inv_sec_dict: Dict = None,
    ser_lei=None,
    series_name=None,
    rpt_period_end=None,
    rpt_authorized_dt=None,
) -> int:
    if i_inv_sec and isinstance(i_inv_sec, dict):
        inv_sec_dict.update(
            {
                "SeriesLEI": ser_lei,
                "SeriesName": series_name,
                "PeriodEndDate": rpt_period_end,
                "ReportAuthorizedDate": rpt_authorized_dt,
            }
        )
        insert_into_collection("Portfolios", "Investments", inv_sec_dict)

    return 0

def curr_metrics(
    cur_metric=None,
    ser_lei=None,
    series_name=None,
    rpt_period_end=None,
    rpt_authorized_dt=None,
) -> int:
    if cur_metric:
        if isinstance(cur_metric, List):
            cur_metric_list = replace_new(cur_metric, "@", "")
            cur_metric_list.append(
                {
                    "SeriesLEI": ser_lei,
                    "SeriesName": series_name,
                    "PeriodEndDate": rpt_period_end,
                    "ReportAuthorizedDate": rpt_authorized_dt,
                }
            )
            insert_into_collection("Funds", "CurrencyMetrics", cur_metric_list)

        elif isinstance(cur_metric, Dict):
            cur_metric_dict = dict(replace_new(cur_metric, "@", ""))
            cur_metric_dict.update(
                {
                    "SeriesLEI": ser_lei,
                    "SeriesName": series_name,
                    "PeriodEndDate": rpt_period_end,
                }
            )
            insert_into_collection("Funds", "CurrencyMetrics", cur_metric_dict)

    return 0

def credit_risk(
    credit_risk_inv_grade=None,
    credit_risk_noninv_grade=None,
    ser_lei=None,
    series_name=None,
    rpt_period_end=None,
    rpt_authorized_dt=None,
) -> int:
    if credit_risk_inv_grade:
        credit_risk_inv_grade_dict = dict(replace_new(credit_risk_inv_grade, "@", ""))
        credit_risk_inv_grade_dict.update(
            {
                "SeriesLEI": ser_lei,
                "SeriesName": series_name,
                "PeriodEndDate": rpt_period_end,
                "ReportAuthorizedDate": rpt_authorized_dt,
            }
        )
        insert_into_collection("Funds", "CreditRisk", credit_risk_inv_grade_dict)

    if credit_risk_noninv_grade:
        credit_risk_noninv_grade_dict = dict(
            replace_new(credit_risk_noninv_grade, "@", "")
        )
        credit_risk_noninv_grade_dict.update(
            {
                "SeriesLEI": ser_lei,
                "SeriesName": series_name,
                "PeriodEndDate": rpt_period_end,
                "ReportAuthorizedDate": rpt_authorized_dt,
            }
        )
        insert_into_collection("Funds", "CreditRisk", credit_risk_inv_grade_dict)

    return 0

def monthly_returns(
    monthly_return_cats: Dict = None,
    ser_lei=None,
    series_name=None,
    rpt_period_end=None,
    rpt_authorized_dt=None,
) -> int:
    if monthly_return_cats:
        monthly_return_cats_dict = dict(replace_new(monthly_return_cats, "@", ""))
        monthly_return_cats_dict.update(
            {
                "SeriesLEI": ser_lei,
                "SeriesName": series_name,
                "PeriodEndDate": rpt_period_end,
                "ReportAuthorizedDate": rpt_authorized_dt,
            }
        )
        insert_into_collection(
            "FundReturns", "ReturnCategories", monthly_return_cats_dict
        )
    return 0


if __name__ == "__main__":
    try:
        files = get_fund_file_list("/Volumes/ExtShield/SecFundFilings/converted_files", 1000)

        for file in files:
            print(file)
            filing = file.split("/")[-1]
            # break up file name
            filing_doc = filing.split("-")[-2]
            size = len(filing_doc)
            suff = size - 6
            dt_st = size - 8
            initial = dt_st
            file_cik = filing_doc[0:initial]
            file_yr = filing_doc[dt_st:suff]
            filing_suff = filing_doc[suff:size]
            file_str = f"{file_cik}-{file_yr}-{filing_suff}"

            cik_val = filing.split("-")[-3]
            filing_yr = filing.split("-")[0]
            filing_mo = filing.split("-")[1]
            filing_dy = filing.split("-")[2]
            filing_dt = f"'{filing_yr}-{filing_mo}-{filing_dy}'"
            time.sleep(0.25)

            filing_ref_sid = get_query_results(filing_dt, cik_val, file_str)
            if filing_ref_sid:
                f = open(file, "rb")
                json_data = json.load(f)
                i_submission = json_data.get("edgarSubmission")
                i_formData = i_submission.get("formData")
                i_headerData = i_submission.get("headerData")
                i_sigData = i_formData.get("signature")

                i_fund_info = i_formData.get("fundInfo")
                i_return_info = i_fund_info.get("returnInfo")
                i_monthly_fund_returns = i_return_info.get("monthlyTotReturns")
                i_monthly_return_cats = i_return_info.get("monthlyReturnCats")
                i_cur_metrics = i_fund_info.get("curMetrics")

                i_credit_risk_inv_grade = i_fund_info.get("creditSprdRiskInvstGrade")
                i_credit_risk_noninv_grade = i_fund_info.get("creditSprdRiskNonInvstGrade")

                i_inv_sec = i_formData.get("invstOrSecs")

                i_gen_info = i_formData.get("genInfo")
                i_series_name = i_gen_info.get("seriesName")
                i_series_name = i_series_name.replace(" ", "-")
                i_file_num = i_gen_info.get("regFileNumber")
                i_reg_cik = i_gen_info.get("regCik")
                i_reg_lei = i_gen_info.get("regLei")
                i_ser_lei = i_gen_info.get("seriesLei")
                i_rpt_close_dt = i_gen_info.get("repPdEnd")
                i_rpt_period_end = i_gen_info.get("repPdDate")
                i_rpt_authorized_dt = i_sigData.get("ncom:dateSigned")

                if i_cur_metrics:
                    i_cur_metric = i_cur_metrics.get("curMetric")
                    curr_metrics(i_cur_metric, i_ser_lei, i_series_name, i_rpt_period_end)

                fund_info_doc(
                    i_fund_info, i_reg_cik, i_series_name, i_rpt_period_end, i_rpt_authorized_dt
                )
                fund_gen_info(
                    i_gen_info, i_reg_lei, i_series_name, i_rpt_period_end, i_rpt_authorized_dt
                )
                fund_return_info(
                    i_monthly_fund_returns,
                    i_reg_lei,
                    i_series_name,
                    i_rpt_period_end,
                    i_rpt_authorized_dt,
                )
                monthly_returns(
                    i_monthly_return_cats,
                    i_ser_lei,
                    i_series_name,
                    i_rpt_period_end,
                    i_rpt_authorized_dt,
                )

                if i_inv_sec:
                    invest_sec = i_inv_sec.get("invstOrSec")
                    invest_sec_list = replace_new(invest_sec, "@", "")
                    for inv in invest_sec_list:
                        inv_sec_info(
                            inv, i_ser_lei, i_series_name, i_rpt_period_end, i_rpt_authorized_dt
                        )

                if i_credit_risk_inv_grade and i_credit_risk_noninv_grade:
                    credit_risk(
                        i_credit_risk_inv_grade,
                        i_credit_risk_noninv_grade,
                        i_ser_lei,
                        i_series_name,
                        i_rpt_period_end,
                        i_rpt_authorized_dt,
                    )
                # update postgres that data is written
                update_database(filing_ref_sid)
            else:
                print("No filing found for: " + file + " may already be in database.")
                continue
    except Exception as e:
        if 'AttributeError' in str(e):
            print("Filing has no positions to process")
            exit(1)

Load Processing Status

Output of Load Script
Output of Load Script
Status Check Query Output from PostgreSQL
Status Check Query Output from PostgreSQL
Index Updates on Couchbase
Index Updates on Couchbase

One thing to take note on the content of the load process, is that I do not process wrappers for funds that have no holdings. Basically I only take funds that have holdings, and if the wrapper does not list the funds its wrapping in holdings, it's not loaded. There are not that many of them, but they pop up here and there.


I am running load processing today(2025-04-11), so the timestamps in some of the screenshots are from the last few days.


On the target data warehouse side, which receives the loading documents. The configuration is a two node Couchbase cluster configuration with one of the nodes only handling queries and indexes. The larger node(10.0.0.46) is a refurbished 8-Core with 128GB of RAM and an internal SSD.

Couchbase Node Configuration
Couchbase Node Configuration

The Couchbase built-in dashboard also provides a cluster and per node view(below), indexing offers the a similar display.

Couchbase Cluster Dashboard
Couchbase Cluster Dashboard

The load process is usually fairly smooth going for the most part, usually failures are caused because the filing doesn't have any holdings listed. There is also the occasional host timeout. Currently I have loaded filings starting in 2018 though 2024(partial), plus the updates I am processing now. That totals a little over 100 million documents currently:

Couchbase Bucket of SEC Filings for Portfolios
Couchbase Bucket of SEC Filings for Portfolios
Scopes and Collections of us-fund-filings Couchbase Bucket
Scopes and Collections of us-fund-filings Couchbase Bucket

The load process, as I mentioned in a previous post, splits a single SEC filing into multiple collections that I have displayed previously, and below. I separate some asset manager specific data of the parent company, as well as fund returns by category and monthly returns. The funds collection contains general information about the fund, as well as credit and currency risk metrics. The schemas of each collection mirror how I sliced the filing for the categories. Entire portfolio filings are found in Investments.

Couchbase Collections
Couchbase Collections

In the next post, I will demonstrate what information you can find from the loaded documents.

bottom of page