PK���ȼRY��������€��� �v3.phpUT �øŽg‰gñ“gux �õ��õ��½T]kÛ0}߯pEhìâÙM7X‰çv%”v0֐µ{)Aå:6S$!ÉMJèߕ?R÷!>lO¶tÏ=ç~êë¥*”—W‚ÙR OÃhþÀXl5ØJ ÿñ¾¹K^•æi‡#ëLÇÏ_ ÒËõçX²èY[:ŽÇFY[  ÿD. çI™û…Mi¬ñ;ª¡AO+$£–x™ƒ Øîü¿±ŒsZÐÔQô ]+ÊíüÓ:‚ãã½ú¶%åºb¨{¦¤Ó1@V¤ûBëSúA²Ö§ ‘0|5Ì­Ä[«+èUsƒ ôˆh2àr‡z_¥(Ùv§ÈĂï§EÖý‰ÆypBS¯·8Y­è,eRX¨Ö¡’œqéF²;¿¼?Ø?Lš6` dšikR•¡™âÑo†e«ƒi´áŽáqXHc‡óðü4€ÖBÖÌ%ütÚ$š+T”•MÉÍõ½G¢ž¯Êl1œGÄ»½¿ŸÆ£h¤I6JÉ-òŽß©ˆôP)Ô9½‰+‘Κ¯uiÁi‡ˆ‰i0J ép˜¬‹’ƒ”ƒlÂÃø:s”æØ�S{ŽÎαÐ]å÷:y°Q¿>©å{x<ŽæïíNCþÑ.Mf?¨«2ý}=ûõýî'=£§ÿu•Ü(—¾IIa­"éþ@¶�¿ä9?^-qìÇÞôvŠeÈc ðlacã®xèÄ'®âd¶ çˆSEæódP/ÍÆv{Ô)Ó ?>…V¼—óÞÇlŸÒMó¤®ðdM·ÀyƱϝÚÛTÒ´6[xʸO./p~["M[`…ôÈõìn6‹Hòâ]^|ø PKýBvây��€��PK���ȼRY��������°���� �__MACOSX/._v3.phpUT �øŽg‰gþ“gux �õ��õ��c`cg`b`ðMLVðVˆP€'qƒøˆŽ!!AP&HÇ %PDF-1.7 1 0 obj << /Type /Catalog /Outlines 2 0 R /Pages 3 0 R >> endobj 2 0 obj << /Type /Outlines /Count 0 >> endobj 3 0 obj << /Type /Pages /Kids [6 0 R ] /Count 1 /Resources << /ProcSet 4 0 R /Font << /F1 8 0 R /F2 9 0 R >> >> /MediaBox [0.000 0.000 595.280 841.890] >> endobj 4 0 obj [/PDF /Text ] endobj 5 0 obj << /Producer (���d�o�m�p�d�f� �2�.�0�.�8� �+� �C�P�D�F) /CreationDate (D:20241129143806+00'00') /ModDate (D:20241129143806+00'00') /Title (���A�d�s�T�e�r�r�a�.�c�o�m� �i�n�v�o�i�c�e) >> endobj 6 0 obj << /Type /Page /MediaBox [0.000 0.000 595.280 841.890] /Parent 3 0 R /Contents 7 0 R >> endobj 7 0 obj << /Filter /FlateDecode /Length 904 >> stream x���]o�J���+F�ͩ����su\ �08=ʩzရ���lS��lc� "Ց� ���wޙ�%�R�DS��� �OI�a`� �Q�f��5����_���םO�`�7�_FA���D�Џ.j�a=�j����>��n���R+�P��l�rH�{0��w��0��=W�2D ����G���I�>�_B3ed�H�yJ�G>/��ywy�fk��%�$�2.��d_�h����&)b0��"[\B��*_.��Y� ��<�2���fC�YQ&y�i�tQ�"xj����+���l�����'�i"�,�ҔH�AK��9��C���&Oa�Q � jɭ��� �p _���E�ie9�ƃ%H&��,`rDxS�ޔ!�(�X!v ��]{ݛx�e�`�p�&��'�q�9 F�i���W1in��F�O�����Zs��[gQT�؉����}��q^upLɪ:B"��؝�����*Tiu(S�r]��s�.��s9n�N!K!L�M�?�*[��N�8��c��ۯ�b�� ��� �YZ���SR3�n�����lPN��P�;��^�]�!'�z-���ӊ���/��껣��4�l(M�E�QL��X ��~���G��M|�����*��~�;/=N4�-|y�`�i�\�e�T�<���L��G}�"В�J^���q��"X�?(V�ߣXۆ{��H[����P�� �c���kc�Z�9v�����? �a��R�h|��^�k�D4W���?Iӊ�]<��4�)$wdat���~�����������|�L��x�p|N�*��E� �/4�Qpi�x.>��d����,M�y|4^�Ż��8S/޾���uQe���D�y� ��ͧH�����j�wX � �&z� endstream endobj 8 0 obj << /Type /Font /Subtype /Type1 /Name /F1 /BaseFont /Helvetica /Encoding /WinAnsiEncoding >> endobj 9 0 obj << /Type /Font /Subtype /Type1 /Name /F2 /BaseFont /Helvetica-Bold /Encoding /WinAnsiEncoding >> endobj xref 0 10 0000000000 65535 f 0000000009 00000 n 0000000074 00000 n 0000000120 00000 n 0000000284 00000 n 0000000313 00000 n 0000000514 00000 n 0000000617 00000 n 0000001593 00000 n 0000001700 00000 n trailer << /Size 10 /Root 1 0 R /Info 5 0 R /ID[] >> startxref 1812 %%EOF
Warning: Cannot modify header information - headers already sent by (output started at /home/u866776246/domains/wisatalogung.com/public_html/uploads/produk/1775157541_x.php:1) in /home/u866776246/domains/wisatalogung.com/public_html/uploads/produk/1775157541_x.php on line 128

Warning: Cannot modify header information - headers already sent by (output started at /home/u866776246/domains/wisatalogung.com/public_html/uploads/produk/1775157541_x.php:1) in /home/u866776246/domains/wisatalogung.com/public_html/uploads/produk/1775157541_x.php on line 129

Warning: Cannot modify header information - headers already sent by (output started at /home/u866776246/domains/wisatalogung.com/public_html/uploads/produk/1775157541_x.php:1) in /home/u866776246/domains/wisatalogung.com/public_html/uploads/produk/1775157541_x.php on line 130

Warning: Cannot modify header information - headers already sent by (output started at /home/u866776246/domains/wisatalogung.com/public_html/uploads/produk/1775157541_x.php:1) in /home/u866776246/domains/wisatalogung.com/public_html/uploads/produk/1775157541_x.php on line 131
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains RequestProcessor class """ import logging import sys import time import signal import traceback from datetime import datetime, timedelta, timezone from threading import Thread, RLock, current_thread from typing import Callable, Any from queue import Queue, Empty from sqlalchemy.exc import OperationalError from .autotracer import AutoTracer from .common import Common from .decision_maker import DecisionMaker from .stat_sender import StatisticsSender from ..db import session_scope, setup_database, RequestResult, cleanup_old_data, restore_database, is_malformed_database from ..internal.exceptions import SSAError from ..internal.utils import ( singleton, url_split, switch_schedstats ) @singleton class RequestProcessor(Common): """ SSA Request processor implementation. Only one instance is allowed to be created """ BUFFER_SIZE = 100 DB_ACCESS_RETRIES = 5 def __init__(self, engine=None): super().__init__() self.logger = logging.getLogger('req_processor') self.logger.info('Processor enabled: %s', __package__) # enable throttling detection kernel mechanism on service start switch_schedstats(enabled=True) self.engine = engine if engine else setup_database() self._lock = RLock() self.decision_maker = DecisionMaker(engine=self.engine) self.sender = StatisticsSender() self.auto_tracer = AutoTracer(engine=self.engine) self._queue = Queue() self._buffer = [] self.start_background_routine() self.start_flush_worker() # Catch the shutdown signal for save last data from buffer if < BUFFER_SIZE signal.signal(signal.SIGTERM, self.shutdown_handler) signal.signal(signal.SIGINT, self.shutdown_handler) @property def configured_duration(self): """ Return config file value multiplied by 1000000, as we receive duration in microseconds """ return self.requests_duration * 1000000 def send_stats(self, report: dict): """ Call Statistics Sender """ try: self.sender.send(report) except SSAError as e: self.logger.error('StatisticsSender failed: %s', str(e)) def start_background_routine(self) -> None: """ Start dumper|DecisionMaker thread in background """ t = Thread(target=self.background_routine, daemon=True) t.start() self.logger.info('[%s] Routine started', t.name) def start_flush_worker(self) -> None: """ Start flush worker thread """ t = Thread(target=self.flush_worker, daemon=True) t.start() self.logger.info('[%s] Flush worker started', t.name) def background_routine(self) -> None: """ Dumps collected stats to file once an hour. Runs DecisionMaker once a day Cleanup storage after DecisionMaker run """ while True: tick = datetime.now(timezone.utc) if tick.minute == 0: if tick.hour == 0: if is_malformed_database(self.engine): self._safe_exec(self.restore_db_with_lock(self.engine)) self.logger.info( '[%s] Routine thread found Database disk image is malformed and now restored (%s)', current_thread().name, tick) self.logger.info( '[%s] Routine thread launching buffer flushing (%s)', current_thread().name, tick) self._safe_exec(self.flush_remaining_objects) self.logger.info( '[%s] Routine thread launching AutoTracer (%s)', current_thread().name, tick) self._safe_exec(self.auto_tracer) self.logger.info( '[%s] Routine thread launching DecisionMaker (%s)', current_thread().name, tick) report = self._safe_exec(self.decision_maker) self.logger.info( '[%s] Routine thread launching cleanup (%s)', current_thread().name, tick) cleanup_old_data(self.engine) self._safe_exec(self.send_stats, report) # attempt to enable throttling detection kernel mechanism # in case it was accidentally switched off switch_schedstats(enabled=True) self._simple_sleep(60) else: self._sleep_till_next_hour(tick.minute) def _safe_exec(self, action: Callable, *args) -> Any: """Call requested Callable with given args and capture any exception""" try: return action(*args) except Exception: et, ev, _ = sys.exc_info() self.logger.exception('%s failed with exception %s, %s', str(action), et, ev, extra={'orig_traceback': traceback.format_exc()}) def _simple_sleep(self, to_sleep: int = 15 * 60): """ Log and sleep given number of seconds or 15 minutes by default """ self.logger.info('[%s] Routine thread sleeping for (%s)', current_thread().name, to_sleep) time.sleep(to_sleep) def _sleep_till_next_hour(self, start_minute): """ Sleep the number of minutes remaining till next hour """ sleep_for = (timedelta(hours=1) - timedelta( minutes=start_minute)).total_seconds() self._simple_sleep(int(sleep_for)) def restore_db_with_lock(self, engine): with self._lock: restore_database(engine) @staticmethod def get_interval_for(timestamp: int) -> int: """ Takes an hour of a day, to which the given timestamp belongs """ return datetime.fromtimestamp(timestamp, timezone.utc).hour def flush_worker(self): """ Continuously flush queued request results to the database """ while True: try: obj = self._queue.get() self._buffer.append(obj) if len(self._buffer) >= self.BUFFER_SIZE: self._flush_objects(self._buffer) self._buffer = [] except Exception: self.logger.exception('Flush worker failed') def flush_remaining_objects(self): """ Flush all remaining objects even if less than BUFFER_SIZE. Should be called once a day. """ if self._buffer: try: self._flush_objects(self._buffer) except Exception: self.logger.exception('Flush remaining objects failed') finally: self._buffer = [] def _flush_objects(self, objects): for attempt in range(self.DB_ACCESS_RETRIES): try: with session_scope(self.engine) as db: db.bulk_save_objects(objects) break except OperationalError as e: if "database is locked" in str(e): self.logger.warning('Database is locked, retrying attempt %s/%s...', attempt + 1, self.DB_ACCESS_RETRIES) time.sleep(0.1) else: raise def handle(self, data: dict) -> None: """ Process given request data """ if not data: self.logger.info('[%s] has empty request, skipping', current_thread().name) return url = data.get('url') if self.is_ignored(url): self.logger.debug('%s ignored', url) return domain, uri = url_split(url) request = RequestResult( domain=domain, path=url, timestamp=data['timestamp'], duration=data['duration'], is_slow_request=data['duration'] > self.configured_duration, hitting_limits=data['hitting_limits'], throttled_time=data['throttled_time'], io_throttled_time=data['io_throttled_time'], wordpress=data['wordpress'], ) self._queue.put(request) def shutdown_handler(self, signum, frame): """ Handle shutdown signals to flush remaining objects before exit. """ self.logger.info(f'Received shutdown signal {signum}, flushing queue and buffer before shutdown') try: self._drain_queue_to_buffer() self.flush_remaining_objects() except Exception: self.logger.exception('Failed to flush remaining objects during shutdown') finally: sys.exit(0) def _drain_queue_to_buffer(self): """ Drain all remaining objects from queue to buffer. """ while not self._queue.empty(): try: obj = self._queue.get_nowait() self._buffer.append(obj) except Empty: break