ETF & Mutual Funds Portfolios: Infrastructure
- Claude Paugh
- Apr 19
- 12 min read
Updated: Jun 24
After I decided to load data from Couchbase into a graph database, I went through some short evaluations. First on which graph database, and second what would be the most efficient method to synchronize the data. I ended up with Neo4J, despite trying Memgraph, TigerGraph, and others. My choice was due to Neo4J being the most mature and widely supported. This occurred four to five years ago, so the graph database market is obviously different that it is today. Neo4j was also stable in my limited infrastructure that I was using.
I did not try graph offerings from AWS, Google, Microsoft, or Neo4J because of product maturity and the trial resources had limitations, which by doing the simple math, I would have to pay to host the proof of concept.
Hardware Config
My setup consisted of a refurbished PC with an 8-Core Processor and 128GB of RAM and 2TB of storage for Couchbase, Docker deployed Kafka, and Neo4J. Due to resource constraints I moved the Neo4J to another PC which had 8 Cores and 16GB of RAM. The latter was also my "work" PC
After deciding on Neo4J, I then did some planning on what type of graph might be suitable for presenting holdings of portfolio investment. After researching it, I decided that a Knowledge Graph would be the best fit for the information I was presenting.
An essential concept in this knowledge graph would be to identify types of investment portfolios that held various types of assets. I included ETF's, Mutual Funds, Family Offices, and Hedge Funds where there were SEC Filings that I could find.
I also decided to use different asset types for identify holdings, e.g. Common Stocks, Preferred Equity, Municipal Bonds, Corporate Bonds, Futures, Forwards, etc. The current data I had been loading in the warehouse was only portfolio filings from the SEC.
I went looking for additional sources for currencies, countries, regions of the world, and eventually Legal Entity(LEI) data which tracks companies and their business relationships. This would add valuable content to the graph to be able to associate assets types to their issuers, derivatives to the parties that participate in a transaction, and municipality information for issues debt, and other drill through data. I went through a brainstorming process, that I represented below:

I created a new bucket and scope in Couchbase to store the assets types from the portfolio filings, and each type would become a collection, results are below:

It took some time analyzing and profiling the existing filings data, that I had loaded in order to come up with valid logic. It ended up being based on the assetCat, issuerCat, and derivCat fields in the filing document. Some examples are below:
Futures
insert into `Instruments`.`Assets`.`Futures` (KEY UUID())
select distinct * from Investments where PeriodEndDate = '2023-10-31' and assetCat = 'DCO' and
derivativeInfo.futrDeriv.derivCat = 'FUT';
insert into `Instruments`.`Assets`.`Futures` (KEY UUID())
select distinct * from Investments where PeriodEndDate = '2023-10-31' and assetCat = 'DIR' and
derivativeInfo.futrDeriv.derivCat = 'FUT';
insert into `Instruments`.`Assets`.`Futures` (KEY UUID())
select distinct * from Investments where PeriodEndDate = '2023-10-31' and assetCat = 'DFE' and
derivativeInfo.futrDeriv.derivCat = 'FUT';
insert into `Instruments`.`Assets`.`Futures` (KEY UUID())
select distinct * from Investments where PeriodEndDate = '2023-10-31' and assetCat = 'DE' and
derivativeInfo.futrDeriv.derivCat = 'FUT';
Municipal Bonds
insert into `Instruments`.`Assets`.`MunicipalDebt` (KEY UUID())
select distinct * from Investments where PeriodEndDate = '2023-07-31' and assetCat = 'DBT' and issuerCat = 'MUN';
Mortgage Backed Debt
insert into `Instruments`.`Assets`.`MortgageBackedDebt` (KEY UUID())
select distinct * from Investments where PeriodEndDate = '2023-07-31' and assetCat = 'ABS-MBS';
Forwards
insert into `Instruments`.`Assets`.`Forwards` (KEY UUID())
select distinct * from Investments where PeriodEndDate = '2023-07-31' and assetCat = 'DFE' and
derivativeInfo.fwdDeriv.derivCat = 'FWD';
My intent was to separate by asset type, and I was able to accomplish that. The flow of the analysis looked like this:

The filings can come in late, so I wanted to make sure I captured changes. Initially, I decided to keep the integration simple, by building SQL++ queries and using DataGrip to run them, and extracting pipe-delimited formatted results. I then would load and CREATE or MERGE data into the graph that mirrored the asset types. Below are the nodes in the graph:


After getting off to a decent start with basic extract -> load processing, I thought of upgrading the infrastructure to have a more hands-off pipeline. So I went looking for tools and extensions that could bridge the gaps more elegantly.
Apache Kafka was getting a lot of notice at that time for its scalability, so I thought that may be a good place to start. Couchbase also had a Kafka connector where you could source collections and create topics in Kafka, and essentially publish document additions or changes to topics. Kafka also has a REST API for subscribing to topics and consume the messages in the topics. I created a python client for Kafka REST to consume information from the topics and CREATE/MERGE the JSON from the topic into Neo4j. It ended up looking like this:

It worked reasonably well, most of my issues were operational with Docker, which is where I deployed Confluent's Apache Kafka cluster. My resources were challenged for memory and cpu, which was the root of most of the problems. I had also attempted to use the Neo4J connector for Kafka as well, but at the time (3-4 years ago), it was not ingesting the data correctly at all times, so I abandoned it.
I created Python scripts, per the diagram above, one that contains functions to make requests to topics using the REST api. I didn't optimize it, as I was adding one topic at a time. It would have been more efficient to optimize the URL construction generically and pass the topic and parameters and receive the final URL. The payload processing can be optimized as well, which would have reduced code size significantly.
consumer_kafka_rest.py
import base64
import json
import requests
from data_acquisition.dict_utils import _finditem, _renamekey
headers = {
"Cache-Control": "no-cache",
"Accept": "application/vnd.kafka.binary.v2+json",
"Content-Type": "application/vnd.kafka.binary.v2+json",
"Charset": "utf-8",
"Content-Encoding": "gzip",
"Accept-Encoding": "gzip",
}
def get_repo():
repo_group_payload = b'{"name": "REST_Repo_Consumer", "format": "binary", "auto.offset.reset": "earliest"}'
repo_subscription_payload = b'{"topics": ["Repurchase"]}'
group_url = "http://10.0.0.46:8082/consumers/repogroup"
sub_url = "http://10.0.0.46:8082/consumers/repogroup/instances/REST_Repo_Consumer/subscription"
data_url = "http://10.0.0.46:8082/consumers/repogroup/instances/REST_Repo_Consumer/records?timeout=3000&maxbytes=3000000"
key_list: list = [
"PeriodEndDate",
"SeriesLEI",
"SeriesName",
"balance",
"curCd",
"cusip",
"other",
"securityLending",
"invCountry",
"issuerCat",
"payoffProfile",
"lei",
"name",
"pctVal",
"centralCounterparty",
"isCleared",
"isTriParty",
"collateralCd",
"collateralVal",
"invstCat",
"principalAmt",
"principalCd",
"repurchaseRt",
"transCat",
"title",
"valUSD",
"issuerConditional",
"maturityDt",
"ReportAuthorizedDate",
"counterpartyInfo",
"isRestrictedSec",
"assetCat",
]
group_response = requests.request(
"POST", group_url, headers=headers, data=repo_group_payload
)
group_response_json = json.loads(group_response.text)
group_response_error = group_response_json.get("error_code")
if not group_response_error:
group_response_url = json.loads(group_response.text)
sub_url = group_response_url.get("base_uri")
sub_url = sub_url.replace("rest-proxy", "10.0.0.46")
requests.request("POST", sub_url, headers=headers, data=repo_subscription_payload)
data_response = requests.request("GET", data_url, headers=headers)
data_out = json.loads(data_response.content)
ret_list: list = []
for row in data_out:
v = base64.b64decode(row.get("value"))
value = json.loads(v)
output_payload: dict = {}
for key in key_list:
data = _finditem(value, key)
if data:
output_payload.update({key: data})
output_payload = _renamekey(output_payload, "InvestmentLEI", "lei")
output_payload = _renamekey(
output_payload, "Transaction Category", "transCat"
)
output_payload = _renamekey(output_payload, "NotionalValueUSD", "valUSD")
output_payload = _renamekey(
output_payload, "CentralCounterParty", "centralCounterparty"
)
output_payload = _renamekey(output_payload, "InvestmentTitle", "title")
output_payload = _renamekey(output_payload, "Currency", "curCd")
output_payload = _renamekey(
output_payload, "RepurchaseRate", "repurchaseRt"
)
output_payload = _renamekey(output_payload, "InvestmentName", "name")
output_payload = _renamekey(output_payload, "PositionType", "payoffProfile")
output_payload = _renamekey(output_payload, "Country", "invCountry")
output_payload = _renamekey(
output_payload, "Collateral Code", "collateralCd"
)
output_payload = _renamekey(
output_payload, "Collateral Value", "collateralVal"
)
output_payload = _renamekey(output_payload, "PCT Portfolio", "pctVal")
output_payload = _renamekey(
output_payload, "Investment Category", "invstCat"
)
output_payload = _renamekey(
output_payload, "Principal Amount", "principalAmt"
)
output_payload = _renamekey(output_payload, "Principal Code", "principalCd")
output_payload = _renamekey(output_payload, "Maturity Date", "maturityDt")
ret_list.append(output_payload)
return ret_list
def get_futures():
repo_group_payload = b'{"name": "REST_Futures_Consumer", "format": "binary", "auto.offset.reset": "earliest"}'
repo_subscription_payload = b'{"topics": ["Futures"]}'
group_url = "http://10.0.0.46:8082/consumers/futuresgroup"
sub_url = "http://10.0.0.46:8082/consumers/futuresgroup/instances/REST_Futures_Consumer/subscription"
data_url = "http://10.0.0.46:8082/consumers/futuresgroup/instances/REST_Futures_Consumer/records?timeout=3000&maxbytes=3000000"
group_response = requests.request(
"POST", group_url, headers=headers, data=repo_group_payload
)
group_response_json = json.loads(group_response.text)
group_response_error = group_response_json.get("error_code")
if not group_response_error:
group_response_url = json.loads(group_response.text)
sub_url = group_response_url.get("base_uri")
sub_url = sub_url.replace("rest-proxy", "10.0.0.46")
sub_response = requests.request(
"POST", sub_url, headers=headers, data=repo_subscription_payload
)
data_response = requests.request("GET", data_url, headers=headers)
data_out = json.loads(data_response.content)
ret_list: list = []
key_list: list = [
"PeriodEndDate",
"SeriesLEI",
"SeriesName",
"balance",
"curCd",
"cusip",
"other",
"isRestrictedSec",
"invCountry",
"currencyConditional",
"payoffProfile",
"lei",
"name",
"pctVal",
"debtSec",
"identifiers",
"issuerConditional",
"valUSD",
"futrDeriv",
"optionSwaptionWarrantDeriv",
"descRefInstrmnt",
"lei",
"name",
"pctVal",
"title",
"valUSD",
"exercisePrice",
"securityLending",
"exercisePriceCurCd",
"expDt",
"putOrCall",
"shareNo",
"unrealizedAppr",
"writtenOrPur",
"ReportAuthorizedDate",
"assetCat",
]
for row in data_out:
v = base64.b64decode(row.get("value"))
value = json.loads(v)
output_payload: dict = {}
for key in key_list:
data = _finditem(value, key)
if data:
output_payload.update({key: data})
output_payload = _renamekey(output_payload, "InvestmentLEI", "lei")
output_payload = _renamekey(output_payload, "NotionalValueUSD", "valUSD")
output_payload = _renamekey(output_payload, "InvestmentTitle", "title")
output_payload = _renamekey(output_payload, "Currency", "curCd")
output_payload = _renamekey(output_payload, "InvestmentName", "name")
output_payload = _renamekey(output_payload, "PositionType", "payoffProfile")
output_payload = _renamekey(output_payload, "Country", "invCountry")
output_payload = _renamekey(output_payload, "PCT Portfolio", "pctVal")
output_payload = _renamekey(
output_payload, "Investment Category", "invstCat"
)
output_payload = _renamekey(output_payload, "Maturity Date", "maturityDt")
ret_list.append(output_payload)
return ret_list
def get_options():
options_group_payload = b'{"name": "REST_Options_Consumer", "format": "binary", "auto.offset.reset": "earliest"}'
options_subscription_payload = b'{"topics": ["Options"]}'
group_url = "http://10.0.0.46:8082/consumers/optionsgroup"
sub_url = "http://10.0.0.46:8082/consumers/optionsgroup/instances/REST_Options_Consumer/subscription"
data_url = "http://10.0.0.46:8082/consumers/optionsgroup/instances/REST_Options_Consumer/records?timeout=3000&maxbytes=3000000"
group_response = requests.request(
"POST", group_url, headers=headers, data=options_group_payload
)
group_response_json = json.loads(group_response.text)
group_response_error = group_response_json.get("error_code")
if not group_response_error:
group_response_url = json.loads(group_response.text)
sub_url = group_response_url.get("base_uri")
sub_url = sub_url.replace("rest-proxy", "10.0.0.46")
sub_response = requests.request(
"POST", sub_url, headers=headers, data=options_subscription_payload
)
data_response = requests.request("GET", data_url, headers=headers)
data_out = json.loads(data_response.content)
ret_list: list = []
key_list: list = [
"PeriodEndDate",
"SeriesLEI",
"SeriesName",
"balance",
"curCd",
"cusip",
"currencyConditional",
"invCountry",
"issuerCat",
"payoffProfile",
"lei",
"name",
"pctVal",
"isRestrictedSec",
"identifiers",
"issuerConditional",
"valUSD",
"optionSwaptionWarrantDeriv",
"issuerCat",
"securityLending",
"ReportAuthorizedDate",
"assetCat",
]
for row in data_out:
v = base64.b64decode(row.get("value"))
value = json.loads(v)
output_payload: dict = {}
for key in key_list:
data = _finditem(value, key)
if data:
output_payload.update({key: data})
output_payload = _renamekey(output_payload, "InvestmentLEI", "lei")
output_payload = _renamekey(output_payload, "NotionalValueUSD", "valUSD")
output_payload = _renamekey(output_payload, "InvestmentTitle", "title")
output_payload = _renamekey(output_payload, "Currency", "curCd")
output_payload = _renamekey(output_payload, "InvestmentName", "name")
output_payload = _renamekey(output_payload, "PositionType", "payoffProfile")
output_payload = _renamekey(output_payload, "Country", "invCountry")
output_payload = _renamekey(output_payload, "PCT Portfolio", "pctVal")
ret_list.append(output_payload)
return ret_list
def get_swaps():
swap_group_payload = b'{"name": "REST_SWAP_Consumer", "format": "binary", "auto.offset.reset": "earliest"}'
swap_subscription_payload = b'{"topics": ["SWAP"]}'
group_url = "http://10.0.0.46:8082/consumers/swapgroup"
sub_url = "http://10.0.0.46:8082/consumers/swapgroup/instances/REST_SWAP_Consumer/subscription"
data_url = "http://10.0.0.46:8082/consumers/swapgroup/instances/REST_SWAP_Consumer/records?timeout=3000&maxbytes=3000000"
group_response = requests.request(
"POST", group_url, headers=headers, data=swap_group_payload
)
group_response_json = json.loads(group_response.text)
group_response_error = group_response_json.get("error_code")
if not group_response_error:
group_response_url = json.loads(group_response.text)
sub_url = group_response_url.get("base_uri")
sub_url = sub_url.replace("rest-proxy", "10.0.0.46")
sub_response = requests.request(
"POST", sub_url, headers=headers, data=swap_subscription_payload
)
data_response = requests.request("GET", data_url, headers=headers)
data_out = json.loads(data_response.content)
ret_list: list = []
key_list: list = [
"PeriodEndDate",
"SeriesLEI",
"SeriesName",
"balance",
"curCd",
"cusip",
"other",
"isRestrictedSec",
"invCountry",
"issuerCat",
"payoffProfile",
"lei",
"name",
"pctVal",
"swapDeriv",
"securityLending",
"descRefInstrmnt",
"otherRefInst",
"fixedPmntDesc",
"fixedRecDesc",
"floatingPmntDesc",
"floatingRecDesc",
"notionalAmt",
"otherPmntDesc",
"otherRecDesc",
"issuerConditional",
"identifiers",
"currencyConditional",
"valUSD",
"name",
"pctVal",
"title",
"ReportAuthorizedDate",
"assetCat",
]
for row in data_out:
v = base64.b64decode(row.get("value"))
value = json.loads(v)
output_payload: dict = {}
for key in key_list:
data = _finditem(value, key)
if data:
output_payload.update({key: data})
output_payload = _renamekey(output_payload, "InvestmentLEI", "lei")
output_payload = _renamekey(output_payload, "NotionalValueUSD", "valUSD")
output_payload = _renamekey(output_payload, "InvestmentTitle", "title")
output_payload = _renamekey(output_payload, "Currency", "curCd")
output_payload = _renamekey(output_payload, "InvestmentName", "name")
output_payload = _renamekey(output_payload, "PositionType", "payoffProfile")
output_payload = _renamekey(output_payload, "Country", "invCountry")
output_payload = _renamekey(output_payload, "PCT Portfolio", "pctVal")
output_payload = _renamekey(output_payload, "Maturity Date", "maturityDt")
ret_list.append(output_payload)
return ret_list
def get_loan_debt():
repo_group_payload = b'{"name": "REST_LoanDebt_Consumer", "format": "binary", "auto.offset.reset": "earliest"}'
repo_subscription_payload = b'{"topics": ["LoanDebt"]}'
group_url = "http://10.0.0.46:8082/consumers/loangroup"
sub_url = "http://10.0.0.46:8082/consumers/loangroup/instances/REST_LoanDebt_Consumer/subscription"
data_url = "http://10.0.0.46:8082/consumers/loangroup/instances/REST_LoanDebt_Consumer/records?timeout=3000&maxbytes=30000000"
request_session = requests.session()
group_response = request_session.request(
"POST", group_url, headers=headers, data=repo_group_payload
)
group_response_json = json.loads(group_response.text)
group_response_error = group_response_json.get("error_code")
if not group_response_error:
group_response_url = json.loads(group_response.text)
sub_url = group_response_url.get("base_uri")
sub_url = sub_url.replace("rest-proxy", "10.0.0.46")
sub_response = request_session.request(
"POST", sub_url, headers=headers, data=repo_subscription_payload
)
data_response = request_session.request("GET", data_url, headers=headers)
data_out = json.loads(data_response.content)
request_session.close()
ret_list: list = []
key_list: list = [
"PeriodEndDate",
"SeriesLEI",
"SeriesName",
"balance",
"curCd",
"cusip",
"isin",
"maturityDt",
"isRestrictedSec",
"invCountry",
"issuerCat",
"payoffProfile",
"lei",
"name",
"pctVal",
"annualizedRt",
"areIntrstPmntsInArrs",
"couponKind",
"isDefault",
"isPaidKind",
"identifiers",
"issuerConditonal",
"assetCat",
"securityLending",
"currencyConditional",
"valUSD",
"debtSec",
"name",
"pctVal",
"title",
"ReportAuthorizedDate",
]
for row in data_out:
v = base64.b64decode(row.get("value"))
value = json.loads(v)
output_payload: dict = {}
for key in key_list:
data = _finditem(value, key)
if data:
output_payload.update({key: data})
output_payload = _renamekey(output_payload, "InvestmentLEI", "lei")
output_payload = _renamekey(output_payload, "NotionalValueUSD", "valUSD")
output_payload = _renamekey(output_payload, "InvestmentTitle", "title")
output_payload = _renamekey(output_payload, "Currency", "curCd")
output_payload = _renamekey(output_payload, "InvestmentName", "name")
output_payload = _renamekey(output_payload, "PositionType", "payoffProfile")
output_payload = _renamekey(output_payload, "Country", "invCountry")
output_payload = _renamekey(output_payload, "PCT Portfolio", "pctVal")
output_payload = _renamekey(
output_payload, "Investment Category", "invstCat"
)
output_payload = _renamekey(output_payload, "Maturity Date", "maturityDt")
ret_list.append(output_payload)
return ret_list
def get_corp_debt():
corpdebt_group_payload = b'{"name": "REST_LoanDebt_Consumer", "format": "binary", "auto.offset.reset": "earliest"}'
corpdebt_subscription_payload = b'{"topics": ["CorporateDebt"]}'
group_url = "http://10.0.0.46:8082/consumers/corpdebtgroup"
sub_url = "http://10.0.0.46:8082/consumers/corpdebtgroup/instances/REST_LoanDebt_Consumer/subscription"
data_url = "http://10.0.0.46:8082/consumers/corpdebtgroup/instances/REST_LoanDebt_Consumer/records?timeout=3000&maxbytes=30000000"
request_session = requests.session()
group_response = request_session.request(
"POST", group_url, headers=headers, data=corpdebt_group_payload
)
group_response_json = json.loads(group_response.text)
group_response_error = group_response_json.get("error_code")
if not group_response_error:
group_response_url = json.loads(group_response.text)
sub_url = group_response_url.get("base_uri")
sub_url = sub_url.replace("rest-proxy", "10.0.0.46")
sub_response = request_session.request(
"POST", sub_url, headers=headers, data=corpdebt_subscription_payload
)
data_response = request_session.request("GET", data_url, headers=headers)
data_out = json.loads(data_response.content)
request_session.close()
ret_list: list = []
key_list: list = [
"PeriodEndDate",
"SeriesLEI",
"SeriesName",
"balance",
"curCd",
"cusip",
"isin",
"maturityDt",
"isRestrictedSec",
"currencyInfos",
"dbtSecRefInstrument",
"isContngtConvrtbl",
"isMandatoryConvrtbl",
"invCountry",
"issuerCat",
"payoffProfile",
"lei",
"name",
"pctVal",
"annualizedRt",
"currencyConditional",
"areIntrstPmntsInArrs",
"couponKind",
"isDefault",
"isPaidKind",
"identifiers",
"assetCat",
"securityLending",
"valUSD",
"debtSec",
"name",
"pctVal",
"title",
"ReportAuthorizedDate",
]
for row in data_out:
if row.get("value"):
v = base64.b64decode(row.get("value"))
value = json.loads(v)
output_payload: dict = {}
for key in key_list:
data = _finditem(value, key)
if data:
output_payload.update({key: data})
output_payload = _renamekey(output_payload, "InvestmentLEI", "lei")
output_payload = _renamekey(
output_payload, "NotionalValueUSD", "valUSD"
)
output_payload = _renamekey(output_payload, "InvestmentTitle", "title")
output_payload = _renamekey(output_payload, "Currency", "curCd")
output_payload = _renamekey(output_payload, "InvestmentName", "name")
output_payload = _renamekey(
output_payload, "PositionType", "payoffProfile"
)
output_payload = _renamekey(output_payload, "Country", "invCountry")
output_payload = _renamekey(output_payload, "PCT Portfolio", "pctVal")
output_payload = _renamekey(
output_payload, "Maturity Date", "maturityDt"
)
ret_list.append(output_payload)
return ret_list
def get_common_equity():
ce_group_payload = b'{"name": "REST_CE_Consumer", "format": "binary", "auto.offset.reset": "earliest"}'
ce_subscription_payload = b'{"topics": ["CommonEquity"]}'
group_url = "http://10.0.0.46:8082/consumers/cegroup"
sub_url = "http://10.0.0.46:8082/consumers/cegroup/instances/REST_CE_Consumer/subscription"
data_url = "http://10.0.0.46:8082/consumers/cegroup/instances/REST_CE_Consumer/records?timeout=3000&maxbytes=30000000"
group_response = requests.request(
"POST", group_url, headers=headers, data=ce_group_payload
)
group_response_json = json.loads(group_response.text)
group_response_error = group_response_json.get("error_code")
if not group_response_error:
group_response_url = json.loads(group_response.text)
sub_url = group_response_url.get("base_uri")
sub_url = sub_url.replace("rest-proxy", "10.0.0.46")
sub_response = requests.request(
"POST", sub_url, headers=headers, data=ce_subscription_payload
)
data_response = requests.request("GET", data_url, headers=headers)
data_out = json.loads(data_response.content)
ret_list: list = []
key_list: list = [
"PeriodEndDate",
"SeriesLEI",
"SeriesName",
"balance",
"curCd",
"cusip",
"identifiers",
"isRestrictedSec",
"invCountry",
"issuerCat",
"payoffProfile",
"lei",
"pctVal",
"currencyConditional",
"identifiers",
"assetCat",
"payoffProfile",
"issuerConditional",
"securityLending",
"valUSD",
"name",
"pctVal",
"title",
"ReportAuthorizedDate",
]
for row in data_out:
if row.get("value"):
v = base64.b64decode(row.get("value"))
value = json.loads(v)
output_payload: dict = {}
for key in key_list:
data = _finditem(value, key)
if data:
output_payload.update({key: data})
output_payload = _renamekey(output_payload, "InvestmentLEI", "lei")
output_payload = _renamekey(
output_payload, "NotionalValueUSD", "valUSD"
)
output_payload = _renamekey(output_payload, "InvestmentTitle", "title")
output_payload = _renamekey(output_payload, "Currency", "curCd")
output_payload = _renamekey(output_payload, "InvestmentName", "name")
output_payload = _renamekey(
output_payload, "PositionType", "payoffProfile"
)
output_payload = _renamekey(output_payload, "Country", "invCountry")
output_payload = _renamekey(output_payload, "PCT Portfolio", "pctVal")
ret_list.append(output_payload)
return ret_list
def get_ust_debt():
ustdebt_group_payload = b'{"name": "REST_UST_Debt_Consumer", "format": "binary", "auto.offset.reset": "earliest"}'
ustdebt_subscription_payload = b'{"topics": ["US-Treasury-Debt"]}'
group_url = "http://10.0.0.46:8082/consumers/ustdebtgroup"
sub_url = "http://10.0.0.46:8082/consumers/ustdebtgroup/instances/REST_UST_Debt_Consumer/subscription"
data_url = "http://10.0.0.46:8082/consumers/ustdebtgroup/instances/REST_UST_Debt_Consumer/records?timeout=3000&maxbytes=30000000"
request_session = requests.session()
group_response = request_session.request(
"POST", group_url, headers=headers, data=ustdebt_group_payload
)
group_response_json = json.loads(group_response.text)
group_response_error = group_response_json.get("error_code")
if not group_response_error:
group_response_url = json.loads(group_response.text)
sub_url = group_response_url.get("base_uri")
sub_url = sub_url.replace("rest-proxy", "10.0.0.46")
sub_response = request_session.request(
"POST", sub_url, headers=headers, data=ustdebt_subscription_payload
)
data_response = request_session.request("GET", data_url, headers=headers)
data_out = json.loads(data_response.content)
request_session.close()
ret_list: list = []
key_list: list = [
"PeriodEndDate",
"SeriesLEI",
"SeriesName",
"balance",
"curCd",
"cusip",
"isin",
"maturityDt",
"isRestrictedSec",
"invCountry",
"issuerCat",
"payoffProfile",
"lei",
"name",
"pctVal",
"annualizedRt",
"areIntrstPmntsInArrs",
"couponKind",
"isDefault",
"isPaidKind",
"identifiers",
"assetCat",
"securityLending",
"valUSD",
"debtSec",
"name",
"pctVal",
"title",
"ReportAuthorizedDate",
]
for row in data_out:
if row.get("value"):
v = base64.b64decode(row.get("value"))
value = json.loads(v)
output_payload: dict = {}
for key in key_list:
data = _finditem(value, key)
if data:
output_payload.update({key: data})
output_payload = _renamekey(output_payload, "InvestmentLEI", "lei")
output_payload = _renamekey(
output_payload, "NotionalValueUSD", "valUSD"
)
output_payload = _renamekey(output_payload, "InvestmentTitle", "title")
output_payload = _renamekey(output_payload, "Currency", "curCd")
output_payload = _renamekey(output_payload, "InvestmentName", "name")
output_payload = _renamekey(
output_payload, "PositionType", "payoffProfile"
)
output_payload = _renamekey(output_payload, "Country", "invCountry")
output_payload = _renamekey(output_payload, "PCT Portfolio", "pctVal")
output_payload = _renamekey(
output_payload, "Maturity Date", "maturityDt"
)
ret_list.append(output_payload)
return ret_list
Since I started by topic, I initially separated the scripts by topics. If I were to do it again it would be a generic client with parameters for each asset type combination, but I was moving slowly and starting/stopping at different points as this was a side project, so I went simplistic.
from typing import Dict
from neo4j import GraphDatabase
from neo4j.exceptions import Neo4jError
from env import uri, creds
from data_acquisition.consumer_kafka_rest import get_common_equity
# URI examples: "neo4j://localhost", "neo4j+s://xxx.databases.neo4j.io"
URI = uri.neo4j_server # neo4j_server
AUTH = (creds.N4J_USER, creds.N4J_PASS) # (N4J_USER, N4J_PASS)
# List of Keys
try:
with GraphDatabase.driver(URI, auth=AUTH, database="memgraph") as driver:
# with driver.session(database="memgraph") as session:
common_equity_json_data = get_common_equity()
for row in common_equity_json_data:
PEDate = row.get("PeriodEndDate")
SerLEI = row.get("SeriesLEI")
SerName = row.get("SeriesName")
RptAuthDt = row.get("ReportAuthorizedDate")
if not RptAuthDt:
RptAuthDt = "None"
AssetCategory = row.get("assetCat")
RestrictedSec = row.get("isRestrictedSec")
cusip_value = row.get("cusip")
PCTPort = row.get("PCT Portfolio")
InvestmentTitle = row.get("InvestmentTitle")
InvestmentLei = row.get("InvestmentLEI")
InvestmentName = row.get("InvestmentName")
NotionalvalueUSD = row.get("NotionalValueUSD")
Curr = row.get("Currency")
Country = row.get("Country")
PosType = row.get("PositionType")
Identifiers = row.get("identifiers")
if Identifiers and isinstance(Identifiers, Dict):
other = Identifiers.get("other")
if other and isinstance(other, Dict):
otherIdValue = other.get("value")
otherIdDesc = other.get("otherDesc")
else:
otherIdValue = "None"
otherIdDesc = "None"
isin = Identifiers.get("isin")
if isin and isinstance(isin, Dict):
isin_value = isin.get("value")
else:
isin_value = "None"
Ticker = Identifiers.get("ticker")
if Ticker and isinstance(Ticker, Dict):
ticker_value = Ticker.get("value")
else:
ticker_value = "None"
SecLending = row.get("securityLending")
if SecLending and isinstance(SecLending, Dict):
Cashcoll = SecLending.get("isCashCollateral")
Loanedsec = SecLending.get("isLoanByFund")
Noncashcoll = SecLending.get("isNonCashCollateral")
else:
Cashcoll = "None"
Loanedsec = "None"
Noncashcoll = "None"
CurrCond = row.get("currencyConditional")
if CurrCond and isinstance(CurrCond, Dict):
CondCurrency = CurrCond.get("curCd")
CondExchangeRt = CurrCond.get("exchangeRt")
else:
CondCurrency = "None"
CondExchangeRt = "None"
IssuerConditional = row.get("issuerConditional")
if IssuerConditional and isinstance(IssuerConditional, Dict):
CondIssuerDesc = IssuerConditional.get("desc")
CondIssuerCat = IssuerConditional.get("issuerCat")
else:
CondIssuerDesc = "None"
CondIssuerCat = "None"
result = driver.execute_query(
"MERGE (:CommonEquity {PeriodEndDate: $PeriodEndDate, \
SeriesLEI: $SeriesLEI, SeriesName: $SerName, ConditionalIssuerDesc: $Condit_issuer_desc, \
ConditionalIssuerCategory: $Cond_issuer_cat, ReportAuthorizedDate: $ReportAuthorizedDate, \
InvestmentLEI: $InvLEI, NotionalValueUSD: $NotionalValUSD, \
ConditionalCurrency: $CondCurrency, ConditionalFXRate: $CondExchangeRt, \
InvestmentTitle: $InvestmentTitle, Currency: $Currency, InvestmentName: $InvName, Ticker: $ticker, \
PositionType: $PosType, Country: $Country, PCTPortfolio: $PCTPort, AssetCategory: $AssetCat, \
cusip: $Cusip, ISIN: $Isin, RestrictedSecurity: $RestrictedSec \
})",
PeriodEndDate=str(PEDate),
SeriesLEI=str(SerLEI),
SerName=str(SerName),
ReportAuthorizedDate=str(RptAuthDt),
CondCurrency=str(CondCurrency),
CondExchangeRt=str(CondExchangeRt),
InvLEI=str(InvestmentLei),
NotionalValUSD=str(NotionalvalueUSD),
InvestmentTitle=str(InvestmentTitle),
Currency=str(Curr),
InvName=str(InvestmentName),
PosType=str(PosType),
Country=str(Country),
Cusip=str(cusip_value),
Isin=str(isin_value),
PCTPort=str(PCTPort),
Condit_issuer_desc=str(CondIssuerDesc),
Cond_issuer_cat=str(CondIssuerCat),
AssetCat=str(AssetCategory),
RestrictedSec=str(RestrictedSec),
ticker=str(ticker_value),
)
except Neo4jError as e:
driver.close()
print(e.message)
print(e.classification)
finally:
driver.close()This worked out well functionally, but the MERGE does slow down record processing on the Neo4J side(putCE.py script). I had scheduled the REST "client" scripts to run every 5 minutes while loading data into Couchbase, and it went reasonably well, but due to my resource constraints, slower than I had hoped for. I was going to attempt streaming, but REST worked well enough that I decided not to go that route.
I eventually decided on a simple diff -> export -> import process using Python, since it was simple to maintain after it was built. I will cover that in a future post, as well as expand upon the knowledge graph development.
