chromium/chrome/test/enterprise/e2e/connector/reporting_connector_splunk/splunk_server.py

# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


import json
import requests
import time

from datetime import datetime
import xml.etree.ElementTree as ET
from .. import Verifyable, VerifyContent


class SplunkApiService(Verifyable):
  baseurl = ''
  userName = ''
  password = ''

  def __init__(self, credentials):
    credentialsJson = json.loads(credentials)
    self.baseurl = 'https://' + credentialsJson['ip_address'] + ':8089'
    self.userName = credentialsJson['username']
    self.password = credentialsJson['password']

  def get_dispatch_state(self, job_id):
    response = requests.post(
        self.baseurl + '/services/search/jobs/' + job_id,
        auth=(self.userName, self.password),
        verify=False)
    root = ET.fromstring(response.text)
    print(response.text)
    dispatchState = ""
    for tag in root:
      if "content" in tag.tag:
        for tag2 in tag:
          for tag3 in tag2:
            if tag3.attrib['name'] == "dispatchState":
              dispatchState = tag3.text
    return (dispatchState.upper())

  def get_results(self, job_id):
    data = {'output_mode': 'json'}
    offset = 0
    results_fullness = False
    events = list()

    while not results_fullness:
      response = requests.get(self.baseurl + '/services/search/jobs/' \
                              + job_id + '/results?count=50000&offset=' \
                              + str(offset), data=data,
                              auth=(self.userName, self.password), verify=False)
      response_data = json.loads(response.text)
      print("Count: " + str(len(response_data['results'])))
      events += response_data['results']
      if len(response_data['results']
            ) == 0:  #This means that we got all the results
        results_fullness = True
      else:
        offset += 50000
    print("All: " + str(len(events)))
    return events

  def TryVerify(self, content: VerifyContent) -> bool:
    """This method Verify event entry existence on the splunk instance"""
    searchQuery = "sourcetype=google:chrome:json"
    minutes = int((
        (datetime.utcnow() - content.timestamp).total_seconds() / 60) + 2)
    searchQuery += ' earliest=-' + str(minutes) + 'm'
    searchQuery += ' device_id="' + content.device_id + '"'
    searchQuery += ' event="badNavigationEvent"'
    # Remove leading and trailing whitespace from the search
    searchQuery = searchQuery.strip()
    print(searchQuery)

    # If the query doesn't already start with the 'search' operator or another
    # generating command (e.g. "| inputcsv"), then prepend "search " to it.
    if not (searchQuery.startswith('search') or searchQuery.startswith("|")):
      searchQuery = 'search ' + searchQuery

    data = {'search': searchQuery, 'max_count': '10000000'}
    # Run the search.
    # disable SSL cert validation.
    response = requests.post(
        self.baseurl + '/services/search/jobs',
        data=data,
        auth=(self.userName, self.password),
        verify=False)

    root = ET.fromstring(response.text)
    for tag in root:
      job_id = tag.text

    status = "UNKNOWN"
    while status != "DONE":
      status = self.get_dispatch_state(job_id)
      print(status)
      if status not in ["QUEUED", "PARSING", "RUNNING", "FINALIZING", "DONE"]:
        return False
      time.sleep(5)

    events = self.get_results(job_id)
    return len(events) != 0