• Using Google Gemini to optimise my scraping code

    I wanted for Gemini to take a look on the code that I posted here

    Here is his answer:

    Absolutely! Here’s a breakdown of improvements and considerations for refining the provided web scraping code:

    Enhancements:

    • Robust Error Handling: Incorporate try-except blocks around the scraping logic and network calls. This will prevent the script from crashing due to unexpected issues like elements not being found or the website being temporarily down.
    • Optimized Waits: Replace the fixed time.sleep(2) with more precise WebDriverWait conditions. For instance, wait for specific elements that indicate successful loading. This improves efficiency.
    • Reusable Function: Convert the alcopascrap function to accept additional parameters (like wait times) for greater flexibility in various scraping scenarios.
    • Data Cleaning: Within the loop, sanitize the extracted data, removing unnecessary whitespace or non-standard characters to ensure data consistency.
    • Comments: Add clear comments to explain the purpose of different code sections, which is crucial for understanding and maintenance.

    Optimization Considerations

    • Scrolling Logic: Investigate if the website uses infinite scroll or lazy loading. If it does, adjust the scrolling logic to trigger loading of new content only when necessary.
    • Parallelism (Advanced): Consider parallelizing requests for different brands using threading or multiprocessing, especially if the scraping scope is large.

    from bs4 import BeautifulSoup
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.common.exceptions import TimeoutException, NoSuchElementException
    import time
    
    def alcopascrap(brand, max_wait_time=10):
        driver = webdriver.Chrome()  
    
        try:
            driver.get(f"https://www.alcopa-auction.fr/ro/cautare?brand={brand}")
            WebDriverWait(driver, max_wait_time).until(
                EC.element_to_be_clickable((By.ID, "CybotCookiebotDialogBodyButtonAccept"))
            ).click()
    
            # Optimized scrolling (Implementation depends on the site's scrolling mechanism)
    
            list_of_cars = []
    
            ratings = soup.find_all('div',{'data-brand':brand})
            for rating in ratings:
                data = {}
                try: 
                    data['title'] = rating.find('li',{'class':'mb-2 list-group-item border-0 p-0'}).get_text().strip()
                    data['price'] = rating['data-prices']
                    data['details'] = [element.get_text().strip() for element in rating.find_all('li', {'class':'list-group-item border-0 p-0'})]
                    data['link'] = rating.find('a', {'class':'no-decoration'}).get('href')
                    list_of_cars.append(data)
                except NoSuchElementException:
                    print(f"Error processing car data in rating block: {rating}")
    
            # ... (rest of your code)
    
        except TimeoutException:
            print(f"Timeout waiting for elements on page for brand: {brand}")
        finally:
            driver.quit()
    
    alcopascrap('nissan') 
    

    It lacks a couple of lines like:

            html_source = driver.page_source
            soup = BeautifulSoup(html_source, 'html.parser')

    And

      for car in list_of_cars:
                print(car, end='\n')

    But otherwise it works like charm and it’s faster than my first implementation

    Use chatbots, they know better most of the times.

    Cheers!

  • Scraping Alcopa auction site using python

    Just for reference a small example of scrapping https://www.alcopa-auction.fr/ for new cars on specific brand.

    It can be improved a lot but just as a reference of the code.

    from bs4 import BeautifulSoup
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium import webdriver
    import time
    
    # Set up a headless browser
    driver = webdriver.Chrome()  
    
    
    # Load the website with JavaScript
    def alcopascrap(brand):
    
        driver.get(f"https://www.alcopa-auction.fr/ro/cautare?brand={brand}")
    
        # Wait for JavaScript to execute (adjust wait time if needed)
        driver.implicitly_wait(180)  
    
    
        WebDriverWait(driver, 1).until(
        EC.element_to_be_clickable((By.ID, "CybotCookiebotDialogBodyButtonAccept"))
    ).click()
    
    # Get initial scroll height
        last_height = driver.execute_script("return document.body.scrollHeight")
    
        while True:
        # Scroll down
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    
        # Wait for loading
            time.sleep(2)  # Adjust wait time as needed
    
        # Calculate new scroll height and compare with previous
            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break
            last_height = new_height
    
    # Get the HTML source with JavaScript modifications
        html_source = driver.page_source
    
    
    # Parse the HTML with BeautifulSoup 
        soup = BeautifulSoup(html_source, 'html.parser')
    
    #Create a list to store the data
    
        list_of_cars = []
    # Now you can use BeautifulSoup to extract the data you need
    # ...
        ratings = soup.find_all('div',{'data-brand':brand})
        for rating in ratings:
            data = {}
            data['title'] = rating.find('li',{'class':'mb-2 list-group-item border-0 p-0'}).get_text()
            data['price'] = rating['data-prices']
            data['details'] = [ element.get_text().strip() for element in rating.find_all('li',{'class':'list-group-item border-0 p-0'})]
            data['link'] = rating.find('a', {'class':'no-decoration'}).get('href')
            list_of_cars.append(data)
       
    # Print the data
        for car in list_of_cars:
            print(car, end='\n')
    
    # Close browser
    
        driver.quit()
        
    
    alcopascrap('nissan')

    As for the code description, here is the one from Github Copilot, you judge if it’s good or not.

    This Python script uses Selenium and BeautifulSoup to scrape car data from the Alcopa Auction website. Here’s a step-by-step breakdown:

    1. Import necessary libraries: BeautifulSoup for parsing HTML, Selenium for browser automation, and time for controlling the script’s timing.
    2. Set up a headless browser using Selenium’s webdriver. This script uses Chrome as the browser.
    3. Define a function alcopascrap(brand), which takes a car brand as an argument and scrapes data for that brand from the website.
    4. The function first navigates to the Alcopa Auction website’s search page for the specified brand.
    5. It waits for the page to load, then clicks the accept button on the cookie consent dialog.
    6. It then enters a loop where it continually scrolls to the bottom of the page, waits for more content to load, and repeats until no more new content is loaded. This is done to ensure all the dynamic content loaded by JavaScript is captured.
    7. After all content is loaded, it gets the page’s HTML source.
    8. It then parses the HTML using BeautifulSoup.
    9. It creates an empty list list_of_cars to store the car data.
    10. It finds all div elements with a data-brand attribute matching the specified brand. For each of these elements, it extracts the car title, price, details, and link, and stores them in a dictionary. Each dictionary is then appended to list_of_cars.
    11. It prints out the data for each car.
    12. Finally, it closes the browser.
    13. The function is then called with ‘nissan’ as the argument, so it scrapes data for Nissan cars.

    This script is a good example of how to use Selenium and BeautifulSoup together to scrape data from a website that uses JavaScript to load content.

  • Using line_profiler in Python

    Currently I am reading one of the chapters from Python Data Science Handbook and saw that there are some magic commands that can be used in IPython.

    One of them gives you the possibility to time the execution and it’s called %timeit. Going further it seems that there are other commands like %prun but it is not present in plain old Python, so I asked Gemini, the lovely chatbot, what were the alternatives. Here is his answer:

    Unfortunately, there’s no single command in standard Python that perfectly replicates the functionality of IPython’s %prun magic command. Let me explain why and provide the closest alternatives:

    What %prun Does

    IPython’s %prun offers detailed profiling of a code statement. It shows you:

    • Number of times each function was called.
    • Time spent in each function (cumulative and per-call).
    • Other profiling information

    Python Equivalents

    Here are the most common alternatives:

    1. cProfile Module:
    • Usage:
      Python
      import cProfile
      cProfile.run(‘my_function()’)
    • Provides similar profiling information to %prun.
    • Advantage: More granular control over profiling.
    1. line_profiler Module (External Install):
    • Install: pip install line_profiler
    • Usage: Decorate functions with @profile
    • Provides line-by-line profiling, showing execution time for each code line within a function.
    1. Timeit Module:
    • Best for measuring short code snippets’ execution time.
    • Not a full-fledged profiler, but useful for quick timing comparisons.

    Example: Using cProfile

    Python

    import cProfile

    def my_function():
      # Code you want to profile

    if __name__ == ‘__main__’:
        cProfile.run(‘my_function()’)

    Choosing the Right Tool

    • For general profiling: cProfile is a good starting point.
    • For line-by-line analysis: Consider line_profiler.
    • For simple timing: timeit is sufficient.

    So I tried to install line_profiler with pip, created a test py file with the example from the book

    @profile
    def sum_of_lists(N):
                total = 0
                for i in range(5):
                    L = [j ^ (j >> i) for j in range(N)]
                    total += sum(L)
                return total
    
    sum_of_lists(1000000)

    It seems like in order to grab the statistics you need to use kernprof which I likely found in my anaconda installation.

    I executed it with the following output

    kernprof -l lineprofiler_test.py 
    Wrote profile results to lineprofiler_test.py.lprof
    Inspect results with:
    python -m line_profiler -rmt "lineprofiler_test.py.lprof"
    python -m line_profiler -rmt "lineprofiler_test.py.lprof"
    Timer unit: 1e-06 s
    
    Total time: 0.906589 s
    File: lineprofiler_test.py
    Function: sum_of_lists at line 1
    
    Line #      Hits         Time  Per Hit   % Time  Line Contents
    ==============================================================
         1                                           @profile
         2                                           def sum_of_lists(N):
         3         1         21.0     21.0      0.0      total = 0
         4         6         51.0      8.5      0.0      for i in range(5):
         5         5     862782.0 172556.4     95.2          L = [j ^ (j >> i) for j in range(N)]
         6         5      43735.0   8747.0      4.8          total +=sum(L)
         7         1          0.0      0.0      0.0      return total
    
      0.91 seconds - lineprofiler_test.py:1 - sum_of_lists

    Which is kind of cool taking into consideration that we wrote in the past a lot of scripts without optimising for performance.

    Cheers

  • Merging initial data load with IP info

    Getting back to my traffic statistics project, I managed to play with aggregation of the two main collections that had the information I needed.

    First of all there is the normal code to load data from Mongo and check also the columns that can be used for the join. Which is available below

    import pandas as pd
    from pymongo import MongoClient
    
    # Connect to MongoDB
    client = MongoClient('localhost', 27017)
    db = client['mydatabase']
    traffic_init_load = db['traffic_init_load']
    unique_ip_info = db['unique_ip_info']
    
    # Read the data from MongoDB
    traffic_init_load_data = traffic_init_load.find()
    unique_ip_info_data = unique_ip_info.find()
    
    # Convert the data to a DataFrame
    traffic_init_load_df = pd.DataFrame(list(traffic_init_load_data))
    unique_ip_info_df = pd.DataFrame(list(unique_ip_info_data))
    
    traffic_init_load_columns = list(traffic_init_load_df.columns)
    unique_ip_info_columns = list(unique_ip_info_df.columns)
    
    # Print the columns
    print(traffic_init_load_columns)
    print(unique_ip_info_columns)

    After this, we can safely perform the merge and drop the _id columns

    merged_df = pd.merge(traffic_init_load_df, unique_ip_info_df, left_on='SourceIP', right_on='query')
    merged_df = merged_df.drop(['_id_x', 'query', '_id_y'], axis=1)

    Check it again and we also see that there are also two columns that are not really required: index and message which was returned by whois API. We drop those as well

    merged_df = merged_df.drop(['index', 'message'], axis=1)

    And now we are finally ready to convert it and put it back to the database

    # Convert DataFrame to a list of records
    records = merged_df.to_records(index=False)
    
    # Connect to MongoDB
    client = MongoClient('localhost', 27017)
    db = client['mydatabase']
    base_merged_data = db['base_merged_data']
    
    # Insert records into MongoDB collection
    dicts = []
    for record in records:
        record_dict = {}
        for field_name, value in zip(merged_df.columns, record):
            record_dict[field_name] = value
        dicts.append(record_dict)
    
    base_merged_data.insert_many(dicts)

    What I can tell you is that insert_many method is significantly faster than insert_one. At first I played with insert_one and it took 12 min to do the job, and with insert_many, that time was reduced to 3 min.

    That is all for today,

    Sorin

  • Loading unique IP’s in MongoDB

    Hi,

    So today I played a little bit with the possibility of storing the unique IP addresses in a separate table.

    Since I will use a subscription from  ip-api.com, it seems that there is an option to query info by batch processing with a limit of 100 IP’s per payload.

    So, at a first glance there are 227200 unique ip’s in my dataset. That will account for 2272 payloads to be queried.

    The code more or less looks in the following way:

    unique_ip = temp['SourceIP'].unique()
    unique_list = [unique_ip[i:i + 100] for i in range(0, len(unique_ip), 100)]
    data = []
    for i in range(len(unique_list)):
        temp_dict = {}
        temp_dict['id'] = i+1
        temp_dict['payload'] = unique_list[i].tolist()
        data.append(temp_dict)

    Once this is constructed you only need to parse the list element by element and insert it to MongoDB using this code:

    import pymongo
    
    myclient = pymongo.MongoClient("mongodb://localhost:27017/")
    mydb = myclient["mydatabase"]
    mycol = mydb["unique_ip"]
    for i in range(len(data)): 
        mycol.insert_one(data[i])

    Next step will involve taking the collection one by one and serve it to the API endpoint.

    Tnx,

    Sorin

  • Loading data to a Mongo database for further processing

    Hi,

    Since I needed my data to be available for further processing in a centralized manner, I have decided to store the first draft as well as further queries to location API in a Mongo database.

    Here is the short code snippet that was used for this task:

    import pandas as pd
    
    import pymongo
    
    df = pd.read_csv(r'C://Users//Sorin//Downloads//filter.concat')
    
    test = df[df['pppoe0'] == 'pppoe0']
    temp = test.iloc[:, [0,1,2,21, 22, 23, 24]]
    
    dict = {'Feb': 'Month',
            '18': 'Day',
            '09:16:00': 'Hour',
            '184.105.247.254': 'SourceIP',
            '86.123.204.222': 'DestinationIP',
            '48307': 'SourcePort',
            '447': 'DestinationPort'}
     
    temp.rename(columns=dict,
              inplace=True)
    
    
    myclient = pymongo.MongoClient("mongodb://localhost:27017/")
    mydb = myclient["mydatabase"]
    mycol = mydb["traffic_init_load"]
    temp.reset_index(inplace=True)
    data_dict = temp.to_dict("records")
    # Insert collection
    mycol.insert_many(data_dict) 

    From the concatenated file, my interest is strictly related to traffic on pppoe.

    We take only headers which are related to Source and Destination, and after that the documents are written to MongoDB.

    That is all.

  • Start of the traffic project

    So, I managed to gather about 1 GB of records from the pfsense installation and grab them from the box (filter.log files that you can find under /var/log).

    And I have a list of 16 logs that I need to concatenate.

    I had a lot of trouble concatenating it since I tried multiple times to use writelines() method from the file object.

    The code that worked for me:

    outputcsv = open('//Users//tudorsorin//Downloads//var//log//firewallrepo//filter.csv','w')
    f = open(f'//Users//tudorsorin//Downloads//var//log//firewallrepo//filter.concat', 'r')
    lines = f.readlines()
    for line in lines:
        outputcsv.writelines(",".join(line.split(" ")[0:3])+","+line.split(" ")[-1])
    f.close()
    outputcsv.close()

    The idea is that it’s already in CSV format and all you need to do is to modify the “header” that normally looks like Feb 20 07:58:18 soaretudorhome filterlog[41546]: to something like Feb,20, 07:58:18, and the rest remains the same.

    Suprisingly, if you want to load it directly to a dataframe using pd.read_csv and you don’t force a header it works and I have all the data there with NaN in the fields that are not filled.

    After this is done, we can filter only traffic that is done over ppoe0 which is the WAN interface, and you can easily do that using temp = df[df[‘pppoe0’] == ‘pppoe0’]

    So far so good. I also took a look at a generic pppoe0 line and came to the conclusion that the colums that interest me are [0,1,2,21,22,23,24] which represent the date and source ip, destination ip and ports (source and destination). You can than filter the dataframe by temp = temp.iloc[:, [0,1,2,21, 22, 23, 24]]

    So we finally we have a dateframe that we can work with. Now what remains is to change the table header and try to enhance it with extra info.

    Cheers

    Sorin

  • Microsoft Teams blocked by pfBlockerNG

    Hi,

    One short tip to remember. I’ve been struggling for a while now with the fact that pfBlockerNG was blocking my Teams connection for whatever reason.

    I couldn’t understand what was the correct way to fix this until today. I should have known that there isn’t a range of IPs that can be whitelisted to make it work, and it’s related to the domain that was blocked.

    This became evident today when I took a look at the Reports tab and Alerts subtab and filtered by interface

    In order to fix it, you will need to go to DNSBL tab and expand TLD Exclusion List so that you can add the general domain that should be excluded.

    You could also whitelist each subdomain but since we are talking Microsoft, I think this is easier.

    The way this works, at least from what I understood, is that it will allow all of hostnames with the general domain and only block the ones that are specifically blacklisted.

    That would be all for today,

    Sorin

  • Python Kata on Codewars

    Hi,

    Since I pretty much broke the internet trying to solve the following “kata” with pieces of code, lets paste it also here cause it makes me proud.

    Here is the link to the kata: https://www.codewars.com/kata/5977ef1f945d45158d00011f

    And also here is my “solution” which took quite a long time to fix:

    def sep_str(st): 
        # your code here
        test_list = [[letter for letter in element] for element in st.split()]
        for a in test_list:
            a.extend([""] * (max(map(len, test_list)) - len(a)))
        if test_list:    
            result = [[test_list[j][i] for j in range(len(test_list))] for i in range(len(test_list[0]))]
        else:
            result = []
        return result

    That is all.

    Cheers!

  • Traffic statistics – new project

    Hi,

    For some time I wanted to understand how the traffic on my networking is actually shaped.

    To that purpose, at first I purchased a Synology router but it seems that it hasn’t that much traffic logging capabilities, so I kept it and put in front of it the following box.

    It’s a cool toy but ultimately I wanted to have Pfsense installed on it and logging activated so that I can gather as much data as possible.

    It’s now installed and hopefully it should be the start of some articles related to the data manipulation and also, maybe, some administration insights.

    Tnx,

    Sorin