Tag: pandas

  • Merging initial data load with IP info

    Getting back to my traffic statistics project, I managed to play with aggregation of the two main collections that had the information I needed.

    First of all there is the normal code to load data from Mongo and check also the columns that can be used for the join. Which is available below

    import pandas as pd
    from pymongo import MongoClient
    
    # Connect to MongoDB
    client = MongoClient('localhost', 27017)
    db = client['mydatabase']
    traffic_init_load = db['traffic_init_load']
    unique_ip_info = db['unique_ip_info']
    
    # Read the data from MongoDB
    traffic_init_load_data = traffic_init_load.find()
    unique_ip_info_data = unique_ip_info.find()
    
    # Convert the data to a DataFrame
    traffic_init_load_df = pd.DataFrame(list(traffic_init_load_data))
    unique_ip_info_df = pd.DataFrame(list(unique_ip_info_data))
    
    traffic_init_load_columns = list(traffic_init_load_df.columns)
    unique_ip_info_columns = list(unique_ip_info_df.columns)
    
    # Print the columns
    print(traffic_init_load_columns)
    print(unique_ip_info_columns)

    After this, we can safely perform the merge and drop the _id columns

    merged_df = pd.merge(traffic_init_load_df, unique_ip_info_df, left_on='SourceIP', right_on='query')
    merged_df = merged_df.drop(['_id_x', 'query', '_id_y'], axis=1)

    Check it again and we also see that there are also two columns that are not really required: index and message which was returned by whois API. We drop those as well

    merged_df = merged_df.drop(['index', 'message'], axis=1)

    And now we are finally ready to convert it and put it back to the database

    # Convert DataFrame to a list of records
    records = merged_df.to_records(index=False)
    
    # Connect to MongoDB
    client = MongoClient('localhost', 27017)
    db = client['mydatabase']
    base_merged_data = db['base_merged_data']
    
    # Insert records into MongoDB collection
    dicts = []
    for record in records:
        record_dict = {}
        for field_name, value in zip(merged_df.columns, record):
            record_dict[field_name] = value
        dicts.append(record_dict)
    
    base_merged_data.insert_many(dicts)

    What I can tell you is that insert_many method is significantly faster than insert_one. At first I played with insert_one and it took 12 min to do the job, and with insert_many, that time was reduced to 3 min.

    That is all for today,

    Sorin