Portfolio Holdings Data: Document Database Load
- Claude Paugh
- Apr 11
- 6 min read
Updated: Jun 24
The load process for the documents is reasonably simple. It consists of a single Python script which reads a targeted number of JSON documents from the input location, and runs a number of functions to extract data that map to the buckets, scopes, and collections that I had shown in the previous posts. Below is a recent XML to JSON conversion run:



The "filing_ref" and "filing_ref_status" tables in PostgreSQL are used in this process as well tracking which files require loading, and the status of the collection load. The load is non-transactional, but failed loads can be restarted on a document. The script and load outputs are contained below.
Load Python Script
import json
import os
import time
import uuid
from datetime import timedelta
from typing import Dict, List
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, ClusterTimeoutOptions, QueryOptions
import env.creds as creds
import env.uri as uri
from connect_util import getconn
endpoint = uri.couchbase_server
auth = PasswordAuthenticator(creds.CDC_USER, creds.CDC_PASS)
timeout_opts = ClusterTimeoutOptions(
kv_timeout=timedelta(seconds=10),
connect_timeout=timedelta(seconds=10),
query_timeout=timedelta(seconds=120),
)
cluster = Cluster(
"couchbase://{}".format(endpoint),
ClusterOptions(auth, timeout_options=timeout_opts),
)
bucket = cluster.bucket("us-fund-filings")
collection = bucket.default_collection()
# Get DB connection
conn, conn_r = getconn()
def update_database(ref_sid):
sql_update = (
f"update inbd_raw.filing_ref_status "
f"set data_persisted_ind = true, "
f"record_ts = current_timestamp, data_persisted_ts = current_timestamp "
f"where filing_ref_sid = {ref_sid}"
)
cur = conn_r.cursor()
upd_stat = cur.execute(sql_update)
conn_r.commit()
return upd_stat
def get_query_results(filed_dt: str, filed_cik: str, filing_doc: str) -> List[Dict]:
sql_query_select = (
f"select fr.filing_ref_sid as filing_ref_sid "
f"from inbd_raw.filing_ref fr join inbd_raw.filing_ref_status frs on frs.filing_ref_sid = fr.filing_ref_sid "
f"where ((fr.form_type = 'NPORT-P' and frs.data_persisted_ind = false) and (fr.cik = '{filed_cik}' "
f"and fr.date_filed = {filed_dt} and fr.file_name like '%{filing_doc}%'))"
)
cur = conn_r.cursor()
cur.execute(sql_query_select)
cur_data_val = cur.fetchone()
if cur_data_val:
return_val = cur_data_val[0]
return return_val
else:
return None
def gen_uuid():
return str(uuid.uuid4())
def insert_into_collection(
bucket_scope_str: str, collection_str: str, data_dict: Dict | List
):
bucket_scope = bucket.scope(bucket_scope_str)
collection = bucket_scope.collection(collection_str)
collection.insert(gen_uuid(), data_dict)
def fund_info_doc(
fund_info: Dict = None,
reg_cik: str = None,
series_name: str = None,
rpt_period_end: str = None,
rpt_authorized_dt: str = None,
) -> int:
if fund_info:
fund_info_dict = dict(replace_new(fund_info, "@", ""))
fund_info_dict.update(
{
"CIK": reg_cik,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
"ReportAuthorizedDate": rpt_authorized_dt,
}
)
insert_into_collection("AssetManagers", "FundParents", fund_info_dict)
return 0
def doc_exist(
bucket: str,
scope: str,
collection: str,
reg_cik: str = None,
reg_lei: str = None,
series_lei: str = None,
period_end_dt: str = None,
) -> str | None:
global parm_val, col
if reg_cik:
col = "CIK"
parm_val = reg_cik
elif reg_lei:
col = "ParentLEI"
parm_val = reg_lei
elif series_lei:
col = "SeriesLEI"
parm_val = series_lei
result_count = cluster.query(
f"select count(meta().id) from `{bucket}`.`{scope}`.`{collection}` WHERE {col}=$1 AND PeriodEndDate=$2",
QueryOptions(positional_parameters=[parm_val, period_end_dt]),
)
for num in result_count.rows():
r_count = num.get("$1")
if r_count == 0:
rec_count = r_count
return rec_count
else:
result_uuid = cluster.query(
f"select meta().id from `{bucket}`.`{scope}`.`{collection}` WHERE {col}=$1 AND PeriodEndDate=$2",
QueryOptions(positional_parameters=[parm_val, i_rpt_period_end]),
)
for row in result_uuid.rows():
meta_id = row.get("id")
if meta_id is not None:
doc_id = meta_id
else:
doc_id = None
return doc_id
def replace_new(data, match, repl) -> List:
if isinstance(data, Dict) or isinstance(data, List):
temp_json = json.dumps(data)
temp_json_new = ""
for line in temp_json.replace(match, repl):
temp_json_new = temp_json_new + line
return json.loads(temp_json_new)
def get_fund_file_list(target_dir: str = None, target_size: int = None) -> List:
if os.path.exists(target_dir):
target_files: List = []
for root, dirs, files in os.walk(target_dir):
for file in files:
# Set utime to current time
file_path = root + "/" + file
file_path = str(file_path).replace("\\", "/")
target_files.append(file_path)
target_files_size = len(target_files)
if target_files_size == target_size:
return target_files
else:
print("Path does not exists: " + target_dir)
exit(1)
def fund_gen_info(
gen_info: Dict = None,
reg_lei=None,
series_name=None,
rpt_period_end=None,
rpt_authorized_dt=None,
) -> int:
if gen_info:
gen_info_dict = dict(replace_new(gen_info, "@", ""))
gen_info_dict.update(
{
"ParentLEI": reg_lei,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
"ReportAuthorizedDate": rpt_authorized_dt,
}
)
insert_into_collection("Funds", "GeneralInformation", gen_info_dict)
return 0
def fund_return_info(
return_info: Dict = None,
reg_lei=None,
series_name=None,
rpt_period_end=None,
rpt_authorized_dt=None,
) -> int:
if return_info and isinstance(return_info, Dict):
return_info_dict = dict(replace_new(return_info, "@", ""))
return_info_dict.update(
{
"ParentLEI": reg_lei,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
"ReportAuthorizedDate": rpt_authorized_dt,
}
)
insert_into_collection("FundReturns", "MonthlyReturns", return_info_dict)
return 0
def inv_sec_info(
inv_sec_dict: Dict = None,
ser_lei=None,
series_name=None,
rpt_period_end=None,
rpt_authorized_dt=None,
) -> int:
if i_inv_sec and isinstance(i_inv_sec, dict):
inv_sec_dict.update(
{
"SeriesLEI": ser_lei,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
"ReportAuthorizedDate": rpt_authorized_dt,
}
)
insert_into_collection("Portfolios", "Investments", inv_sec_dict)
return 0
def curr_metrics(
cur_metric=None,
ser_lei=None,
series_name=None,
rpt_period_end=None,
rpt_authorized_dt=None,
) -> int:
if cur_metric:
if isinstance(cur_metric, List):
cur_metric_list = replace_new(cur_metric, "@", "")
cur_metric_list.append(
{
"SeriesLEI": ser_lei,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
"ReportAuthorizedDate": rpt_authorized_dt,
}
)
insert_into_collection("Funds", "CurrencyMetrics", cur_metric_list)
elif isinstance(cur_metric, Dict):
cur_metric_dict = dict(replace_new(cur_metric, "@", ""))
cur_metric_dict.update(
{
"SeriesLEI": ser_lei,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
}
)
insert_into_collection("Funds", "CurrencyMetrics", cur_metric_dict)
return 0
def credit_risk(
credit_risk_inv_grade=None,
credit_risk_noninv_grade=None,
ser_lei=None,
series_name=None,
rpt_period_end=None,
rpt_authorized_dt=None,
) -> int:
if credit_risk_inv_grade:
credit_risk_inv_grade_dict = dict(replace_new(credit_risk_inv_grade, "@", ""))
credit_risk_inv_grade_dict.update(
{
"SeriesLEI": ser_lei,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
"ReportAuthorizedDate": rpt_authorized_dt,
}
)
insert_into_collection("Funds", "CreditRisk", credit_risk_inv_grade_dict)
if credit_risk_noninv_grade:
credit_risk_noninv_grade_dict = dict(
replace_new(credit_risk_noninv_grade, "@", "")
)
credit_risk_noninv_grade_dict.update(
{
"SeriesLEI": ser_lei,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
"ReportAuthorizedDate": rpt_authorized_dt,
}
)
insert_into_collection("Funds", "CreditRisk", credit_risk_inv_grade_dict)
return 0
def monthly_returns(
monthly_return_cats: Dict = None,
ser_lei=None,
series_name=None,
rpt_period_end=None,
rpt_authorized_dt=None,
) -> int:
if monthly_return_cats:
monthly_return_cats_dict = dict(replace_new(monthly_return_cats, "@", ""))
monthly_return_cats_dict.update(
{
"SeriesLEI": ser_lei,
"SeriesName": series_name,
"PeriodEndDate": rpt_period_end,
"ReportAuthorizedDate": rpt_authorized_dt,
}
)
insert_into_collection(
"FundReturns", "ReturnCategories", monthly_return_cats_dict
)
return 0
if __name__ == "__main__":
try:
files = get_fund_file_list("/Volumes/ExtShield/SecFundFilings/converted_files", 1000)
for file in files:
print(file)
filing = file.split("/")[-1]
# break up file name
filing_doc = filing.split("-")[-2]
size = len(filing_doc)
suff = size - 6
dt_st = size - 8
initial = dt_st
file_cik = filing_doc[0:initial]
file_yr = filing_doc[dt_st:suff]
filing_suff = filing_doc[suff:size]
file_str = f"{file_cik}-{file_yr}-{filing_suff}"
cik_val = filing.split("-")[-3]
filing_yr = filing.split("-")[0]
filing_mo = filing.split("-")[1]
filing_dy = filing.split("-")[2]
filing_dt = f"'{filing_yr}-{filing_mo}-{filing_dy}'"
time.sleep(0.25)
filing_ref_sid = get_query_results(filing_dt, cik_val, file_str)
if filing_ref_sid:
f = open(file, "rb")
json_data = json.load(f)
i_submission = json_data.get("edgarSubmission")
i_formData = i_submission.get("formData")
i_headerData = i_submission.get("headerData")
i_sigData = i_formData.get("signature")
i_fund_info = i_formData.get("fundInfo")
i_return_info = i_fund_info.get("returnInfo")
i_monthly_fund_returns = i_return_info.get("monthlyTotReturns")
i_monthly_return_cats = i_return_info.get("monthlyReturnCats")
i_cur_metrics = i_fund_info.get("curMetrics")
i_credit_risk_inv_grade = i_fund_info.get("creditSprdRiskInvstGrade")
i_credit_risk_noninv_grade = i_fund_info.get("creditSprdRiskNonInvstGrade")
i_inv_sec = i_formData.get("invstOrSecs")
i_gen_info = i_formData.get("genInfo")
i_series_name = i_gen_info.get("seriesName")
i_series_name = i_series_name.replace(" ", "-")
i_file_num = i_gen_info.get("regFileNumber")
i_reg_cik = i_gen_info.get("regCik")
i_reg_lei = i_gen_info.get("regLei")
i_ser_lei = i_gen_info.get("seriesLei")
i_rpt_close_dt = i_gen_info.get("repPdEnd")
i_rpt_period_end = i_gen_info.get("repPdDate")
i_rpt_authorized_dt = i_sigData.get("ncom:dateSigned")
if i_cur_metrics:
i_cur_metric = i_cur_metrics.get("curMetric")
curr_metrics(i_cur_metric, i_ser_lei, i_series_name, i_rpt_period_end)
fund_info_doc(
i_fund_info, i_reg_cik, i_series_name, i_rpt_period_end, i_rpt_authorized_dt
)
fund_gen_info(
i_gen_info, i_reg_lei, i_series_name, i_rpt_period_end, i_rpt_authorized_dt
)
fund_return_info(
i_monthly_fund_returns,
i_reg_lei,
i_series_name,
i_rpt_period_end,
i_rpt_authorized_dt,
)
monthly_returns(
i_monthly_return_cats,
i_ser_lei,
i_series_name,
i_rpt_period_end,
i_rpt_authorized_dt,
)
if i_inv_sec:
invest_sec = i_inv_sec.get("invstOrSec")
invest_sec_list = replace_new(invest_sec, "@", "")
for inv in invest_sec_list:
inv_sec_info(
inv, i_ser_lei, i_series_name, i_rpt_period_end, i_rpt_authorized_dt
)
if i_credit_risk_inv_grade and i_credit_risk_noninv_grade:
credit_risk(
i_credit_risk_inv_grade,
i_credit_risk_noninv_grade,
i_ser_lei,
i_series_name,
i_rpt_period_end,
i_rpt_authorized_dt,
)
# update postgres that data is written
update_database(filing_ref_sid)
else:
print("No filing found for: " + file + " may already be in database.")
continue
except Exception as e:
if 'AttributeError' in str(e):
print("Filing has no positions to process")
exit(1)Load Processing Status



One thing to take note on the content of the load process, is that I do not process wrappers for funds that have no holdings. Basically I only take funds that have holdings, and if the wrapper does not list the funds its wrapping in holdings, it's not loaded. There are not that many of them, but they pop up here and there.
I am running load processing today(2025-04-11), so the timestamps in some of the screenshots are from the last few days.
On the target data warehouse side, which receives the loading documents. The configuration is a two node Couchbase cluster configuration with one of the nodes only handling queries and indexes. The larger node(10.0.0.46) is a refurbished 8-Core with 128GB of RAM and an internal SSD.

The Couchbase built-in dashboard also provides a cluster and per node view(below), indexing offers the a similar display.

The load process is usually fairly smooth going for the most part, usually failures are caused because the filing doesn't have any holdings listed. There is also the occasional host timeout. Currently I have loaded filings starting in 2018 though 2024(partial), plus the updates I am processing now. That totals a little over 100 million documents currently:


The load process, as I mentioned in a previous post, splits a single SEC filing into multiple collections that I have displayed previously, and below. I separate some asset manager specific data of the parent company, as well as fund returns by category and monthly returns. The funds collection contains general information about the fund, as well as credit and currency risk metrics. The schemas of each collection mirror how I sliced the filing for the categories. Entire portfolio filings are found in Investments.

In the next post, I will demonstrate what information you can find from the loaded documents.
